## Combined models
1) rf-benign-specialize
2) rf-general
3) rf-lexical

In [116]:
import os
import pickle

from collections import defaultdict
import numpy as np
import pandas as pd 
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, precision_recall_fscore_support
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MaxAbsScaler

In [117]:
def classification_type(type):
    '''
    Convert classification type into values:
        1) Benign = 0
        2) Defacement = 1
        3) Phishing = 2
        4) Malware = 3    
    '''
    if type == "benign":
        return 0
    elif type == "defacement":
        return 1
    elif type == "phishing":
        return 2
    elif type == "malware":
        return 3
    else:
        print(f"Unable to find proper type: {type}")


def calc_FNR_accuracy(y_true, y_pred):
    conf_matrix = confusion_matrix(y_true, y_pred)
    for label_class in range(4):
        FN = sum(conf_matrix[label_class][i] for i in range(len(conf_matrix)) if i != label_class)  
        
        TP = conf_matrix[label_class][label_class]  
        
        TN = np.sum(np.delete(np.delete(conf_matrix, label_class, axis=0), label_class, axis=1))
        
        accuracy = (TP + TN) / np.sum(conf_matrix)
        print("Accuracy for class", label_class, ":", accuracy)

        FNR = FN / (FN + TP) if (FN + TP) > 0 else -1
        print("FNR for class", label_class, ":", FNR)

In [118]:
# Load models
model_files = [file for file in os.listdir(os.getcwd()) if file.endswith(".pkl")]
models = []
for file in model_files:
    with open(file, "rb") as file:
        models.append((file.name, pickle.load(file)))

In [119]:
# Data for training ensemble model
training_dataset = {'rf-benign-specialize.pkl': 'rf-benign-specialize-features.csv',
                    'rf-general.pkl': 'rf-general-features.csv',
                    'rf-lexical.pkl': 'rf-lexical-features.csv'}

df = pd.read_csv('../datasets/malicious_phish.csv')
y = df.iloc[:, 1]
y = y.apply(classification_type)
y = y.values.ravel()

In [120]:
testing_ensemble_X = {} # test size 0.2 values <model_name: (X_test, scaler)>
testing_ensemble_y = None # test size 0.2 values
scalers = {} # stores scaler models <model_name: scaler>

# Prepare predictions from models
for i in range(len(models)):
    model_name, model = models[i]
    data = pd.read_csv(training_dataset[model_name], header=None, skiprows=1)
    X_train, X_test, _, y_test = train_test_split(data, y, test_size=0.2, random_state=69)
    
    scaler = MaxAbsScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    if testing_ensemble_y is None:
        testing_ensemble_y = y_test

    # input_ensemble[:, i] = y_pred
    testing_ensemble_X[model_name] = X_test
    scalers[model_name] = scaler

In [121]:
# Attached to endpoint prediction of above models
# priority = [('rf-general.pkl', 3), ('rf-lexical.pkl', 3), ('rf-lexical.pkl', 1),
#             ('rf-benign-specialize.pkl', 3), ('rf-benign-specialize.pkl', 0), ('rf-general.pkl', 1),
#             ('rf-general.pkl', 2), ('rf-benign-specialize.pkl', 1), ('rf-lexical.pkl', 0),
#             ('rf-lexical.pkl', 2), ('rf-benign-specialize.pkl', 2), ('rf-general.pkl', 0)]

priority = [('rf-general.pkl', 3), ('rf-general.pkl', 2), ('rf-general.pkl', 1), ('rf-benign-specialize.pkl', 0), ('rf-lexical.pkl', 0), ('rf-general.pkl', 0)]
# 0.94, 0.90, 0.67

### Test on 0.2 validation split initial dataset

In [122]:
y_ensemble_pred = None # holds predictions of all models
model_col = {} # Takes note of which column the model predictions are in <index, model_name>

for i in range(len(models)):
    model_name, model = models[i]
    X_test = testing_ensemble_X[model_name] # Already scaled
    
    y_pred = model.predict(X_test)
    if y_ensemble_pred is None:
        y_ensemble_pred = np.empty((y_pred.shape[0], 3))

    y_ensemble_pred[:, i] = y_pred
    model_col[i] = model_name

# Look at priority list
pred_combined_y = np.empty(shape=testing_ensemble_y.shape)

