In [None]:
# imports and data preprocessing

#imports
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from prettytable import PrettyTable
from sklearn.preprocessing import MinMaxScaler
from matplotlib import pyplot
from sklearn.ensemble import RandomForestClassifier
from IPython.display import display
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV
import tensorflow as tf
from tensorflow import keras
from keras import activations
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import recall_score
from sklearn.metrics import precision_score
from sklearn.metrics import f1_score
import shap

# most preprocessing has been done for you but we still need to scale this data
df = pd.read_csv("numeric.csv")

# scale data with min max
scaler = MinMaxScaler()
df[df.columns] = scaler.fit_transform(df) #.values

# drop truth label from training set, define training and testing sets
X = df.drop('label', axis=1)  
y = df['label']

# define 70/30 ratio for train/test across the lab
ratio=0.3

# generate train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=ratio, random_state=2023, stratify=y)
y_train = np.array(y_train)


# API calls (features) obtained for graphics
features = X_test.columns

print("This dataset consists of 5816 samples from the EMBER 2018 dataset")
print("converted to tabular for ease of use. 2930 of these are malicious,")
print("and 2886 are benign. The 17 features are listed below.\n\nFeatures:\n")
for feature in features:
    print(feature)

In [None]:
# define a keras neural net. This will be our primary model

batchnum=100
epochnum=100

# define a keras model we can instantiate from this function later
def nn_model():
    Maldetector = Sequential()
    # input layer
    Maldetector.add(keras.Input(shape=(17,))) # explicitly define input layer rather than implicit in 1st layer
    # hidden layers
    Maldetector.add(layers.Dense(128, activation='relu'))
    Maldetector.add(layers.Dense(64, activation='relu')) 
    Maldetector.add(layers.Dense(32, activation='relu'))
    Maldetector.add(layers.Dense(32, activation='relu')) # fully connected layer
    Maldetector.add(layers.Dense(32, activation='relu')) # fully connected layer
    Maldetector.add(layers.Dense(16, activation='relu'))
    # output layer
    Maldetector.add(layers.Dense(1, activation='sigmoid')) # 1 layer sigmoid for binary classification task
    Maldetector.compile(loss='binary_crossentropy', optimizer='nadam', metrics=['accuracy', 
                                                                               tf.keras.metrics.Precision(), 
                                                                               tf.keras.metrics.Recall()])
    return Maldetector

# instantiate NN, then fit to training data
Maldetector = nn_model()
Maldetector.fit(X_train, y_train, epochs=epochnum, batch_size=batchnum, verbose=0)

# make some predictions, gather NN metrics
mal_loss, mal_acc, mal_precision, mal_recall = Maldetector.evaluate(X_test, y_test, verbose=0)
mal_pred = Maldetector.predict(X_test)
mal_pred_binary = [1 if pred >= 0.5 else 0 for pred in mal_pred]

# calc F1, avoid div by zero
try:
    mal_f1 = 2 * ((mal_precision * mal_recall) / (mal_precision + mal_recall))
except:
    mal_f1 = 0.0
    print("F1 calc error")

# testing loop for NN model
acclist = []
i=0
while(i<10):
    Maldetector_loop = nn_model()
    Maldetector_loop.fit(X_train, y_train, epochs=epochnum, batch_size=batchnum, verbose=0)
    # make some predictions, gather NN metrics
    loss, acc, precision, recall = Maldetector_loop.evaluate(X_test, y_test, verbose=0)
    acclist.append(round((100 * acc), 2))
    i += 1

# tabulate results
NN_results = PrettyTable(["Testing Round", "Accuracy on Original"])
NN_results.add_row(["1", acclist[0]])
NN_results.add_row(["2", acclist[1]])
NN_results.add_row(["3", acclist[2]])
NN_results.add_row(["4", acclist[3]])
NN_results.add_row(["5", acclist[4]])
NN_results.add_row(["6", acclist[5]])
NN_results.add_row(["7", acclist[6]])
NN_results.add_row(["8", acclist[7]])
NN_results.add_row(["9", acclist[8]])
NN_results.add_row(["10", acclist[9]])

# results
# print Maldetector scores
print("Model Architecture")
Maldetector.summary()

print('\nKeras Neural Network')
print('Accuracy: %.3f' % mal_acc)
print('Precision: %.3f' % mal_precision)
print('Recall: %.3f' % mal_recall)
print('F1 score: %.3f' % mal_f1)

# show Maldetector confusion matrix
mal_cm = confusion_matrix(y_test, mal_pred_binary)
mal_disp = ConfusionMatrixDisplay(confusion_matrix=mal_cm)
mal_disp.plot()
plt.show()

