<a href="https://colab.research.google.com/github/Elvirasun28/anomality_detection/blob/master/Low_Pass_Filter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

In [0]:
import os

path = "/content/gdrive/My Drive/Colab Notebooks/anomality_detection"
os.chdir(path)
os.listdir(path)

In [0]:
from __future__ import division
from itertools import count
import matplotlib.pyplot as plt 
from numpy import linspace, loadtxt, ones, convolve
import numpy as np
import pandas as pd
import collections
from random import randint 
from matplotlib import style 
style.use('fivethirtyeight')

In [0]:
## load dataset 
data = loadtxt('data/sunspots.txt',np.float)
data_as_frame = pd.DataFrame(data,columns=['Months','SunSpots'])

In [0]:
def moving_average(data,window_size):
    window = np.ones(int(window_size)) / np.float(window_size)
    return np.convolve(data,window,'same')

def explain_anomalies(y,window_size, sigma=1.0):
    ''' exploring the anamolies using stationary standard deviation '''
    avg = moving_average(y, window_size).tolist()
    residual = y - avg 
    std = np.std(residual)
    return {'standard_deviation':round(std,3),
            'anomalies_dict':collections.OrderedDict([(idx,y_i) for idx, y_i, avg_i in zip(count(),y,avg)
                    if (y_i > avg_i + (sigma * std)) | (y_i < avg_i - (sigma*std))
                    ])
            
            }


def explain_anomalies_rolling_std(y,window_size, sigma = 1.0):
    ''' exploring the anamolies using rolling standard deviation '''
    avg = moving_average(y,window_size).tolist()
    residual = y - avg 
    # caculate the variation in the distribution of the residual 
    testing_std = residual.rolling(window_size).std()
    testing_std_as_df = pd.DataFrame(testing_std)
    rolling_std = testing_std_as_df.replace(np.nan,
                                  testing_std_as_df.ix[window_size - 1]).round(3).iloc[:,0].tolist()
    std = np.std(residual)
    return {
        'stationary standard_deviation': round(std,3),
        'anomalies_dict':collections.OrderedDict([
            (idx, y_i) for idx, y_i, avg_i, rs_i in zip(count(), y, avg, rolling_std)
            if (y_i > avg_i +(sigma * rs_i)) |(y_i < avg_i - (sigma * rs_i))
        ])
    }

In [0]:
def plot_results(x, y, window_size, sigma_value = 1, title_for_plot = None, text_xlabel = 'X Axis', text_ylabel = 'Y Axis', apply_rolling_std = False):
  plt.figure(figsize=(15,8))
  plt.plot(x,y,'k.')
  y_av = moving_average(y,window_size)
  plt.plot(x,y_av,color = 'green')
  plt.xlim(0,1000)
  plt.xlabel(text_xlabel)
  plt.ylabel(text_ylabel)
  plt.title(title_for_plot)
  # query for the anomalies and plot the same 
  events = {}
  if apply_rolling_std:
    events = explain_anomalies_rolling_std(y,window_size = window_size, sigma = sigma_value)
  else:
    events = explain_anomalies(y, window_size = window_size, sigma = sigma_value)
  
  x_anomal = np.fromiter(events['anomalies_dict'].keys(), dtype = int, count=len(events['anomalies_dict']))
  y_anomal = np.fromiter(events['anomalies_dict'].values(),dtype = float, count = len(events['anomalies_dict']))
  
  plt.plot(x_anomal, y_anomal, "r*",markersize = 12)
  plt.grid(True)
  plt.show()

In [0]:
x = data_as_frame['Months']
y = data_as_frame['SunSpots']
# plot the results 
plot_results(x, y, window_size = 10, text_xlabel = 'Months', text_ylabel = 'No. of SunSpots', sigma_value = 3)
events = explain_anomalies(y, 5, 3)
# Display the anomaly dict
print("Information about the anomalies model:{}".format(events))

+ Another Case: Stock Prediction

In [0]:
# Convenience function to add noise
def noise(yval):
    """ Helper function to generate random points """
    np.random.seed(0)
    return 20*np.asarray(yval)*np.random.normal(size=len(yval))
  
# Generate a random dataset
def generate_random_dataset(size_of_array=1000, random_state=0):
  
    np.random.seed(random_state)
    y = np.random.normal(0, 0.5, size_of_array)
    x = range(0, size_of_array)
    y_new = [y_i + index**((size_of_array - index)/size_of_array) +ns*((size_of_array - index)/size_of_array) for index, y_i,ns in zip(count(), y, noise(y))]
    return x, pd.Series(y_new)

In [0]:
# Lets play
x1, y1 = generate_random_dataset()
plot_results(x1, y1, window_size=12, title_for_plot="Statinoary Standard Deviation",
                    sigma_value=3, text_xlabel="Time in Days", text_ylabel="Value in $")

# using rolling standard deviation for
x1, y1 = generate_random_dataset()
plot_results(x1, y1, window_size=50, title_for_plot="Using rolling standard deviation",
             sigma_value=3, text_xlabel="Time in Days", text_ylabel="Value in $", apply_rolling_std=True)