Skip to content
Permalink
Browse files

Merge branch 'mini_batch' into refactor_flow

  • Loading branch information...
d-mo committed Jan 18, 2018
2 parents 102e28a + e89db01 commit 789e41395c8b2742591266ee991ffba73cf6b00f
Showing with 151 additions and 79 deletions.
  1. +93 −68 dsio/anomaly_detectors.py
  2. +1 −1 dsio/generate_data.py
  3. +4 −4 dsio/helpers.py
  4. +1 −1 dsio/main.py
  5. +27 −1 dsio/update_formulae.py
  6. +4 −4 examples/detector.py
  7. +20 −0 examples/example.py
  8. +1 −0 requirements.txt
@@ -1,107 +1,132 @@
""" Base anomaly detector class and collection of built-in detectors """

import abc

import pandas as pd
import numpy as np

from scipy.stats import percentileofscore

from scipy.stats import norm
from collections import namedtuple
from dsio.update_formulae import update_effective_sample_size
from dsio.update_formulae import convex_combination, rolling_window_update
from dsio.update_formulae import convex_combination, rolling_window_update, decision_rule

from sklearn.base import BaseEstimator


class AnomalyMixin(object):
"""Mixin class for all anomaly detectors, compatible with BaseEstimator from scikit-learn."""
_estimator_type = "anomaly"

def fit_score(self, X):
"""Fits the model on X and scores each datapoint in X.
Parameters
----------
X : ndarray, shape (n_samples, n_features)
Input data
class AnomalyDetector(object):
'common base class for all anomaly detectors'
Returns
-------
y : ndarray, shape (n_samples, )
anomaly scores
"""

@abc.abstractmethod
def __init__(self, variable_types=None, model_params=None, tuning_params=None):
self.variable_types = variable_types
self.model_params = model_params
self.tuning_params = tuning_params
self.fit(X)
return self.score_anomaly(X)

@abc.abstractmethod
def update(self, x):
raise NotImplementedError

@abc.abstractmethod
def train(self, x):
def flag_anomaly(self, x):
raise NotImplementedError

@abc.abstractmethod
def score(self, x):
def fit(self, x):
raise NotImplementedError

@abc.abstractmethod
def copy(self):
def score_anomaly(self, x):
raise NotImplementedError

@abc.abstractmethod
def serialise(self):
raise NotImplementedError

def compute_confusion_matrix(detector_output, index_anomalies):

index_detected = set(np.where(detector_output)[0])
index_true = set(index_anomalies)
true_anomalies = index_detected.intersection(index_true)
false_anomalies = index_detected.difference(index_true)
return {
'TPR': len(true_anomalies)/(1.0*len(index_anomalies)),
'FPR': len(false_anomalies)/(1.0*len(detector_output))}


class Gaussian1D(AnomalyDetector):
def __init__(self,
variable_types=[np.dtype('float64'), np.dtype('int64'), np.dtype('int8')],
model_params={'mu': 0, 'ess': 0},
tuning_params={'ff': 0.9}):
# TODO: we must find a way to validate incoming dtypes
self.variable_types = variable_types
self.model_params = model_params
self.tuning_params = tuning_params
class Gaussian1D(BaseEstimator, AnomalyMixin):
def __init__(
self,
ff = 0.9,
threshold=0.99
):
self.ff = ff
self.threshold = threshold

def fit(self, x):
x = pd.Series(x)
self.mu_ = np.mean(x)
self.std_ = np.std(x, ddof=1)
self.ess_ = len(x)

