In [1]:
import findspark
import math
import numpy as np
import pyspark
import time

In [2]:
findspark.init()
sc = pyspark.SparkContext("local[*]", "Proyecto distribuido")

In [3]:
nIter = 20
learningRate = 0.5
lambda_reg = 0.1
numberOfFeatures = 11
path = "botnet.csv"

In [4]:
def add(x,y):
    return np.sum([x,y],axis=0)
def divideByRows(x):
    return (x[0], x[1]/(numberOfRows.value))

In [None]:
def readFile (filename):
    
    def row2Tuple(x):
        """
        Takes one rdd row and creates a float tuple (x,y) 
        where x is the array of data of these row and y the label
        """
        floatArray=[float(number) for number in x.split(",")]
        return (np.array(floatArray[:-1]),floatArray[-1]) #tuple (array of x),y
    
    """
    This function normalizes RDD for each column to N(0,1)
    :param filename: name of the spam dataset file 12 columns: 11 features/dimensions (X) + 1 column with labels (Y)
        Y -- Train labels (0 if normal traffic, 1 if botnet) 
        m rows: number of examples (m) 
    :return: An RDD containing the data of filename. Each example (row) of the file corresponds to one RDD record. 
        Each record of the RDD is a tuple (X,y). “X” is an array containing the 11 features (float number) of an example
        “y” is the 12th column of an example (integer 0/1) 
    """
    global sc #spark context
    inputRdd = sc.textFile(filename).map(row2Tuple)
    #rdd = inputRdd.collect()
    #return sc.parallelize(inputRdd.take(100000))
    return inputRdd

In [None]:
def normalize (RDD_Xy):
    """
    This function normalizes RDD for each column to N(0,1)
    :param RDD_Xy: is an RDD containing data examples. Each record of the RDD is a tuple (X,y). 
        “X” is an array containing the 11 features (float number) of an example 
        “y” is the label of the example (integer 0/1) 
    :return: An RDD rescaled to N(0,1) in each column (mean=0, standard deviation=1)
    """
    def normalizeLine(line):
        return np.array(
            [(element - broadcastMean.value[index])/broadcastStdev.value[index] 
            for index,element in enumerate(line[0])]
        ), line[1]
    
    rdd = RDD_Xy.map(lambda line: line[0]) #remove label
    meanArray = rdd.sum()/numberOfRows.value
    #print(meanArray)
    broadcastMean = sc.broadcast(meanArray)
    stdevArray=np.sqrt(rdd.map(lambda x: (x-broadcastMean.value)**2).sum()/numberOfRows.value)
    #print(stdevArray)
    
    broadcastStdev = sc.broadcast(stdevArray)
    
    #rdd.map(lambda line: (line-broadcastMean.value)/broadcastStdev.value)
    #return rdd.map(lambda line: (line-broadcastMean.value)/broadcastStdev.value)
    return RDD_Xy.map(normalizeLine)

In [None]:
def sigmoid(X,W,b):
    y = np.dot(X,W) + b
    return 1/(1+np.exp(-y))

In [None]:
def doDb(dataset,w,b,m):
    sumTerm = dataset.map(lambda x: sigmoid(x[0],w,b) - x[1])
    sumatory = sumTerm.sum()
    return sumatory/ m.value

def doDw(dataset,w,b,m):
    sumTerm = dataset.map(lambda row: row[0]*(sigmoid(row[0],w,b) - row[1]))
    sumatory =  sumTerm.sum()
    reg = lambda_reg*w
    return (sumatory + reg) / m.value

In [None]:
def costFuncion(dataset,w,b,m,lambda_reg):
    
    firstTerm = dataset.map(lambda row:
                           (row[1]*np.log(sigmoid(row[0],w,b))) + ((1-row[1])*np.log(1-sigmoid(row[0],w,b)))
                           ).sum() / -m.value
    secondTerm = (lambda_reg*np.sum(w**2)/(2*m.value))
    return firstTerm + secondTerm

In [None]:
def train (RDD_Xy, iterations, learning_rate, lambda_reg):
    """
    This function computes accuracy of the model
    :param RDD_Xy: RDD containing data examples. Each record of the RDD is a tuple (X,y). “X” is an array containing 
        the 11 features (float number) of an example “y” is the label of the example (integer 0/1) 
    :param iterations: number of iterations of the optimization loop
    :param learning_rate: learning rate of the gradient descent
    :param lambda_reg: regularization rate
    :return: A list or array containing the weights “w” and bias “b” at the end of the training process
    """
    
    w = np.random.rand(11)
    b = np.random.random_sample()
    dw = np.empty(11)
    db = 0
    listOfErr = []
    
    for iteration in range(iterations):
        start = time.perf_counter()
        
        dw = doDw(RDD_Xy,w,b,numberOfRows)
        db = doDb(RDD_Xy,w,b,numberOfRows)
        
        w -= (learning_rate * dw)
        b -= (learning_rate * db)
                
        end = time.perf_counter()
                
        err = costFuncion(RDD_Xy,w,b,numberOfRows,lambda_reg)
        listOfErr.append(err)
                
        endErr = time.perf_counter()
        print("Iteration {:2d} err: {}, time training: {:.3f} time accuracy: {:.3f}".format(
            iteration,err,end-start,endErr - end))
    return np.append(w,b), np.array(listOfErr)

In [None]:
def accuracy (w, b, RDD_Xy):

    """
    This function computes accuracy of the model
    :param w: weights
    :param b: bias
    :param RDD_Xy: RDD containing examples to be predicted
    :return: The number of predictions that are correct divided by the number of records (examples) in RDD_xy
    """
    
    def compare(row):
        yPred = predict(w,b,row[0])
        if yPred==row[1]:
            return 1
        else:
            return 0
    
    return (RDD_Xy.map(compare).sum())/numberOfRows.value

In [None]:
def predict (w, b, X):
    """
    Predict function can be used for predicting a single example
    :param w: weights
    :param b: bias
    :param X: Example to be predicted
    :return: A value (0/1) corresponding to the prediction of X
    """
    return np.round(sigmoid(X,w,b))

In [None]:
startRead = time.perf_counter()
data = readFile(path)
endRead = time.perf_counter()

In [None]:
numberOfRows = sc.broadcast(data.count())

In [None]:
startNormalize= time.perf_counter()
data = normalize(data)
endNormalize = time.perf_counter()

print("Read time: {:.3f}, normalize time: {:.3f}".format(endRead-startRead,endNormalize - startNormalize))

Read time: 3.233, normalize time: 68.698


In [None]:
data.take(1)

[(array([-0.79240977, -0.81309371, -0.42245076, -0.46646975, -0.52239296,
         -0.35631957,  0.7370103 ,  0.52834963,  0.82717799,  0.47316616,
          0.15895172]), 1.0)]

In [None]:
ws, listOffErr = train(data, nIter, learningRate, lambda_reg)
w = ws[:-1]
b = ws[-1]
acc = accuracy(w,b,data)
print("acc: ", acc)

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt

x = [i for i in range(nIter)]

plt.plot(listOffErr,marker="o")
plt.xlabel("Epoch")
plt.ylabel("Error")
plt.title("Error of accuraccy during training")
plt.xticks(x)
plt.show()