# Apache Spark para Big Data

Trabajo realizado por Gonzalo Campos Mejías y Alejandro Medina Durán

## Índice

<p><span style="color: #0000ff; font-size: large;"><strong>
<a href="#1">1. Introducción a spark</a></strong> </span></p>
- <a href="#1.1">1.1 Creación de un RDD </a></strong> </span></p>
- <a href="#1.2">1.2 Graphframe </a></strong> </span></p>
- <a href="#1.3">1.3 Cálculo de estadísticas </a></strong> </span></p>
- <a href="#1.4">1.4 Transformaciones </a></strong> </span></p>
- <a href="#1.5">1.5 Acción collect </a></strong> </span></p>
- <a href="#1.6">1.6 VectorIndexer </a></strong> </span></p>

<p><span style="color: #0000ff; font-size: large;"><strong>
<a href="#2">2. Muestreo de RDDs</a></strong> </span></p>
- <a href="#2.1">2.1 La transformación sample </a></strong> </span></p>
- <a href="#2.2">2.2 La acción takeSample </a></strong> </span></p>

<p><span style="color: #0000ff; font-size: large;"><strong>
<a href="#3">3. Operaciones en RDDs </a></strong> </span></p>
- <a href="#3.1">3.1 Combinaciones usando cartesian </a></strong> </span></p>

<p><span style="color: #0000ff; font-size: large;"><strong>
<a href="#4">4. Agregaciones de datos en RDDs</a></strong> </span></p>

<p><span style="color: #0000ff; font-size: large;"><strong>
<a href="#5">5. RDDs de pares clave/valor</a></strong> </span></p>
- <a href="#5.1">5.1 Agregaciones de datos con RDDs de pares clave/valor</a></strong> </span></p>

<p><span style="color: #0000ff; font-size: large;"><strong>
<a href="#6">6. Análisis de datos </a></strong> </span></p>
- <a href="#6.1">6.1 Un RDD de vectores densos</a></strong> </span></p>
- <a href="#6.2">6.2 Cálculo de estadísticas</a></strong> </span></p>
- <a href="#6.3">6.3 Correlaciones</a></strong> </span></p>

<p><span style="color: #0000ff; font-size: large;"><strong>
<a href="#7">7. SQL</a></strong> </span></p>

<p><span style="color: #0000ff; font-size: large;"><strong>
<a href="#8">8. Regresión Logística</a></strong> </span></p>
- <a href="#8.1">8.1 Matriz de correlación</a></strong> </span></p>
- <a href="#8.2">8.2 Pruebas de hipótesis</a></strong> </span></p>

<p><span style="color: #0000ff; font-size: large;"><strong>
<a href="#9">9. Árboles de decisión</a></strong> </span></p>

<a name="1"></a>
## <span style="color:black">1. Introducción a spark</span>

En primer lugar, vamos a importar todas las librerías necesarias para la ejecución del código. Se usarán para el procesamiento de datos y el análisis en el entorno de Spark y Python:
- `findspark` facilita la configuración del entorno de Spark en Python.
- la clase `SparkSession` de pyspark.sql permite trabajar con Spark SQL para el procesamiento de datos estructurados.
- `urllib.request` y `os` proporcionan herramientas para trabajar con archivos y realizar operaciones de red.
- `GraphFrame` permite trabajar con grafos en Spark.
- `numpy` y `pandas` son librerías populares para manipulación y análisis de datos en Python.
- Las funcionalidades adicionales de `pyspark.sql.functions`, `pyspark.ml.feature`, `pyspark.ml.evaluation`, `pyspark.mllib.stat`, `pyspark.mllib.regression` y `pyspark.mllib.classification` ofrecen herramientas para realizar operaciones avanzadas de análisis de datos, incluyendo la construcción y evaluación de modelos de aprendizaje automático, estadísticas básicas y manipulación de datos en Spark DataFrames.

In [1]:
import findspark
from pyspark.sql import SparkSession

import urllib.request
import os

from time import time
from graphframes import GraphFrame

from math import sqrt

import numpy as np
from numpy import array
import pandas as pd

from pyspark.sql.functions import col, split, expr
from pyspark.sql.types import FloatType
from pyspark.sql import Row

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.mllib.stat import Statistics 
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

El primer paso sería la iniciar una sesión en Spark en un entorno de Python utilizando la biblioteca findspark y la clase SparkSession de PySpark.

In [2]:
findspark.init()
spark = SparkSession.builder \
    .appName("BigDataApacheSpark") \
    .getOrCreate()

Mostramos la sesión de spark:

In [3]:
spark

<a name="1.1"></a>
### <span style="color:black">1.1 Creación de un RDD</span>

La estructura de datos básica de Spark es el **Resilient Distributed Dataset** o **RDD**, un RDD es una colección distribuida de elementos. Todo el trabajo en Spark se realiza como la creación de nuevos RDDs, la transformación de RDDs existentes, o la llamada a acciones en RDDs para calcular un resultado. Spark distribuye automáticamente los datos contenidos en los RDDs a través de su cluster y paraleliza las operaciones que realiza sobre ellos.

El conjunto de datos usados para este proyecto será de la competición KDD Cup 1999, el cual cuenta con casi 5 millones de datos de interacciones de red y se encuentra en el siguiente enlace: [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99).

A continuación, se crea un DataFrame llamado *df* a partir de un archivo del archivo descargado. Este DataFrame contendrá una sola columna llamada "value", donde cada fila corresponderá a una línea del archivo de texto. De esta forma podemos cargar los datos en un formato estructurado que se pueda manipular y analizar utilizando el procesamiento distribuido de Spark.

In [3]:
url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz"
filename = "kddcup.data.gz"

def load_data(url, filename, spark):
    if not os.path.isfile(filename):
        urllib.request.urlretrieve(url, filename)

    df = spark.read.text(filename)
    return df

df = load_data(url, filename, spark)

 > Otra forma de crear un RDD sería usando una lista ya existente con `parallelize`

In [8]:
ls = range(50)
new_df = spark.sparkContext.parallelize(ls)
print("Número de elementos:", new_df.count())

Número de elementos: 50


Con `count` se calcula el número total de filas en el DataFrame

In [9]:
df.count()

4898431

Podemos usar `take` para devolver las primeras 5 filas del DataFrame

In [10]:
df.take(5)

[Row(value='0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.'),
 Row(value='0,tcp,http,SF,162,4528,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,normal.'),
 Row(value='0,tcp,http,SF,236,1228,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,2,2,1.00,0.00,0.50,0.00,0.00,0.00,0.00,0.00,normal.'),
 Row(value='0,tcp,http,SF,233,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,3,3,1.00,0.00,0.33,0.00,0.00,0.00,0.00,0.00,normal.'),
 Row(value='0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,3,3,0.00,0.00,0.00,0.00,1.00,0.00,0.00,4,4,1.00,0.00,0.25,0.00,0.00,0.00,0.00,0.00,normal.')]

<a name="1.2"></a>
### <span style="color:black">1.2 Graphframe</span>

PySpark y GraphFrames  se usan para analizar un conjunto de datos. En primer lugar, se divide una columna llamada "value" en columnas separadas, utilizando la función split. Luego, se crea un nuevo DataFrame llamado df_split con las columnas relevantes renombradas para representar los nodos, tomando valores únicos de la columna "id", y las aristas de un grafo, utilizando las columnas "id" y "service" como origen y destino, respectivamente, y la columna "attack_type" como la relación entre las aristas.

Se crea un objeto GraphFrame con los nodos y aristas. Luego, se filtran las aristas para encontrar conexiones asociadas con un tipo de ataque específico ("buffer_overflow.") y se muestran estas conexiones.

Finalmente, se utiliza el algoritmo PageRank para calcular las clasificaciones de los nodos en el grafo en función de su importancia relativa, y se muestran los 10 nodos con la puntuación más alta, es decir, los mejores clasificados en un rango de [0 - 1].

In [12]:
# Split the "value" column into different columns
df_split = df.withColumn("values", split(df.value, ",")).selectExpr("values[0] as id", "values[1] as protocol", "values[2] as service", "values[3] as flag", "values[41] as attack_type")

nodes = df_split.select("id").distinct() # Create a DataFrame of nodes (vertices) with the column "id" as the identifier

edges = df_split.selectExpr("id as src", "service as dst", "attack_type as relationship") # Create a DataFrame of edges

graph = GraphFrame(nodes, edges)

print("Número de nodos:", graph.vertices.count())
print("Número de aristas:", graph.edges.count())

connections_with_attack = graph.edges.filter("relationship = 'buffer_overflow.'") # Finding the connections associated with a specific type of attack
connections_with_attack.show()

ranks = graph.pageRank(resetProbability=0.15, maxIter=10)
top_ranks = ranks.vertices.orderBy(ranks.vertices.pagerank.desc()).limit(10)
top_ranks.show()



Número de nodos: 9883
Número de aristas: 4898431
+---+--------+----------------+
|src|     dst|    relationship|
+---+--------+----------------+
|184|  telnet|buffer_overflow.|
|305|  telnet|buffer_overflow.|
|150|  telnet|buffer_overflow.|
| 60|  telnet|buffer_overflow.|
|158|  telnet|buffer_overflow.|
|113|  telnet|buffer_overflow.|
| 53|  telnet|buffer_overflow.|
|  0|ftp_data|buffer_overflow.|
|  0|ftp_data|buffer_overflow.|
|  0|ftp_data|buffer_overflow.|
|  0|ftp_data|buffer_overflow.|
|  7|     ftp|buffer_overflow.|
|169|  telnet|buffer_overflow.|
|179|  telnet|buffer_overflow.|
| 49|  telnet|buffer_overflow.|
|290|  telnet|buffer_overflow.|
| 31|  telnet|buffer_overflow.|
|  0|ftp_data|buffer_overflow.|
|  0|ftp_data|buffer_overflow.|
|  0|ftp_data|buffer_overflow.|
+---+--------+----------------+
only showing top 20 rows





