In [1]:
import findspark
findspark.init()
from pyspark import *
import numpy as np

In [2]:
#
#
#

In [3]:
#for some reason the context is already created when using the kernel
conf = (SparkConf()
           .setMaster("local")
           .setAppName("assignment")
           .set("spark.executor.memory", "1g"))
sc = SparkContext.getOrCreate(conf)

In [4]:
# load the file PROTOTYPE
# rdd_initial = sc.textFile("../alumno/Downloads/botnet_tot_syn_l.csv")

In [5]:
#RDD_Xy_raw will be like:
#    {["v","v","v","v","v","v","v","v","v","v","v","l"],
#     ["v","v","v","v","v","v","v","v","v","v","v","l"],
#     ...
#     ["v","v","v","v","v","v","v","v","v","v","v","l"]}

#RDD_Xy will be like:
#    {[("v","v","v","v","v","v","v","v","v","v","v"),("l")],
#     [("v","v","v","v","v","v","v","v","v","v","v"),("l")],
#     ...
#     [("v","v","v","v","v","v","v","v","v","v","v"),("l")]}    

In [6]:
def readFile(fileName):
    
    loaded_data=sc.textFile(filename)
    # take the loaded data and separate the different values (separated by a comma)
    RDD_Xy_raw=loaded_data.map(lambda x : np.array([i for i in x.split(',')],np.float32))
    # cast the label to an integer value
    RDD_Xy = RDD_Xy_raw.map(lambda y: (y[0:-1],[int(y[-1])]))
    return RDD_Xy
    

In [7]:
def normalize(RDD_Xy):   
   
    # Count the number of samples
    n_rows=RDD_Xy.count()
    #Extract only data and "cut" the label
    RDD_X=RDD_Xy.map(lambda x : x[0])
    
    #CALCULATE MEAN
    #start by calculating the summation of all the values
    RDD_summation = RDD_X.reduce(lambda x,y: x + y)
    #divide each summation by the number of samples
    mean = RDD_summation/n_rows
    
    #CALCULATE STANDARD DEVIATION
    #start by calculating the difference between the i-th sample and the mean
    RDD_differences = RDD_X.map(lambda x: np.subtract(x,mean))
    #compute the square of each of the differences
    RDD_squared = RDD_differences.map(lambda x: np.power(x,2))
    #calcualte the summation of all the values
    RDD_summation1 = RDD_squared.reduce(lambda x,y: np.add(x,y))
    #calculate the variance by dividing the summation by the number of samples
    var = RDD_summation1/n_rows
    #the standard deviation is given by the square root of the variance
    std = np.sqrt(var)
    
    
    # Normalize the data as (X-mean)/std
    RDD_Xy_norm = RDD_Xy.map(lambda x: ((x[0]-mean)/std,x[1]))   
    return RDD_Xy_norm

                               

