## Simple Anomaly Detection using Live Data

### Setup

#### Install the Cognite Python SDK

In [1]:
!pip install cognite-sdk



#### Import further required packages

In [2]:
import os
import datetime as dt

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from bokeh.io import output_notebook, push_notebook, show
from bokeh.layouts import column
from bokeh.models import Band, LinearAxis, Range1d
from bokeh.models.sources import ColumnDataSource
from bokeh.palettes import Category10
from bokeh.plotting import figure

from cognite.client import CogniteClient

#### Connect to the Cognite Data Platform

* This client object is how all queries will be sent to the Cognite API to retrieve data.
* When prompted for your API key, use the key provided.

In [3]:
client = CogniteClient(api_key="ODg4MWMwZjktNDlmOS00ZTcxLTg3ODgtMDZjMjhhMzNjYTEx")

**Accessing Cognite Data Platform (CDP)**

The CDP organizes digital information about the physical world.
There are different kinds of resources stored on the CDP. Here, we care about assets, time series and datapoints.

 * [Assets](https://doc.cognitedata.com/api/0.5/#tag/Assets) are digital representations of physical objects or groups of objects, and assets are organized into an asset hierarchy. For example, an asset can represent a water pump which is part of a subsystem on an oil platform.


 * A [Time Series](https://doc.cognitedata.com/api/0.5/#tag/Time-series) consists of a sequence of data points connected to a single asset. For example: A water pump asset can have a temperature time series that records a data point in units of °C every second.


 * It is important to refer back to the [SDK](https://cognite-sdk-python.readthedocs-hosted.com/en/latest/cognite.html) for specific details on arguments on all avaiable methods on how to access these objects.

In this example, we are working with a model windmill and want to perform simple anomaly detection using *RPM*, *torque* and *wind speeds* sensor readings. In terms of CDP resources, we need to obtain the *datapoints* corresponding to those three *time series*.


A **Datapoint** in the CDP is stored as a key value pair:

 * timestamp is the time since epoch in milliseconds

 * value is the reading from the sensor

**Checking if we are receiving sensor readings: pulling RPM data**

In [17]:
a = client.datapoints.get_latest('rpm').to_pandas()
a.timestamp = pd.to_datetime(a.timestamp, unit='ms')
a

Unnamed: 0,timestamp,value
0,2019-04-08 16:46:01.187,67.84


### Generating training data

#### Live Plotter class

The class below downloads datapoints for the specified time series nearly continuously using the method `client.datapoints.get_latest` (returning a Python generator) and creates a plot in *Bokeh*

In [5]:
class LivePlotter:
    def __init__(self, tags, update_frequency=0.1, n_show=10, training_data_threshold=60):
        self.tags = tags
        self.update_frequency = update_frequency
        self.n_show = n_show
        self.training_data_threshold = training_data_threshold
        output_notebook()

    def plot(self):
        if self.tags[0][-1] in [2, 3]:
            plot_labels = [tag[:-1] for tag in self.tags]
        else:
            plot_labels = self.tags.copy()

        plot_colors = Category10[3]
        P = [client.datapoints.get_latest(name=tag).to_pandas() for tag in self.tags]
        self.start_time = np.min([pp["timestamp"].values[0] for pp in P])
        x = [pd.to_datetime(self.start_time, unit="ms")]
        y = [pp["value"].values for pp in P]

        my_figure = figure(
            title="Windmill Sensor Values",
            tools="",
            plot_width=900,
            plot_height=500,
            x_axis_type="datetime",
            y_range=[-50, 200],
        )
        test_data = ColumnDataSource(
            dict(x=[x.copy()] * len(self.tags), y=y.copy(), colors=plot_colors.copy(), labels=plot_labels.copy())
        )
        line = my_figure.multi_line(
            xs="x", ys="y", line_color="colors", legend="labels", source=test_data, line_width=5
        )

        my_figure.legend.location = "center_left"

        new_data = dict(x=[x] * len(self.tags), y=y, colors=plot_colors, labels=plot_labels)

        handle = show(my_figure, notebook_handle=True)

        start_data_gen = dt.datetime.now()
        printed_enough_data = False

        try:

            for P in zip(*[client.datapoints.live_data_generator(name=tag, update_frequency=self.update_frequency) for tag in self.tags]):

                x.append(np.max([pd.to_datetime(pp["timestamp"], unit="ms") for pp in P]))  # naive synchronization
                y = [np.append(yyy, ppp["value"]) for yyy, ppp in zip(y, P)]

                nump = np.minimum(self.n_show, len(x))
                x = x[-nump:]  # prevent filling ram
                new_data["x"] = [x] * len(self.tags)
                new_data["y"] = y = [yyy[-nump:] for yyy in y]  # prevent filling ram

                test_data.stream(new_data, nump)
                self.end_time = int(x[-1].timestamp() * 1000)
                push_notebook(handle=handle)
                if (dt.datetime.now() - start_data_gen).total_seconds() > self.training_data_threshold and not printed_enough_data:
                    print("Sufficient training data size.")
                    printed_enough_data = True

        except KeyboardInterrupt:

            print('Interrupted')

#### Plot live data
The windmill is uploading sensor data to CDP, while the `LivePlotter.plot` method is downloading the resulting datapoints as described above.

In [10]:
# choose windmill 
windmill_no = 1

# no of most recent secs to plot
n_show = 60

# length of training sample
training_data_threshold = 60 # in secs: 60 as default

if windmill_no in [2,3, 5]:
    tags = [tag + str(int(windmill_no)) for tag in ["rpm", "torque", "wind"]]  
else:
    tags = ["rpm", "torque", "wind"]
    
plotter = LivePlotter(tags, update_frequency=0.1, n_show=n_show, training_data_threshold=training_data_threshold)
plotter.plot()

Sufficient training data size.
Interrupted


#### Retrieve timestamps at start and end of the above plotting exercise

Below, we will download datapoints for the RPM, torque and wind speed time series in the form of a (synchronized) Pandas dataframe. However, we will have to specify a *start* and *end* time for the datapoints we want to fetch. There are two alternative ways:
 * use start and end timestamps from your own run of `LivePlotter.plot`
 * use mine (you will obtain exactly the same training data)

**Using your run of the live plotter**

In [11]:
start = plotter.start_time
end   = plotter.end_time

In [14]:
# assign this value to the start variable
start = 1554740843080

In [15]:
# assign this value to the end variable
end = 1554740908355

**Type in values provided by me manually**

In [91]:
#start, end = # type in values here

### Train a model

#### Download a Pandas DataFrame of synchronized datapoints for RPM, torque and windspeed

In [16]:
data_train = client.datapoints.get_datapoints_frame(
            time_series=tags,
            start=int(start),
            end=int(end),
            aggregates=['avg'],
            granularity='1s'
        )

#### Build a simple anomaly detector

To keep things as simple as possible, we recommend the following trivial version of the *functional leave-one-out approach*:
 * use a simple [Scikit-learn linear regression](https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LinearRegression.html)
 * for each sensor (RPM, torque and wind speed):
   * train a linear model for the given sensor using the other two (e.g. train a model for RPM using torque and wind speed as predictors)
   * compute the residuals (*actual vs. predicted on the training data*) as well as the mean and standard deviation of the latter (you could *split your training data in half*: the first half is used to train the linear regression model, the second half to compute the residuals and their mean and standard deviation)
 * write a function `predict_AD` that takes a DataFrame `data_test` with the same columns as `data_train` as an argument and does the following for each sensor:
   * computes the residuals/prediction errors using the above models
   * standardizes the residuals using the means and standard deviations computed above:
   $$resid_{std} = (resid - mean(resid))/(std(resid))$$
   * creates a binary anomaly indicator, which takes the value **1 for a given datapoint** if for **any** sensor the standardized residual is greater than one:
   $$resid_{std} > 1.0$$
   * returns an array containing the anomaly indicator
 * instead of linear regression you can try a [Scikit-learn Random Forest](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html)
 
 
 

In [98]:
# write your code here

### Run the anomaly detector on test data

#### Plot some more live data for testing (with anomalies this time)

In [None]:
plotter_test = LivePlotter(tags, update_frequency=0.1, n_show=n_show, training_data_threshold=training_data_threshold)
plotter_test.plot()

Sufficient training data size.


#### Set start and end time for downloading test data

In [None]:
start_test = plotter_test.start_time
end_test   = plotter_test.end_time

In [None]:
# assign this value to the start_test variable
start_test = 

In [None]:
# assign this value to the end_test variable
end_test = 

In [99]:
#start_test, end_test = # type in values here

#### Download a Pandas DataFrame of synchronized datapoints for testing

In [None]:
data_test = client.datapoints.get_datapoints_frame(
            time_series=tags,
            start=int(start_test),
            end=int(end_test),
            aggregates=['avg'],
            granularity='1s'
        )

#### Compute anomaly indicator using the function `predict_AD` from above

In [102]:
# write your code here