# Parallelized version

In [2]:
import math
import random
import numpy as np
import time
import sklearn.metrics as metrics

## Initializing spark

In [3]:
from pyspark import SparkConf, SparkContext

In [4]:
conf = SparkConf().setAppName("appName").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [5]:
sc.getConf().getAll()

[('spark.app.id', 'local-1671131389649'),
 ('spark.app.name', 'appName'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '10.0.2.15'),
 ('spark.driver.port', '40751'),
 ('spark.ui.showConsoleProgress', 'true')]

## Functions

### Auxiliar functions

In [6]:
def trainTestSplit(dataset):
    train = dataset.filter(lambda x: x[2]==1)
    train = train.map(lambda x: (x[0], x[1]))
    test = dataset.filter(lambda x: x[2]==0)
    test = test.map(lambda x: (x[0], x[1]))
    return (train, test)

In [7]:
def sigmoid(x):
    return 1 / (1 + math.exp(-x))

In [8]:
def rdd_cost_function(RDD_Xyyhat, lambda_ref, m, w):
    y = RDD_Xyyhat[1]
    y_hat = RDD_Xyyhat[2]
    temp = (-1/m) * (
        y * np.log(y_hat) + (1 - y)
        * np.log(1 - y_hat)
    )
    cost = temp + (lambda_ref/(2*m))*sum([i*i for i in w.value])
    return cost

In [9]:
def str_to_number_list(line):
    numbers = line.replace(" ","").split(",")
    numbers = [float(number) for number in numbers]
    
    # Separate X and y
    numbers = (numbers[:-1], numbers[-1])
    
    return numbers

In [10]:
def sum_list_values(list1, list2):
    return [n1 + n2 for n1, n2 in zip(list1, list2)]

In [11]:
def sum_minus_mean_squared(numbers):
    return [(number-means.value[x])**2 for x,number in enumerate(numbers)]

In [12]:
def rdd_mean_by_column(RDD_Xy, m):
    means = RDD_Xy.map(lambda x: x[0]).reduce(sum_list_values)
    means = [x/m for x in means]
    return means

In [13]:
def rdd_mean_by_column(RDD_Xy, m):
    means = RDD_Xy.map(lambda x: x[0]).reduce(sum_list_values)
    means = [x/m for x in means]
    return means

In [14]:
def rdd_std_by_column(RDD_Xy, m):
    stds = RDD_Xy.map(lambda x: x[0]).map(sum_minus_mean_squared).reduce(sum_list_values)
    stds = [math.sqrt(x/m) for x in stds]
    return stds

In [15]:
def multiply_RDDXy_by_w(Xy):
    global b
    tot = 0
    for xi, wi in zip(Xy[0], w.value):
        tot += xi * wi
    tot += b
    return (Xy[0], Xy[1], sigmoid(tot))

In [16]:
def calculate_dw(RDD_Xyyhat):
    dw = []
    for x in RDD_Xyyhat[0]:
        dw.append((RDD_Xyyhat[2]-RDD_Xyyhat[1])*x)
    return dw

### Mandatory functions

In [17]:
def readFile(filename):
    dataset = sc.textFile(filename)
    dataset = dataset.map(str_to_number_list)
    return dataset.sample(False, 1)

In [18]:
def normalize(RDD_Xy):
    m = RDD_Xy.count()
    global means
    means = sc.broadcast(rdd_mean_by_column(RDD_Xy, m))
    stds = sc.broadcast(rdd_std_by_column(RDD_Xy, m))
    norm_rdd = RDD_Xy.map(lambda values: ([(x - mean) / std for x, mean, std in zip(values[0], means.value, stds.value)], values[1]))
    return norm_rdd

In [19]:
def train(RDD_Xy, iterations, learning_rate, lambda_reg):
    global b
    global m
    m = RDD_Xy.count()
    n = len(RDD_Xy.take(1)[0][0])
    global w
    w_temp = w.value.copy()
    for it in range(iterations):
        RDD_Xyyhat = RDD_Xy.map(multiply_RDDXy_by_w)
        print(f"Cost for it {it}:", RDD_Xyyhat.map(lambda x: rdd_cost_function(x, lambda_reg, m, w)).reduce(lambda x,y: x+y))
        dw=[0 for i in range(0,n)]
        for cl in range(n):
                X_cl = RDD_Xyyhat.map(lambda x: (x[2]-x[1])*x[0][cl]).reduce(lambda x, y: x+y)
                dw[cl] = (1/m)*(X_cl)+(lambda_reg/m)*w_temp[cl]
                w_temp[cl] -= learning_rate * dw[cl]
        w = sc.broadcast(w_temp)
        db = (1/m)*RDD_Xyyhat.map(lambda x: x[2]-x[1]).reduce(lambda x, y: x+y)
        b -= learning_rate * db
    return w, b