+-----+------------------+
|   id|          pagerank|
+-----+------------------+
|  296|0.9999999999998862|
|  467|0.9999999999998862|
|25969|0.9999999999998862|
|11888|0.9999999999998862|
|39458|0.9999999999998862|
|39103|0.9999999999998862|
|31118|0.9999999999998862|
|22596|0.9999999999998862|
|  691|0.9999999999998862|
| 1512|0.9999999999998862|
+-----+------------------+



<a name="1.3"></a>
### <span style="color:black">1.3 Cálculo de estadísticas</span>

Podemos seleccionar las columnas específicas con `col` y luego mostrar estadísticas de esas columnas usando `describe()`

In [13]:
split_column = split(df['value'], ',')
df_column = df.withColumn('features', split_column)

value_column = df_column.select(col("features")[0].cast("float").alias("duration"), # number of seconds of the connection
                                     col("features")[4].cast("float").alias("src_bytes"), # number of data bytes from source to destination 
                                     col("features")[8].cast("float").alias("urgent")) # number of urgent packets 

value_column.describe().show()

+-------+-----------------+------------------+--------------------+
|summary|         duration|         src_bytes|              urgent|
+-------+-----------------+------------------+--------------------+
|  count|          4898431|           4898431|             4898431|
|   mean|48.34243046395876|1834.6211678800823|7.961733052889793E-6|
| stddev|723.3298112546826|  941431.070365515|0.007215083685169...|
|    min|              0.0|               0.0|                 0.0|
|    max|          58329.0|       1.3799639E9|                14.0|
+-------+-----------------+------------------+--------------------+



<a name="1.4"></a>
### <span style="color:black">1.4 Transformaciones</span>

Se puede hacer uso de `filter`, por ejemplo, para contar en números de ataque de `neptune`

 > En el siguiente código hemos medido el tiempo transcurrido para contar los elementos en el RDD. Hemos hecho esto porque queríamos destacar que los cálculos reales, que son distribuidos en Spark tienen lugar cuando ejecutamos *acciones* y no *transformaciones*. En este caso `count` es la acción que ejecutamos sobre el RDD. Podemos aplicar tantas transformaciones como queramos sobre nuestro RDD y no se producirá prácticamente ningún cómputo hasta que llamemos a la primera acción que, en este caso tarda unos segundos en completarse.

In [5]:
t0 = time()
neptune_df = df.filter(expr("value LIKE '%neptune.%'"))
t1 = time() - t0

t2 = time()
neptune_count = neptune_df.count()
t3 = time() - t2

neptune_df.show(5, truncate=False)

print ("Hay {} ataques 'neptune'".format(neptune_count))
print("Tiempo de la transformacion 'filter' {} segundos".format(round(t1,3)))
print ("Tiempo de ejecucion de la accion 'count' {} segundos".format(round(t3,3)))

+-----------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                          |
+-----------------------------------------------------------------------------------------------------------------------------------------------+
|0,tcp,telnet,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,1,0.50,1.00,0.00,0.00,0.50,1.00,0.00,1,2,1.00,0.00,1.00,1.00,1.00,0.50,0.00,0.00,neptune.|
|0,tcp,telnet,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3,2,0.67,1.00,0.00,0.00,0.67,0.67,0.00,2,3,1.00,0.00,0.50,0.67,1.00,0.67,0.00,0.00,neptune.|
|0,tcp,telnet,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,4,3,0.75,1.00,0.00,0.00,0.75,0.50,0.00,3,4,1.00,0.00,0.33,0.50,1.00,0.75,0.00,0.00,neptune.|
|0,tcp,telnet,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,5,4,0.80,1.00,0.00,0.00,0.80,0.40,0.00,4,5,1.00,0.00,0.25,0.40,1.00,0.8

Usando la transformación `map`, podemos aplicar una función a cada elemento de nuestro RDD.

In [19]:
t0 = time()
dict_data = df.rdd.map(lambda d: (d.value.split(",")[41], d.value.split(",")[:-1])) # [41] is the position of the tag
t1 = time() - t0

t2 = time()
print(dict_data.take(1))
t3 = time() - t2

print ("Tiempo de ejecución de la transformación 'map' {} segundos".format(round(t1,3)))
print ("Tiempo de ejecución de la acción 'take' {} segundos".format(round(t3,3)))

[('normal.', ['0', 'tcp', 'http', 'SF', '215', '45076', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '0', '0', '0.00', '0.00', '0.00', '0.00', '0.00', '0.00', '0.00', '0.00'])]
Tiempo de ejecución de la transformación 'map' 0.002 segundos
Tiempo de ejecución de la acción 'take' 2.196 segundos


<a name="1.5"></a>
### <span style="color:black">1.5 Acción `collect`</span>

`collect` trae todos los elementos del RDD a la memoria para que podamos trabajar con ellos. Por esta razón hay que usarla con cuidado, especialmente cuando se trabaja con RDDs grandes, por eso mismo usaremos un dataset con aproximadamente un 10% del tamaño del dataset original.  

In [4]:
url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz"
filename = "kddcup.data_10_percent.gz"

df = load_data(url, filename, spark)

t0 = time()
df_collect = df.collect()
t1 = time() - t0
print ("Tiempo de ejecución {} segundos".format(round(t1,3)))

Tiempo de ejecución 9.913 segundos


Cada nodo trabajador de Spark que tiene un fragmento del RDD tiene que coordinarse para recuperar su parte, y luego juntar todos los resultados. 

Como ejemplo, combinando todo lo anterior, recogeremos todas las interacciones `neptune` como pares clave-valor.   

In [8]:
dict_data = df.rdd.map(lambda d: (d[0].split(",")[41], d[0].split(","))) # [41] is the position of the tag
neptune_keys = dict_data.filter(lambda d: d[0] == "neptune.")

t0 = time()
all_neptune = neptune_keys.collect()
all_neptune_count = len(all_neptune)
t1 = time() - t0

print ("Hay {} ataques 'neptune'".format(all_neptune_count))
print ("Tiempo de ejecución {} segundos".format(round(t1,3)))

Hay 107201 ataques 'neptune'
Tiempo de ejecución 10.167 segundos


El nuevo procedimiento requiere más tiempo. Esto se debe a que recuperamos todos los datos con `collect` y luego usamos `len` de Python en la lista resultante. Antes sólo contábamos el número total de elementos en el RDD usando `count`.

<a name="1.6"></a>
### <span style="color:black">1.6 VectorIndexer</span>

`VectorAssembler` es un transformador que combina una lista dada de columnas en una única columna vectorial. Es útil para combinar características originales y características generadas por diferentes transformadores de características en un único vector, con el fin de entrenar modelos de Machine Learning como regresión logística y árboles de decisión.

VectorAssembler acepta los siguientes tipos de columnas de entrada: todos los tipos numéricos, booleanos y vectoriales. En cada fila, los valores de las columnas de entrada se concatenarán en un vector en el orden especificado. 

Queremos combinar *duration*, *src_bytes*, *urgent* en un único vector de características llamado features y utilizarlo para predecir el servicio(*service*). Si establecemos las columnas de entrada de VectorAssembler en *duration*, *src_bytes* y *urgent* y la columna de salida en features, tras la transformación obtendremos el siguiente DataFrame:

In [20]:
value_column = df_column.select(col("features")[0].cast("float").alias("duration"), # number of seconds of the connection
                                     col("features")[4].cast("float").alias("src_bytes"), # number of data bytes from source to destination
                                      col("features")[2].cast("string").alias("service"),
                                     col("features")[8].cast("float").alias("urgent")) # number of urgent packets 

assembler = VectorAssembler(inputCols=["duration", "src_bytes", "urgent"],
                            outputCol="features")
output = assembler.transform(dataset=value_column)
output.select('features','service').show()

+---------------+-------+
|       features|service|
+---------------+-------+
|[0.0,215.0,0.0]|   http|
|[0.0,162.0,0.0]|   http|
|[0.0,236.0,0.0]|   http|
|[0.0,233.0,0.0]|   http|
|[0.0,239.0,0.0]|   http|
|[0.0,238.0,0.0]|   http|
|[0.0,235.0,0.0]|   http|
|[0.0,234.0,0.0]|   http|
|[0.0,239.0,0.0]|   http|
|[0.0,181.0,0.0]|   http|
|[0.0,184.0,0.0]|   http|
|[0.0,185.0,0.0]|   http|
|[0.0,239.0,0.0]|   http|
|[0.0,181.0,0.0]|   http|
|[0.0,236.0,0.0]|   http|
|[0.0,233.0,0.0]|   http|
|[0.0,238.0,0.0]|   http|
|[0.0,235.0,0.0]|   http|
|[0.0,234.0,0.0]|   http|
|[0.0,239.0,0.0]|   http|
+---------------+-------+
only showing top 20 rows



<a name="2"></a>
### <span style="color:black">2. Muestreo de RDDs</span>

En Spark existen dos operaciones de muestreo, la transformación `sample` y la acción `takeSample`.
- Usando una transformación podemos decirle a Spark que aplique transformaciones sucesivas sobre una muestra de un RDD dado.
- Usando una acción recuperamos una muestra dada y podemos tenerla en memoria local para realizar operaciones sobre sus datos.  

<a name="2.1"></a>
### <span style="color:black">2.1 La transformación `sample`</span>

La transformación `sample` toma hasta tres parámetros:
- El primero es si el muestreo se realiza con reemplazo o no.
- El segundo es el tamaño de la muestra.
- Por último, podemos proporcionar opcionalmente una *semilla aleatoria*, la cual no se establece para que valga 0, y así obtener siempre los mismos resultados.  

