Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# Model Development

In the introductory lab, we encountered alternative approaches to anomaly detection.  We implemented our own solution to labeling data points that were several standard deviations from the mean of all data points.  Then we estimated linear and seasonal trends in the time series, and removed them to perform anomaly detection on the remaining time series.

Here, we are going to apply this approach to our telemetry data.  We will first develop a solution that processes all the data in **batch**, and then optimize the solution so that it detects anomalies **online**, whenever new sensor measurements arrive.

There are two reasons why we want to move to an online solution:
- We don't want to repeatedely process historic data over and over again, this would be a waste of computing resources and time.
- We should only store as much historic data as necessary for robust online anomaly detection.

In this lab, we will do the following:
- Use python port [pyculiarity](https://github.com/nicolasmiller/pyculiarity) of [Twitter's anomaly detection R library](https://github.com/twitter/AnomalyDetection)
- Do some data preprocessing to clean up our timeseries data
- Apply this library to detect anomalies in our telemetry data
- Adapt our solution to perform online anomaly detection

We will have two objectives:
1. Find a way to calculate running averages online
2. Determine how much historic data we need to retain without losing our ability to robustly detect anomalies

In [None]:
# %matplotlib inline

import pandas as pd
import numpy as np
import os
from pyculiarity import detect_ts # python port of Twitter AD lib
import matplotlib.pyplot as plt # for plotting
import seaborn # for decent default settings for figures
import time # so we can time our operations
import inspect
import urllib

## Load data

As usual, we start by loading the data

In [None]:
base_path = 'https://sethmottstore.blob.core.windows.net'
data_dir = os.path.join(base_path, 'predmaint')

print("Reading data ... ", end="")
telemetry = pd.read_csv(os.path.join(data_dir, 'telemetry.csv'))
print("Done.")

print("Parsing datetime...", end="")
telemetry['datetime'] = pd.to_datetime(telemetry['datetime'], format="%m/%d/%Y %I:%M:%S %p")
print("Done.")

print("Reading reference anomaly detection results ... ", end="")
anoms_batch = pd.read_csv(os.path.join(data_dir, 'anoms.csv'))
anoms_batch['datetime'] = pd.to_datetime(anoms_batch['datetime'], format="%Y-%m-%d %H:%M:%S")
print("Done.")

## Data Preparation

Let's pick a random sensor (e.g. volt) and machine id and see what we get.

In [None]:
sensor = 'volt'
machine_id = 67

# Select the slice of the telemetry for this machine 
df_s = telemetry.loc[telemetry.loc[:, 'machineID'] == machine_id, ('datetime', sensor)]
df_s.columns = ['timestamp', 'value']

In [None]:
# let's take a peak at the data
plt.close()
fig = plt.figure(figsize=(16,4))
plt.plot(df_s['timestamp'], df_s['value'])
df_s.describe()
display()

Let's try to run anomaly detection. 

In [None]:
results = detect_ts(df_s) # we use the detect_ts function of pyculiarity
anoms = results['anoms']

plt.close()
plt.plot(df_s['timestamp'], df_s['value'])
plt.plot(anoms['timestamp'], anoms['anoms'], 'o')
display()

Hmmm. Looks like we don't really get too many anomalies this way.  We could try to lower our thresholds. Should we try a lower threshold?

In [None]:
alpha = 0.2 # The level of statistical significance with which to accept or reject anomalies. (default: 0.05)

results = detect_ts(df_s, alpha=alpha)
anoms = results['anoms']

plt.close()
plt.plot(df_s['timestamp'], df_s['value'])
plt.plot(anoms['timestamp'], anoms['anoms'], 'o')
display()

That doesn't really seem too change to much. We could lower the threshold further, but maybe we need to take a completely different approach.

## Denoising data

The data look really noisy. Maybe what we need to do is to calculate running averages, because it is not individual measurements that are a problem, but a *collection* of measurements.

Let's start by aggregating sensor measurements over the last 12h.  Let's use exponential decay, so that more recent measurements inside the time window have more weight that those that are at the beginning of the time window.

In [None]:
com = 6 # Specify decay in terms of center of mass (this approximates averageing over about twice as many hours)
rm = df_s['value'].ewm(com=com).mean()

plt.close()
plt.plot(df_s['timestamp'], df_s['value'])
plt.plot(df_s['timestamp'], rm)
display()

Interesting, now there seem to be some more apparent anomalies.  Apparently, sometimes sensor readings are off for several hours in a row.  Let's try to run anomaly detection on the smoother time series.

In [None]:
# we create a data frame with two columns (timestamp, value), which is what pyculiarity expects as input
df_smooth = pd.DataFrame(data={'timestamp': df_s['timestamp'], 'value': rm})

results = detect_ts(df_smooth)
anoms = results['anoms']

# we store the location of the last anomaly for later
last_anomaly = int(np.where(df_smooth['timestamp'] == results['anoms'].tail(1)['timestamp'].values[0])[0])

In [None]:
plt.close()
plt.plot(df_smooth['timestamp'], df_smooth['value'])
plt.plot(anoms['timestamp'], anoms['anoms'], 'o', alpha=.2)
plt.xlabel("Dates")
plt.ylabel("Sensor Value")
display()

### Hands-on lab

That appears to work, but we want our scoring service to work in real-time.  That means we shouldn't be calculating running averages over all the historic data whenever we receive just one new row of data.  Instead we want to use the last existing value of the running average, integrate it with the new data, to get the new running average.  Put another way, we want to calculate running averages *online*, rather than in *batch*.

### Calculating running averages online

The first step is to create a lightweight function that can calculate running avgs. 

[Welford's online](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_Online_algorithm) algorithm for calculating running variance includes a method to also calculate the mean.  This method is also described in Donald Knuth's Art of Computer Programming, Vol 2, page 232, 3rd edition.

In [None]:
import numpy as np

def running_avg(ts, com=6):
    rm_o = np.zeros_like(ts)
    rm_o[0] = ts[0]
    
    for r in range(1, len(ts)):
        curr_com = float(min(com, r))
        rm_o[r] = <your solution goes here>
    
    return rm_o

In [None]:
# # uncomment the below lines to see the solution
# filename = 'running_avg.py'
# urllib.request.urlretrieve(
#     os.path.join('https://raw.githubusercontent.com/Azure/LearnAI-ADPM/master/solutions/', filename),
#     filename=filename)
# with open(filename) as f:
#     print(f.read())

Let's confirm that your solution is correct. If your online version produces the same results as the batch version, you should just see an orange line in the below plot.  That is, a perfect blend of red and yellow. If you also see red and yellow, that means your online solution falls short of implmenting the batch version for calculating running averages.

In [None]:
rm_o = running_avg(df_s['value'].values)

plt.close()
plt.plot(df_smooth['timestamp'], df_smooth['value'], color='yellow')
plt.plot(df_smooth['timestamp'], rm_o, alpha=.5, color='red')
display()

You can also plot he difference between the two resulting time series.  Except for the couple of first values in the timeseries, this should all be zeros.

In [None]:
plt.close()
plt.plot(df_smooth['timestamp'], df_smooth['value'] - rm_o)
display()

### End of Lab

## Optimizing the window over which to perform Anomaly Detection

One remaining optimization is to only use the last couple of values of the running average for anomaly detection, instead of running it repeatedely on the entire timeseries of the running average.  There is no general answer for how many recent values have to be included to get robust anomaly detection, because the choice of the size of this sliding window depends on all sorts of factors (e.g. machine, sensor, size of anomaly).  

This means we have to develop an empirical approach to finding the right size for this sliding window.   

The first step is to define a function that performs anomaly detection using only those values that fall within a specified time window leading up to the current value.

The smaller the window, the faster the anomaly detection algorithm runs.  However, if we make the window too small, the algorithm will no longer be able to detect linear and seasonal trends, and will fail.

Below is a function that allows you to test different window sizes for anomaly detection.  It expects a data frame with two columns (timestamp, value), a specification of the window size, and the row index of an anomaly in the provided data frame.  The function returns whether it found an anomaly and how long it took to run (in seconds).

In [None]:
def detect_ts_online(df_smooth, window_size, stop):
    is_anomaly = False
    run_time = 9999
    
    start_index = max(0, stop - window_size)
    df_win = df_smooth.iloc[start_index:stop, :]
    
    start_time = time.time()
    
    # call pyculiarity's anomaly detection function
    results = detect_ts(df_win, alpha=0.05, max_anoms=0.02, only_last=None, longterm=False, e_value=False, direction='both')
    
    run_time = time.time() - start_time
    
    if results['anoms'].shape[0] > 0:
        timestamp = df_win['timestamp'].tail(1).values[0]
        if timestamp == results['anoms'].tail(1)['timestamp'].values[0]:
            is_anomaly = True
            
    return is_anomaly, run_time

Let's run this function to test whether we can recover the last anomaly we previously discovered in this time series using the batch method.  We stored the timestamp of the `last_anomaly` above for exactly this purpose.

In [None]:
is_anomaly, run_time = detect_ts_online(df_smooth, 24*7*2, last_anomaly) 

print('Anomaly: %s, run_time: %ss' % (is_anomaly, round(run_time,2)))

### Hands-on lab

We need to systematically explore how different window sizes affect our ability to detect anomalies in the time series.  When we do this, we need to consider both our precision and recall in doing so.  Depending on the operational cost of false positives and negatives, we may have to use different F-measures.

In order to do this, we define a function that samples machines and sensors from the data frame.  It checks whether the batch anomaly detection algorithm detected an anomaly at that time point, and tests whether the online anomaly detection algorithm comes to the same result. 

It records the number of correct and incorrect online anomaly detection results, to calculate the accuracy of the online anomaly detection solution.

The script accepts two input arguments:
- window_size: how much historic data to retain
- com: Specify decay in terms of center of mass for running avg


We already provided the source code for this function below. But we would like you to discuss with your neighbors which of the following metrics you should use (follow the links to read up on the definition of these):
- [precision](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.precision_score.html)
- [recall](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.recall_score.html)
- [f1-score](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.f1_score.html)
- [fbeta-score](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.fbeta_score.html) (which beta?)

Think about the cost and savings of different settings for the size of the sliding window.  What if the cost of a false negative is high (e.g. USD 50000 for a broken machine), what if that cost is low (e.g. USD 10 for replacing a broken part and restarting the machine)?  What if a false positive is really expensive (e.g. USD 1000 for production loss while machine is being restarted)?

To conclude the lab, enter your choice at the bottom of the next script and run it.

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import fbeta_score
import time

from pyculiarity import detect_ts

def sample_run(df, anoms_ref, window_size = 500, com = 12, n_epochs=10):
    """
    This functions expects a dataframe df as mandatory argument.  
    The first column of the df should contain timestamps, the second machine IDs
    
    arguments:
    df: a pandas data frame with two columns: 1. timestamp, 2. value
    anoms_ref: reference anomaly detection results 
    
    Keyword arguments:
    window_size: the size of the window of data points that are used for anomaly detection
    com: decay in terms of center of mass (this approximates averageing over about twice as many hours)
    """

    p_anoms = .1

    def detect_ts_online(df_smooth, window_size, stop):
        is_anomaly = False
        run_time = 9999
        start_index = max(0, stop - window_size)
        df_win = df_smooth.iloc[start_index:stop, :]
        start_time = time.time()
        results = detect_ts(df_win, alpha=0.05, max_anoms=0.02, only_last=None, longterm=False, e_value=False, direction='both')
        run_time = time.time() - start_time
        if results['anoms'].shape[0] > 0:
            timestamp = df_win['timestamp'].tail(1).values[0]
            if timestamp == results['anoms'].tail(1)['timestamp'].values[0]:
                is_anomaly = True
        return is_anomaly, run_time

    def running_avg(ts, com=6):
        rm_o = np.zeros_like(ts)
        rm_o[0] = ts[0]
    
        for r in range(1, len(ts)):
            curr_com = float(min(com, r))
            rm_o[r] = rm_o[r-1] + (ts[r] - rm_o[r-1])/(curr_com + 1)
    
        return rm_o

    # create arrays that will hold the results of batch AD (y_true) and online AD (y_pred)
    y_true = [False] * n_epochs
    y_pred = [True] * n_epochs
    run_times = []
    
    # check which unique machines, sensors, and timestamps we have in the dataset
    machineIDs = df['machineID'].unique()
    sensors = df.columns[2:]
    timestamps = df['datetime'].unique()[window_size:]
    
    # sample n_machines_test random machines and sensors 
    random_machines = np.random.choice(machineIDs, n_epochs)
    random_sensors = np.random.choice(sensors, n_epochs)

    # we intialize an array with that will later hold a sample of timetamps
    random_timestamps = np.random.choice(timestamps, n_epochs)
    
    for i in range(0, n_epochs):
        # take a slice of the dataframe that only contains the measures of one random machine
        df_s = df[df['machineID'] == random_machines[i]]
        
        # smooth the values of one random sensor, using our running_avg function
        smooth_values = running_avg(df_s[random_sensors[i]].values, com)
        
        # create a data frame with two columns: timestamp, and smoothed values
        df_smooth = pd.DataFrame(data={'timestamp': df_s['datetime'].values, 'value': smooth_values})

        # load the results of batch AD for this machine and sensor
        anoms_s = anoms_ref[((anoms_ref['machineID'] == random_machines[i]) & (anoms_ref['errorID'] == random_sensors[i]))]
                
        # find the location of the t'th random timestamp in the data frame
        if np.random.random() < p_anoms:
            anoms_timestamps = anoms_s['datetime'].values
            np.random.shuffle(anoms_timestamps)
            counter = 0
            while anoms_timestamps[0] < timestamps[0]:
                if counter > 100:
                    return 0.0, 9999.0
                np.random.shuffle(anoms_timestamps)
                counter += 1
            random_timestamps[i] = anoms_timestamps[0]
            
        # select the test case
        test_case = df_smooth[df_smooth['timestamp'] == random_timestamps[i]]
        test_case_index = test_case.index.values[0]


        # check whether the batch AD found an anomaly at that time stamps and copy into y_true at idx
        y_true_i = random_timestamps[i] in anoms_s['datetime'].values

        # perform online AD, and write result to y_pred
        y_pred_i, run_times_i = detect_ts_online(df_smooth, window_size, test_case_index)
        
        y_true[i] = y_true_i
        y_pred[i] = y_pred_i
        run_times.append(run_times_i)
            
    return <your solution goes here>

In [None]:
# # uncomment the below lines to see the solution
# filename = 'sample_run.py'
# urllib.request.urlretrieve(
#     os.path.join('https://raw.githubusercontent.com/Azure/LearnAI-ADPM/master/solutions/', filename),
#     filename=filename)
# with open(filename) as f:
#     print(f.read())

Explore the parameter space:
- Different settings for the size of the sliding window (e.g. 50, 100, 5000, 1000).
- Different settings for the decay in terms of center of mass for running avg (e.g. 4,6,8,12 24)

In [None]:
from sklearn.metrics import fbeta_score 

n_epochs = 10 # sample size
window_size = 500
com = 12

fscore, run_time = sample_run(telemetry, anoms_batch, window_size=window_size, com=com, n_epochs=n_epochs)

print("Fbeta-score: %s" % fscore)

If you ran the cell above repeatedely, you may have noticed that you can not get reliable results.  This is because we need to draw a bigger sample (n_epochs).  Unfortunately, the run-time of `sample_run` increases linearly with `n_epochs`.  In the next lab, we will therefore move on to run this on the cloud, using the Azure tools for Machine Learning experimentation. 


### End of lab

## The end

Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.