In [20]:
def checkPrediction(y, y_hat):
    if (y == y_hat and y_hat==0):
        return (0, 1, 0, 0)
    if (y == y_hat and y_hat==1):
        return (1, 0, 0, 0)
    if (y != y_hat and y_hat==1):
        return (0, 0, 1, 0)
    if (y != y_hat and y_hat==0):
        return (0,0,0,1)

In [21]:
def metrics(ws, b, RDD_Xy, treshold=0.5):
    total = RDD_Xy.count()
    y_and_y_hat = RDD_Xy.map(lambda x: (x[1], predict_with_teshold(ws, x[0], b, treshold)))
    result = y_and_y_hat.map(lambda x: checkPrediction(x[0], x[1]))
    
    tp = result.map(lambda x: x[0]).reduce(lambda x,y: x+y)
    tn = result.map(lambda x: x[1]).reduce(lambda x,y: x+y)
    fp = result.map(lambda x: x[2]).reduce(lambda x,y: x+y)
    fn = result.map(lambda x: x[3]).reduce(lambda x,y: x+y)
    acc = (tp+tn)/total
    
    tpr = tp/(tp+fn)
    fpr = fp/(fp+tn)
    
    return acc, tp, tn, fp, fn, tpr, fpr

In [22]:
def accuracy(ws, b, RDD_Xy):
    total = RDD_Xy.count()
    y_and_y_hat = RDD_Xy.map(lambda x: (x[1], predict(ws, x[0], b)))
    result = y_and_y_hat.map(lambda x: checkPrediction(x[0], x[1]))
    tp = result.map(lambda x: x[0]).reduce(lambda x,y: x+y)
    tn = result.map(lambda x: x[1]).reduce(lambda x,y: x+y)
    fp = result.map(lambda x: x[2]).reduce(lambda x,y: x+y)
    fn = result.map(lambda x: x[3]).reduce(lambda x,y: x+y)
    acc = (tp+tn)/total
    return acc

In [23]:
def predict(w, X, b):
    tot = 0
    for xi, wi in zip(X, w.value):
        tot += xi * wi
    tot += b
    val = sigmoid(tot)
    if (val>=0.5):
        return 1.0
    return 0.0

In [24]:
def predict_with_teshold(w, X, b, tresh):
    tot = 0
    for xi, wi in zip(X, w.value):
        tot += xi * wi
    tot += b
    val = sigmoid(tot)
    if (val>=tresh):
        return 1.0
    return 0.0

## Testing

In [25]:
RDD_Xy = readFile("../data/botnet_tot_syn_l.csv")
RDD_Xy.count()

1000000

In [26]:
RDD_Xy_normalized = normalize(RDD_Xy)

In [27]:
RDD_Xy_presplit = RDD_Xy_normalized.map(lambda x: (x[0], x[1], 1 if random.random() < 0.75 else 0))
train_data, test_data = trainTestSplit(RDD_Xy_presplit)

#### Initialize some stuff before training

In [28]:
np.random.seed(33)

global b  
b = 0

global n
n = len(RDD_Xy.take(1)[0][0])

global w
w = sc.broadcast(np.random.rand(n))

#### Training

In [29]:
w_final, b = train(train_data, 10, 1.5, 0)

Cost for it 0: 1.4437574320575264
Cost for it 1: 0.7363756795746913
Cost for it 2: 0.45069962013729137
Cost for it 3: 0.3459850526977412
Cost for it 4: 0.29716128878021786
Cost for it 5: 0.2700473828360465
Cost for it 6: 0.2514499034307337
Cost for it 7: 0.23886534610202373
Cost for it 8: 0.2295874836432782
Cost for it 9: 0.2217807106242988


#### Accuracy

In [30]:
acc = accuracy(w_final, b, test_data)
acc

0.9267038268023796

In [35]:
# experiment to get the different datapoints for roc and precision-recall curve
# treshold = 0.00
# x = []
# y = []
# while treshold <= 1:
#     acc, tp, tn, fp, fn, tpr, fpr = metrics(w_final, b, test_data, treshold)
#     precision = tp/(tp+fp)
#     recall = tp/(tp+fn)
#     x.append(precision)
#     y.append(recall)
#     treshold += 0.01

## Experiments

In [36]:
elapsed_times = {}

