Implementación distribuida y paralelizada de un algoritmo de Machine Learning

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

import numpy as np
from pyspark import RDD

import time
import json


def readFile(filename):
    def transform_to_floats(line:str) -> np.ndarray:
        # Importante. Retornar como tupla
        element = [float(x) for x in line.split(',')]
        return (np.array(element))
    
    data_raw = contex_global.textFile(filename)
    return data_raw.map(transform_to_floats)


def _calculate_mean(rdd:RDD, total_rows:int) -> np.ndarray:
    def reduce_function(value1, value2):
        return value1[:11] + value2[:11]
    
    return rdd.reduce(reduce_function) / total_rows

def _calculate_stdev(rdd:RDD, mean:np.ndarray, total_rows:int) -> np.ndarray:
    def map_function(element):
        return (element[:11] - mean)**2

    def reduce_function(value1, value2):
        return value1[:11] + value2[:11]
    
    rdd_aux = rdd.map(map_function)
    return (rdd_aux.reduce(reduce_function) / total_rows)**(1/2)


def normalize(RDD_Xy:RDD) -> RDD:
    def map_function(element, mean, desvest):
        x_values = (element[:11] - mean) / desvest
        return np.concatenate((x_values, [element[-1]]))

    m = RDD_Xy.count()
    mean = _calculate_mean(RDD_Xy, m)
    desvest = _calculate_stdev(RDD_Xy, mean, m)
    return RDD_Xy.map(lambda element: map_function(element, mean, desvest))


def _sigmoid(z):
    return 1 / (1 + np.exp(-z))

def _map_label_y_hat_diffs(element, W:np.ndarray, b:np.ndarray):
    """Añade después de la y(etiqueta) el y_hat(ŷ) y la diferencia ŷ-y que se
    utiliza para calcular tanto el coste como las derivadas de W y b. Es necesario
    mantener el orden en el RDD.
    """
    dot_product = 0
    n = W.size
    for i in range(n):
        dot_product += element[i] * W[i]
    
    y_hat = float(_sigmoid(dot_product + b))
    diff = y_hat - element[-1]
    to_append = [y_hat, diff]

    return (np.append(element, to_append)) 

def _calculate_y_hat_and_diffs(RDD_Xy, W:np.ndarray, b:np.ndarray) -> RDD:
    return RDD_Xy.map(lambda element: _map_label_y_hat_diffs(element, W, b))

def _calculate_cost_J(RDD_XY) -> float:
    def _map_cost_function_J(element):
        label, y_hat = element
        #DEBUG. Cambiar log10 a log en la versión final
        return label * np.log(y_hat) + (1 - label) * np.log(1 - y_hat)
    
    # Cuidado con los índices, que es aquí donde se hace la "vista".
    RDD_label_y_hat = RDD_XY.map(lambda element: (element[-3], element[-2]))
    aux_costs = RDD_label_y_hat.map(_map_cost_function_J)
    m = RDD_label_y_hat.count()
    return -aux_costs.reduce(lambda x, y: x + y) / m

def _map_label_y_pred(element, W:np.ndarray, b:np.ndarray):
    dot_product = 0
    n = W.size
    for i in range(n):
        dot_product += element[i] * W[i]
    # ¿Hace falta mantener el orden etiqueta <-> y_hat para calcular
    # accuracy? Sí, para ver cuántas veces acierta.
    y_hat = float(_sigmoid(dot_product + b))
    THRESHOLD = 0.5
    if y_hat > THRESHOLD:
        y_pred = 1
    else:
        y_pred = 0
    return (element[-1], y_pred)

def predict(W:np.ndarray, b:np.ndarray, RDD_Xy) -> RDD:
    return RDD_Xy.map(lambda element: _map_label_y_pred(element, W, b))

def accuracy(W, b, RDD_Xy:RDD) -> float:
    def map_function(element):
        label, y_pred = element
        if label == y_pred:
            return 1
        else:
            return 0
    predictions_rdd = predict(W, b, RDD_Xy)
    aux_acc = predictions_rdd.map(map_function)
    m = aux_acc.count()
    accuracy = aux_acc.reduce(lambda x,y: x + y) / m
    return accuracy