# show loop performance
print("\n10 rounds of keras neural networks,\nmodels trained original data, tested on original data.")
print(NN_results)   

In [None]:
# define a second model, XGBoosted tree, for reference

# take the known best model from gridsearch
xgb_model = GradientBoostingClassifier(criterion='friedman_mse', loss='exponential', n_estimators=1000)
xgb_model.fit(X_train, y_train)

# make some predictions
xgb_pred = xgb_model.predict(X_test)

# xgb metrics
xgb_acc = accuracy_score(y_test, xgb_pred)
xgb_precision = precision_score(y_test, xgb_pred)
xgb_recall = recall_score(y_test, xgb_pred)
xgb_f1 = f1_score(y_test, xgb_pred)

# print xgb scores
print('\nXGBoost')
print('Accuracy: %.3f' % xgb_acc)
print('Precision: %.3f' % xgb_precision)
print('Recall: %.3f' % xgb_recall)
print('F1 score: %.3f' % xgb_f1)

# show XGB confusion matrix
xgb_cm = confusion_matrix(y_test, xgb_pred, labels=xgb_model.classes_)
xgb_disp = ConfusionMatrixDisplay(confusion_matrix=xgb_cm, display_labels=xgb_model.classes_)
xgb_disp.plot()
plt.show()

In [None]:
# define a third model, Random Forest Classifier ensemble, for reference

# create best model from known gridsearch result
rfc_model = RandomForestClassifier(criterion='entropy', n_estimators=400)
rfc_model.fit(X_train, y_train)

# make some predictions
rfc_pred = rfc_model.predict(X_test)

# RFC metrics
rfc_acc = accuracy_score(y_test, rfc_pred)
rfc_precision = precision_score(y_test, rfc_pred)
rfc_recall = recall_score(y_test, rfc_pred)
rfc_f1 = f1_score(y_test, rfc_pred)

# print RFC scores
print('\nRandom Forest Classifier')
print('Accuracy: %.3f' % rfc_acc)
print('Precision: %.3f' % rfc_precision)
print('Recall: %.3f' % rfc_recall)
print('F1 score: %.3f' % rfc_f1)

# show RFC confusion matrix
rfc_cm = confusion_matrix(y_test, rfc_pred, labels=rfc_model.classes_)
rfc_disp = ConfusionMatrixDisplay(confusion_matrix=rfc_cm, display_labels=rfc_model.classes_)
rfc_disp.plot()
plt.show()

In [None]:
# tabulate results of 3 models trained and tested on regular data

multimodel_results = PrettyTable(["Model", "Accuracy on Original Data"])
multimodel_results.add_row(["Keras Neural Network", round((100 * mal_acc), 2)])
multimodel_results.add_row(["XGBoost", round((100 * xgb_acc), 2)])
multimodel_results.add_row(["Random Forest Classifier", round((100 * rfc_acc), 2)])

# show table
print("\nA comparison table of models trained and tested on the original dataset")
print(multimodel_results)

In [None]:
# a simple poisoning attack is label flipping, where we flip the truth label of malicious (1) and benign (0)

def flip(row):
    if(row['label'] == 1):
        return 0
    elif(row['label'] == 0):
        return 1

# generate flipped dataset
flip_dataset = df.copy(deep=True)
flip_dataset['label'] = flip_dataset.apply(flip, axis=1)

# write flip dataset
flip_dataset.to_csv('flip.csv', index=False)

# drop truth label from training set, define training and testing sets
Xf = flip_dataset.drop('label', axis=1)  
yf = flip_dataset['label']

# train/test split. 70/30 ratio
X_trainf, X_testf, y_trainf, y_testf = train_test_split(Xf, yf, test_size=ratio, random_state = 2023, stratify=y)
y_trainf = np.array(y_trainf)

In [None]:
# make each of the models trained on regular data classify on flipped dataset. Notice the difference. 
# Although simple, this attack is highly effective

# NN predictions on flipped set
mal_lossf, mal_accf, mal_precisionf, mal_recallf = Maldetector.evaluate(X_testf, y_testf, verbose=0)
mal_accf = round((100 *mal_accf), 2)

# XGB predictions on flipped set
xgb_predf = xgb_model.predict(X_testf)
xgb_accf = round((100 * accuracy_score(y_testf, xgb_predf)), 2)

# RFC predictions on flipped set
rfc_predf = rfc_model.predict(X_testf)
rfc_accf = round((100 * accuracy_score(y_testf, rfc_predf)), 2)

