In [None]:
# Group Members: Kimiya Shabani, Quentin Mathieu, Christopher Steuer

import findspark
 
findspark.init()
 
from pyspark import SparkContext,SparkConf
import random
import math
from operator import add
import numpy as np
import numpy
 
conf = SparkConf().setAppName("assignment_botnet")
sc = SparkContext("local[*]","assignment_botnet")

### Function: `readFile`

#### Description
The `readFile` function reads a dataset from a specified file and transforms it into an RDD (Resilient Distributed Dataset) for further processing with Apache Spark. The dataset is expected to have 12 columns, where the first 11 columns are features (X) and the 12th column is the label (Y).

#### Arguments
- `path` (string): The path to the dataset file.
- `cols` (int, optional): The number of columns to read from the dataset file. Defaults to `np.inf`, which indicates reading all columns.

#### Returns
- `data_rdd` (RDD): An RDD containing the data from the specified file. Each record in the RDD is a tuple `(X, y)`, where:
  - `X` (list of floats): An array containing the 11 features of an example.
  - `y` (int): The label of the example, which is the 12th column (0 for normal traffic, 1 for botnet).

In [None]:
def readFile(path,cols=np.inf):
    rdd = sc.textFile(path)
    data_rdd = rdd.map(lambda line: line.split(','))
    data_rdd = data_rdd.map(lambda parts:(list(map(float, parts[:-1])),int(parts[-1])))
    return data_rdd

## Function: `normalize`

### Description
The `normalize` function is designed to normalize the features of a dataset represented as a Resilient Distributed Dataset (RDD) in Apache Spark. Normalization is a preprocessing step commonly used in machine learning to scale numeric features to a standard range, typically between 0 and 1 or with a mean of 0 and a standard deviation of 1. This process ensures that each feature contributes equally to the analysis, preventing features with larger scales from dominating the learning algorithm.

### Arguments
- `rdd` (RDD): The RDD representing the dataset to be normalized.

### Returns
- `normalized_rdd` (RDD): An RDD containing the normalized data. Each record in the RDD is a tuple `(X_prime, y)`, where:
  - `X_prime` (list of floats): An array containing the normalized feature values.
  - `y` (int): The label (if applicable) associated with the example.

### Steps
1. **Extract Features and Calculate Aggregates:**
   - The function starts by extracting the features from the RDD's first record. It then defines an inner function `extract_features` to process each data point. This function calculates the sum of feature values, the sum of squared feature values, and the count for each feature across all records in the RDD.
   - These aggregates are computed using a `flatMap` transformation followed by a `reduceByKey` operation.

2. **Calculate Mean and Standard Deviation:**
   - After aggregating the feature values, the function computes the mean and standard deviation for each feature column. This is done by mapping over the aggregated RDD and applying formulas to calculate mean and standard deviation.

3. **Normalize the Data:**
   - Once the mean and standard deviation are calculated for each feature, the function normalizes each feature value in the dataset. This is achieved by subtracting the mean and dividing by the standard deviation for each feature, element-wise.
   - The normalization is applied using a custom function `normalize_row` to each row of the RDD. The normalized RDD is then returned as the result.

In [None]:
def normalize(rdd):
    X = rdd.first()
    
    def extract_features(data):
        X, y = data
        features_with_counts_and_squares = []
        for i in range(len(X)):
            # Append the feature index, feature value, square of the feature value, and count
            feature_tuple = (i, (X[i], X[i]**2, 1))
            features_with_counts_and_squares.append(feature_tuple)
        return features_with_counts_and_squares
    
    feature_sums = rdd.flatMap(extract_features)
    # Reduce by key to sum the values, squared values, and counts for each feature across all records
    feature_aggregates = feature_sums.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1], a[2] + b[2]))
 
    # Calculate the mean and standard deviation for each column
    feature_stats = feature_aggregates.map(lambda x: (x[0], 
                                                      (x[1][0] / x[1][2],  # Mean
                                                       math.sqrt((x[1][1] / x[1][2]) - (x[1][0] / x[1][2])**2)  # Std Dev
                                                      )))
    
    sorted_feature_stats = feature_stats.sortByKey()
    mean_elements= sorted_feature_stats.map(lambda x: x[1][0]).take(11)
    std_elements= sorted_feature_stats.map(lambda x: x[1][1]).take(11)
    
    def normalize_row(row):
        # Extract X from the row
        X_row, y_row = row
        # Calculate x_prime for each feature in the row
        x_prime_row = [(X_row[i] - mean_elements[i]) / std_elements[i] for i in range(len(X_row))]
        # Return the normalized row
        return (x_prime_row, y_row)
    # Apply the normalize_row function to each row of the RDD
    normalized_rdd = rdd.map(normalize_row)
    
    return normalized_rdd

### Function: `check_normalized`

#### Description
The `check_normalized` function is used to verify if the normalization of an RDD has been performed correctly. It calculates the mean and standard deviation for each feature in the dataset.

#### Arguments
- `rdd` (RDD): An RDD containing the normalized data. Each record in the RDD is a tuple `(X, y)`, where:
  - `X` (list of floats): An array containing the features of an example.
  - `y` (int): The label of the example.

#### Returns
- `feature_stats` (RDD): An RDD where each element is a tuple `(feature_index, (mean, std_dev))`, representing the mean and standard deviation of each feature.