row = 0
for entry in y_ensemble_pred:
    # Match to model
    statements = defaultdict(int)

    for i, prediction in enumerate(entry):
        statements[(model_col[i], prediction)] = 1
    for p in priority:
        if statements[p] == 1:
            # Take the choice
            choice_for_entry = p[1]
            pred_combined_y[row] = choice_for_entry
            break
    row += 1




In [123]:
precision = precision_score(testing_ensemble_y, pred_combined_y, average='weighted')
recall = recall_score(testing_ensemble_y, pred_combined_y, average='weighted')
f1 = f1_score(testing_ensemble_y, pred_combined_y, average='weighted')
val_accuracy = accuracy_score(testing_ensemble_y, pred_combined_y)
print(f'OVERALL: Accuracy: {val_accuracy:.8f}, Precision: {precision:.8f}, Recall: {recall:.8f}, F1 Score: {f1:.8f}')

class_test_precision, class_test_recall, class_test_f1, class_ = precision_recall_fscore_support(testing_ensemble_y, pred_combined_y)
for i in range(4):
    print(f'Class {i}:\tTest Precision: {class_test_precision[i]:.8f},\tTest Recall: {class_test_recall[i]:.8f},\tTest f1: {class_test_f1[i]:.8f}')
calc_FNR_accuracy(testing_ensemble_y, pred_combined_y)

OVERALL: Accuracy: 0.83582491, Precision: 0.83410359, Recall: 0.83582491, F1 Score: 0.82365547
Class 0:	Test Precision: 0.83218979,	Test Recall: 0.95302977,	Test f1: 0.88852000
Class 1:	Test Precision: 0.83694664,	Test Recall: 0.66891471,	Test f1: 0.74355573
Class 2:	Test Precision: 0.79875479,	Test Recall: 0.44548320,	Test f1: 0.57196749
Class 3:	Test Precision: 0.95099905,	Test Recall: 0.90946315,	Test f1: 0.92976744
Accuracy for class 0 : 0.8427276007954606
FNR for class 0 : 0.04697022767075307
Accuracy for class 1 : 0.9317101636222637
FNR for class 1 : 0.3310852874040257
Accuracy for class 2 : 0.9041684902371794
FNR for class 2 : 0.5545168011111705
Accuracy for class 3 : 0.9930435583811301
FNR for class 3 : 0.0905368516833485


### Test on phishing dataset

In [124]:
training_dataset_phishing = {'rf-benign-specialize.pkl': 'rf-benign-specialize-test-phishing.csv',
                    'rf-general.pkl': 'rf-general-test-phishing.csv',
                    'rf-lexical.pkl': 'rf-lexical-test-phishing.csv'}

input_ensemble_phishing = None

# Prepare predictions from models
for i in range(len(models)):
    model_name, model = models[i]
    X_test = pd.read_csv(training_dataset_phishing[model_name], header=None, skiprows=1)
    X_test = scalers[model_name].transform(X_test)

    y_pred = model.predict(X_test) # prediction for model
    if input_ensemble_phishing is None:
        input_ensemble_phishing = np.empty((y_pred.shape[0], 3))

    input_ensemble_phishing[:, i] = y_pred



In [125]:
# Phishing is label 2.
y_phishing = np.full((input_ensemble_phishing.shape[0],), 2)
pred_combined_y_phishing = np.empty(shape=y_phishing.shape)
row = 0
for entry in input_ensemble_phishing:
    # Match to model
    statements = defaultdict(int)

    for i, prediction in enumerate(entry):
        statements[(model_col[i], prediction)] = 1
    for p in priority:
        if statements[p] == 1:
            # Take the choice
            choice_for_entry = p[1]
            pred_combined_y_phishing[row] = choice_for_entry
            break
    row += 1

In [126]:
precision = precision_score(y_phishing, pred_combined_y_phishing, average='weighted')
recall = recall_score(y_phishing, pred_combined_y_phishing, average='weighted')
f1 = f1_score(y_phishing, pred_combined_y_phishing, average='weighted')
val_accuracy = accuracy_score(y_phishing, pred_combined_y_phishing)
print(f'OVERALL: Accuracy: {val_accuracy:.8f}, Precision: {precision:.8f}, Recall: {recall:.8f}, F1 Score: {f1:.8f}')

class_test_precision, class_test_recall, class_test_f1, class_ = precision_recall_fscore_support(y_phishing, pred_combined_y_phishing)
for i in range(4):
    print(f'Class {i}:\tTest Precision: {class_test_precision[i]:.8f},\tTest Recall: {class_test_recall[i]:.8f},\tTest f1: {class_test_f1[i]:.8f}')
