In [None]:
from pyspark import SparkContext
import findspark
import numpy as np
import random as rd
import time
import matplotlib.pyplot as plt

In [None]:
findspark.init()

In [None]:
'''
Function to read the file and return the features and labels as an RDD
 Arguments:
  filename – name of the spam dataset file
    12 columns: 11 features/dimensions (X) + 1 column with labels (Y)
         Y -- Train labels (0 if normal traffic, 1 if botnet) 
    m rows: number of examples (m)

 Returns:
  An RDD containing the data of filename. Each example (row) of the file
  corresponds to one RDD record. Each record of the RDD is a tuple (X,y).
  “X” is an array containing the 11 features (float number) of an example
  “y” is the 12th column of an example (integer 0/1)
'''
def readFile(filename):
    # Read the file into an RDD using textFile
    rdd = sc.textFile(filename)

    # Process each line to split the features and labels
    rdd2 = rdd.map(lambda line: line.split(",")) \
              .map(lambda fields: (np.array(fields[:-1], dtype=float), int(fields[-1])))
    # We get the rdd2 filled with tuples (X,y)

    return rdd2

In [None]:
'''
Function to normalize the features in X
 Arguments:
  RDD_Xy is an RDD containing data examples. Each record of the RDD is a tuple (X,y).
  “X” is an array containing the 11 features (float number) of an example
  “y” is the label of the example (integer 0/1)

 Returns:
  An RDD rescaled to N(0,1) in each column (mean=0, standard deviation=1)
'''
def normalize(RDD_Xy):
    # Extract array with features
    X = RDD_Xy.map(lambda row: row[0])
    dim = (X.count(), len(X.first())) # more efficient that X.take(1)[0]

    # Compute mean for each column
    sumsCol = X.reduce(lambda x, y: [x[i] + y[i] for i in range(dim[1])])
    means = [sumCol / dim[0] for sumCol in sumsCol]

    # Compute variance for each column
    variances = X.map(lambda row: [(row[i] - means[i]) ** 2 for i in range(dim[1])]) \
                .reduce(lambda x, y: [x[i] + y[i] for i in range(dim[1])])
    variances = [variance / dim[0] for variance in variances]

    # Normalization function
    def normalize_row(row):
        return [(row[i] - means[i]) / (variances[i] ** 0.5) for i in range(dim[1])]

    # Apply the normalization to the initial RDD
    normalized_data = RDD_Xy.map(lambda row: (np.array(normalize_row(row[0]),dtype=float), row[1]))
    return normalized_data

In [None]:
'''
Function to perform gradient descent and train the logistic regression model
 Arguments:
  RDD_Xy --- RDD containing data examples. Each record of the RDD is a tuple (X,y).
  “X” is an array containing the 11 features (float number) of an example
  “y” is the label of the example (integer 0/1)
  iterations -- number of iterations of the optimization loop
  learning_rate -- learning rate of the gradient descent
  lambda_reg – regularization rate
  
 Returns:
  A list containing the weights “w” and bias “b” at the end of the
  training process
'''
def train(RDD_Xy, iterations, learning_rate, lambda_reg):
    
    X = RDD_Xy.map(lambda row: row[0])
    dim = (X.count(), len(X.first())) # more efficient than X.take(1)

    J_values = []  # Initialize list to store cost values

    # Initialize weights and bias
    ws = []
    for i in range(dim[1]+1):
        ws.append(rd.uniform(-1,1))
    #ws = [0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5]
    
    for it in range(iterations):
        
        def compute_predictions(row):
            (X_line, y) = (row[0], row[1])
            Y_pred = 0
            
            for i in range(dim[1]):
                Y_pred += X_line[i]*ws[i] # x1*w1 + x2*w2 + ...
            Y_pred += ws[-1] # + b
            
            # Pass Y_pred into sigmoidal function to be between 0 and 1
            Y_pred = 1/(1+np.exp(-Y_pred))
            return (X_line, Y_pred, y)

        predictions = RDD_Xy.map(compute_predictions)

        # Compute cost function
        def compute_cost(row):
            (_, Y_pred, y) = row
            cost = y * np.log(Y_pred) + (1 - y) * np.log(1 - Y_pred)
            return cost

        cost_values = predictions.map(compute_cost)
        J = cost_values.reduce(lambda x, y: x + y)
        J *= -1 / dim[0]
        
        add_term = 0
        for i in range(dim[1]):
            add_term += ws[i]**2
        add_term *= (lambda_reg/(2*dim[1]))
        J += add_term
        J_values.append(J)
        
        def compute_derivatives(row):
            (X_line, Y_pred, y) = (row[0], row[1], row[2])

            derivatives = []
            # Compute weight derivatives
            for i in range(dim[1]):
                derivatives.append((Y_pred-y)*X_line[i])
            derivatives.append(Y_pred-y)
            
            return derivatives

        derivatives = predictions.map(compute_derivatives)

        # Sum up derivatives
        dWs = derivatives.reduce(lambda x, y: [x[i] + y[i] for i in range(dim[1]+1)])
        
        # Compute weight derivatives
        for i in range(dim[1]):
            dWs[i] /= dim[0]
            dWs[i] += (lambda_reg/dim[1])*ws[i]
            
        # Compute bias derivative
        dWs[-1] /= dim[0]

        # Compute new weights and bias
        for i in range(dim[1]+1):
            ws[i] -= learning_rate*dWs[i]
            
    axis = plt.gca()
    plt.plot(range(iterations), J_values, color="darkcyan")
    plt.title("Evolution of cost function per iteration")
    plt.xlabel("iterations")
    plt.ylabel("cost")
    axis.xaxis.set_ticks(range(iterations))
    axis.yaxis.set_ticks(np.arange(0, J_values[0], 0.1))
    
    return ws

