In [3]:
import sys
import os
notebook_dir = os.getcwd()
sys.path.append(os.path.abspath(os.path.join(notebook_dir, '..')))

import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import classification_report, f1_score
from isolated_ad_model.processing import PREPROCESS
from a2pm import A2PMethod

In [4]:
class AdversarialAttack:
    def __init__(self):
        self.dataset = pd.read_csv('../isolated_ad_model/ue.csv')
        self.cols = self.dataset.columns
        self.true_anomalies = self.dataset['Viavi.UE.anomalies']

    @staticmethod
    def a2pm_attack(pattern, training_data, model):
        a2pm_method = A2PMethod(pattern)
        a2pm_method.fit(training_data.values)
        
        raw_adv_training_data = a2pm_method.generate(model, training_data.values)

        return pd.DataFrame(raw_adv_training_data, columns=training_data.columns)
    
    @staticmethod
    def predict(model, input):
        pred = model.predict(input.values)

        return [1 if p == -1 else 0 for p in pred]
    
    @staticmethod
    def get_scorer(true_labels):
        def scorer(estimator, X):
            pred = estimator.predict(X)
            pred = [1 if p == -1 else 0 for p in pred]
            return f1_score(true_labels, pred)
        return scorer
        
    def get_iso_forest(self, training_data):
        random_state = 4
        parameter = {'contamination': [of for of in np.arange(0.01, 0.5, 0.02)],
                     'n_estimators': [100*(i+1) for i in range(1, 10)],
                     'max_samples': [0.005, 0.01, 0.1, 0.15, 0.2, 0.3, 0.4]}
        cv = [(slice(None), slice(None))]
        scorer = self.get_scorer(self.true_anomalies)
        iso = IsolationForest(random_state=random_state, bootstrap=True, warm_start=False)
        model = RandomizedSearchCV(iso, parameter, scoring=scorer, cv=cv, n_iter=50)
        md = model.fit(training_data.values)
        return md.best_estimator_

    @staticmethod
    def print_metrics(pred):
        print(f"Total number of inlier = {sum([p==0 for p in pred])}")
        print(f"Total number of outlier = {sum([p==1 for p in pred])}")


    def train_model(self):
        ps = PREPROCESS(self.dataset)  # TODO: Is it possible to get rid of src/scale dependency? 
        ps.process()
        self.training_data = ps.data

        # TODO: Original ADS does not use cross-validation. 

        # iso = IsolationForest(n_estimators=100, contamination=0.25, max_features=1.0, random_state=42) # TODO: Import original ADS configuration as a baseline
        # model = iso.fit(training_data)

        self.model = self.get_iso_forest(self.training_data) 
        

    def attack(self):
        
        assert self.model != None, "Model needs to be trained first to test attack"

        pred = self.predict(self.model, self.training_data)
        f1_normal = f1_score(self.true_anomalies, pred)
        self.print_metrics(pred)

        # TODO: Figure out patterns - how to optimize them
        # pattern = (

        #         {
        #             "type": "interval",
        #             "features": None,
        #             "ratio": 0.1,
        #             "probability": 0.6,
        #             "momentum": 0.99
        #         },
        #     )

        pattern = (

                # First pattern to be applied: Interval
                {
                    "type": "interval",
                    "features": None,
                    "integer_features": None,
                    "ratio": 0.1,
                    "max_ratio": 0.3,
                    "missing_value": 0.0,
                    "probability": 0.6,
                },

                # # Second pattern to be applied: Combination
                # {
                #     "type": "combination",
                #     "features": None,
                #     "locked_features": None,
                #     "probability": 0.4,
                # },
            )
            
        adv_training_data = self.a2pm_attack(pattern, self.training_data, self.model)

        adv_pred = self.predict(self.model, adv_training_data)

        f1_adversarial = f1_score(self.true_anomalies, adv_pred)

        print("Regular f1_score ", f1_normal, ' vs. adversarial ', f1_adversarial, ' difference ', f1_normal- f1_adversarial)

        self.print_metrics(adv_pred)

        
   


In [5]:
attack = AdversarialAttack()
attack.train_model()

In [6]:
attack.attack()

Total number of inlier = 5900
Total number of outlier = 4100
Regular f1_score  0.687762447510498  vs. adversarial  0.39124134599903393  difference  0.29652110151146405
Total number of inlier = 146
Total number of outlier = 9854