def update(self, x): # allows mini-batch
assert isinstance(x, pd.Series)
self.model_params['ess'], weight = update_effective_sample_size(
effective_sample_size=self.model_params['ess'],
try:
getattr(self, "mu_")
except AttributeError:
raise RuntimeError("You must fit the detector before updating it")
x = pd.Series(x)
self.ess_, weight = update_effective_sample_size(
effective_sample_size=self.ess_,
batch_size=len(x),
forgetting_factor=self.tuning_params['ff']
forgetting_factor=self.ff
)
self.model_params['mu'] = convex_combination(
self.model_params['mu'],
self.mu_ = convex_combination(
self.mu_,
np.mean(x),
weight=1-(self.model_params['ess']-len(x))/self.model_params['ess']
weight=weight
)
self.std_ = np.std(x)

def score_anomaly(self, x):
x = pd.Series(x)
scaled_x = np.abs(x - self.mu_)/(1.0*self.std_)
return norm.cdf(scaled_x)

def train(self, x):
assert isinstance(x, pd.Series)
assert x.dtypes in self.variable_types
self.model_params = {'mu': np.mean(x), 'ess': len(x)}
def flag_anomaly(self, x):
return decision_rule(self.score_anomaly(x), self.threshold)

def score(self, x):
assert isinstance(x, pd.Series)
return np.abs(x - self.model_params['mu'])

class Percentile1D(BaseEstimator, AnomalyMixin):

class Quantile1D(AnomalyDetector):
def __init__(
self,
ff=1.0,
window_size = 300,
threshold=0.99
):
self.ff = ff
self.window_size = window_size
self.threshold = threshold

def __init__(self,
variable_types=[np.dtype('float64'),
np.dtype('int64'),
np.dtype('int8')],
model_params={'sample': [0]},
tuning_params={'ff': 1.0, 'w':100} # TODO: not supported yet on quantiles
):
# TODO: we must find a way to validate incoming dtypes
self.variable_types = variable_types
self.model_params = model_params
self.tuning_params = tuning_params
def fit(self, x):
x = pd.Series(x)
self.sample_ = x[:int(np.floor(self.window_size))]

def update(self, x): # allows mini-batch
assert isinstance(x, pd.Series)
self.model_params['sample'] = rolling_window_update(
old=self.model_params['sample'], new=x,
w=self.tuning_params['w']
x = pd.Series(x)
window = rolling_window_update(
old=self.sample_, new=x,
w=int(np.floor(self.window_size))
)
self.sample_ = window

def train(self, x):
assert isinstance(x, pd.Series)
assert x.dtypes in self.variable_types
self.model_params['sample'] = x[:self.tuning_params['w']]

def score(self, x):
assert isinstance(x, pd.Series)
scores = [0.01*percentileofscore(self.model_params['sample'], z) for z in x]
def score_anomaly(self, x):
x = pd.Series(x)
scores = pd.Series([0.01*percentileofscore(self.sample_, z) for z in x])
return scores

def flag_anomaly(self, x):
return decision_rule(self.score_anomaly(x), self.threshold)
@@ -30,6 +30,6 @@ def gen_data_with_obvious_anomalies(
pd.DataFrame(data=x, columns=['simulated_data']).to_csv(filename, index=False)
return None
else:
return x
return x, index_of_anomalies


@@ -10,14 +10,14 @@

from .exceptions import SensorsNotFoundError, TimefieldNotFoundError
from .exceptions import ModuleLoadError, DetectorNotFoundError
from .anomaly_detectors import AnomalyDetector
from .anomaly_detectors import AnomalyMixin


def parse_arguments():
""" Parse command line arguments """
parser = argparse.ArgumentParser()
parser.add_argument("--detector", help="Anomaly detector",
default="Quantile1D")
default="Gaussian1D")
parser.add_argument("--modules",
help="Python modules that define additional anomaly "
"detectors",
@@ -169,7 +169,7 @@ def load_detector(name, modules):
raise ModuleLoadError('Failed to load module %s. Exception: %r', (module, exc))

# Load selected anomaly detector
for detector in AnomalyDetector.__subclasses__():
for detector in AnomalyMixin.__subclasses__():
if detector.__name__.lower() == name.lower():
return detector

@@ -181,5 +181,5 @@ def init_detector_models(sensors, training_set, detector):
models = {}
for sensor in sensors:
models[sensor] = detector()
models[sensor].train(training_set[sensor])
models[sensor].fit(training_set[sensor])
return models
@@ -90,7 +90,7 @@ def threaded_restream_dataframe(dataframe, sensors, detector, timefield,
first_pass = True
for batch in batches:
for sensor in sensors: # Apply the scores
batch['SCORE_{}'.format(sensor)] = models[sensor].score(batch[sensor])
batch['SCORE_{}'.format(sensor)] = models[sensor].score_anomaly(batch[sensor])

end_time = np.min(batch[timefield])
recreate_index = first_pass
@@ -1,11 +1,14 @@
"""
This is the "convex_combination" module.
This is the "update_formulae" module.
It performs a number of weighted updates that are needed for streaming learning, e.g.,
>>> convex_combination(10,20,0.3)
13.0
It also hosts certain decision formulae.
"""

import numpy as np


@@ -58,3 +61,26 @@ def rolling_window_update(old, new, w=100):
out = out[(len(out)-w):]
return out


def decision_rule(score, threshold=0.99, two_sided=True):
"""
:param score: a score, assumed normalised (between 0 and 1) representing anomalousness
:param threshold: a user-specified threshold above which an alert should be raised
:param two_sided: if True, we flag anomalies that are either smaller than 1-threshold or larger than threhsold
:return: a boolean flag
>>> decision_rule(score=0.9)
False
>>> decision_rule(score=0.95, threshold=0.9)
True
>>> decision_rule(score=0.0001, threshold=0.99)
True
>>> decision_rule(score=0.001, two_sided=False)
False
"""
if two_sided:
ans = np.logical_or(score < 1-threshold, score > threshold)
else:
ans = score > threshold
return ans
@@ -1,11 +1,11 @@
import numpy as np
from sklearn.base import BaseEstimator

from dsio.anomaly_detectors import AnomalyDetector
from dsio.anomaly_detectors import AnomalyMixin


class Percentile(AnomalyDetector):
def __init__(self, threshold=0.99):
self.threshold = threshold
class Greater_Than_Max_Rolling(BaseEstimator, AnomalyMixin):
def __init__(self, ):

def detect(self, x):
score = self.score(x)
@@ -0,0 +1,20 @@
from dsio.anomaly_detectors import Gaussian1D, Percentile1D, compute_confusion_matrix
from dsio.generate_data import gen_data_with_obvious_anomalies

x, index_anomalies = gen_data_with_obvious_anomalies(n=1000, anomalies=50)

detector1 = Gaussian1D()
detector1.fit(x[:50])
detector_output1 = detector1.flag_anomaly(x)
print(compute_confusion_matrix(detector_output1, index_anomalies))
detector1.update(x[101:])
detector_output1 = detector1.flag_anomaly(x)
print(compute_confusion_matrix(detector_output1, index_anomalies))

detector2 = Percentile1D()
detector2.fit(x[:50])
detector_output2 = detector2.flag_anomaly(x)
print(compute_confusion_matrix(detector_output2, index_anomalies))
detector2.update(x[101:])
detector_output2 = detector2.flag_anomaly(x)
print(compute_confusion_matrix(detector_output2, index_anomalies))
@@ -5,3 +5,4 @@ numpy
scipy
pandas
kibana-dashboard-api
scikit-learn

0 comments on commit 789e413

Please sign in to comment.
You can’t perform that action at this time.