# tabulate results of 3 models trained and tested on regular data + tested on flipped data
multimodelf_results = PrettyTable(["Model", "Accuracy on Original Data", "Accuracy on Flipped Data"])
multimodelf_results.add_row(["Keras Neural Network", round((100 * mal_acc), 2), mal_accf])
multimodelf_results.add_row(["XGBoost", round((100 * xgb_acc), 2), xgb_accf])
multimodelf_results.add_row(["Random Forest Classifier", round((100 * rfc_acc), 2), rfc_accf])

# show table
print("\nA comparison table of models trained on regular data and tested on the flipped dataset")
print(multimodelf_results)

In [None]:
# train one of each model on the flipped dataset, then classify on regular data.
# this represents a simple poisoning attack on the training set

# NN on flipped data
Maldetector_f = nn_model()
Maldetector_f.fit(X_trainf, y_trainf, epochs=epochnum, batch_size=batchnum, verbose=0) # train on flipped
mal_lossf, mal_accf, mal_precisionf, mal_recallf = Maldetector_f.evaluate(X_test, y_test, verbose=0) # predict on original

# XGB on flipped data
xgb_model_f = GradientBoostingClassifier(criterion='friedman_mse', loss='exponential', n_estimators=1000)
xgb_model_f.fit(X_trainf, y_trainf) # train on flipped
xgb_predf = xgb_model_f.predict(X_test) # predict on original
xgb_accf = accuracy_score(y_test, xgb_predf)

# RFC on flipped data
rfc_model_f = RandomForestClassifier(criterion='entropy', n_estimators=400)
rfc_model_f.fit(X_trainf, y_trainf) # train on flipped
rfc_predf = rfc_model_f.predict(X_test) # predict on original
rfc_accf = accuracy_score(y_test, rfc_predf)

# tabulate results of 3 models trained on flipped data, tested on regular data
flip_results = PrettyTable(["Model", "Accuracy on Original Data"])
flip_results.add_row(["Keras Neural Network", round((100 * mal_accf), 2)])
flip_results.add_row(["XGBoost", round((100 * xgb_accf), 2)])
flip_results.add_row(["Random Forest Classifier", round((100 * rfc_accf), 2)])

# show table
print("\nA comparison table of models trained on flipped data and tested on the original dataset.")
print("This is a simple demonstration of poisoning the training dataset, yet it is highly effective.")
print(flip_results)

In [None]:
# Next, we explore a more nuanced poisoning attack with additive perturbations targeting specific features, informed by SHAP.

# Keras Neural Net explanation using SHapely Additive exPlanations (SHAP). This is the original trained model.

# set up js for rendering SHAP graphics
shap.initjs()

# SHAP is a heavy calculation that takes time. This sample makes this job take less time, but renders the interpretation less
# accurate the smaller the sample gets.
sample = shap.sample(X_test, 100)

# generate explanation of the NN trained on the normal dataset using SHAP
mal_explainer = shap.KernelExplainer(Maldetector, sample)
mal_shap_values = mal_explainer.shap_values(sample)
shap.summary_plot(mal_shap_values, features, class_names=["Benign", "Malicious"], max_display=17, plot_type="bar", plot_size="auto", show=True)

# scroll to the bottom of this cell to view the rendered plot. Take note of the top 3 features the model bases its 
# predictions on. These will be the targets of our poisoning attack.

In [None]:
# we want to derive boundaries for our attack by statistically analyzing the dataset
# derive the mean value (average) for each class, separated by malicious and benign
# this way we can see the general differences between the malicious and benign classes

# calculate averages to use as bounds in perturbing the datset
# for each feature column in df, split dataframe into 2: 
# 1 malicious df and 1 benign df. These are copies of the scaled data

# drop benign, leaving malicious
malicious = df.copy(deep=True)
malicious.drop(malicious[malicious.label==0].index, inplace=True)
malicious.drop('label', axis=1, inplace=True) # truth label excluded. Its attack already demonstrated in label flipping
malicious.reset_index(drop=True, inplace=True)

# drop malicious, leaving benign
benign = df.copy(deep=True)
benign.drop(benign[benign.label==1].index, inplace=True)
benign.drop('label', axis=1, inplace=True)
benign.reset_index(drop=True, inplace=True)

# average of malicious for each column
malcol_avgs = malicious.mean()

# average of benign for each column
bencol_avgs = benign.mean()

