In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.metrics import roc_auc_score
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import roc_curve
from sklearn.metrics import auc
from sklearn.metrics import confusion_matrix
#from imblearn.under_sampling import RandomUnderSampler
from sklearn.preprocessing import StandardScaler
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.feature_selection import SelectFromModel
from sklearn.decomposition import PCA
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer, average_precision_score
#ignore warnings
import warnings
warnings.filterwarnings('ignore')

In [None]:
data, is_anomaly = generate_data(
    n_features=6,
    train_only=True,
    random_state=1234
    )


data = pd.DataFrame(data, columns = ['x1','x2','x3','x4','x5', 'x6'])
data['class'] = is_anomaly

In [None]:
data.tail()

In [None]:
plt.rcParams["figure.figsize"] = (5,3)
np.random.seed(1234)
sns.scatterplot(
    data = data, 
    x = 'x1', 
    y = 'x2', 
    hue = 'class'
)

plt.title("Generated Random Data")
plt.show()

In [None]:
#train isolation forest
clf = IsolationForest(max_samples=100, random_state=1234)
clf.fit(data.drop('class', axis=1))
predictions=clf.predict(data.drop('class', axis=1))
anomaly_score = clf.decision_function(data.drop('class', axis=1))
#map predictions 1 to 0, -1 to 1
predictions[predictions == 1] = 0
predictions[predictions == -1] = 1

#add the predictions as a column to data
data['predicted_class'] = predictions
data['anomaly_score'] = anomaly_score

In [None]:
sns.scatterplot(
    data = data, 
    x = 'x1', 
    y = 'x2', 
    hue = 'class',
    style = 'predicted_class'
)

In [None]:
#plot boxplot for anomaly score
sns.boxplot(
    data = data,
    x = 'predicted_class',
    y = 'anomaly_score'
)

In [None]:
def isolation_Forest(parameters):
    model=IsolationForest(**parameters)
    model.fit(data.drop('class', axis=1))
    predictions=model.predict(data.drop('class', axis=1))
    anomaly_score = model.decision_function(data.drop('class', axis=1))
    #map predictions 1 to 0, -1 to 1
    predictions[predictions == 1] = 0
    predictions[predictions == -1] = 1

    #add the predictions as a column to data
    data['predicted_class'] = predictions
    data['anomaly_score'] = anomaly_score

    return predictions, anomaly_score

In [None]:
#set a parameter grid for the isolation forest
param_grid = {
    'n_estimators': [50, 100, 200, 500],
    'max_samples': [50, 100, 200, 500],
    'contamination': [0.01, 0.05, 0.1, 0.2]
}

#set a list to store the results
results = []

#loop through the parameter grid

for n_estimators in param_grid['n_estimators']:

    for max_samples in param_grid['max_samples']:

        for contamination in param_grid['contamination']:

            parameters = {
                'n_estimators': n_estimators,
                'max_samples': max_samples,
                'contamination': contamination,
                'random_state': 42
            }

            predictions, anomaly_score = isolation_Forest(parameters)

            results.append({
                'n_estimators': n_estimators,
                'max_samples': max_samples,
                'contamination': contamination,
                'precision_recall_curve': precision_recall_curve(data['class'], predictions),
                'f1_score': f1_score(data['class'], predictions),
            })

In [None]:
#convert the results to a pandas dataframe
results = pd.DataFrame(results)

#find the best parameters
best_parameters = results.loc[results['f1_score'].idxmax()]

print(best_parameters)


In [None]:
#plot the results
plt.figure(figsize=(10,5))
plt.plot(results['f1_score'])
plt.xlabel('Parameter Set')
plt.ylabel('F1 Score')
plt.title('F1 Score for Different Parameter Sets')
plt.show()

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin

class HashingTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, n_features_per_category=20, input_type="categorical"):
        self.n_features_per_category = n_features_per_category
        self.input_type = input_type
        self.encoders = []  # List to store encoders for remaining features

    def fit(self, X, y=None):
        self.encoders.append(OneHotEncoder(sparse=False))  # Encoder for protocol_type (OHE)
        self.encoders[0].fit(X[:, 0:1])  # Fit the encoder on the first column (protocol_type)

        # Calculate total features needed for remaining features (excluding protocol_type)
        remaining_features = X.shape[1] - 1
        total_n_features = remaining_features * self.n_features_per_category

        # Create and fit hashers for remaining features with adjusted n_features
        for i in range(1, remaining_features):
            self.encoders.append(FeatureHasher(n_features=total_n_features // remaining_features,
                                                input_type=input_type))
            self.encoders[i].fit(X[:, i:i+1])  # Fit each hasher on its corresponding column

        return self

    def transform(self, X, y=None):
        protocol_encoded = self.encoders[0].transform(X[:, 0:1])
        other_features_hashed = np.concatenate([encoder.transform(X[:, i:i+1])
                                                for i, encoder in enumerate(self.encoders[1:])], axis=1)
        return np.concatenate((protocol_encoded, other_features_hashed), axis=1)