def train(RDD_Xy:RDD, iterations, learning_rate):
    def b_map(element):
        return element[-1] # Se corresponde con las diferencias
    
    def W_map(element):
        return np.array([element[-1] * element[i] for i in range(num_W)])
    
    m = RDD_Xy.count()
    MODEL_PARAMS = 12 # 11 pesos + 1 sesgo
    num_W = MODEL_PARAMS - 1

    W = 2 * np.random.rand(num_W) - 1
    b = 2 * np.random.rand(1) - 1

    for it in range(iterations):
        rdd_xy_y_hat_diffs = _calculate_y_hat_and_diffs(RDD_Xy, W, b)

        dW_aux = rdd_xy_y_hat_diffs.map(W_map)
        dW = dW_aux.reduce(lambda x,y: x + y) / m
        W = W - learning_rate * dW
        
        diffs_label_hat = rdd_xy_y_hat_diffs.map(b_map)
        db = diffs_label_hat.reduce(lambda x,y: x + y) / m
        b = b - learning_rate * db

        cost = _calculate_cost_J(rdd_xy_y_hat_diffs)
        cost_list.append(cost)
        print(f'[train] Iteración {it}, Coste = {cost}')

    return W,b

def save_data(resultado, filename):
    try:
        with open(filename, 'r') as infile:
            data = json.load(infile)
    except FileNotFoundError:
        data = []  # Si el archivo no existe, comenzamos con una lista vacía
    
    data.append(resultado)

    with open(filename, 'w') as outfile:
        json.dump(data, outfile, indent=4)


#FILE_NAME = '../../0-SPAI/1-datos/botnet_tot_syn_l.csv'
#FILE_NAME = 'botnet_sample.csv'
FILE_NAME = '../../0-SPAI/1-datos/botnet_100k.csv'

LEARNING_RATE = 1.5
N_ITER = 10

OUTPUT_FILE = 'run_times_100k_3.json'


for num_workers in range(1,7):
    if 'session_global' in globals():
        session_global.stop()
    
    session_global = SparkSession.builder.master(f'local[{num_workers}]').getOrCreate()
    contex_global = session_global.sparkContext

    start = time.time()
    print('Número de workers ', num_workers)

    # read data
    data_raw_rdd = readFile(FILE_NAME)

    # standarize
    normal_data = normalize(data_raw_rdd)

    # train
    cost_list = []
    W, b = train(normal_data, N_ITER, LEARNING_RATE)

    acc = accuracy(W, b, normal_data)
    print('Accuracy ', acc)

    end = time.time()
    runtime = end - start
    print('Tiempo de ejecución (s) ', runtime)

    results = {
        "num_workers": num_workers,
        "accuracy": acc,
        "runtime": runtime,
        "costs": cost_list
    }

    save_data(results, OUTPUT_FILE)


24/10/19 19:03:30 WARN Utils: Your hostname, daniel-fpga resolves to a loopback address: 127.0.1.1; using 192.168.0.201 instead (on interface enp2s0f2)
24/10/19 19:03:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/19 19:03:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Número de workers  1


                                                                                

[train] Iteración 0, Coste = 1.2477727975978592


                                                                                

[train] Iteración 1, Coste = 0.7677959667822504


                                                                                

[train] Iteración 2, Coste = 0.5271132041282294


                                                                                

[train] Iteración 3, Coste = 0.40128478073027096


                                                                                

[train] Iteración 4, Coste = 0.3324726695399143


                                                                                

[train] Iteración 5, Coste = 0.2921492023723284


                                                                                

[train] Iteración 6, Coste = 0.2664668043394709


                                                                                

[train] Iteración 7, Coste = 0.24887554617362348


                                                                                

[train] Iteración 8, Coste = 0.23612721498318645


                                                                                

[train] Iteración 9, Coste = 0.22648151698337085


                                                                                

Accuracy  0.92353
Tiempo de ejecución (s)  223.4232795238495




Número de workers  2


                                                                                

[train] Iteración 0, Coste = 1.0462166459648352


                                                                                

[train] Iteración 1, Coste = 0.5725751510225985


                                                                                

[train] Iteración 2, Coste = 0.4077845170867011


                                                                                

