In [None]:
from paho.mqtt import client as mqtt_client
from datetime import datetime
import csv
import os
import json
from adtk.detector import SeasonalAD
from adtk.data import validate_series
import pandas as pd
from adtk.detector import LevelShiftAD
from adtk.detector import QuantileAD
from sklearn.neighbors import LocalOutlierFactor
from adtk.detector import OutlierDetector
import warnings
warnings.filterwarnings("ignore", message="Setting an item of incompatible dtype")

In [5]:
class AnomalyDetection:

    def __init__(self, detector_type):
        self.detector_type = detector_type
        self.prediction = 0
        if self.detector_type == 'LevelShift':
            self.score = 0.5
        elif self.detector_type == 'Quantile':
            self.score = 0.4
        elif self.detector_type == 'OutlierDetection':
            self.score = 0.4

    def predict(self, csv_file):
        if self.detector_type == 'LevelShift':
            anomalies_x_dates_ls, anomalies_y_dates_ls, anomalies_z_dates_ls = getAnomalysLevelShift(csv_file)
            check_x_ls = check_anomaly_in_rows(anomalies_x_dates_ls, csv_file, 6)
            check_y_ls = check_anomaly_in_rows(anomalies_y_dates_ls, csv_file, 6)
            check_z_ls = check_anomaly_in_rows(anomalies_z_dates_ls, csv_file, 6)
            if at_least_two_true(check_x_ls, check_y_ls, check_z_ls):
                self.prediction = 1
            else:
                self.prediction = 0
                
        elif self.detector_type == 'Quantile':
            anomalies_x_dates_q, anomalies_y_dates_q, anomalies_z_dates_q = getAnomalyQuantile(csv_file)
            check_x_q = check_anomaly_in_rows(anomalies_x_dates_q, csv_file, 6)
            check_y_q = check_anomaly_in_rows(anomalies_y_dates_q, csv_file, 6)
            check_z_q = check_anomaly_in_rows(anomalies_z_dates_q, csv_file, 6)
            if at_least_two_true(check_x_q, check_y_q, check_z_q):
                self.prediction = 1
            else:
                self.prediction = 0
                
        elif self.detector_type == 'OutlierDetection':
            anomalies_x_dates_od, anomalies_y_dates_od, anomalies_z_dates_od = getOutlierDetection(csv_file)
            check_x_od = check_anomaly_in_rows(anomalies_x_dates_od, csv_file, 6)
            check_y_od = check_anomaly_in_rows(anomalies_y_dates_od, csv_file, 6)
            check_z_od = check_anomaly_in_rows(anomalies_z_dates_od, csv_file, 6)
            if at_least_two_true(check_x_od, check_y_od, check_z_od):
                self.prediction = 1
            else:
                self.prediction = 0

        return self.prediction

    def get_score(self):
        return self.score

    def getAnomalysLevelShift(csv_file):
         
        sleeping_data = pd.read_csv(csv_file)
            
        sleeping_data['date'] = pd.to_datetime(sleeping_data['date'])

        sleeping_data.set_index('date', inplace=True)
    
        # get each axis data
        x_axis_data = sleeping_data["x"]
        y_axis_data = sleeping_data["y"]
        z_axis_data = sleeping_data["z"]
        
        # validating series
        x_train = validate_series(x_axis_data)
        y_train = validate_series(y_axis_data)
        z_train = validate_series(z_axis_data)
    
        # Create and fit the LevelShiftAD detector for x_train
        level_shift_ad_x = LevelShiftAD(c=6.0, side='both', window=3)
        anomalies_x = level_shift_ad_x.fit_detect(x_train)
        anomalies_x_dates = getanomalyList(anomalies_x) # list of anomalies
        
        # Create and fit the LevelShiftAD detector for y_train
        level_shift_ad_y = LevelShiftAD(c=6.0, side='both', window=3)
        anomalies_y = level_shift_ad_y.fit_detect(y_train)
        anomalies_y_dates = getanomalyList(anomalies_y) # list of anomalies
        
        # Create and fit the LevelShiftAD detector for z_train
        level_shift_ad_z = LevelShiftAD(c=6.0, side='both', window=3)
        anomalies_z = level_shift_ad_z.fit_detect(z_train)
        anomalies_z_dates = getanomalyList(anomalies_z) # list of anomalies
        
        return anomalies_x_dates, anomalies_y_dates, anomalies_z_dates

    def getAnomalyQuantile(csv_file):
        
        sleeping_data = pd.read_csv(csv_file)
    
        # Convert 'date' column to datetime
        sleeping_data['date'] = pd.to_datetime(sleeping_data['date'])
    
        # Set 'date' column as the index
        sleeping_data.set_index('date', inplace=True)
    
        # get each axis data
        x_axis_data = sleeping_data["x"]
        y_axis_data = sleeping_data["y"]
        z_axis_data = sleeping_data["z"]
        
        # validating series
        x_train = validate_series(x_axis_data)
        y_train = validate_series(y_axis_data)
        z_train = validate_series(z_axis_data)
    
        # Create and fit the QuantileAD detector for x_train
        quantile_ad_x = QuantileAD(high=0.99, low=0.02)
        anomalies_x = quantile_ad_x.fit_detect(x_train)
        anomalies_x_dates = getanomalyList(anomalies_x) # list of anomalies
    
        # Create and fit the QuantileAD detector for y_train
        quantile_ad_y = QuantileAD(high=0.99, low=0.02)
        anomalies_y = quantile_ad_y.fit_detect(y_train)
        anomalies_y_dates = getanomalyList(anomalies_y) # list of anomalies
    
    
        # Create and fit the QuantileAD detector for z_train
        quantile_ad_z = QuantileAD(high=0.99, low=0.02)
        anomalies_z = quantile_ad_z.fit_detect(z_train)
        anomalies_z_dates = getanomalyList(anomalies_z) # list of anomalies
    
        return anomalies_x_dates, anomalies_y_dates, anomalies_z_dates

    def getOutlierDetection(csv_file):
            
        sleeping_data = pd.read_csv(csv_file)
    
        # Convert 'date' column to datetime
        sleeping_data['date'] = pd.to_datetime(sleeping_data['date'])
    
        # Set 'date' column as the index
        sleeping_data.set_index('date', inplace=True)
    
        # get each axis data
        x_axis_data = sleeping_data["x"]
        y_axis_data = sleeping_data["y"]
        z_axis_data = sleeping_data["z"]
        
        # validating series
        x_train = validate_series(x_axis_data)
        y_train = validate_series(y_axis_data)
        z_train = validate_series(z_axis_data)
    
        x_train_df = pd.DataFrame(x_train)
        y_train_df = pd.DataFrame(y_train)
        z_train_df = pd.DataFrame(z_train)
        
        outlier_detector_x = OutlierDetector(LocalOutlierFactor(contamination=0.02))
        anomalies_x = outlier_detector_x.fit_detect(x_train_df)
        anomalies_x_dates = getanomalyList(anomalies_x) # list of anomalies
        
        outlier_detector_y = OutlierDetector(LocalOutlierFactor(contamination=0.02))
        anomalies_y = outlier_detector_y.fit_detect(y_train_df)
        anomalies_y_dates = getanomalyList(anomalies_y) # list of anomalies
       
        
        outlier_detector_z = OutlierDetector(LocalOutlierFactor(contamination=0.02))
        anomalies_z = outlier_detector_z.fit_detect(z_train_df)
        anomalies_z_dates = getanomalyList(anomalies_z) # list of anomalies
    
        return anomalies_x_dates, anomalies_y_dates, anomalies_z_dates

    def check_anomaly_in_rows(anomaly_timestamps,csv_file, rows):
    
        # Check if the list is empty
        if not anomaly_timestamps:
            return False
            
        df = pd.read_csv(csv_file)
    
        df['date'] = pd.to_datetime(df['date'])
    
        # Set 'date' column as the index
        df.set_index('date', inplace=True)
    
        # Get the last 6 rows of the data
        last_rows = df.tail(rows)
    
        # Check if any of the anomaly timestamps exist within the last 6 seconds of the record
        for anomaly_timestamp in anomaly_timestamps:
            if anomaly_timestamp in last_rows.index:
                return True
                
        return False
        
    