# print all the resultant data in a pretty table
averages = PrettyTable(["Feature", "Benign Averages", "Malicious Averages"])
averages.add_row(["avlength", bencol_avgs.at['avlength'], malcol_avgs.at['avlength']])
averages.add_row(["coff.timestamp", bencol_avgs.at['coff.timestamp'], malcol_avgs.at['coff.timestamp']])
averages.add_row(["entropy", bencol_avgs.at['entropy'], malcol_avgs.at['entropy']])
averages.add_row(["exports_counts", bencol_avgs.at['exports_counts'], malcol_avgs.at['exports_counts']])
averages.add_row(["has_debug", bencol_avgs.at['has_debug'], malcol_avgs.at['has_debug']])
averages.add_row(["has_relocations", bencol_avgs.at['has_relocations'], malcol_avgs.at['has_relocations']])
averages.add_row(["has_resources", bencol_avgs.at['has_resources'], malcol_avgs.at['has_resources']])
averages.add_row(["has_signature", bencol_avgs.at['has_signature'], malcol_avgs.at['has_signature']])
averages.add_row(["imports_counts", bencol_avgs.at['imports_counts'], malcol_avgs.at['imports_counts']])
averages.add_row(["MZ", bencol_avgs.at['MZ'], malcol_avgs.at['MZ']])
averages.add_row(["numstrings", bencol_avgs.at['numstrings'], malcol_avgs.at['numstrings']])
averages.add_row(["paths", bencol_avgs.at['paths'], malcol_avgs.at['paths']])
averages.add_row(["printables", bencol_avgs.at['printables'], malcol_avgs.at['printables']])
averages.add_row(["registry", bencol_avgs.at['registry'], malcol_avgs.at['registry']])
averages.add_row(["size", bencol_avgs.at['size'], malcol_avgs.at['size']])
averages.add_row(["urls", bencol_avgs.at['urls'], malcol_avgs.at['urls']])
averages.add_row(["vsize", bencol_avgs.at['vsize'], malcol_avgs.at['vsize']])

print("\nAverages of scaled dataset by feature, separated by malicious and benign samples.")
print("These will inform target values to push data towards in our perturbations.\n")
print(averages)

In [None]:
# given the features of interest from SHAP explanation and the boundaries we derive from our statistical analysis, 
# perturb the dataset to generate a poisoned training set

# perturb function takes col (feature name), bavg (benign average for that feature),
# mavg (malicious average for that feature), and a small decimal factor to control the size of the perturbation
def perturb(row, col, bavg, mavg, factor):
    if(row['label'] == 1):
        # if current row malware, push from above or below to bavg by a factor of itself to make look more like benign
        if(row[col] > bavg):
            return abs((row[col] - (row[col] * factor)))
        elif(row[col] < bavg):
            return abs((row[col] + (row[col] * factor)))
    elif(row['label'] == 0):
        # if current row benign, push from above or below to mavg by a factor of itself to make look more like malicious
        if(row[col] > mavg):
            return abs((row[col] - (row[col] * factor)))
        elif(row[col] < mavg):
            return abs((row[col] + (row[col] * factor)))

poison_dataset = df.copy(deep=True)
factor = 0.1 # .5 brings down to 60% acc. 0.05 brings down to 81-72 acc

# =================================== your work here ============================================================
# CHANGE THIS:
# Given the three features discovered from the SHAP explanation, and the corresponding averages for malicious and 
# benign samples over those features, create a poisoned dataset here targeting those features. Call perturb 3 times
# to poison your 3 target features.

# args are feature, benign sample average over feature, malicious sample average over feature, factor to perturb by.
# Change the column index of the feature you want to change as well. Examples commented below.

# poison_dataset['featureName'] = poison_dataset.apply(perturb, axis=1, args=('featureName', benign_avg, mal_avg, factor))
#poison_dataset['entropy'] = poison_dataset.apply(perturb, axis=1, args=('entropy', 0.8625272385686976, 0.902868372786425, factor))
#poison_dataset['has_relocations'] = poison_dataset.apply(perturb, axis=1, args=('has_relocations', 0.6036036036036037, 0.4768305624336753, factor))
#poison_dataset['coff.timestamp'] = poison_dataset.apply(perturb, axis=1, args=('coff.timestamp', 0.2915475344330408, 0.30322718417659084, factor))


# ================================================================================================================

# write poison dataset
poison_dataset.to_csv('poison.csv', index=False)

# drop truth label from training set, define training and testing sets
Xp = poison_dataset.drop('label', axis=1)  
yp = poison_dataset['label']

