# Anomaly Detections using recursive Tree search

### Initial setup

In [1]:
from typing import Tuple, Union

import pandas as pd
import btrdb
from btrdb.stream import Stream

from btrdb.utils.general import pointwidth
from btrdb.utils.timez import ns_to_datetime, to_nanoseconds

Helper function to join timestamps/windows that have previous end the same consecutive start.

In [2]:
def combine_consecutive_windows(
    dataframe,
    event_merge_threshold_ns: int = 0,
    agg: str = "max",
    data_columns: str = "Severity",
    stream=None,
):
    """
    Combine consecutive windows (end time is the same as the consecutive start time) into a single timerange.
    Such as [(2021-03-15 18:34:54.518091776,	2021-03-15 18:35:03.108026368),
    (2021-03-15 18:35:03.108026368,	2021-03-15 18:35:11.697960960)]
    -> [(2021-03-15 18:34:54.518091776,	2021-03-15 18:35:11.697960960)].

    If a stream object is given, check for the gaps between the events and combine the windows.

    NOTE: DataFrame must have ["StartTime", "EndTime"], error will be thrown if not in the columns.

    Parameters
    ----------
    dataframe : DataFrame
        A Pandas' DataFrame object.
    event_merge_threshold_ns : int
        If 2 events are within the specified time range in nanoseconds (end of previous event to start of consecutive),
        they are combined as a single event.
    agg : str
        Aggregation function to apply to the 'Severity'.
    data_columns : str or list of str
        Column names to apply aggregation to.
    stream : Stream, optional
        Stream to check for gaps between the events and combine the event windows.
    Returns
    -------
    DataFrame
        A Pandas' DataFrame object
    """

    def _agg_data_columns(data, mask, starttimes, end_times):
        """
        Apply aggregation to the given `data_columns`.
        """
        severity = (
            data[data_columns].where(mask).copy()  # use the starttime as reference
        )
        for startidx, endidx in zip(*[starttimes.index, end_times.index]):
            severity[startidx] = data.loc[startidx:endidx, data_columns].agg(agg)
        return severity.dropna()

    # DataFrame must have ["StartTime", "EndTime"], error will be thrown if not in columns
    merge_endtimes = dataframe.EndTime[
        (dataframe.StartTime.shift(-1) - dataframe.EndTime > event_merge_threshold_ns)
        | dataframe.StartTime.shift(-1).isna()
    ]
    merge_starttimes = dataframe.StartTime[
        (dataframe.StartTime - dataframe.EndTime.shift(1) > event_merge_threshold_ns)
        | dataframe.EndTime.shift(1).isna()
    ]
    if len(merge_endtimes) != len(merge_starttimes):
        warnings.warn("Unequal sizes for the merging start and end times.")
    elif len(merge_endtimes) == 0 or len(merge_starttimes) == 0:
        # nothing to merge
        return dataframe
    severity = _agg_data_columns(
        dataframe,
        dataframe.StartTime - dataframe.EndTime.shift(1) > event_merge_threshold_ns,
        merge_starttimes,
        merge_endtimes,
    )

    merged_df = pd.concat(
        [
            merge_starttimes.reset_index(drop=True),
            merge_endtimes.reset_index(drop=True),
            severity.reset_index(drop=True),
        ],
        axis=1,
    )

    if stream is not None and merged_df.shape[0] > 1:
        # combine the windows between the events if gaps (if the count == 0)
        # (done after merging to reduce the number of events)
        count = pd.concat(
            [
                merged_df.EndTime,
                merged_df.StartTime.shift(-1),
            ],
            axis=1,
        ).apply(
            lambda x: stream.count(int(x.EndTime), int(x.StartTime), precise=True)
            if not x.isna().any()
            else None,
            axis=1,
        )

        merged_start = merged_df.StartTime[~count.shift(1).eq(0) | count.isna()]
        merged_end = merged_df.EndTime[~count.eq(0) | count.isna()]
        severity = _agg_data_columns(
            merged_df, ~count.shift(1).eq(0) | count.isna(), merged_start, merged_end
        )

        merged_df = pd.concat(
            [
                merged_start.reset_index(drop=True),
                merged_end.reset_index(drop=True),
                severity.reset_index(drop=True),
            ],
            axis=1,
        )
    return merged_df


def search_generator_to_dataframe(search_generator):
    """
    Parameters
    ----------
    search_generator : generator
        The search_generator object containing the search results.

    Returns
    -------
    df : DataFrame
        The DataFrame containing the search results. The columns of the DataFrame depend on the
        shape of the search_generator object. If it has only one column, the column name will be
        "Time". If it has two columns, the column names will be "StartTime" and "EndTime".
    """
    df = pd.DataFrame(search_generator)
    if df.shape[1] == 1:
        df.columns = ["Time"]
    elif df.shape[1] == 2:
        df.columns = ["StartTime", "EndTime"]
    return df

#### Get a stream to search:

In [3]:
conn = btrdb.connect()
streams = conn.streams_in_collection("sunshine", tags={"unit": "volts"})
stream = streams[0]
stream.uuid

UUID('d60fc469-a6da-4c98-8763-fd833293d955')

#### Initializing the start, end, and pointwidth:

In this example, the pointwidth is ~13 days.

In [4]:
start = stream.earliest()[0].time
end = stream.latest()[0].time
initial_pw: int = pointwidth.from_nanoseconds(end - start) - 2
version = 0
start, end, initial_pw.days

(1456790400008333000, 1464738830333333000, 13.03124892178963)

## Finding Specific Values Using Anomaly Detection
We suggest to use this value search for values that are on the edges of the measurement distribution to not touch all of the RawPoints.

In [5]:
def search_timestamps_within_bounds(
    stream: Stream,
    start: int,
    end: int,
    bounds: Tuple[float, float],
    initial_pw: Union[int, pointwidth] = 49,
    final_pw: int = 36,
    return_rawpoint_timestamps: bool = False,
    version: int = 0,
) -> tuple:
    """
    Find timestamps in stream between `start` and `end` that are within specified bounds values
    (lower and upper bounds) using StatPoints recursively through BTrDB tree.

    Parameters
    ----------
    stream: Stream
        BTrDB Stream to search.
    start: int
        The start time in nanoseconds for the range to search from.
    end: int
        The end time in nanoseconds for the range to search from.
    bounds: tuple(float, float), length: 2
        Find events with values within the specified (lower, upper) bounds threshold.
    initial_pw: int or pointwidth, default: 49
        Initial query pointwidth of tree traversal, Default is 49 (approximately 7 days).
    final_pw: int, default: 36
        Final pointwidth depth to use tree traversal with StatPoints and to
        search with RawPoints. Default is 36 (approximately 1.15 minutes).
    return_rawpoint_timestamps: bool, default: False
        Return RawPoint timestamps if `True`, else returns time-range tuple of start and end
        timestamps of the StatPoint windows that is within the specified bounds threshold.
    version: int, default: 0
        Version of the stream to search from.

    Yields
    ------
    tuple
        Timestamp (nanoseconds) of start (and end if within bounds for more than specified max
        depth default is ~1.15 minutes) timestamps of event.

    Example
    -------
    For measurement stream with values (280-290):

    >>> result = search_timestamps_outside_bounds(stream, start, end, bounds=(0, 1))
    >>> result_timestamps = [timestamp for timestamp in result]
    >>> print(result_timestamps)
    []

    """
    # TODO: add more examples cases
    assert len(bounds) == 2, "Requires a minimum and maximum for the `bounds`"
    lower_bound, upper_bound = bounds
    assert isinstance(
        initial_pw, (int, pointwidth)
    ), "Please provide `initial_pw` as an integer or pointwidth object"
    if isinstance(initial_pw, int):
        initial_pw = pointwidth(
            initial_pw
        )  # convert initial_pw integer to pointwidth object
    windows = stream.arrow_aligned_windows(
        start, end, int(initial_pw), version
    ).to_pylist()
    for window in windows:
        # Get the time range of the current window
        wstart = window["time"]
        wend = wstart + pd.Timedelta(initial_pw.nanoseconds, unit="ns")

        # if the aggregates are with the bounds, return full window
        if not return_rawpoint_timestamps and (
            lower_bound <= window["min"] and window["max"] <= upper_bound
        ):
            yield wstart, wend
        elif not (window["max"] < lower_bound or window["min"] > upper_bound):
            # If we are at a window length of a final_pw, use values
            if initial_pw <= final_pw and not return_rawpoint_timestamps:
                points = []
                yield wstart, wend
            elif initial_pw <= final_pw and return_rawpoint_timestamps:
                points = stream.arrow_values(wstart, wend, version).to_pylist()
            # Otherwise, traverse the stat point children of this node
            else:
                points = search_timestamps_within_bounds(
                    stream,
                    wstart.value,
                    wend.value,
                    bounds,
                    initial_pw - 2,
                    final_pw,
                    return_rawpoint_timestamps=return_rawpoint_timestamps,
                    version=version,
                )

            for point in points:
                if isinstance(point, dict):
                    if lower_bound <= point["value"] <= upper_bound:
                        yield (point["time"],)
                else:
                    yield point