In [9]:
url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz"
filename = "kddcup.data.gz"

df = load_data(url, filename, spark)
df_sample = df.sample(False, 0.1)

sample_size = df_sample.count()
total_size = df.count()

print ("El tamaño de la muestra es {} de un total de {}".format(sample_size, total_size))

El tamaño de la muestra es 488744 de un total de 4898431


Si queremos tener una aproximación de la proporción de interacciones `neptune.`, podríamos hacerlo contando el número total de etiquetas. Sin embargo, si queremos una respuesta más rápida y no necesitamos la respuesta exacta, sino sólo una aproximación, podemos hacerlo con `sample`:   

In [11]:
df_sample_items = df_sample.rdd.map(lambda d: d["value"].split(","))
df_sample_neptune = df_sample_items.filter(lambda d: "neptune." in d)

t0 = time()
df_sample_neptune_count = df_sample_neptune.count()
t1 = time() - t0

sample_neptune_ratio = df_sample_neptune_count / float(sample_size)
print ("La relación de interacciones 'neptune' es {}".format(round(sample_neptune_ratio,3))) 
print ("Tiempo de ejecución {} segundos".format(round(t1,3)))

La relación de interacciones 'neptune' es 0.219
Tiempo de ejecución 7.253 segundos


Comparando este tiempo de ejecución con el de la celda siguiente, podemos ver una gran diferencia:

In [13]:
df_neptune = df.rdd.map(lambda d: d["value"].split(",")).filter(lambda d: "neptune." in d)

t0 = time()
df_neptune_count = df_neptune.count()
t1 = time() - t0

neptune_ratio = df_neptune_count / float(total_size)
print ("La relación de interacciones 'neptune' es {}".format(round(neptune_ratio,3))) 
print ("Tiempo de ejecución {} segundos".format(round(t1,3)))

La relación de interacciones 'neptune' es 0.219
Tiempo de ejecución 58.45 segundos


Podemos observar una ganancia de tiempo, ya que cuantas más transformaciones apliquemos después del muestreo (es decir, el dataframe creado con `sample`), mayor será esta ganancia. Esto se debe a que sin muestreo todas las transformaciones se aplican al conjunto completo de datos.  

<a name="2.2"></a>
### <span style="color:black">2.2 La acción `takeSample`</span>

Si lo que necesitamos es tomar una muestra de datos en bruto de nuestro RDD en la memoria local con el fin de ser utilizado por otras bibliotecas externas a Spark, se puede utilizar `takeSample`. 

La sintaxis es muy similar, pero en este caso especificamos el número de elementos, al contrario de antes donde era el tamaño de la muestra como fracción del tamaño completo de los datos.  

In [14]:
t0 = time()
df_sample = df.rdd.takeSample(False, 400000)
df_sample_neptune = [x for x in list(df_sample) if "neptune." in x["value"]]
t1 = time() - t0

df_sample_neptune_size = len(df_sample_neptune)

neptune_ratio = df_sample_neptune_size / 400000.0
print ("La relación de interacciones 'neptune' es {}".format(neptune_ratio))
print ("Tiempo de ejecución {} segundos".format(round(t1,3)))

La relación de interacciones 'neptune' es 0.21896
Tiempo de ejecución 64.3 segundos


Ha consumido más tiempo que antes, incluso con una muestra ligeramente menor. La razón es que Spark se limitó a distribuir la ejecución del proceso de muestreo, y el filtrado y la división de los resultados se realizaron localmente en un único nodo.  

<a name="3"></a>
### <span style="color:black">3. Operaciones en RDDs</span>

Spark soporta muchas de las operaciones que tenemos en conjuntos matemáticos, como la unión y la intersección. Los RDDs no son conjuntos reales, y por lo tanto operaciones como la unión de RDDs no elimina duplicados. En este apartado se verán las operaciones `exceptAll` y `subtract`.      

Podemos obtener el número de las interacciones de ataque restando las normales del RDD original con `subtract`, el cual elimina los elementos del primer DataFrame que también están presentes en el segundo, en otras palabras, *df_attack* contendrá todos los elementos de *df* que no están presentes en *df_normal*. Si hay filas duplicadas en *df* que coinciden con las filas de *df_normal*, solo se quedará con una instancia de cada duplicado :  

In [16]:
df_normal = df.filter(col("value").contains("normal."))
df_attack_count = df.subtract(df_normal).count()

df_count = df.count()
df_normal_count = df_normal.count()
print ("Hay {} interacciones normales y {} ataques, de un total de {} interacciones".format(df_normal_count,df_attack_count,df_count))

Hay 972781 interacciones normales y 262178 ataques, de un total de 4898431 interacciones


La operación `exceptAll` elimina todas las filas de *df* que también están presentes en *df_normal*, incluso si hay duplicados

In [17]:
df_attack_count = df.exceptAll(df_normal).count()

print ("Hay {} interacciones normales y {} ataques, de un total de {} interacciones".format(df_normal_count,df_attack_count,df_count))

Hay 972781 interacciones normales y 3925650 ataques, de un total de 4898431 interacciones


<a name="3.1"></a>
### <span style="color:black"> 3.1 Combinaciones usando `cartesian`</span>

Podemos calcular el producto cartesiano entre dos RDDs utilizando la transformación `cartesiana`, devolviendo todos los posibles pares de elementos entre dos RDDs. En nuestro caso la utilizaremos para generar todas las combinaciones posibles entre servicio y ataques en nuestras interacciones de red.  

En primer lugar necesitamos aislar cada colección de valores en dos RDDs separados, para ello utilizaremos `distinct`. Sabemos que los ataques están en la columna 41 y el servicio en la 2.  

In [4]:
data = df.rdd.map(lambda d: d.value.split(","))
attacks = data.map(lambda d: d[41]).distinct()
attacks.collect()

['normal.',
 'buffer_overflow.',
 'loadmodule.',
 'perl.',
 'neptune.',
 'smurf.',
 'guess_passwd.',
 'pod.',
 'teardrop.',
 'portsweep.',
 'ipsweep.',
 'land.',
 'ftp_write.',
 'back.',
 'imap.',
 'satan.',
 'phf.',
 'nmap.',
 'multihop.',
 'warezmaster.',
 'warezclient.',
 'spy.',
 'rootkit.']

In [20]:
services = data.map(lambda d: d[2]).distinct()
services.collect()

['http',
 'smtp',
 'domain_u',
 'auth',
 'finger',
 'telnet',
 'eco_i',
 'ftp',
 'ntp_u',
 'ecr_i',
 'other',
 'urp_i',
 'private',
 'pop_3',
 'ftp_data',
 'netstat',
 'daytime',
 'ssh',
 'echo',
 'time',
 'name',
 'whois',
 'domain',
 'mtp',
 'gopher',
 'remote_job',
 'rje',
 'ctf',
 'supdup',
 'link',
 'systat',
 'discard',
 'X11',
 'shell',
 'login',
 'imap4',
 'nntp',
 'uucp',
 'pm_dump',
 'IRC',
 'Z39_50',
 'netbios_dgm',
 'ldap',
 'sunrpc',
 'courier',
 'exec',
 'bgp',
 'csnet_ns',
 'http_443',
 'klogin',
 'printer',
 'netbios_ssn',
 'pop_2',
 'nnsp',
 'efs',
 'hostnames',
 'uucp_path',
 'sql_net',
 'vmnet',
 'iso_tsap',
 'netbios_ns',
 'kshell',
 'urh_i',
 'http_2784',
 'harvest',
 'aol',
 'tftp_u',
 'http_8001',
 'tim_i',
 'red_i']

Una vez tenemos los protocolos y los servicios podemos hacer el producto cartesiano.  

In [21]:
product = attacks.cartesian(services).collect()
print ("Existen {} combinaciones de ataque-servicio".format(len(product)))

Existen 1610 combinaciones de ataque-servicio


En Spark o en análisis de datos en general, el producto cartesiano se utiliza para crear todas las combinaciones posibles entre dos conjuntos de datos. Esto es útil cuando se desea analizar o comparar todos los pares de elementos de dos conjuntos para realizar operaciones como agrupamientos, filtrados o cálculos complejos.

<a name="4"></a>
### <span style="color:black">4. Agregaciones de datos en RDDs</span>

Podemos agregar datos RDD en Spark utilizando tres acciones diferentes: `reduce`, `fold`, y `aggregate`.

La función que pasamos a `reduce` obtiene y devuelve elementos del mismo tipo del RDD. Si queremos sumar duraciones tenemos que extraer ese elemento a un nuevo RDD.

Si queremos saber la duración total de nuestras interacciones para interacciones normales y de ataque. Podemos utilizar `reduce` de la siguiente manera:

In [22]:
normal_duration_data = data.filter(lambda d: d[41]=="normal.").map(lambda d: int(d[0]))
attack_duration_data = data.filter(lambda d: d[41]!="normal.").map(lambda d: int(d[0]))

Ahora podemos aplicar `reduce` a estos nuevos RDDs.  

In [23]:
total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)

print ("La duración total de las interacciones 'normales' es de {}".format(total_normal_duration))
print ("La duración total de las interacciones de 'ataque' es {}".format(total_attack_duration))

La duración total de las interacciones 'normales' es de 211895753
La duración total de las interacciones de 'ataque' es 24906307


Podemos utilizar los recuentos para calcular las medias de duración.  

In [24]:
print ("La duración media de las interacciones 'normales' es {}".format(round(total_normal_duration/float(normal_duration_data.count()),3)))
print ("La duración media de las interacciones de 'ataque' es {}".format(round(total_attack_duration/float(attack_duration_data.count()),3)))