# train/test split. 80/20 ratio
X_trainp, X_testp, y_trainp, y_testp = train_test_split(Xp, yp, test_size=0.2, random_state = 2023, stratify=y)
y_trainp = np.array(y_trainp)

In [None]:
# with our newly poisoned dataset, train a new neural network on this poisoned data and predict on regular data. This is 
# a SHAP-informed poisoning attack against the training set targeting specific features of interest

acclistp = []
i=0
while(i<10):
    Maldetector_p = nn_model()
    Maldetector_p.fit(X_trainp, y_trainp, epochs=epochnum, batch_size=batchnum, verbose=0) # train on poison
    lossp, accp, precisionp, recallp = Maldetector_p.evaluate(X_test, y_test, verbose=0) # test on original data
    acclistp.append(round((100 * accp), 2))
    i += 1

Maldetector_p = nn_model()
Maldetector_p.fit(X_trainp, y_trainp, epochs=epochnum, batch_size=batchnum, verbose=0) # train on poison
mal_lossp, mal_accp, mal_precisionp, mal_recallp = Maldetector_p.evaluate(X_test, y_test, verbose=0) # predict on original

# tabulate results
NNp_results = PrettyTable(["Testing Round", "NN Trained on Original", "NN Trained on Poison"])
NNp_results.add_row(["1", acclist[0], acclistp[0]])
NNp_results.add_row(["2", acclist[1], acclistp[1]])
NNp_results.add_row(["3", acclist[2], acclistp[2]])
NNp_results.add_row(["4", acclist[3], acclistp[3]])
NNp_results.add_row(["5", acclist[4], acclistp[4]])
NNp_results.add_row(["6", acclist[5], acclistp[5]])
NNp_results.add_row(["7", acclist[6], acclistp[6]])
NNp_results.add_row(["8", acclist[7], acclistp[7]])
NNp_results.add_row(["9", acclist[8], acclistp[8]])
NNp_results.add_row(["10", acclist[9], acclistp[9]])

# show loop performance
print("\n10 rounds of Keras Neural Networks.")
print("One trained on original data, one trained on poison.\nBoth tested on original dataset.")
print(NNp_results)   

In [None]:
# we can also explain our other two models to see which features are most important to their predictions. From this, 
# we can speculate on whether the poisoned dataset for the neural network will also be transferrable to the 
# XGBoosted model and Random Forest ensemble

# SHAP explanation of XGBoost trained on original data
print("XGB Summary Plot")
xgb_explainer = shap.TreeExplainer(xgb_model)
xgb_shap_values = xgb_explainer.shap_values(X_test)
shap.summary_plot(xgb_shap_values, features, class_names=["Benign", "Malicious"], max_display=17, plot_type="bar", plot_size="auto", show=True)


In [None]:
# SHAP explanation of RFC trained on original dataset
print("RFC Summary Plot")
rfc_explainer = shap.TreeExplainer(rfc_model)
rfc_shap_values = rfc_explainer.shap_values(X_test)
shap.summary_plot(rfc_shap_values, features, class_names=["Benign", "Malicious"], max_display=17, plot_type="bar", plot_size="auto", show=True)

In [None]:
# test transferability of poison targeting NN against all models

# XGB on poison data
xgb_model_p = GradientBoostingClassifier(criterion='friedman_mse', loss='exponential', n_estimators=1000)
xgb_model_p.fit(X_trainp, y_trainp) # train on poison
xgb_predp = xgb_model_p.predict(X_test) # predict on original
xgb_accp = accuracy_score(y_test, xgb_predp)

# RFC on poison data
rfc_model_p = RandomForestClassifier(criterion='entropy', n_estimators=400)
rfc_model_p.fit(X_trainp, y_trainp) # train on poison
rfc_predp = rfc_model_p.predict(X_test) # predict on original
rfc_accp = accuracy_score(y_test, rfc_predp)

# big table with each model type trained on poison, trained on regular, all tested on regular
# this will be the transferability of perturbations table

# tabulate results
transfer_results = PrettyTable(["Model", "Trained on Original", "Trained on Poison"])
transfer_results.add_row(["Keras Neural Network", round((100 * mal_acc), 2), round((100 * mal_accp), 2)])
transfer_results.add_row(["XGBoost", round((100 * xgb_acc), 2), round((100 * xgb_accp), 2)])
transfer_results.add_row(["Random Forest Classifier", round((100 * rfc_acc), 2), round((100 * rfc_accp), 2)])

# show transferability performance
print("\nTable demonstrating transferability of perturbations across models.")
print("One trained on original data, one trained on poison.\nAll tested on original dataset.")
print(transfer_results)   