def search_timestamps_at_value(
    stream: Stream,
    start: int,
    end: int,
    value: Union[int, float],
    tol: float = 1e-6,
    initial_pw: Union[int, pointwidth] = 49,
    final_pw: int = 36,
    return_rawpoint_timestamps: bool = False,
    version: int = 0,
) -> tuple:
    """
    Find points in stream between `start` and `end` that are equal to specified values (with
    tolerance) using StatPoints recursively through BTrDB tree.

    Parameters
    ----------
    stream: Stream
        BTrDB Stream to search.
    start: int
        The start time in nanoseconds for the range to search from.
    end: int
        The end time in nanoseconds for the range to search from.
    value : int, float
        The values to search in the tree.
    initial_pw: int or pointwidth, default: 49
        Initial query pointwidth of tree traversal, Default is 49 (approximately 7 days).
    final_pw: int, default: 36
        Final pointwidth depth to use tree traversal with StatPoints and to
        search with RawPoints. Default is 36 (approximately 1.15 minutes).
    return_rawpoint_timestamps: bool, default: False
        Return RawPoint timestamps if `True`, else returns time-range tuple of start and end
        timestamps of the StatPoint windows that is outside the specified bounds threshold.
    version: int, default: 0
        Version of the stream to search from.

    Yields
    ------
    tuple
        Timestamp (nanoseconds) of start (and end if above threshold for more than specified max
        depth default is ~1.15 minutes) timestamps of event.

    Example
    -------
    >>> result = search_timestamps_at_value(stream, start, end, value)
    >>> result_timestamps = [timestamp for timestamp in result]
    >>> print(result_timestamps)
    """
    # TODO: add more examples cases
    bounds = (value - tol, value + tol)
    yield from search_timestamps_within_bounds(
        stream,
        start,
        end,
        bounds,
        initial_pw - 2,
        final_pw,
        return_rawpoint_timestamps=return_rawpoint_timestamps,
        version=version,
    )

The example stream has the following StatPoint from March 1, 2016 to May 31, 2016:

In [6]:
stream.arrow_windows(start, end, width=end - start).to_pylist()

[{'time': Timestamp('2016-03-01 00:00:00.008333+0000', tz='UTC'),
  'min': 240.4445343017578,
  'mean': 284.714006053764,
  'max': 293.46502685546875,
  'count': 949451443,
  'stddev': 1.6748411051700447}]

Set the min, mean to use in the example below:

In [7]:
min_value = 240.4445343017578
mean_value = 284.714006053764
max_value = 293.46502685546875
stddev_value = 1.6748411051700447

### Example: Finding the global minimum value from the above StatPoint window data.

In [8]:
%%time
equal_min_generator = search_timestamps_at_value(stream, start, end, value=min_value)
timestamps_series = search_generator_to_dataframe(equal_min_generator)
timestamps_series

CPU times: user 5.01 ms, sys: 5.12 ms, total: 10.1 ms
Wall time: 66.9 ms


Unnamed: 0,StartTime,EndTime
0,2016-03-15 22:16:42.297966592+00:00,2016-03-15 22:17:16.657704960+00:00


Because we are searching the global `mean` value of the Stream, and essentially touching all levels down to pointwidth of 36 which is the default value for `final_pw`, approximately 1.5 minutes. Let's change to 6 levels above `final_pw=42` to window of 1.22 hours.

Note the time it took to ran the search, and see the timestamp gap in row 2-3:

- Row 2's EndTime   : 2016-03-01 02:05:31.221811200+00:00
- Row 3's StartTime : 2016-03-01 02:42:10.245066752+00:00

In [9]:
%%time
equal_mean_generator = search_timestamps_at_value(
    stream, start, end, value=mean_value, final_pw=42  # ~1.22 hours
)
timestamps_series = search_generator_to_dataframe(equal_mean_generator)
print(timestamps_series.size)
timestamps_series.head()

5822
CPU times: user 897 ms, sys: 392 ms, total: 1.29 s
Wall time: 7.81 s


Unnamed: 0,StartTime,EndTime
0,2016-03-01 00:15:34.152044544+00:00,2016-03-01 00:52:13.175300096+00:00
1,2016-03-01 00:52:13.175300096+00:00,2016-03-01 01:28:52.198555648+00:00
2,2016-03-01 01:28:52.198555648+00:00,2016-03-01 02:05:31.221811200+00:00
3,2016-03-01 02:42:10.245066752+00:00,2016-03-01 03:18:49.268322304+00:00
4,2016-03-01 04:32:07.314833408+00:00,2016-03-01 05:08:46.338088960+00:00


Because of StatPoints, the default behavior for all the search functions for `return_rawpoint_timestamps` is **`False`**.
Let's change that to see the RawPoints timestamps:

In [10]:
return_rawpoint_timestamps = True

In [11]:
%%time
equal_min_generator = search_timestamps_at_value(
    stream,
    start,
    end,
    value=min_value,
    return_rawpoint_timestamps=return_rawpoint_timestamps,
)
timestamps_series = search_generator_to_dataframe(equal_min_generator)
timestamps_series

CPU times: user 24.4 ms, sys: 1.81 ms, total: 26.2 ms
Wall time: 58.9 ms


Unnamed: 0,Time
0,2016-03-15 22:16:58.574999+00:00


To return rawpoints, let's make the value 2 stand-deviation above the mean

In [None]:
%%time
equal_generator = search_timestamps_at_value(
    stream,
    start,
    end,
    value=mean_value + 2 * stddev_value,
    final_pw=42,  # ~1.22 hours
    return_rawpoint_timestamps=return_rawpoint_timestamps,
)
timestamps_series = search_generator_to_dataframe(equal_generator)
timestamps_series