La duración media de las interacciones 'normales' es 217.825
La duración media de las interacciones de 'ataque' es 6.345


La mejor forma de realizar estos cálculos sería haciendo uso de la acción `aggregate`, que permite que lo que devolvamos no tenga porque ser del mismo tipo que el RDD sobre el que estamos trabajando. A continuación, proporcionamos dos funciones, la primera se utiliza para combinar los elementos de nuestro RDD con el acumulador y la segunda función es necesaria para combinar dos acumuladores.  

In [25]:
normal_sum_count = normal_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print ("La duración media de las interacciones 'normales' es {}".format(round(normal_sum_count[0]/float(normal_sum_count[1]),3)))

La duración media de las interacciones 'normales' es 217.825


In [26]:
attack_sum_count = attack_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print ("La duración media de las interacciones de 'ataque' es {}".format(round(attack_sum_count[0]/float(attack_sum_count[1]),3)))

La duración media de las interacciones de 'ataque' es 6.345


En la agregación anterior, el primer elemento del acumulador mantiene la suma total, mientras que el segundo elemento mantiene el recuento. Combinar un acumulador con un elemento RDD consiste en sumar el valor e incrementar el recuento. Combinar dos acumuladores requiere sólo una suma por pares. 

<a name="5"></a>
### <span style="color:black">5. RDDs de pares clave/valor</span>

Spark proporciona funciones específicas para tratar con RDDs cuyos elementos son pares clave/valor, se pueden usar para procesar nuestro conjunto de datos de una forma más práctica y eficiente que la utilizada anteriormente.

Primero tenemos que crear un RDD adecuado, donde cada etiqueta es la clave y el resto de valores de la interacción se analiza como una fila que representa el valor.

In [30]:
dict_data = data.map(lambda d: (d[41], d)) # d[41] contains the network interaction tag
dict_data.take(1)

[('normal.',
  ['0',
   'tcp',
   'http',
   'SF',
   '215',
   '45076',
   '0',
   '0',
   '0',
   '0',
   '0',
   '1',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '1',
   '1',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '1.00',
   '0.00',
   '0.00',
   '0',
   '0',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   'normal.'])]

<a name="5.1"></a>
### <span style="color:black">5.1 Agregaciones de datos con RDDs de pares clave/valor</span>

Podemos utilizar todas las transformaciones y acciones disponibles para RDDs normales con RDDs de pares clave/valor.

Por ejemplo, tenemos una transformación `reduceByKey` que podemos utilizar de la siguiente manera para calcular la duración total de cada tipo de interacción de red. Podemos verificar que el resultado obtenido en _normal_ es el mismo que en el apartado anterior, y que las sumas de las duraciones de los ataques también coincide. 

In [33]:
dict_duration = data.map(lambda d: (d[41], float(d[0]))) 
durations_by_key = dict_duration.reduceByKey(lambda x, y: x + y)
durations_by_key.collect()

[('normal.', 211895753.0),
 ('buffer_overflow.', 2751.0),
 ('loadmodule.', 326.0),
 ('perl.', 124.0),
 ('neptune.', 2.0),
 ('smurf.', 0.0),
 ('guess_passwd.', 144.0),
 ('pod.', 0.0),
 ('teardrop.', 0.0),
 ('portsweep.', 24257982.0),
 ('ipsweep.', 13049.0),
 ('land.', 0.0),
 ('ftp_write.', 259.0),
 ('back.', 284.0),
 ('imap.', 72.0),
 ('satan.', 500.0),
 ('phf.', 18.0),
 ('nmap.', 0.0),
 ('multihop.', 1288.0),
 ('warezmaster.', 301.0),
 ('warezclient.', 627563.0),
 ('spy.', 636.0),
 ('rootkit.', 1008.0)]

Disponemos de una acción de recuento específica para pares clave/valor, `countByKey`.  

In [34]:
counts_by_key = dict_data.countByKey()
counts_by_key

defaultdict(int,
            {'normal.': 972781,
             'buffer_overflow.': 30,
             'loadmodule.': 9,
             'perl.': 3,
             'neptune.': 1072017,
             'smurf.': 2807886,
             'guess_passwd.': 53,
             'pod.': 264,
             'teardrop.': 979,
             'portsweep.': 10413,
             'ipsweep.': 12481,
             'land.': 21,
             'ftp_write.': 8,
             'back.': 2203,
             'imap.': 12,
             'satan.': 15892,
             'phf.': 4,
             'nmap.': 2316,
             'multihop.': 7,
             'warezmaster.': 20,
             'warezclient.': 1020,
             'spy.': 2,
             'rootkit.': 10})

Podemos utilizar `combineByKey` para calcular duraciones medias por tipo de la siguiente manera, el resultado asociado a cada tipo está en forma de par: "tag: (suma total, recuento)". 

In [35]:
sum_counts = dict_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # value, increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()

{'normal.': (211895753.0, 972781),
 'buffer_overflow.': (2751.0, 30),
 'loadmodule.': (326.0, 9),
 'perl.': (124.0, 3),
 'neptune.': (2.0, 1072017),
 'smurf.': (0.0, 2807886),
 'guess_passwd.': (144.0, 53),
 'pod.': (0.0, 264),
 'teardrop.': (0.0, 979),
 'portsweep.': (24257982.0, 10413),
 'ipsweep.': (13049.0, 12481),
 'land.': (0.0, 21),
 'ftp_write.': (259.0, 8),
 'back.': (284.0, 2203),
 'imap.': (72.0, 12),
 'satan.': (500.0, 15892),
 'phf.': (18.0, 4),
 'nmap.': (0.0, 2316),
 'multihop.': (1288.0, 7),
 'warezmaster.': (301.0, 20),
 'warezclient.': (627563.0, 1020),
 'spy.': (636.0, 2),
 'rootkit.': (1008.0, 10)}

Si queremos obtener las medias, tenemos que hacer la división antes de recoger los resultados.  

In [40]:
duration_means = sum_counts.map(lambda d: (d[0], round(d[1][0] / d[1][1], 3))).collectAsMap()

for tag in sorted(duration_means, key=duration_means.get, reverse=True):
    print (tag, duration_means[tag])

portsweep. 2329.586
warezclient. 615.258
spy. 318.0
normal. 217.825
multihop. 184.0
rootkit. 100.8
buffer_overflow. 91.7
perl. 41.333
loadmodule. 36.222
ftp_write. 32.375
warezmaster. 15.05
imap. 6.0
phf. 4.5
guess_passwd. 2.717
ipsweep. 1.046
back. 0.129
satan. 0.031
neptune. 0.0
smurf. 0.0
pod. 0.0
teardrop. 0.0
land. 0.0
nmap. 0.0


<a name="6"></a>
### <span style="color:black">6. Análisis de datos</span>

