In [1]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
import numpy as np
import time

# Iniciar random
np.random.seed(6)

# Variables globales con información para acelerar procesamiento
g_filas = g_cols = 0    # Serán números (1.000.000 y 11 en este caso)

ITER = 10
APRN = 0.5
LMBD = 0.000002

In [2]:
# linea es un string de la forma:
# "n0, n1, n2, ..., n10, r" donde nX es una propiedad del tráfico y r es
# su clasificación (1 si es botnet, 0 si no lo es)
def formato_inicial(linea):
    aux   = [float(i) for i in linea.split(',')]
    datos = np.array(aux[:-1])
    resul = aux[-1]
    return (datos, resul)

# Devuelve un RDD de los datos del fichero fn.
# Cada registro del RDD es una tupla (X, y)
#  X: np.array de los datos
#  y: clasificación (1 = botnet, 0 = no botnet)
def readFile(fn):
    global g_filas, g_cols
    sc      = SparkContext("local[*]", "BotnetParalelaPruebas")
    todo    = sc.textFile(fn)
    ret     = todo.map(formato_inicial)
    g_filas = todo.count()
    g_cols  = len(ret.take(1)[0][0])
    return ret

t1  = time.time()
rdd = readFile("../../../datos/botnet_tot_syn_l.csv")
t2  = time.time()
print("Tiempo transcurrido: {} segundos.".format(t2 - t1))

Tiempo transcurrido: 3.6555304527282715 segundos.


In [3]:
# rdd: RDD de 1.000.000 de registros (X, y)
#  X es un array con 11 flotantes
#  y es 1 ó 0
# Devuelve un np.array con las medias de las 11 columnas
def calcular_medias(rdd):
    global g_filas
    medias = np.array(rdd.reduce(lambda x, y: x + y)[0])/g_filas
    return medias

# rdd: RDD de 1.000.000 de registros (X, y)
#  X es un array con 11 flotantes
#  y es 1 ó 0
# medias: np.array de las medias de las 11 columnas
def calcular_stdev(rdd, medias):
    global g_filas
    parcial  = np.array(rdd.map(lambda x: (x[0]-medias)**2).reduce(lambda x, y: x + y))
    varianza = parcial/g_filas
    stdev    = np.sqrt(varianza)
    return stdev

# Toma por parámetro un RDD de 1.000.000 de registros.
# Cada registro tiene la forma (X, y)
# X es un array con 11 flotantes
# y es 1 ó 0
# Devuelve un RDD equivalente donde la X de cada tupla está
# reescalada a N(0, 1) (media = 0, desv. est. = 1)
def normalizar(rdd):
    global g_filas
    t1 = time.time()
    medias = calcular_medias(rdd)
    t2 = time.time()
    stdevs = calcular_stdev(rdd, medias)
    t3 = time.time()
    normal = rdd.map(lambda x: ((x[0] - medias)/stdevs, x[1]))
    t4 = time.time()
    print("Tiempo medias: {} s.".format(t2 - t1))
    print("Tiempo stdevs: {} s.".format(t3 - t2))
    print("Tiempo normal: {} s.".format(t4 - t3))
    return normal
    

t1 = time.time()
datos = normalizar(rdd)
t2 = time.time()
print("Tiempo transcurrido: {} segundos.".format(t2 - t1))
# La media tiene que ser ~0
# La desviación estándar tiene que ser ~1

# 1.2345e-05 = 0.000012345

Tiempo medias: 126.57270693778992 s.
Tiempo stdevs: 3.8516030311584473 s.
Tiempo normal: 1.1682510375976562e-05 s.
Tiempo transcurrido: 130.4247546195984 segundos.


In [4]:
# fila: np.array de 11 flotantes
# salida: predicción para esos 11 flotantes. ŷ.
def sigm(fila, pesos, sesgo):
    entrada = np.sum(fila * pesos) + sesgo
    salida  = 1 / (1 + np.exp(-entrada))
    return salida

def presio(datos, pesos, sesgo):
    global g_filas
    inter = rdd.map(lambda x: (x[1]*np.log(sigm(x[0], pesos, sesgo)))+((1-x[1])*np.log(1-sigm(x[0], pesos, sesgo))))
    suma  = inter.reduce(lambda x, y: x + y)
    coste = -(1/g_filas) * suma
    return coste

# datos: RDD de 1M de tuplas (X, y). X es un np.array de 11 flotantes. Normalizados.
def entrenar(datos, iteraciones, aprendizaje):
    global g_filas, g_cols
    pesos  = np.zeros([g_cols, ])
    dpesos = np.zeros([g_cols, ])
    dsesgo = 0
    sesgo  = 0

    for it in range(iteraciones):
        t1 = time.time()
        dpsum  = rdd.map(lambda x: x[0]*(sigm(x[0], pesos, sesgo)-x[1])).reduce(lambda x,y: x+y)
        dpesos = (1/g_filas) * dpsum
        dsesgo = rdd.map(lambda x: sigm(x[0], pesos, sesgo)-x[1]).reduce(lambda x,y: x+y)/g_filas
        
        pesos  = pesos - dpesos * aprendizaje
        sesgo  = sesgo - dsesgo * aprendizaje
        
        coste = rdd.map(lambda x: (x[1]*np.log(sigm(x[0], pesos, sesgo)))+((1-x[1])*np.log(1-sigm(x[0], pesos, sesgo))))\
                .reduce(lambda x,y: x+y)
        coste = -(1/g_filas) * coste
        t2 = time.time()
        print(str(it) + ": " + str(coste) + " [{}]".format(t2 - t1))
    return pesos, sesgo

t1 = time.time()
PESOS, SESGO = entrenar(datos, ITER, APRN)
t2 = time.time()
print("Tiempo transcurrido: {} segundos.".format(t2 - t1))

0: nan [25.739523887634277]
1: nan [23.08590340614319]


KeyboardInterrupt: 

In [None]:
def precision(datos, pesos, sesgo):
    global g_filas
    paso1 = datos.map(lambda x: (sigm(x[0], pesos, sesgo), x[1]))
    paso2 = paso1.map(lambda x: (np.rint(x[0]) == x[1], 1))
    preds = paso2.reduceByKey(lambda x, y: x + y)
    
    preci = preds.collectAsMap()[True]/g_filas
    return preci

t1 = time.time()
precision_ = precision(datos, PESOS, SESGO)
t2 = time.time()
print("La precisión es: {} [{}]".format(precision_, (t2 - t1)))