In [None]:
'''
Function to compute the Y_pred, the prediction of X
 Arguments:
  X – Example to be predicted
  ws -- weights & bias

 Returns:
  Y_pred – a value (0/1) corresponding to the prediction of X
'''
def predict(X, ws):
    # Initialize the prediction
    Y_pred = ws[-1]
    
    # Compute the prediction
    for i in range(len(X)):
        Y_pred += X[i]*ws[i]
    
    # Pass the prediction into sigmoidal function to be between 0 and 1
    Y_pred = 1/(1+np.exp(-Y_pred))
    
    # Finish the prediction by choosing 0 or 1
    return (Y_pred >= 0.5)

In [None]:
def predict_with_threshold(X, ws, threshold):
    # Initialize the prediction
    Y_pred = ws[-1]
    
    # Compute the prediction
    for i in range(len(X)):
        Y_pred += X[i]*ws[i]
    
    # Pass the prediction into sigmoidal function to be between 0 and 1
    Y_pred = 1/(1+np.exp(-Y_pred))
    
    # Finish the prediction by choosing 0 or 1
    return (Y_pred >= threshold)

In [None]:
'''
Function to compute the accuracy of the logistic regression model
 Arguments:
  RDD_Xy -- RDD containing examples to be predicted
  ws -- weights & bias

 Returns:
  accuracy -- the number of predictions that are correct divided by the number
  of records (examples) in RDD_xy.
  Predict function can be used for predicting a single example
'''
def accuracy(RDD_Xy, ws):
    # Predict using the given weights
    predictions = RDD_Xy.map(lambda row: (predict(row[0], ws), row[1]))

    # Count correct predictions and sum up the correct count
    correct_count = predictions.map(lambda pred: int(pred[0] == pred[1])).reduce(lambda x, y: x + y)

    # Calculate accuracy
    acc = correct_count / RDD_Xy.count()
    return acc

In [None]:
def confusion_matrix(RDD_Xy, ws, threshold=0.5):
    # Predictions on the data
    predictions = RDD_Xy.map(lambda row: (predict_with_threshold(row[0], ws, threshold), row[1]))

    # Calculate TP, FP, TN, FN
    TP = predictions.flatMap(lambda x: [x[0]] if x[0] == 1 and x[1] == 1 else []).count()
    FP = predictions.flatMap(lambda x: [x[0]] if x[0] == 1 and x[1] == 0 else []).count()
    TN = predictions.flatMap(lambda x: [x[0]] if x[0] == 0 and x[1] == 0 else []).count()
    FN = predictions.flatMap(lambda x: [x[0]] if x[0] == 0 and x[1] == 1 else []).count()

    return (TP,FP,TN,FN)

In [None]:
def get_error_metrics(data,ws,threshold):
    (TP,FP,TN,FN) = confusion_matrix(data,ws,threshold)
    
    # Calculate precision
    precision = TP/(TP+FP) if (TP+FP) != 0 else 1.0
    print("Precision:", precision)
    
    # Calculate accuracy
    accuracy = (TP+TN)/(TP+FP+TN+FN)
    print("Accuracy:", accuracy)
    
    # Calculate recall
    recall = TP/(TP+FN) if (TP+FN) != 0 else 1.0
    print("Recall:", recall)
    
    # Calculate f1_score
    f1_score = 2*(precision*recall)/(precision+recall)
    print("F1-Score:", f1_score)
    
    # Calculate specificity
    specificity = TN/(TN+FP)
    print("Specificity:", specificity)
    
    return (precision, accuracy, recall, f1_score, specificity)