calc_FNR_accuracy(y_phishing, pred_combined_y_phishing)

OVERALL: Accuracy: 0.56006882, Precision: 1.00000000, Recall: 0.56006882, F1 Score: 0.71800528
Class 0:	Test Precision: 0.00000000,	Test Recall: 0.00000000,	Test f1: 0.00000000
Class 1:	Test Precision: 0.00000000,	Test Recall: 0.00000000,	Test f1: 0.00000000
Class 2:	Test Precision: 1.00000000,	Test Recall: 0.56006882,	Test f1: 0.71800528
Class 3:	Test Precision: 0.00000000,	Test Recall: 0.00000000,	Test f1: 0.00000000
Accuracy for class 0 : 0.7058803684086232
FNR for class 0 : -1
Accuracy for class 1 : 0.8653554198576296
FNR for class 1 : -1
Accuracy for class 2 : 0.5600688235889477
FNR for class 2 : 0.4399311764110523
Accuracy for class 3 : 0.988833035322695
FNR for class 3 : -1


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


### Test on benign dataset

In [127]:
training_dataset_benign = {'rf-benign-specialize.pkl': 'rf-benign-specialize-test-benign.csv',
                    'rf-general.pkl': 'rf-general-test-benign.csv',
                    'rf-lexical.pkl': 'rf-lexical-test-benign.csv'}

input_ensemble_benign = None

# Prepare predictions from models
for i in range(len(models)):
    model_name, model = models[i]
    X_test = pd.read_csv(training_dataset_benign[model_name], header=None, skiprows=1)
    X_test = scalers[model_name].transform(X_test)

    y_pred = model.predict(X_test) # prediction for model
    if input_ensemble_benign is None:
        input_ensemble_benign = np.empty((y_pred.shape[0], 3))

    input_ensemble_benign[:, i] = y_pred



In [128]:
# Benign is label 0.
y_benign = np.full((input_ensemble_benign.shape[0],), 2)
pred_combined_y_benign = np.empty(shape=y_benign.shape)
row = 0
for entry in input_ensemble_benign:
    # Match to model
    statements = defaultdict(int)

    for i, prediction in enumerate(entry):
        statements[(model_col[i], prediction)] = 1
    for p in priority:
        if statements[p] == 1:
            # Take the choice
            choice_for_entry = p[1]
            pred_combined_y_benign[row] = choice_for_entry
            break
    row += 1

In [129]:
precision = precision_score(y_benign, pred_combined_y_benign, average='weighted')
recall = recall_score(y_benign, pred_combined_y_benign, average='weighted')
f1 = f1_score(y_benign, pred_combined_y_benign, average='weighted')
val_accuracy = accuracy_score(y_benign, pred_combined_y_benign)
print(f'OVERALL: Accuracy: {val_accuracy:.8f}, Precision: {precision:.8f}, Recall: {recall:.8f}, F1 Score: {f1:.8f}')

class_test_precision, class_test_recall, class_test_f1, class_ = precision_recall_fscore_support(y_benign, pred_combined_y_benign)
for i in range(4):
    print(f'Class {i}:\tTest Precision: {class_test_precision[i]:.8f},\tTest Recall: {class_test_recall[i]:.8f},\tTest f1: {class_test_f1[i]:.8f}')
calc_FNR_accuracy(y_benign, pred_combined_y_benign)

OVERALL: Accuracy: 0.09557237, Precision: 1.00000000, Recall: 0.09557237, F1 Score: 0.17447021
Class 0:	Test Precision: 0.00000000,	Test Recall: 0.00000000,	Test f1: 0.00000000
Class 1:	Test Precision: 0.00000000,	Test Recall: 0.00000000,	Test f1: 0.00000000
Class 2:	Test Precision: 1.00000000,	Test Recall: 0.09557237,	Test f1: 0.17447021
Class 3:	Test Precision: 0.00000000,	Test Recall: 0.00000000,	Test f1: 0.00000000
Accuracy for class 0 : 0.18938618260069764
FNR for class 0 : -1
Accuracy for class 1 : 0.9141257252601681
FNR for class 1 : -1
Accuracy for class 2 : 0.09557236982917701
FNR for class 2 : 0.904427630170823
Accuracy for class 3 : 0.9920604619683112
FNR for class 3 : -1


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