In [None]:
# Helper function to see if the normalization worked correctly
def check_normalized(rdd):
    def extract_features(data):
        X, y = data
        features_with_counts_and_squares = []
        for i in range(len(X)):
            # Append the feature index, feature value, square of the feature value, and count
            feature_tuple = (i, (X[i], X[i]**2, 1))
            features_with_counts_and_squares.append(feature_tuple)
        return features_with_counts_and_squares
    feature_sums = rdd.flatMap(extract_features)
    # Reduce by key to sum the values, squared values, and counts for each feature across all records
    feature_aggregates = feature_sums.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1], a[2] + b[2]))
    # Calculate the mean and standard deviation for each column
    feature_stats = feature_aggregates.map(lambda x: (x[0], 
                                                      (x[1][0] / x[1][2],  # Mean
                                                       math.sqrt((x[1][1] / x[1][2]) - (x[1][0] / x[1][2])**2)  # Std Dev
                                                      )))
    
    return feature_stats

### Function: `train`

#### Description
The `train` function implements the Gradient Descent algorithm to optimize the weights of a logistic regression model using an RDD containing the dataset.

#### Arguments
- `rdd` (RDD): An RDD containing the training data. Each record in the RDD is a tuple `(X, y)`, where:
  - `X` (list of floats): An array containing the features of an example.
  - `y` (int): The label of the example.
- `iterations` (int): The number of iterations to run the gradient descent algorithm.
- `learning_rate` (float): The learning rate for the gradient descent algorithm.

#### Returns
- `w` (list of floats): The optimized weights (including bias) after training.

In [None]:
def train(rdd,iterations,learning_rate):
    #num_features = len(rdd.first()[0])
    #print(num_features)
    X = rdd.first()
 
    m = len(X) #number of examples or rows in dataset
    w =[]
    # w includes b
    num_columns = len(X[0])  # define the number of columns (and weights) to process
    print ("num_cols:",num_columns)
    for i in range(num_columns): 
        w.append(random.uniform(0.0, 1.0))
    # init b with random
    w.append(random.uniform(0.0, 1.0))
    for i in range(iterations):
        # Compute cost every 10 iterations for example            
        cost = rdd.map(lambda x_y: fcost(x_y[1], get_y_hat(x_y[0], w))).mean()            
        print("Iteration {}: Cost = {}".format(i, cost)) 
        # Compute gradients for each data point        
        gradients = rdd.map(lambda x_y: get_derivatives(x_y, w))                  
        # Sum the gradients across all data points        
        summed_gradients = gradients.reduce(lambda a, b: np.array(a) + np.array(b))                  
        # Average the gradients        
        mean_gradients = summed_gradients / rdd.count()         
        # Update the weights       
        w = update_ws(w, mean_gradients, learning_rate)   
    return w

In [None]:
def get_y_hat (x,w):
    #print (x.shape, w.shape)
    return sigmoid(get_dot_xw(x,w))

def get_dot_xw(x,w):
    
    r = 0.
    if (len(x)>len(w)):
        print ("getdot",len(x),len(w))
    for i in range(len(x)):
        r += float(x[i])*float(w[i])
    return r + w[-1]

def sigmoid(x):
    try:
        return 1. / (1.+math.exp(-x))
    except OverflowError:
        return 0.0


In [None]:
# Cost function   
def fcost(y, y_hat):
    #print ("cost:",y,y_hat)
    # compute loss/cost for one element "y_hat" and one label "y" 
    epsilon=0.00000001
    if y == 1:
        return -numpy.log(y_hat if y_hat > 0. else epsilon)
    else:
        return -numpy.log (1-y_hat if 1-y_hat >0. else epsilon)
                 

In [None]:
# Update model weights using current weights, their derivatives and the learning rate 
def update_ws(w,dw,lr):
    w_ = numpy.array(w, dtype=float)
    dw_ = numpy.array(dw, dtype=float)
    #print ("shapes w y dw",w.shape,dw.shape)
    tmp = w_ - lr*dw_
    neww = tmp    
    return neww

# Get derivatives of Cost function for each element of the dataset
def get_derivatives(x_y, w): 
    x = numpy.array(x_y[0], dtype=float)
    y= x_y[1]
    diff_y = get_y_hat(x,w) - y
    # Vectorized version
    # res = list(x*diff_y)+ [w[-1]]
    res=[]
    # dw
    for x_i in x:
        res.append(x_i*diff_y)
    # db
    res.append (w[-1])
    
    return res

In [None]:
def predict (x,w):
    threshold=0.5
    y_hat=get_y_hat(x,w)
    return 1 if y_hat > threshold else 0

### Function: `accuracy`

#### Description
The `accuracy` function calculates the accuracy of the logistic regression model on a given dataset.

#### Arguments
- `rdd_Xy` (RDD): An RDD containing the data examples. Each record of the RDD is a tuple `(X, y)`, where:
  - `X` (list of floats): An array containing the features of an example.
  - `y` (int): The label of the example.
- `wf` (list of floats): The weights of the trained logistic regression model, including the bias term.

#### Returns
- `accuracy` (float): The accuracy of the model as a percentage.

In [None]:
def accuracy(rdd_Xy, wf): 
    # Compute the number of correct predictions   
    correct_predictions = rdd_Xy.map(lambda x_y: (x_y[1], predict(x_y[0], wf))).map(lambda y_yhat: 1 if y_yhat[0] == y_yhat[1] else 0).reduce(lambda a, b: a + b)         
    # Total number of examples    
    total_examples = rdd_Xy.count()
    # Calculate accuracy    
    accuracy = (correct_predictions / total_examples) * 100.0
    
    return accuracy

In [None]:
# Execution parameters
nIter = 20
learningRate = 1.5
path = "/home/administrador/botnet_tot_syn_l.csv"
 

# read text_file in dataPath
X_y = readFile(path)
 
# Normalize
X_y = normalize(X_y)
 
# Train 
ws = train(X_y, nIter, learningRate)

# Calculate the Accuracy
acc = accuracy(X_y, ws)

# Results
 
print("Final weights of the model:")
print(ws)
print(" ")
print("Accuracy of the model:",acc,"%")
print(" ")