In [8]:
def train(RDD_Xy, iterations, learning_rate):
    
    
    num_rows = RDD_Xy.count()
    w = np.random.rand(len(RDD_Xy.take(1)[0][0]))
    b=np.random.randint(0,100)/100
    
    
    
    for i in range(iterations):
        
        
        #we need to be able to compare the predicted output with the actual output, in order to check the performance
        #Map in order to have (x,[y,y_hat])
        RDD_Xy_pred=RDD_Xy.map(lambda x: (x[0],x[1]+[predict(w,b,x[0])]))
        
        
        # CALCULATE LOSS FUNCTION
    
        #in order to calculate the loss function, we need to compute log(y_hat_i) and log(1 - y_hat_i)
        #we want to obtain a tuple like (y,[log(y_hat_i),log(1 - y_hat_i)])
        loss_data = RDD_Xy_pred.map(lambda x: (x[1][0],[x[1][1]+epsilon,1-x[1][1]+epsilon]))
        #Calculate the loss value for each sample applyin the formula:
        # LOSS = -1/m * summation[(y * log(y_hat)) + ((1 - y) * log(1 - y_hat))]
        #start by calculating summation[(y * log(y_hat)) + ((1 - y) * log(1 - y_hat))]
        summation_loss = loss_data.map(lambda x: x[0]*np.log(x[1][0])+(1-x[0])*np.log(x[1][1]))
        # divide by the number of samples obtaining the loss function
        loss= -(summation_loss.reduce(lambda x,y:x+y)) / num_rows
       
    
        # NOTICE:
        # in order to calculate the derivatives we applied the given function
        # (1/m) * SUMMATION[(y_hat - y) * x_1]
        
    
        # CALCULATE WEIGHT DERIVATIVE
        
        #start by calculating (y_hat - y)*x for every row
        mult_diff_w = RDD_Xy_pred.map(lambda x: np.multiply(x[0],(x[1][1]-x[1][0])))
        #divide by the number of rows in order to obtain the derivative
        weight_der = (mult_diff_w.reduce(lambda x,y:x+y)) / num_rows
    
    
        # CALCULATE BIAS DERIVATIVE
        
        #start by calculating the difference (y_hat-y) for every row
        diff_b = RDD_Xy_pred.map(lambda x: x[1][1]-x[1][0])
        #divide by the number of samples in order to obtain the derivative
        bias_der = (diff_b.reduce(lambda x,y:x+y)) / num_rows
    
        
        # UPDATE THE VALUES OF WEIGHT AND BIAS
        
        #the values are updated according to the given formulas:
        # new = old - (learning rate * derivative)   
        w = w -learning_rate * weight_der
        b = b -learning_rate * bias_der
        
        #COMPUTE THE VALUE OF THE ACCURACY (done here in order to have the updated value at each iternation)
        acc=accuracy(w,b,RDD_Xy_pred)

        #PRINT THE epoch, THE loss value AND THE accuracy value
        print("EPOCH",i," ---->  LOSS: ",loss," ACCURACY: ",acc)
       
        

In [9]:
def accuracy(w, b, RDD_Xy):
    
    
    # RDD_Xy has shape (x, [y, y_hat])
    
    #in this function we need to compare the predicted labels with the given ones.
    #in order to make the comparison possible we set all the predicted labels that are under 0.5 to 0, while the others are set to 1  
    RDD_Xy_adjusted = RDD_Xy.map(lambda x: (x[1][0],0) if x[1][1] <= 0.5 else (x[1][0],1))
    #we count the correct predictions assigning a label 1 to the predicted values matching the given values, 0 otherwise
    RDD_check_result= RDD_Xy_adjusted.map(lambda x: 1 if x[0] == x[1] else 0)
    
    n_rows = RDD_Xy_adjusted.count()
    #we compute the % of the correct predictions
    acc = ((RDD_check_result.reduce(lambda x,y: x + y)) / n_rows)*100
    return acc
   
    

In [10]:
def predict(w, b, X):
    
    y_hat = sigmoid(np.dot(w,X.T) + b)
    
    return y_hat

In [11]:
def sigmoid(x):
    y = 1/(1+np.exp(-x))
    return y

In [12]:
# ****** EXECUTION ********

In [13]:
# GLOBAL VARIABLES

filename = "../alumno/Downloads/botnet_tot_syn_l.csv"
epochs = 10
learning_r = 1.5
epsilon = 0.000000000001

In [14]:
#----- PROVIDED EXECUTION CODE ---------

In [15]:
# read data
data = readFile(filename)

# standardize
data_norm = normalize(data)

w, b = train(data_norm, epochs, learning_r)
acc = accuracy(w, b, data_norm)

print("acc: ", acc)


EPOCH 0  ---->  LOSS:  1.0625990845510587  ACCURACY:  68.9594
EPOCH 1  ---->  LOSS:  0.6132352741215287  ACCURACY:  80.0445
EPOCH 2  ---->  LOSS:  0.42975863091405647  ACCURACY:  86.1113
EPOCH 3  ---->  LOSS:  0.34496814055669  ACCURACY:  88.9956
EPOCH 4  ---->  LOSS:  0.29916778329709814  ACCURACY:  90.3188
EPOCH 5  ---->  LOSS:  0.2710746417751432  ACCURACY:  91.0819
EPOCH 6  ---->  LOSS:  0.25222021785624027  ACCURACY:  91.671
EPOCH 7  ---->  LOSS:  0.23873439111569686  ACCURACY:  92.0784
EPOCH 8  ---->  LOSS:  0.22862425077199894  ACCURACY:  92.37729999999999
EPOCH 9  ---->  LOSS:  0.2207656541650843  ACCURACY:  92.6129


TypeError: cannot unpack non-iterable NoneType object