En este apartado usaremos la librería MLlIB (Spark's machine learning library) para usar su funcionalidad de estadística. Un **vector local** se utiliza a menudo como tipo base para RDDs en Spark MLlib, que soporta dos tipos de vectores locales: densos y dispersos. Un vector denso está respaldado por una matriz doble que representa sus valores de entrada, mientras que un vector disperso está respaldado por dos matrices paralelas: índices y valores. 

- Para vectores densos, MLlib utiliza *listas* de Python o el tipo *array* de NumPy.
- Para vectores dispersos, se puede construir con los métodos implementados en `Vectors`.  

<a name="6.1"></a>
### <span style="color:black">6.1 Un RDD de vectores densos</span>

Para representar cada interacción de red en nuestro conjunto de datos como un vector denso, utilizaremos el tipo `array` de *NumPy*.  

In [5]:
url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz"
filename = "kddcup.data_10_percent.gz"
df = load_data(url, filename, spark)

vector_data = df.rdd.map(lambda line: np.array([float(x) for i, x in enumerate(line.value.split(",")) if i not in [1, 2, 3, 41]])) # remove columns containing strings

<a name="6.2"></a>
### <span style="color:black">6.2 Cálculo de estadísticas</span>

MLlib de Spark proporciona estadísticas de resumen de columnas para `RDD[Vector]` a través de la función `colStats` disponible en `Statistics`. El método devuelve el *máximo*, *mínimo*, *media*, *varianza* y *número de no ceros* por columna, así como el *conteo total*.  

In [6]:
summary = Statistics.colStats(vector_data)

print ("Estadísticas de duración:") 
print (" Media: {}".format(round(summary.mean()[0],3))) # 0 is the position of the duration, rounded to 3 decimals
print (" Desviación estándar: {}".format(round(sqrt(summary.variance()[0]),3)))
print (" Valor máximo: {}".format(round(summary.max()[0],3)))
print (" Valor mínimo: {}".format(round(summary.min()[0],3)))
print (" Conteo total: {}".format(summary.count()))
print (" Número de 'no ceros': {}".format(summary.numNonzeros()[0]))

Estadísticas de duración:
 Media: 47.979
 Desviación estándar: 707.746
 Valor máximo: 58329.0
 Valor mínimo: 0.0
 Conteo total: 494021
 Número de 'no ceros': 12350.0


A continuación, lo vamos a calcular por tipo de ataque a la red. Para ello, habría que filtrar nuestro RDD conteniendo etiquetas como claves y vectores como valores.

In [7]:
# (tag, values)
tag_vector_data = df.rdd.map(lambda line: (line.value.split(",")[41], 
                                           np.array([float(x) for i, x in enumerate(line.value.split(",")) if i not in [1, 2, 3, 41]])))

Dado que `values()` es una transformación en un RDD, y no una acción, no realizamos ningún cálculo hasta que llamamos a `colStats`. Para ello, crearemos una función para que podamos reutilizarlo con cualquier etiqueta.

In [8]:
def summary_by_label(dataframe, label):
    tag_vector_data = dataframe.rdd.map(lambda line: (
        line.value.split(",")[41], 
        np.array([float(x) for i,x in enumerate(line.value.split(",")) if i not in [1, 2, 3, 41]]))
                                       ).filter(lambda d: d[0]==label)
    return Statistics.colStats(tag_vector_data.values())

Volvemos a probar con la etiqueta `normal` y `guess_passwd`:

In [9]:
normal_summary = summary_by_label(df, "normal.")

print ("Estadísticas de duración para: {}".format("normal"))
print (" Media: {}".format(normal_summary.mean()[0],3)) # 0 is the position of the duration, rounded to 3 decimals
print (" Desviación estándar: {}".format(round(sqrt(normal_summary.variance()[0]),3)))
print (" Valor máximo: {}".format(round(normal_summary.max()[0],3)))
print (" Valor mínimo: {}".format(round(normal_summary.min()[0],3)))
print (" Conteo total: {}".format(normal_summary.count()))
print (" Número de 'no ceros': {}".format(normal_summary.numNonzeros()[0]))

Estadísticas de duración para: normal
 Media: 216.65732231336938
 Desviación estándar: 1359.213
 Valor máximo: 58329.0
 Valor mínimo: 0.0
 Conteo total: 97278
 Número de 'no ceros': 11690.0


In [10]:
guess_passwd_summary = summary_by_label(df, "guess_passwd.") 

print ("Estadísticas de duración para: {}".format("guess_password"))
print (" Media: {}".format(guess_passwd_summary.mean()[0],3)) # 0 is the position of the duration, rounded to 3 decimals
print (" Desviación estándar: {}".format(round(sqrt(guess_passwd_summary.variance()[0]),3)))
print (" Valor máximo: {}".format(round(guess_passwd_summary.max()[0],3)))
print (" Valor mínimo: {}".format(round(guess_passwd_summary.min()[0],3)))
print (" Conteo total: {}".format(guess_passwd_summary.count()))
print (" Número de 'no ceros': {}".format(guess_passwd_summary.numNonzeros()[0]))

Estadísticas de duración para: guess_password
 Media: 2.7169811320754715
 Desviación estándar: 11.88
 Valor máximo: 60.0
 Valor mínimo: 0.0
 Conteo total: 53
 Número de 'no ceros': 4.0


Podemos ver que la duración de este tipo de ataque es menor que la de una interacción normal. Podríamos construir una tabla con estadísticas de duración para cada tipo de interacción en nuestro conjunto de datos.

In [11]:
distinct_attacks = attacks.collect()
stats_by_label = [(label, summary_by_label(df, label)) for label in distinct_attacks]

Para obtener un dataframe a partir de cualquier variable de nuestro conjunto de datos definiremos una función.  

In [12]:
pd.set_option('display.max_columns', 50)

def get_variable_stats(stats_by_label, num_column):
    column_stats_by_label = [
        (stat[0], np.array([float(stat[1].mean()[num_column]), float(sqrt(stat[1].variance()[num_column])), float(stat[1].min()[num_column]), float(stat[1].max()[num_column]), int(stat[1].count())])) 
        for stat in stats_by_label
    ]

    data_dict = {
        "Etiqueta": [label[0] for label in column_stats_by_label],
        "Media": [values[0] for label, values in column_stats_by_label],
        "Desviacion estandar": [values[1] for label, values in column_stats_by_label],
        "Minimo": [values[2] for label, values in column_stats_by_label],
        "Maximo": [values[3] for label, values in column_stats_by_label],
        "Numero total": [int(values[4]) for label, values in column_stats_by_label],
    }
    return pd.DataFrame(data_dict, columns=["Etiqueta", "Media", "Desviacion estandar", "Minimo", "Maximo", "Numero total"])

Ahora obtenemos la columna *duración*(su índice es el 0).

In [13]:
print ("Estadísticas de la duración por etiquetas")
get_variable_stats(stats_by_label,0) # 0 is the position of the duration

Estadísticas de la duración por etiquetas


Unnamed: 0,Etiqueta,Media,Desviacion estandar,Minimo,Maximo,Numero total
0,normal.,216.657322,1359.213469,0.0,58329.0,97278
1,buffer_overflow.,91.7,97.514685,0.0,321.0,30
2,loadmodule.,36.222222,41.408869,0.0,103.0,9
3,perl.,41.333333,14.843629,25.0,54.0,3
4,neptune.,0.0,0.0,0.0,0.0,107201
5,smurf.,0.0,0.0,0.0,0.0,280790
6,guess_passwd.,2.716981,11.879811,0.0,60.0,53
7,pod.,0.0,0.0,0.0,0.0,264
8,teardrop.,0.0,0.0,0.0,0.0,979
9,portsweep.,1915.299038,7285.125159,0.0,42448.0,1040


Podemos ver también las de *src_bytes*, cuyo índice es 1:

In [14]:
print ("Estadísticas de 'src_bytes' por etiquetas")
get_variable_stats(stats_by_label,1) # 1 is the position of src_bytes

Estadísticas de 'src_bytes' por etiquetas


Unnamed: 0,Etiqueta,Media,Desviacion estandar,Minimo,Maximo,Numero total
0,normal.,1157.047524,34226.12,0.0,2194619.0,97278
1,buffer_overflow.,1400.433333,1337.133,0.0,6274.0,30
2,loadmodule.,151.888889,127.7453,0.0,302.0,9
3,perl.,265.666667,4.932883,260.0,269.0,3
4,neptune.,0.0,0.0,0.0,0.0,107201
5,smurf.,935.7723,200.0224,520.0,1032.0,280790
6,guess_passwd.,125.339623,3.03786,104.0,126.0,53
7,pod.,1462.651515,125.098,564.0,1480.0,264
8,teardrop.,28.0,0.0,28.0,28.0,979
9,portsweep.,666707.436538,21500670.0,0.0,693375640.0,1040


<a name="6.3"></a>
### <span style="color:black">6.3 Correlaciones</span>

MLlib de Spark soporta Pearson's y Spearman's para calcular métodos de correlación por pares entre muchas series. Ambos son proporcionados por el método `corr` en el paquete `Statistics`.  

Tenemos dos opciones como entrada, o dos `RDD[Double]`s o un `RDD[Vector]`. En el primer caso la salida será un valor `Double`, mientras que en el segundo es una matriz de correlación. Debido a la naturaleza de nuestros datos, obtendremos la segunda.  

In [15]:
correlation_matrix = Statistics.corr(vector_data, method="spearman")

Una vez calculado, podemos ver la matriz de correlación

In [20]:
pd.set_option('display.max_columns', 50)

column_names = ["duration","src_bytes","dst_bytes","land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]

df_correlation = pd.DataFrame(correlation_matrix, index=column_names, columns=column_names)

df_correlation

Unnamed: 0,duration,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,num_failed_logins,logged_in,num_compromised,root_shell,su_attempted,num_root,num_file_creations,num_shells,num_access_files,num_outbound_cmds,is_hot_login,is_guest_login,count,srv_count,serror_rate,srv_serror_rate,rerror_rate,srv_rerror_rate,same_srv_rate,diff_srv_rate,srv_diff_host_rate,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
duration,1.0,0.014196,0.299189,-0.001068,-0.008025,0.017884,0.108639,0.014363,0.159564,0.010687,0.040425,0.026012,0.013401,0.061099,0.008633,0.019407,,,0.205607,-0.259032,-0.250139,-0.074211,-0.073663,-0.025936,-0.02642,0.062291,-0.050875,0.123621,-0.161107,-0.217167,-0.211979,0.231644,-0.065202,0.100692,-0.056753,-0.057298,-0.007759,-0.013891
src_bytes,0.014196,1.0,-0.167931,-0.009404,-0.019358,9.4e-05,0.11392,-0.008396,-0.089702,0.118562,0.003067,0.002282,-0.00205,0.02771,0.014403,-0.001497,,,0.027511,0.66623,0.722609,-0.65746,-0.652391,-0.34218,-0.332977,0.744046,-0.739988,-0.104042,0.130377,0.741979,0.729151,-0.712965,0.815039,-0.140231,-0.645919,-0.641792,-0.297338,-0.300581
dst_bytes,0.299189,-0.167931,1.0,-0.00304,-0.022659,0.007234,0.193156,0.021952,0.882185,0.169772,0.026054,0.012191,-0.003884,0.034154,-5.5e-05,0.065775,,,0.085947,-0.639157,-0.497683,-0.205848,-0.198715,-0.100958,-0.081307,0.229677,-0.222572,0.521003,-0.611972,0.024124,0.055033,-0.035073,-0.396195,0.578557,-0.167047,-0.158378,-0.003042,0.001621
land,-0.001068,-0.009404,-0.00304,1.0,-0.000334,-1.9e-05,-0.000538,-7.5e-05,-0.002784,-0.000449,-7e-05,-3.3e-05,-0.00023,-0.000155,-6.8e-05,-0.000202,,,-0.000249,-0.010939,-0.010128,0.01416,0.014343,-0.000452,-0.001688,0.002153,-0.001846,0.02068,-0.019922,-0.012342,0.002574,-0.001803,0.004265,0.016173,0.013565,0.012264,0.000386,-0.00182
wrong_fragment,-0.008025,-0.019358,-0.022659,-0.000334,1.0,-0.000143,-0.004042,-0.000566,-0.020911,-0.003371,-0.000529,-0.000247,-0.001726,-0.001161,-0.000509,-0.00152,,,-0.001868,-0.057711,-0.029117,-0.00885,-0.023382,0.00043,-0.012676,0.010218,-0.009386,0.012117,-0.029149,-0.058225,-0.04956,0.055542,-0.015449,0.007306,0.010387,-0.024117,0.046655,-0.013666
urgent,0.017884,9.4e-05,0.007234,-1.9e-05,-0.000143,1.0,0.008596,0.062973,0.006821,0.031781,0.067394,-1.4e-05,0.061989,0.061373,-2.9e-05,0.023389,,,-0.000106,-0.00478,-0.004798,-0.001335,-0.001327,-0.000711,-0.00072,0.001524,-0.001526,-0.000781,-0.005898,-0.0057,-0.004081,0.00521,-0.001941,-0.000975,-0.001379,-0.001369,-0.000788,-0.000776
hot,0.108639,0.11392,0.193156,-0.000538,-0.004042,0.008596,1.0,0.112558,0.189126,0.811529,0.101986,-0.000397,0.003096,0.028693,0.009144,0.004223,,,0.463709,-0.120847,-0.114735,-0.035487,-0.034934,0.013468,0.052002,0.041342,-0.040555,0.032141,-0.074178,-0.01796,0.018783,-0.017198,-0.086998,-0.014141,-0.004706,-0.010721,0.199018,0.189142
num_failed_logins,0.014363,-0.008396,0.021952,-7.5e-05,-0.000566,0.062973,0.112558,1.0,-0.002189,0.004621,0.016873,0.072693,0.010047,0.015221,-0.000115,0.005573,,,-0.000421,-0.018024,-0.018027,-0.003674,-0.004027,0.035325,0.034879,0.005716,-0.005538,-0.003099,-0.028371,-0.015092,0.003003,-0.002961,-0.006617,-0.002585,0.014713,0.014914,0.032393,0.032151
logged_in,0.159564,-0.089702,0.882185,-0.002784,-0.020911,0.006821,0.189126,-0.002189,1.0,0.16119,0.025293,0.011814,0.082533,0.05553,0.024356,0.072697,,,0.089318,-0.578287,-0.438947,-0.187114,-0.180122,-0.091962,-0.072287,0.216969,-0.214019,0.503807,-0.682721,0.080352,0.114526,-0.093565,-0.359506,0.659078,-0.143283,-0.132474,0.007236,0.012979
num_compromised,0.010687,0.118562,0.169772,-0.000449,-0.003371,0.031781,0.811529,0.004621,0.16119,1.0,0.085552,0.04897,0.028557,0.031221,0.011261,0.006979,,,-0.002506,-0.097212,-0.091154,-0.030516,-0.030264,0.008573,0.054006,0.035253,-0.034953,0.036497,-0.041615,0.003465,0.038979,-0.039091,-0.078844,-0.020978,-0.005019,-0.004504,0.214115,0.217859


Ahora queremos aquellas variables que están altamente correlacionadas:

In [21]:
boolean_high_correlation = (abs(df_correlation) > .8) & (df_correlation < 1.0)

correlation_index = (boolean_high_correlation==True).any()
correlation_names = correlation_index[correlation_index==True].index

boolean_high_correlation.loc[correlation_names,correlation_names]

Unnamed: 0,src_bytes,dst_bytes,hot,logged_in,num_compromised,count,srv_count,serror_rate,srv_serror_rate,rerror_rate,srv_rerror_rate,same_srv_rate,diff_srv_rate,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
src_bytes,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False
dst_bytes,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
hot,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
logged_in,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
num_compromised,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
count,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
srv_count,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False
serror_rate,False,False,False,False,False,False,False,False,True,False,False,True,True,False,False,False,False,False,False,True,True,False,False
srv_serror_rate,False,False,False,False,False,False,False,True,False,False,False,True,True,False,False,False,False,False,False,True,True,False,False
rerror_rate,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,True,True


Si tenemos un grupo de variables que están altamente correlacionadas, podemos quedarnos sólo con una de ellas para representar al grupo bajo el supuesto de que transmiten información similar como predictores. Reducir el número de variables no mejorará la precisión de nuestro modelo, pero lo hará más fácil de entender y también más eficiente de calcular.

<a name="7"></a>
### <span style="color:black">7. SQL</span>

Spark SQL puede convertir un RDD de objetos `Row` en un `DataFrame`, las filas se construyen pasando una lista de pares clave/valor a la clase `Row`.

In [25]:
url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz"
filename = "kddcup.data.gz"

df = load_data(url, filename, spark).cache()
data = df.rdd.map(lambda d: d.value.split(","))
rows = data.map(lambda d: Row(duration=int(d[0]), protocol_type=d[1], service=d[2], flag=d[3], src_bytes=int(d[4]), dst_bytes=int(d[5]), 
                              land=int(d[6]), wrong_fragment=int(d[7]), urgent=int(d[8]), hot=int(d[9]), num_failed_logins=int(d[10]), 
                              logged_in=int(d[11]), num_compromised=int(d[12]), root_shell=int(d[13]), su_attempted=int(d[14]), 
                              num_root=int(d[15]), num_file_creations=int(d[16]), num_shells=int(d[17]), num_access_files=int(d[18]), 
                              num_outbound_cmds=int(d[19]), is_host_login=int(d[20]), is_guest_login=int(d[21]), count=int(d[22]), 
                              srv_count=int(d[23]), serror_rate=float(d[24]), srv_serror_rate=float(d[25]), rerror_rate=float(d[26]), 
                              srv_rerror_rate=float(d[27]), same_srv_rate=float(d[28]), diff_srv_rate=float(d[29]), 
                              srv_diff_host_rate=float(d[30]), dst_host_count=int(d[31]), dst_host_srv_count=int(d[32]), 
                              dst_host_same_srv_rate=float(d[33]), dst_host_diff_srv_rate=float(d[34]), 
                              dst_host_same_src_port_rate=float(d[35]), dst_host_srv_diff_host_rate=float(d[36]), 
                              dst_host_serror_rate=float(d[37]), dst_host_srv_serror_rate=float(d[38]), 
                              dst_host_rerror_rate=float(d[39]), dst_host_srv_rerror_rate=float(d[40]), attack=d[41]))

# Once we have our RDD of `Row` we can infer and register the schema
df_interaction = spark.createDataFrame(rows)
df_interaction.createOrReplaceTempView("interactions")

Ahora podemos realizar consultas SQL
> Nota: al tener tantas columnas las consultas necesitan más tiempo para ejecutarse, se podrían suprimir algunas y dejar solo las necesarias para las consultas que queramos realizar, de esta forma el tiempo de ejecución disminuirá.

In [28]:
services_count = spark.sql("""SELECT service, COUNT(*) AS interaction_count FROM interactions GROUP BY service ORDER BY interaction_count DESC LIMIT 5""")
services_count.show()

+-------+-----------------+
|service|interaction_count|
+-------+-----------------+
|  ecr_i|          2811660|
|private|          1100831|
|   http|           623091|
|   smtp|            96554|
|  other|            72653|
+-------+-----------------+



Los resultados de las consultas SQL son RDD y admiten todas las operaciones RDD normales.

In [29]:
services_count_out = services_count.rdd.map(lambda t: "Servicio: {}, Conteo: {}".format(t.service, t.interaction_count))
for interaction in services_count_out.take(5):
    print(interaction)

Servicio: ecr_i, Conteo: 2811660
Servicio: private, Conteo: 1100831
Servicio: http, Conteo: 623091
Servicio: smtp, Conteo: 96554
Servicio: other, Conteo: 72653


Podemos ver fácilmente el esquema de los datos utilizando `printSchema`.

In [30]:
df_interaction.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- land: long (nullable = true)
 |-- wrong_fragment: long (nullable = true)
 |-- urgent: long (nullable = true)
 |-- hot: long (nullable = true)
 |-- num_failed_logins: long (nullable = true)
 |-- logged_in: long (nullable = true)
 |-- num_compromised: long (nullable = true)
 |-- root_shell: long (nullable = true)
 |-- su_attempted: long (nullable = true)
 |-- num_root: long (nullable = true)
 |-- num_file_creations: long (nullable = true)
 |-- num_shells: long (nullable = true)
 |-- num_access_files: long (nullable = true)
 |-- num_outbound_cmds: long (nullable = true)
 |-- is_host_login: long (nullable = true)
 |-- is_guest_login: long (nullable = true)
 |-- count: long (nullable = true)
 |-- srv_count: long (nullable = true)
 |-- serror_rate: d

Spark `DataFrame` proporciona un lenguaje específico del dominio para la manipulación de datos estructurados parecido a SQL. Este lenguaje incluye métodos que podemos concatenar para hacer selección, filtrado, agrupación, etc:

In [31]:
df_interaction.select("protocol_type", "duration", "dst_bytes").groupBy("protocol_type").count().sort('count').show()

+-------------+-------+
|protocol_type|  count|
+-------------+-------+
|          udp| 194288|
|          tcp|1870598|
|         icmp|2833545|
+-------------+-------+



Si que queremos contar cuántas interacciones duran menos de 1 segundo, sin transferencia de datos desde el destino, agrupadas por tipo de protocolo.  Sólo tenemos que añadir a las llamadas al filtro anteriores:

In [32]:
df_interaction.select("protocol_type", "duration", "dst_bytes").filter(df_interaction.duration<1000).filter(df_interaction.dst_bytes==0).groupBy("protocol_type").count().sort('count').show()

+-------------+-------+
|protocol_type|  count|
+-------------+-------+
|          udp|  39021|
|          tcp|1191230|
|         icmp|2833545|
+-------------+-------+



¿Cuántas interacciones de ataque y normales tenemos? Para este nuevo conjunto de datos reduciremos el número de columnas:

In [5]:
def get_normal_or_attack(label):
    if label!="normal.":
        return "attack"
    else:
        return "normal"
    
new_data = data.map(lambda d: Row(duration=int(d[0]), protocol_type=d[1], service=d[2], flag=d[3], src_bytes=int(d[4]), dst_bytes=int(d[5]), 
                                  attack=get_normal_or_attack(d[41])))

new_df = spark.createDataFrame(new_data)

Realizamos la consulta:

In [6]:
new_df.select("attack").groupBy("attack").count().sort('count').show()

+------+-------+
|attack|  count|
+------+-------+
|normal| 972781|
|attack|3925650|
+------+-------+



Ahora queremos contarlos por etiqueta y tipo de protocolo

In [7]:
t0 = time()
new_df.select("attack", "protocol_type").groupBy("attack", "protocol_type").count().sort('count').show()
tt = time() - t0

print ("Tiempo de ejecución: {} segundos".format(round(tt,3)))

+------+-------------+-------+
|attack|protocol_type|  count|
+------+-------------+-------+
|attack|          udp|   2940|
|normal|         icmp|  12763|
|normal|          udp| 191348|
|normal|          tcp| 768670|
|attack|          tcp|1101928|
|attack|         icmp|2820782|
+------+-------------+-------+

Tiempo de ejecución: 117.047 segundos


Por último, seleccionamos las columnas "attack", "protocol_type" y "dst_bytes", agrupamos los datos por estas columnas junto con una condición booleana sobre "dst_bytes==0", y cuenta la cantidad de registros para cada combinación de valores en esas columnas.

In [8]:
t0 = time()
new_df.select("attack", "protocol_type", "dst_bytes").groupBy("attack", "protocol_type", new_df.dst_bytes==0).count().sort('count').show()
tt = time() - t0

print ("Tiempo de ejecución: {} segundos".format(round(tt,3)))

+------+-------------+---------------+-------+
|attack|protocol_type|(dst_bytes = 0)|  count|
+------+-------------+---------------+-------+
|attack|          udp|          false|     70|
|attack|          tcp|          false|   2809|
|attack|          udp|           true|   2870|
|normal|         icmp|           true|  12763|
|normal|          udp|           true|  36151|
|normal|          tcp|           true|  93169|
|normal|          udp|          false| 155197|
|normal|          tcp|          false| 675501|
|attack|          tcp|           true|1099119|
|attack|         icmp|           true|2820782|
+------+-------------+---------------+-------+

Tiempo de ejecución: 119.259 segundos


<a name="8"></a>
### <span style="color:black">8. Regresión Logística</span>

Usaremos MLlib para construir un clasificador de **Regresión Logística** para la detección de ataques de red.

Para ello cargaremos los datos de entrenamiento con casi 5 millones de entradas, y el de test con más de 300 mil.

In [9]:
df = load_data(url, filename, spark)
print ("El tamaño de los datos de entrenamiento es {}".format(df.count()))

El tamaño de los datos de entrenamiento es 4898431


In [10]:
test_data_file = "corrected.gz"
url = "http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz"


df_test = load_data(url, test_data_file, spark)
print ("El tamaño de los datos de test es {}".format(df_test.count()))

El tamaño de los datos de test es 311029


Un punto etiquetado es un vector local asociado a una etiqueta/respuesta. En MLlib, los puntos etiquetados se utilizan en algoritmos de aprendizaje supervisado y se almacenan como dobles. Para la clasificación binaria, una etiqueta debe ser 0 (negativo) o 1 (positivo).

En nuestro caso, queremos detectar los ataques a la red en general, no necesitamos detectar de qué tipo de ataque se trata. Por lo tanto, etiquetaremos cada interacción de red como *normal* o *ataque*.

Preparamos los conjuntos de entrenamiento y pruebas, para esto únuicamente dejaremos las columnas con valores numéricos, es decir, todas excepto las columnas 1, 2, 3 y 41:

In [11]:
# exclude = [1,2,3,41]
training_data = df.rdd.map(lambda line: LabeledPoint(1.0 if line.value.split(",")[41] != 'normal.' else 0.0,
                                                     array([float(x) for x in line.value.split(",")[0:1] + line.value.split(",")[4:41]])))

test_data = df_test.rdd.map(lambda line: LabeledPoint(1.0 if line.value.split(",")[41] != 'normal.' else 0.0,
                                                      array([float(x) for x in line.value.split(",")[0:1] + line.value.split(",")[4:41]])))

La **regresión logística** se utiliza ampliamente para predecir una respuesta binaria. Spark implementa dos algoritmos para resolver la regresión logística: *mini-batch gradient descent* y *L-BFGS*. Usaremos L-BFGS para una convergencia más rápida.  

In [12]:
t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0

print ("Se ha entrenado en {} segundos".format(round(tt,3)))

Se ha entrenado en 730.717 segundos


Para medir el error de clasificación en nuestros datos de prueba, utilizamos `map` en el RDD `test_data`

In [13]:
labels_preds = test_data.map(lambda t: (t.label, logit_model.predict(t.features)))

Los resultados de la clasificación se devuelven en pares, con la etiqueta de prueba real y la predicha. Esto se utiliza para calcular el error de clasificación mediante el uso de `filter` y `count` de la siguiente manera:

In [14]:
t0 = time()
test_accuracy = labels_preds.filter(lambda t: t[0] == t[1]).count() / float(test_data.count())
tt = time() - t0

print ("Predicción realizada en {} segundos. Precisión: {}".format(round(tt,3), round(test_accuracy,4)))

Predicción realizada en 42.85 segundos. Precisión: 0.8663


<a name="8.1"></a>
### <span style="color:black">8.1 Matriz de correlación</span>

En nuestra matriz de correlaciones encontramos que las correlaciones entre `src_bytes` y `srv_count` son casi las mismas.
- src_bytes: es el número de bytes enviados desde el origen al destino.
- srv_count: es el número de conexiones al mismo servicio que la conexión actual en los últimos 2 segundos.

Para la variable `serror_rate` (% de conexiones que tienen errores *SYN* para el mismo host) y `srv_error_rate`, el conjunto de variables con las que están altamente correlacionadas son prácticamente iguales. Del mismo modo pasa con `rerror_rate` y `srv_rerror_rate`, y con las columnas que empiezan por `dst_host_`

Como dijimos antes, podemos quedarnos sólo con una de ellas, por tanto eliminaremos la variables:
- `srv_serror_rate` (columna 25).  
- `srv_rerror_rate` (columna 27).
- `dst_host_same_src_port_rate`, (columna 35).    
- `dst_host_srv_serror_rate` (columna 38).  
- `dst_host_srv_rerror_rate` (columna 40).  

In [15]:
# exclude = [1,2,3,25,27,35,38,40,41]
reduced_training_data = df.rdd.map(lambda line: LabeledPoint(1.0 if line.value.split(",")[41] != 'normal.' else 0.0,
                                                             array([float(x) for x in line.value.split(",")[0:1] + line.value.split(",")[4:25] +
                                                                    line.value.split(",")[26:27] + line.value.split(",")[28:35] +
                                                                    line.value.split(",")[36:38] + line.value.split(",")[39:40]])))

reduced_test_data = df_test.rdd.map(lambda line: LabeledPoint(1.0 if line.value.split(",")[41] != 'normal.' else 0.0,
                                                              array([float(x) for x in line.value.split(",")[0:1] + line.value.split(",")[4:25] +
                                                                     line.value.split(",")[26:27] + line.value.split(",")[28:35] +
                                                                     line.value.split(",")[36:38] + line.value.split(",")[39:40]])))

Entrenamos el modelo:

In [16]:
t0 = time()
logit_model_reduced = LogisticRegressionWithLBFGS.train(reduced_training_data)
tt = time() - t0

print ("Se ha entrenado en {} segundos".format(round(tt,3)))

Se ha entrenado en 848.531 segundos


Obtenemos su precisión:

In [17]:
labels_preds = reduced_test_data.map(lambda t: (t.label, logit_model_reduced.predict(t.features)))
t0 = time()
test_accuracy = labels_preds.filter(lambda l: l[0] == l[1]).count() / float(reduced_test_data.count())
tt = time() - t0

print ("Predicción realizada en {} segundos. Precisión: {}".format(round(tt,3), round(test_accuracy,4)))

Predicción realizada en 57.789 segundos. Precisión: 0.8135


Podemos ver que aunque el tiempo de ejecución ha sido menor, también ha disminuido ligeramente la precisión del modelo.

<a name="8.2"></a>
### <span style="color:black">8.2 Pruebas de hipótesis</span>

La prueba de hipótesis es una herramienta muy útil en la inferencia estadística y el aprendizaje para determinar si un resultado es estadísticamente significativo, lo calcularemos con 'Pearson's chi-squared'. Los métodos que vamos a utilizar forman parte de `Statistics`.

En nuestro caso queremos realizar una selección de características, por lo que proporcionaremos un RDD de `LabeledPoint`. Internamente, MLlib calculará una matriz de contingencia y realizará la prueba chi-cuadrado de Persons, las características deben ser categóricas. En este caso, sólo tendremos en cuenta las características que tengan valores booleanos o sólo unos pocos valores numéricos diferentes en nuestro conjunto de datos.  

In [18]:
training_data_categorical = df.rdd.map(lambda line: LabeledPoint(1.0 if line.value.split(",")[41] != 'normal.' else 0.0,
                                                                 array([float(x) for x in line.value.split(",")[6:41]])))

In [19]:
chi = Statistics.chiSqTest(training_data_categorical)

In [21]:
pd.set_option('display.max_colwidth', 30)

records = [(result.statistic, result.pValue) for result in chi]

feature_names = column_names[3:]
df_chi = pd.DataFrame(data=records, index= feature_names, columns=["Statistic","pvalue"])

df_chi 

Unnamed: 0,Statistic,pvalue
land,0.4649835,0.4953041
wrong_fragment,306.8555,0.0
urgent,38.71844,2.705761e-07
hot,19463.31,0.0
num_failed_logins,127.7691,0.0
logged_in,3273098.0,0.0
num_compromised,2011.863,0.0
root_shell,1044.918,0.0
su_attempted,434.0,0.0
num_root,22871.68,0.0


Si nos fijamos en la columna de _pvalue_ observamos que hay dos valores cercanos a 1, de ello concluimos que los predictores `land` y `num_outbound_cmds` pueden eliminarse de nuestro modelo sin que ello afecte drásticamente a nuestra precisión.

Para realizar este cambio, la única modificación será eliminar las columnas 6 y 19, correspondientes a los dos predictores que queremos que no formen parte de nuestro modelo.  

In [22]:
# exclude = [1,2,3,6,19,41]
training_data = df.rdd.map(lambda line: LabeledPoint(1.0 if line.value.split(",")[41] != 'normal.' else 0.0,
                                                     array([float(x) for x in line.value.split(",")[0:1] + line.value.split(",")[4:6] + 
                                                            line.value.split(",")[7:19] + line.value.split(",")[20:41]])))

test_data = df_test.rdd.map(lambda line: LabeledPoint(1.0 if line.value.split(",")[41] != 'normal.' else 0.0,
                                                      array([float(x) for x in line.value.split(",")[0:1] + line.value.split(",")[4:6] + 
                                                             line.value.split(",")[7:19] + line.value.split(",")[20:41]])))

Volvemos a entrenar y evaluar el modelo:

In [23]:
t0 = time()
logit_model_chi = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0

print ("Se ha entrenado en {} segundos".format(round(tt,3)))

Se ha entrenado en 428.298 segundos


In [24]:
labels_preds = test_data.map(lambda t: (t.label, logit_model_chi.predict(t.features)))
t0 = time()
test_accuracy = labels_preds.filter(lambda l: l[0] == l[1]).count() / float(test_data.count())
tt = time() - t0

print ("Predicción realizada en {} segundos. Precisión: {}".format(round(tt,3), round(test_accuracy,4)))

Predicción realizada en 25.57 segundos. Precisión: 0.872


Por tanto, podemos ver que, utilizando la prueba de hipótesis, hemos podido eliminar dos predictores aumentando la precisión de la prueba, incluso el tiempo de entrenamiento se ha mantenido constante (pudiendo incluso ser inferior al anterior, ya que en cada ejecución puede variar ligeramente).

En este punto, podemos calcular una serie de métricas para medir la calidad y rendimiento del modelo usando `MulticlassClassificationEvaluator`. Esta clase se utiliza para evaluar modelos de clasificación multiclase. Se le proporcionan las columnas de etiquetas reales (labelCol) y predicciones (predictionCol) para calcular diversas métricas.

Las métricas a calcular son:

- accuracy (ó exactitud): Representa la proporción de predicciones correctas sobre el total de predicciones realizadas.

- weightedPrecision: Es la precisión promedio ponderada por la cantidad de instancias de cada clase. Es útil cuando hay un desequilibrio en las clases.

- weightedRecall (ó sensibilidad): mide la proporción de instancias positivas que el modelo logró detectar de manera correcta de entre todas las instancias que realmente son positivas en el conjunto de datos.

- F1: Es una medida que combina precisión y recall en una sola métrica. Es útil para evaluar el rendimiento general de un modelo de clasificación.

In [25]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

df_labels_preds = labels_preds.toDF(["label", "prediction"])
df_labels_preds = df_labels_preds.withColumn("prediction", col("prediction").cast("double"))

accuracy = evaluator.evaluate(df_labels_preds, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(df_labels_preds, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(df_labels_preds, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(df_labels_preds, {evaluator.metricName: "f1"})

print("Exactitud: {}".format(accuracy))
print("Precision: {}".format(precision))
print("Sensibilidad: {}".format(recall))
print("F1 Score: {}".format(f1))

Exactitud: 0.8720119345784477
Precision: 0.9194976734830171
Sensibilidad: 0.8720119345784477
F1 Score: 0.882015454152442


<a name="9"></a>
### <span style="color:black">9. Árboles de decisión</span>

El algoritmo de Árbol de Decisión es una técnica de aprendizaje supervisado ampliamente utilizada en el campo de la inteligencia artificial y el aprendizaje automático. Su objetivo principal es construir un modelo predictivo que pueda tomar decisiones basadas en ciertas características de los datos de entrada. El proceso comienza dividiendo el conjunto de datos inicial en subconjuntos más pequeños de manera recursiva. En cada paso, se selecciona una característica del conjunto de datos que mejor divide los datos en función de algún criterio.

Para entrenar a este modelo volveremos a diferenciar entre *normal* y *attack*

Para beneficiarnos de la capacidad de los árboles para trabajar con variables categóricas, tenemos que convertirlas en factores numéricos, pero primero necesitamos obtener todos los valores posibles:

In [26]:
data = df.rdd.map(lambda d: d.value.split(","))
data_test = df_test.rdd.map(lambda d: d.value.split(","))

protocols = data.map(lambda d: d[1]).distinct().collect()
services = data.map(lambda d: d[2]).distinct().collect()
flags = data.map(lambda d: d[3]).distinct().collect()

Creamos el conjunto de entrenamiento y test. Si un atributo no está en los datos de entrenamiento, le asignamos un valor especial.

In [30]:
def create_labeled_point(line_split):
    new_line = line_split[0:41]  # exclude = [41]
    try: 
        new_line[1] = protocols.index(new_line[1])
    except:
        new_line[1] = len(protocols)
        
    try:
        new_line[2] = services.index(new_line[2])
    except:
        new_line[2] = len(services)
    
    try:
        new_line[3] = flags.index(new_line[3])
    except:
        new_line[3] = len(flags)
    
    return array([float(x) for x in new_line])

training_data = data.map(lambda line: LabeledPoint(1.0 if line[41] != 'normal.' else 0.0, create_labeled_point(line)))
test_data = data_test.map(lambda line: LabeledPoint(1.0 if line[41] != 'normal.' else 0.0, create_labeled_point(line)))

En el siguiente código para entrenar al modelo:

- **training_data:** Es el conjunto de datos de entrenamiento que se utilizará para entrenar el modelo de Árbol de Decisión.
- **numClasses:** Especifica el número de clases en el problema de clasificación, se establece en 2, lo que indica un problema de clasificación binaria.
- **categoricalFeaturesInfo:** Es un diccionario que especifica qué características son categóricas y cuántos valores únicos tienen. La longitud de cada lista en este diccionario representa la cantidad de valores únicos que tiene cada característica categórica.
- **impurity:** Especifica el criterio de impureza a utilizar para la división de nodos en el árbol. En este caso, se utiliza 'gini', que es el índice Gini, una medida de impureza comúnmente utilizada en árboles de decisión para evaluar la calidad de una división de nodos.
- **maxDepth:** Especifica la profundidad máxima del árbol de decisión, para evitar un sobreajuste
- **maxBins:** Especifica el número máximo de contenedores (bins) a utilizar al dividir características continuas. Un número más alto de bins puede proporcionar una mayor resolución en la división de características numéricas.

In [31]:
t0 = time()
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, categoricalFeaturesInfo={1: len(protocols), 2: len(services),3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)
tt = time() - t0

print ("Se ha entrenado en {} segundos".format(round(tt,3)))

Se ha entrenado en 181.676 segundos


Calculamos las predicciones y obtenemos la precisión:

In [35]:
predictions = tree_model.predict(test_data.map(lambda p: p.features))
labels_preds = test_data.map(lambda t: t.label).zip(predictions)

t0 = time()
test_accuracy = labels_preds.filter(lambda t: t[0] == t[1]).count() / float(test_data.count())
tt = time() - t0

print ("Predicción realizada en {} segundos. Precisión: {}".format(round(tt,3), round(test_accuracy,4)))

Predicción realizada en 20.993 segundos. Precisión: 0.9157


Usando el método `toDebugString` en nuestro modelo de árbol podemos obtener mucha información sobre divisiones, nodos, etc. 

In [36]:
print ("Modelo de árbol de clasificación:")
print (tree_model.toDebugString())

Modelo de árbol de clasificación:
DecisionTreeModel classifier of depth 4 with 25 nodes
  If (feature 22 <= 35.5)
   If (feature 38 <= 0.875)
    If (feature 36 <= 0.445)
     If (feature 34 <= 0.925)
      Predict: 0.0
     Else (feature 34 > 0.925)
      Predict: 1.0
    Else (feature 36 > 0.445)
     If (feature 2 in {0.0,5.0,24.0,25.0,14.0,20.0,29.0,1.0,21.0,13.0,2.0,17.0,22.0,27.0,7.0,3.0,11.0,26.0,23.0,8.0,19.0,4.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,5.0,24.0,25.0,14.0,20.0,29.0,1.0,21.0,13.0,2.0,17.0,22.0,27.0,7.0,3.0,11.0,26.0,23.0,8.0,19.0,4.0})
      Predict: 1.0
   Else (feature 38 > 0.875)
    If (feature 3 in {0.0,1.0,2.0})
     Predict: 0.0
    Else (feature 3 not in {0.0,1.0,2.0})
     If (feature 36 <= 0.255)
      Predict: 1.0
     Else (feature 36 > 0.255)
      Predict: 0.0
  Else (feature 22 > 35.5)
   If (feature 5 <= 2.0)
    If (feature 2 in {11.0,66.0})
     Predict: 0.0
    Else (feature 2 not in {11.0,66.0})
     If (feature 11 <= 0.5)
      

Por último, para detener la sesión de Spark se usa spark.stop(). Cuando se ha terminado de trabajar con Spark y ya no se necesita los recursos del clúster, se llama a `spark.stop()` para liberar esos recursos cuando todo el trabajo esté completo, evitando así el uso innecesario de recursos y posibles conflictos con otros trabajos en el clúster.

In [37]:
spark.stop()