for cores in range(1,13,1):
    sc.stop()
    conf = SparkConf().setAppName("appName").setMaster(f"local[{cores}]")
    sc = SparkContext(conf=conf)
    
    print(f"---------- Starting execution with {cores} cores ----------")
    
    
    # ---------- Execution ----------
    RDD_Xy = readFile("../data/botnet_tot_syn_l.csv")
    RDD_Xy_normalized = normalize(RDD_Xy)
    RDD_Xy_presplit = RDD_Xy_normalized.map(lambda x: (x[0], x[1], 1 if random.random() < 0.75 else 0))
    train_data, test_data = trainTestSplit(RDD_Xy_presplit)
    start = time.time()
    
    np.random.seed(33)
    global b  
    b = 0
    global n
    n = len(RDD_Xy.take(1)[0][0])
    global w
    w = sc.broadcast(np.random.rand(n))
    w_final, b = train(train_data, 10, 1.5, 0)
    acc = accuracy(w_final, b, test_data)
    print(f"The accuracy for the test set is: {acc}")
    total = tp+tn+fp+fn
    precision = tp/(tp+fp)
    recall = tp/(tp+fn)
    f1 = (2*precision*recall)/(precision+recall)
    
    print(f"The accuracy for the test set is: {acc}")
    print(f"The tp percentage is: {tp/total}")
    print(f"The tn percentage is: {tn/total}")
    print(f"The fp percentage is: {fp/total}")
    print(f"The fn percentage is: {fn/total}")
    print(f"The precision is: {precision}")
    print(f"The recall is: {recall}")
    print(f"The f1 score is: {f1}")
    # ---------- Execution ----------

    end = time.time()
    print(f"---------- Finished execution with {cores} cores ----------")
    elapsed_time = end - start
    elapsed_times[cores] = elapsed_time
    print(f"Elapsed time for cores {cores} is {elapsed_time} seconds")

---------- Starting execution with 1 cores ----------
Cost for it 0: 1.4444992469992146
Cost for it 1: 0.7367831672926854
Cost for it 2: 0.4508677978384475
Cost for it 3: 0.3457035964892895
Cost for it 4: 0.29695737811338846
Cost for it 5: 0.2693352131372768
Cost for it 6: 0.25231691777626386
Cost for it 7: 0.2393206230899147
Cost for it 8: 0.2295978400480014
Cost for it 9: 0.22154720316480497
The accuracy for the test set is: 0.9288507208482261
The accuracy for the test set is: 0.9288507208482261
The tp percentage is: 0.02336176851595619
The tn percentage is: 0.5006958398004323
The fp percentage is: 6.416966459318437e-05
The fn percentage is: 0.4758782220190183
The precision is: 0.9972607430234549
The recall is: 0.04679466580976864
The f1 score is: 0.08939464859846073
---------- Finished execution with 1 cores ----------
Elapsed time for cores 1 is 1812.720507144928 seconds
---------- Starting execution with 2 cores ----------
Cost for it 0: 1.445560858102925
Cost for it 1: 0.73775052

Cost for it 4: 0.2968261959391132
Cost for it 5: 0.2697404195531727
Cost for it 6: 0.25107929011633623
Cost for it 7: 0.2386298334316958
Cost for it 8: 0.22914921673187727
Cost for it 9: 0.22159424835757469
The accuracy for the test set is: 0.9245964731574303
The accuracy for the test set is: 0.9245964731574303
The tp percentage is: 0.02336176851595619
The tn percentage is: 0.5006958398004323
The fp percentage is: 6.416966459318437e-05
The fn percentage is: 0.4758782220190183
The precision is: 0.9972607430234549
The recall is: 0.04679466580976864
The f1 score is: 0.08939464859846073
---------- Finished execution with 10 cores ----------
Elapsed time for cores 10 is 376.042405128479 seconds
---------- Starting execution with 11 cores ----------
Cost for it 0: 1.4465499046709596
Cost for it 1: 0.7358562563053956
Cost for it 2: 0.45036143191526096
Cost for it 3: 0.3456640295496929
Cost for it 4: 0.29671316223893607
Cost for it 5: 0.26947389940943156
Cost for it 6: 0.25186897915073736
Cost

In [37]:
elapsed_times

{1: 1812.720507144928,
 2: 938.6837751865387,
 3: 660.2266867160797,
 4: 527.590255022049,
 5: 481.8547022342682,
 6: 470.32348465919495,
 7: 474.94063806533813,
 8: 371.04762053489685,
 9: 374.56031608581543,
 10: 376.042405128479,
 11: 373.9481461048126,
 12: 376.0982964038849}