# Anomaly detection over heterogeneous data streams

The recipe, inspired from [this paper](https://arxiv.org/abs/1812.02848) by Palladino and Thissen:

1. Split time line into intervals; split the data streams into subsets corresponding to these intervals.
1. For each interval:
    1. Compose a graph out of the events belonging to the interval.
    1. Extract a vector topological features for each vertex.
    1. Reduce the dimension of these feature vectors.
    1. Compute the anomaly indicator according to the reduced feature vectors for the interval.
    
These various steps will be detailed as we go along.

In [1]:
%load_ext pycodestyle_magic
%flake8_on --max_line_length 120 --ignore W293,E302
%pdb on

Automatic pdb calling has been turned ON


In [2]:
import notebooks_as_modules

In [3]:
from jupytest import Suite, Report, Magic, fail, summarize_results, assert_, eq
suite = Suite('test')
if __name__ == '__main__':
    suite |= Report()
    suite |= Magic()

In [4]:
from growing import growing

## Time line partition

Each data stream is taken in as a Dask dataframe, itself indexed by a certain key. A heterogeneous data stream is a set of named data streams, all indexed in the same key.

In [5]:
import dask.dataframe as ddf
from typing import Mapping, TypeVar

DomainIndex = TypeVar("DomainIndex")
StreamHeterogeneous = Mapping[str, ddf.DataFrame]  # Data frame is indexed by DomainIndex

Given a subset of index values (articulated as an ordered, strictly monotonic increasing iterable sequence), a *partition* is thus a heterogeneous stream, mapped by these values.

In [6]:
import pandas as pd
from typing import Mapping, Tuple

Partition = Tuple[DomainIndex, StreamHeterogeneous]
Partitioning = Mapping[DomainIndex, StreamHeterogeneous]

### Partitioning sequence

The partitioning sequence is processed into a sequence of slices, with which to interrogate the index of the heterogeneous stream. Namely, a sequence `[i0, i1, i2... iN]` yields mutually exclusive slices `i0:(i1-1)`, `i1:(i2-1)`... `iN-1:((iN)-1)`. Let's embody partition sequences into a class.

In [7]:
from abc import ABC, abstractmethod
from typing import Generic, Sequence, Iterator, Tuple


class SequencePartition(ABC, Generic[DomainIndex]):

    @abstractmethod
    def __iter__(self) -> Iterator[DomainIndex]:
        raise NotImplementedError()
        yield 0
        
    @abstractmethod
    def pred(self, n: DomainIndex) -> DomainIndex:
        raise NotImplementedError()
        
    def slices(self) -> Iterator[Tuple[DomainIndex, DomainIndex]]:
        prev: Optional[DomainIndex] = None
        for cur in self:
            if prev is not None:
                yield (prev, self.pred(cur))
            prev = cur

In [8]:
class SequenceInt(SequencePartition[int]):
    
    def __init__(self, seq: Sequence[int]) -> None:
        super().__init__()
        self._seq = seq
        
    def __iter__(self) -> Iterator[int]:
        return iter(self._seq)
    
    def pred(self, n: int) -> int:
        return n - 1

In [9]:
%%test No slice for a sequence of less than two items
for i, seq in enumerate([[], [0]]):
    assert_(eq, actual=len(list(SequenceInt(seq).slices())), expected=0)

Test [1mNo slice for a sequence of less than two items[0m passed.


In [10]:
%%test Iterating single slice
assert_(eq, expected=[(0, 1)], actual=list(SequenceInt([0, 2]).slices()))

Test [1mIterating single slice[0m passed.


In [11]:
%%test With sequences not strictly monotonic, we get degenerate slices
assert_(eq, expected=[(0, 0), (1, 0)], actual=list(SequenceInt([0, 1, 1]).slices()))

Test [1mWith sequences not strictly monotonic, we get degenerate slices[0m passed.


In [12]:
%%test Multiple slices from a sequence of integers
assert_(eq, expected=[(0, 2), (3, 4)], actual=list(SequenceInt([0, 3, 5]).slices()))

Test [1mMultiple slices from a sequence of integers[0m passed.


### Partitioning sequence of date-times

In [13]:
from typing import Any


class SequenceDatetime(SequencePartition[pd.Timestamp]):
    
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__()
        self._seq = pd.date_range(*args, **kwargs)
        
    def __iter__(self) -> Iterator[pd.Timestamp]:
        return iter(self._seq)
        
    def pred(self, ts: pd.Timestamp) -> pd.Timestamp:
        return ts - pd.Timedelta(nanoseconds=1)

In [14]:
%%test Multiple slices from a date-time partitioning sequence
assert_(
    eq,
    expected=[
        (pd.Timestamp("2015-01-01T00:00:00"), pd.Timestamp("2015-01-01T00:59:59.999999999")),
        (pd.Timestamp("2015-01-01T01:00:00"), pd.Timestamp("2015-01-01T01:59:59.999999999")),
        (pd.Timestamp("2015-01-01T02:00:00"), pd.Timestamp("2015-01-01T02:59:59.999999999")),
        (pd.Timestamp("2015-01-01T03:00:00"), pd.Timestamp("2015-01-01T03:59:59.999999999"))
    ],
    actual=list(SequenceDatetime("2015-01-01T00:00:00", "2015-01-01T04:00:00", freq=pd.Timedelta(hours=1)).slices())
)

Test [1mMultiple slices from a date-time partitioning sequence[0m passed.


### Partitioning of the heterogeneous stream

In [15]:
def partition(sh: StreamHeterogeneous, seq: SequencePartition) -> Partitioning:
    P = {}
    for lower, upper in seq.slices():
        P[lower] = {name: df.loc[lower:upper] for name, df in sh.items()}
    return P

In [16]:
import dask.dataframe as ddf
from jupytest import Explanation
import pandas as pd


def equals(expected: pd.DataFrame, actual: ddf.DataFrame) -> Explanation:
    if actual.compute().equals(expected):
        return True
    return Explanation(
        "These dataframes are not equal as expected",
        [("Expected", expected), ("Actual", actual)]
    )

In [17]:
%%test Partitioning an int-indexed heterogeneous stream
import dask


@dask.delayed
def f1():
    return pd.DataFrame({"a": [3, 8, 9], "b": [10, 102, 89]}, index=pd.Int64Index([1, 4, 6]))

@dask.delayed
def f2():
    return pd.DataFrame({"a": [5, 3], "b": [90, 32]}, index=pd.Int64Index([8, 11]))

@dask.delayed
def f3():
    return pd.DataFrame({"a": [2, 11, 7, 3], "b": [56, 98, 99, 67]}, index=pd.Int64Index([13, 14, 15, 19]))

@dask.delayed
def g1():
    return pd.DataFrame(
        {"a": [6, 3, 8, -1, 0], "c": [-3, -8, -11, -7, -10], "d": [106, 203, 267, 308, 429]},
        index=pd.Int64Index([3, 6, 7, 10, 11])
    )

@dask.delayed
def g2():
    return pd.DataFrame(
        {"a": [2, 8, 9, 5, 5, 11], "c": [-13, -6, -7, -6, -8, -2], "d": [567, 308, 289, 290, 432, 387]},
        index=pd.Int64Index([14, 15, 17, 19, 21, 24])
    )


sh = {
    "p": ddf.from_delayed(
        [f1(), f2(), f3()],
        meta={"a": "int64", "b": "int64"},
        divisions=[1, 8, 13, 20]
    ),
    "q": ddf.from_delayed(
        [g1(), g2()],
        meta={"a": "int64", "c": "int64", "d": "int64"}, 
        divisions=[3, 14, 28]
    )
}
P = partition(sh, SequenceInt([0, 5, 10, 15, 20]))

assert_(eq, actual=set(P.keys()), expected={0, 5, 10, 15})
assert_(eq, reference={"p", "q"}, **{str(i): set(p.keys()) for i, p in enumerate(P.values())})

assert_(equals, actual=P[0]["p"], expected=pd.DataFrame({"a": [3, 8], "b": [10, 102]}, index=[1, 4]))
assert_(equals, actual=P[0]["q"], expected=pd.DataFrame({"a": [6], "c": [-3], "d": [106]}, index=[3]))
assert_(equals, actual=P[5]["p"], expected=pd.DataFrame({"a": [9, 5], "b": [89, 90]}, index=[6, 8]))
assert_(
    equals,
    actual=P[5]["q"],
    expected=pd.DataFrame({"a": [3, 8], "c": [-8, -11], "d": [203, 267]}, index=[6, 7])
)
assert_(
    equals,
    actual=P[10]["p"],
    expected=pd.DataFrame({"a": [3, 2, 11], "b": [32, 56, 98]}, index=[11, 13, 14])
)
assert_(
    equals,
    actual=P[10]["q"],
    expected=pd.DataFrame(
        {"a": [-1, 0, 2], "c": [-7, -10, -13], "d": [308, 429, 567]},
        index=[10, 11, 14]
    )
)
assert_(
    equals,
    actual=P[15]["p"],
    expected=pd.DataFrame({"a": [7, 3], "b": [99, 67]}, index=[15, 19])
)
assert_(
    equals,
    actual=P[15]["q"],
    expected=pd.DataFrame(
        {"a": [8, 9, 5], "c": [-6, -7, -6], "d": [308, 289, 290]},
        index=[15, 17, 19]
    )
)

Test [1mPartitioning an int-indexed heterogeneous stream[0m passed.


# Test result summary

In [17]:
if __name__ == "__main__":
    _ = summarize_results(suite)

6 passed, [37m0 failed[0m, [37m0 raised an error[0m