In [None]:
def evaluate(data, ws):
    # Predictions on the data with varying thresholds
    thresholds = [i * 0.1 for i in range(11)]  # Thresholds from 0.0 to 1.0
    fprs = []
    tprs = []
    precisions = []
    for threshold in thresholds:
        (precision, accu, recall, f1_score, specificity) = get_error_metrics(data,ws,threshold)
        fprs.append(1-specificity)
        tprs.append(recall)
        precisions.append(precision)
    
    return (fprs,tprs,precisions)

In [None]:
def show_roc_curve(fprs, tprs):
    plt.figure()
    plt.plot(fprs, tprs, color='darkorange', lw=2, label='ROC curve')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")
    plt.show()

In [None]:
def show_precision_recall_curve(recalls, precisions):
    plt.figure()
    plt.plot(recalls, precisions, color='darkorange', lw=2, label='Precision-Recall curve')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall curve')
    plt.legend(loc="lower left")
    plt.show()

In [None]:
# MAIN

def main():
    #path = "botnet_reduced_l.csv"
    path = "botnet_tot_syn_l.csv"
    
    start_time = time.time()
    
    data = readFile(path)
    data = normalize(data)

    ws = train(data, 10, 1.5, 0.1)

    acc = accuracy(data,ws)
    print("accuracy: ", acc)
    
    end_time = time.time()
    print("Execution time: ", end_time - start_time)

In [None]:
sc = SparkContext("local[*]", "Botnet classifier with Spark")
main()
sc.stop()

In [None]:
def main_with_metrics():
    #path = "botnet_reduced_l.csv"
    path = "botnet_tot_syn_l.csv"
    data = readFile(path)
    data = normalize(data)

    ws = train(data, 10, 1.5, 0.1)
    
    (fprs, tprs, precisions) = evaluate(data, ws)
    
    show_roc_curve(fprs, tprs)
    show_precision_recall_curve(tprs, precisions)

In [None]:
sc = SparkContext("local[*]", "Botnet classifier with Spark")    
main_with_metrics()
sc.stop()

In [None]:
num_cores = [1, 2, 3, 4, 5, 6, 7, 8]
execution_times = []

for cores in num_cores:
    sc = SparkContext(master=f"local[{cores}]", appName="Botnet PerformanceTest")

    start_time = time.time()

    # Running
    main() 
    
    end_time = time.time()
    execution_times.append(end_time - start_time)
    sc.stop()
    
# Compute speedup
base_time = execution_times[0]  # ExecTime with 1 worker
speedups = [base_time / time for time in execution_times]

# Ploting Performance curve
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(num_cores, execution_times, marker='o')
plt.title("Performance curve")
plt.xlabel("Number of workers")
plt.ylabel("Execution time (seconds)")

# Ploting Speedup curve
plt.subplot(1, 2, 2)
plt.plot(num_cores, speedups, marker='o', color='green')
plt.title("Speedup curve")
plt.xlabel("Number of workers")
plt.ylabel("Time with one worker/time with n workers")

plt.tight_layout()
plt.show()

In [None]:
#Cross Validation Part

In [None]:
def get_block_data(data_cv, index):
    train_data = data_cv.flatMap(lambda row: [row[1]] if row[0] != index else [])
    test_data = data_cv.flatMap(lambda row: [row[1]] if row[0] == index else [])
    #train_data = data_cv.filter(lambda x: x[0] != index).values()
    #test_data = data_cv.filter(lambda x: x[0] == index).values()
    
    return train_data, test_data

# Preprocesses the data for cross-validation by randomly indexing elements
def transform(data):
    # Assign a discrete random number following a uniform distribution (0 to 9) to each record
    random_assigned_rdd = data.map(lambda x: (rd.randint(0, 10), x))
    
    return random_assigned_rdd

In [None]:
def cross_valid():
    #path = "botnet_reduced_l.csv"
    path = "botnet_tot_syn_l.csv"
    
    start_time = time.time()
    
    
    data = readFile(path)
    data = normalize(data)

    num_blocks_cv=10

    # Shuffle Rows and transform data
    data_cv=transform(data)
    accuracys = []
    avg_acc = 0
    for i in range(num_blocks_cv) :
        print("it : ", i)
        tr_data, test_data = get_block_data(data_cv,i)
        
        ws = train(tr_data, 10, 1.5, 0.1)
        
        acc = accuracy(test_data, ws)
        accuracys.append(acc)
        avg_acc+=acc
    
    avg_acc /= num_blocks_cv
    print ("average acc:",avg_acc)
    print(accuracys)
    
    end_time = time.time()
    print("Execution time: ", end_time - start_time)

In [None]:
sc = SparkContext("local[*]", "Botnet classifier with Spark") 
cross_valid()
sc.stop()