[train] Iteración 3, Coste = 0.3320229458003188


                                                                                

[train] Iteración 4, Coste = 0.29013834690724316


                                                                                

[train] Iteración 5, Coste = 0.2641038204493191


                                                                                

[train] Iteración 6, Coste = 0.24652052100169872


                                                                                

[train] Iteración 7, Coste = 0.2339002021813875


                                                                                

[train] Iteración 8, Coste = 0.22441739502321695


                                                                                

[train] Iteración 9, Coste = 0.21703532096667044


                                                                                

Accuracy  0.92916
Tiempo de ejecución (s)  136.04958081245422




Número de workers  3


                                                                                

[train] Iteración 0, Coste = 1.103247187376731


                                                                                

[train] Iteración 1, Coste = 0.5604633952917382


                                                                                

[train] Iteración 2, Coste = 0.414662876376823


                                                                                

[train] Iteración 3, Coste = 0.35052602491150153


                                                                                

[train] Iteración 4, Coste = 0.3121970573747008


                                                                                

[train] Iteración 5, Coste = 0.28626268348355394


                                                                                

[train] Iteración 6, Coste = 0.26748247350970183


                                                                                

[train] Iteración 7, Coste = 0.2532586293400961


                                                                                

[train] Iteración 8, Coste = 0.24212389153919167


                                                                                

[train] Iteración 9, Coste = 0.2331794697010248


                                                                                

Accuracy  0.91924
Tiempo de ejecución (s)  138.35452437400818




Número de workers  4


                                                                                

[train] Iteración 0, Coste = 1.283753363703815


                                                                                

[train] Iteración 1, Coste = 0.6173950811008204


                                                                                

[train] Iteración 2, Coste = 0.4262125140223182


                                                                                

[train] Iteración 3, Coste = 0.34936989897393855


                                                                                

[train] Iteración 4, Coste = 0.3068861970476136


                                                                                

[train] Iteración 5, Coste = 0.2797489161092997


                                                                                

[train] Iteración 6, Coste = 0.2608925331662933


                                                                                

[train] Iteración 7, Coste = 0.24702386080066063


                                                                                

[train] Iteración 8, Coste = 0.23639060765926595


                                                                                

[train] Iteración 9, Coste = 0.2279737436978856


                                                                                

Accuracy  0.92446
Tiempo de ejecución (s)  134.15969133377075




Número de workers  5


                                                                                

[train] Iteración 0, Coste = 0.7774200607106855


                                                                                

[train] Iteración 1, Coste = 0.4415983651298855


                                                                                

[train] Iteración 2, Coste = 0.326464814625356


                                                                                

[train] Iteración 3, Coste = 0.2798865763546628


                                                                                

[train] Iteración 4, Coste = 0.25548073354995154


                                                                                

[train] Iteración 5, Coste = 0.24020705625816438


                                                                                

[train] Iteración 6, Coste = 0.22954971549540082


                                                                                

[train] Iteración 7, Coste = 0.22157284195259702


                                                                                

[train] Iteración 8, Coste = 0.215309415359924


                                                                                

[train] Iteración 9, Coste = 0.21022030440207853


                                                                                

Accuracy  0.92976
Tiempo de ejecución (s)  132.11510372161865




Número de workers  6


                                                                                

[train] Iteración 0, Coste = 1.7286200123595807


                                                                                

[train] Iteración 1, Coste = 0.8858963378983948


                                                                                

[train] Iteración 2, Coste = 0.5795149282332092


                                                                                

[train] Iteración 3, Coste = 0.44894946769269023


                                                                                

[train] Iteración 4, Coste = 0.37609948317511344


                                                                                

[train] Iteración 5, Coste = 0.32995206908898006


                                                                                

[train] Iteración 6, Coste = 0.298538824628371


                                                                                

[train] Iteración 7, Coste = 0.27606710958062475


                                                                                

[train] Iteración 8, Coste = 0.2593594020771583


                                                                                

[train] Iteración 9, Coste = 0.2465363705719562


[Stage 45:>                                                         (0 + 2) / 2]

Accuracy  0.91464
Tiempo de ejecución (s)  130.7578227519989


                                                                                