## Creando un RDD, usando parallelize

Una forma de crear un RDD es paralelizar una lista ya existente.


In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
import random
import pprint

sc = SparkContext.getOrCreate()

In [3]:
a = range(100)
data = sc.parallelize(a)

In [4]:
data.count()

100

In [5]:
data.take(5)

[0, 1, 2, 3, 4]

In [8]:
import sys
!{sys.executable} -m pip install wget
# !{sys.executable} -m pip install pprint

Collecting wget
  Downloading wget-3.2.zip (10 kB)
Building wheels for collected packages: wget
  Building wheel for wget (setup.py): started
  Building wheel for wget (setup.py): finished with status 'done'
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9686 sha256=8c0b39bb3913c5457505892459f909f075b69b4d777b74fc84dc24071c82761d
  Stored in directory: c:\users\chris\appdata\local\pip\cache\wheels\bd\a8\c3\3cf2c14a1837a4e04bd98631724e81f33f462d86a1d895fae0
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2


In [10]:
import wget
wget.download("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz")

100% [..........................................................................] 2144903 / 2144903

'kddcup.data_10_percent.gz'

In [11]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [12]:
raw_data.take(5)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In [13]:
raw_data.count()

494021

## Transformaciones y Filtros

Esta transformación se puede aplicar a los RDD para mantener solo elementos que satisfagan una determinada condición. Más concretamente, se evalúa una función en cada elemento en el RDD original. 

El nuevo RDD resultante contendrá solo aquellos elementos que hacen que la función devuelva True.

Para esto utilizaremos las funciones lambdas

In [14]:
# Encontrar la frase 'normal.' en una linea de texto
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

Ahora podemos contar cuántos elementos tenemos en el nuevo RDD.

In [15]:
from time import time

t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print("Existen {} interacciones 'normal' ".format(normal_count))
print("La operación Count demoro {} segundos".format(round(tt,3)))

Existen 97278 interacciones 'normal' 
La operación Count demoro 1.217 segundos


In [16]:
normal_raw_data.take(5)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

Observe que hemos medido el tiempo transcurrido para contar los elementos en el RDD. Hemos hecho esto porque queríamos señalar que los cálculos reales (distribuidos) en Spark tienen lugar cuando ejecutamos acciones y no transformaciones. En este caso, contar es la acción que ejecutamos en el RDD. Podemos aplicar tantas transformaciones como queramos en nuestro RDD y no se realizará ningún cálculo hasta que llamemos a la primera acción que, en este caso, tarda unos segundos en completarse.

## Función de tranformación map

Al usar la transformación del __map__ en Spark, podemos aplicar una función a cada elemento en nuestro RDD. Las lambdas de Python son especialmente expresivas para este particular.

En este caso, queremos leer nuestro archivo de datos en formato CSV. Podemos hacer esto aplicando una función lambda a cada elemento en el RDD de la siguiente manera.

In [19]:
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print("la operacion e tomart 5 elementos se completo en {} segundos".format(round(tt,3)) )
print(head_rows[0])

la operacion e tomart 5 elementos se completo en 0.594 segundos
['0', 'tcp', 'http', 'SF', '181', '5450', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '8', '8', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '9', '9', '1.00', '0.00', '0.11', '0.00', '0.00', '0.00', '0.00', '0.00', 'normal.']


Nuevamente, todas las acciones ocurren una vez que llamamos a la primera acción de Spark (es decir, tomar en este caso). ¿Qué pasa si tomamos muchos elementos en lugar de solo los primeros?

In [20]:
t0 = time()
head_rows = csv_data.take(100000)
tt = time() - t0
print("la operacion e tomart 100000 elementos se completo en {} segundos".format(round(tt,3)) )

la operacion e tomart 100000 elementos se completo en 1.511 segundos


Podemos ver que lleva más tiempo. La función de mapa se aplica ahora de forma distribuida a muchos elementos en el RDD, de ahí el mayor tiempo de ejecución.

## Usando mapa y funciones predefinidas

Por supuesto, podemos usar funciones predefinidas con el mapa. Imagine que queremos tener cada elemento en el RDD como un par clave-valor donde la clave es la etiqueta (por ejemplo, normal) y el valor es la lista completa de elementos que representa la fila en el archivo con formato CSV. Podríamos proceder de la siguiente manera

In [21]:
# funcion parse_interacciones recibe una linea, y en su transformación devuelve 
# clave, valor, donde el primer elemento es la columnia final de la fila y el segundo 
# es el total de la fila a modo de lista
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

In [22]:
# hacemos el map de RDD en base a la funcion creada, devolvemos el primer elemento 
key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
head_rows[0]

('normal.',
 ['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'])

En nuestro notebook sobre cómo trabajar con pares clave-valor, utilizaremos este tipo de RDD para realizar agregaciones de datos (por ejemplo, contar por clave).

## Collection

Hasta ahora hemos utilizado las acciones de ___count__ y __take__ . Otra acción básica que debemos aprender es __collect__. Básicamente, tendrá __todos los elementos en el RDD en la memoria__ para que podamos trabajar con ellos. Por esta razón, debe __usarse con cuidado__, especialmente cuando se trabaja con RDD grandes.

In [23]:
t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print ("collect en {} segundos".format(round(tt,3)))

collect en 2.623 segundos


Eso tomó más tiempo que cualquier otra acción que usamos antes, por supuesto. Cada nodo de trabajo de Spark que tenga un fragmento del RDD debe coordinarse para recuperar su parte y luego reducir todo junto.

Como último ejemplo que combina todo lo anterior, queremos recopilar todas las interacciones normales como par clave-valor

In [24]:
# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)

# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print("Data collected in {} seconds".format(round(tt,3)))
print ("There are {} 'normal' interactions".format(normal_count))

Data collected in 2.804 seconds
There are 97278 'normal' interactions


Este recuento coincide con el recuento anterior para las interacciones normales. El nuevo procedimiento lleva más tiempo. Esto se debe a que recuperamos todos los datos con recopilar y luego usamos el len de Python en la lista resultante. Antes solo estábamos contando el número total de elementos en el RDD usando count

## Sampling RDDs

La transformación de __muestra__ toma hasta tres parámetros. 
- Primero es si el muestreo se realiza con reemplazo o no. 
- El segundo es el tamaño de la muestra como fracción. 
- Finalmente, opcionalmente podemos proporcionar una semilla aleatoria.

In [25]:
raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()
print ("Tamaño de la muestra es {} de un total de {}".format(sample_size, total_size))

Tamaño de la muestra es 49493 de un total de 494021


Pero el poder del muestreo como transformación proviene de hacerlo como parte de una secuencia de transformaciones adicionales. Esto se mostrará más poderoso una vez que comencemos a hacer agregaciones y operaciones de pares clave-valor, y será especialmente útil cuando usemos la biblioteca de aprendizaje automático __MLlib de Spark__.

Mientras tanto, imagine que queremos tener una aproximación de la proporción de lo normal. interacciones en nuestro conjunto de datos. Podríamos hacer esto contando el número total de etiquetas . Sin embargo, queremos una respuesta más rápida y no necesitamos la respuesta exacta sino solo una aproximación. Podemos hacerlo de la siguiente manera.

In [26]:
# Aplicaremos las transformaciones
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_normal_tags = raw_data_sample_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
sample_normal_tags_count = sample_normal_tags.count()
tt = time() - t0

sample_normal_ratio = sample_normal_tags_count / float(sample_size)
print( "El ratio de interacciones 'normal' es de {}".format(round(sample_normal_ratio,3)) )
print( "La cuenta duro {} segundos".format(round(tt,3)))

El ratio de interacciones 'normal' es de 0.195
La cuenta duro 1.193 segundos


Comparemos esto con el cálculo de la relación sin muestreo.

In [27]:
# transformations to be applied
raw_data_items = raw_data.map(lambda x: x.split(","))
normal_tags = raw_data_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
normal_tags_count = normal_tags.count()
tt = time() - t0

normal_ratio = normal_tags_count / float(total_size)
print( "El ratio de interacciones 'normal' es de {}".format(round(normal_ratio,3)) )
print( "La cuenta duro {} segundos".format(round(tt,3)))

El ratio de interacciones 'normal' es de 0.197
La cuenta duro 1.873 segundos


Podemos ver una ganancia en el tiempo. Cuantas más transformaciones apliquemos después del muestreo, mayor será esta ganancia. Esto se debe a que, sin muestreo, todas las transformaciones se aplican al conjunto completo de datos

## función takeSample

Si lo que necesitamos es tomar una muestra de datos sin procesar de nuestro RDD en la memoria local para que otras bibliotecas que no sean de Spark puedan usar, se puede usar takeSample.

La sintaxis es muy similar, pero en este caso especificamos el número de elementos en lugar del tamaño de la muestra como una fracción del tamaño de datos completo.

In [28]:
t0 = time()
raw_data_sample = raw_data.takeSample(False, 400000, 1234)
normal_data_sample = [x.split(",") for x in raw_data_sample if "normal." in x]
tt = time() - t0

normal_sample_size = len(normal_data_sample)

normal_ratio = normal_sample_size / 400000.0
print( "El ratio de interacciones 'normal' es de {}".format(round(normal_ratio,3)) )
print( "La cuenta duro {} segundos".format(round(tt,3)))

El ratio de interacciones 'normal' es de 0.197
La cuenta duro 3.499 segundos


En la agregación anterior, el primer elemento acumulador mantiene la suma total, mientras que el segundo elemento mantiene el recuento. La combinación de un acumulador con un elemento RDD consiste en resumir el valor e incrementar el recuento. La combinación de dos acumuladores requiere solo una suma por pares.

Sin embargo, tardó más, incluso con una muestra un poco más pequeña. La razón es que Spark acaba de distribuir la ejecución del proceso de muestreo. El filtrado y la división de los resultados se realizaron localmente en un solo nodo

## Obteniendo interacciones attack usando restar


Con fines ilustrativos, imagine que ya tenemos nuestro RDD con interacciones sin attack (normal) de algún análisis previo.

In [29]:
normal_raw_data = raw_data.filter(lambda x: "normal." in x)

Podemos obtener interacciones de ataque restando las normales del RDD original sin filtrar de la siguiente manera.


In [30]:
attack_raw_data = raw_data.subtract(normal_raw_data)

Hagamos algunos recuentos para verificar nuestros resultados.


In [31]:
from time import time

# count all
t0 = time()
raw_data_count = raw_data.count()
tt = time() - t0
print( "All count in {} secs".format(round(tt,3)))

All count in 1.031 secs


In [32]:
# count normal
t0 = time()
normal_raw_data_count = normal_raw_data.count()
tt = time() - t0
print ("Normal count in {} secs".format(round(tt,3)))

Normal count in 1.164 secs


In [33]:
# count attacks
t0 = time()
attack_raw_data_count = attack_raw_data.count()
tt = time() - t0
print( "Attack count in {} secs".format(round(tt,3)))

Attack count in 5.209 secs


In [34]:
print( "There are {} normal interactions and {} attacks, \
from a total of {} interactions".format(normal_raw_data_count,attack_raw_data_count,raw_data_count))

There are 97278 normal interactions and 396743 attacks, from a total of 494021 interactions


## Protocolo y combinaciones de servicios usando cartesiano



Podemos calcular el producto cartesiano entre dos RDD utilizando la transformación cartesiana. Devuelve todos los pares de elementos posibles entre dos RDD. En nuestro caso lo usaremos para generar todas las combinaciones posibles entre servicio y protocolo en nuestras interacciones de red.

En primer lugar, necesitamos aislar cada colección de valores en dos RDD separados. Para eso usaremos distinto en el conjunto de datos analizados por CSV. Por la descripción del conjunto de datos, sabemos que el protocolo es la segunda columna y el servicio es la tercera (la etiqueta es la última y no la primera como aparece en la página).

Primero, obtengamos __protocols__.

In [35]:
csv_data = raw_data.map(lambda x: x.split(","))
protocols = csv_data.map(lambda x: x[1]).distinct()
protocols.collect()

['tcp', 'udp', 'icmp']

Haremos lo mismo para __Service__

In [36]:
services = csv_data.map(lambda x: x[2]).distinct()
services.collect()

['http',
 'smtp',
 'finger',
 'domain_u',
 'auth',
 'telnet',
 'ftp',
 'eco_i',
 'ntp_u',
 'ecr_i',
 'other',
 'private',
 'pop_3',
 'ftp_data',
 'rje',
 'time',
 'mtp',
 'link',
 'remote_job',
 'gopher',
 'ssh',
 'name',
 'whois',
 'domain',
 'login',
 'imap4',
 'daytime',
 'ctf',
 'nntp',
 'shell',
 'IRC',
 'nnsp',
 'http_443',
 'exec',
 'printer',
 'efs',
 'courier',
 'uucp',
 'klogin',
 'kshell',
 'echo',
 'discard',
 'systat',
 'supdup',
 'iso_tsap',
 'hostnames',
 'csnet_ns',
 'pop_2',
 'sunrpc',
 'uucp_path',
 'netbios_ns',
 'netbios_ssn',
 'netbios_dgm',
 'sql_net',
 'vmnet',
 'bgp',
 'Z39_50',
 'ldap',
 'netstat',
 'urh_i',
 'X11',
 'urp_i',
 'pm_dump',
 'tftp_u',
 'tim_i',
 'red_i']

Ahora hagamos un producto cartesianos con __protocol x service__.

In [37]:
product = protocols.cartesian(services).collect()
print ("Existen {} combinaciones de protocol X service".format(len(product)))

Existen 198 combinaciones de protocol X service


Obviamente, para RDDs tan pequeños realmente no tiene sentido usar el producto cartesiano Spark. Podríamos haber recogido perfectamente los valores después de utilizar distintos y hacer el producto cartesiano localmente. Además, las operaciones distintas y cartesianas son costosas, por lo que deben usarse con cuidado cuando los conjuntos de datos operativos son grandes.

## Inspección de la duración de la interacción por etiqueta

Tanto fold como reduce toman una función como argumento que se aplica a dos elementos del RDD. La acción de plegado difiere de la reducción en que obtiene un valor cero inicial adicional que se utilizará para la llamada inicial. Este valor debe ser el elemento de identidad para la función proporcionada.

Como ejemplo, imagine que queremos saber la duración total de nuestras interacciones para interacciones normales y de ataque. Podemos usar reducir de la siguiente manera.

In [38]:
# parse data
csv_data = raw_data.map(lambda x: x.split(","))

# separate into different RDDs
normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.")
attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")

La función que pasamos para reducir obtiene y devuelve elementos del mismo tipo de RDD. Si queremos sumar duraciones, necesitamos extraer ese elemento en un nuevo RDD.

In [39]:
normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))
attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))

Ahora podemos reducir estos nuevos RDD.

In [40]:

total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)

print ("Total duration for 'normal' interactions is {}".\
    format(total_normal_duration))
print ("Total duration for 'attack' interactions is {}".\
    format(total_attack_duration))

Total duration for 'normal' interactions is 21075991
Total duration for 'attack' interactions is 2626792


Tenemos un primer enfoque (y demasiado simplista) para identificar las interacciones de ataque.

## Una mejor manera, usando: aggregate


La acción agregada nos libera de la restricción de que el retorno sea del mismo tipo que el RDD en el que estamos trabajando. Al igual que con fold, proporcionamos un valor cero inicial del tipo que queremos devolver. Luego proporcionamos dos funciones. El primero se usa para combinar los elementos de nuestro RDD con el acumulador. La segunda función es necesaria para fusionar dos acumuladores. Veámoslo en acción calculando la media que hicimos antes.

In [41]:

normal_sum_count = normal_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print ("Promedio de 'normal' por interaccion es  {}".\
    format(round(normal_sum_count[0]/float(normal_sum_count[1]),3)))

Promedio de 'normal' por interaccion es  216.657


En la agregación anterior, el primer elemento acumulador mantiene la suma total, mientras que el segundo elemento mantiene el recuento. La combinación de un acumulador con un elemento RDD consiste en resumir el valor e incrementar el recuento. La combinación de dos acumuladores requiere solo una suma por pares.

Podemos hacer lo mismo con las interacciones de tipo ataque.

In [42]:
attack_sum_count = attack_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print ("Promedio de 'attack' por interaccion es {}".\
    format(round(attack_sum_count[0]/float(attack_sum_count[1]),3)))

Promedio de 'attack' por interaccion es 6.621


## Agregaciones de datos con RDD de pares clave / valor

Podemos usar todas las transformaciones y acciones disponibles para RDD normales con RDD de pares clave / valor. Solo necesitamos hacer que las funciones funcionen con elementos de par. Además, Spark proporciona funciones específicas para trabajar con RDD que contienen elementos de par. Son muy similares a los disponibles para los RDD generales.

Por ejemplo, tenemos una transformación reduceByKey que podemos usar de la siguiente manera para calcular la duración total de cada tipo de interacción de red.


In [43]:
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

In [44]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

durations_by_key.collect()

[('normal.', 21075991.0),
 ('buffer_overflow.', 2751.0),
 ('loadmodule.', 326.0),
 ('perl.', 124.0),
 ('neptune.', 0.0),
 ('smurf.', 0.0),
 ('guess_passwd.', 144.0),
 ('pod.', 0.0),
 ('teardrop.', 0.0),
 ('portsweep.', 1991911.0),
 ('ipsweep.', 43.0),
 ('land.', 0.0),
 ('ftp_write.', 259.0),
 ('back.', 284.0),
 ('imap.', 72.0),
 ('satan.', 64.0),
 ('phf.', 18.0),
 ('nmap.', 0.0),
 ('multihop.', 1288.0),
 ('warezmaster.', 301.0),
 ('warezclient.', 627563.0),
 ('spy.', 636.0),
 ('rootkit.', 1008.0)]

Tenemos una acción de conteo específica para pares clave / valor.

In [45]:
counts_by_key = key_value_data.countByKey()
counts_by_key

defaultdict(int,
            {'normal.': 97278,
             'buffer_overflow.': 30,
             'loadmodule.': 9,
             'perl.': 3,
             'neptune.': 107201,
             'smurf.': 280790,
             'guess_passwd.': 53,
             'pod.': 264,
             'teardrop.': 979,
             'portsweep.': 1040,
             'ipsweep.': 1247,
             'land.': 21,
             'ftp_write.': 8,
             'back.': 2203,
             'imap.': 12,
             'satan.': 1589,
             'phf.': 4,
             'nmap.': 231,
             'multihop.': 7,
             'warezmaster.': 20,
             'warezclient.': 1020,
             'spy.': 2,
             'rootkit.': 10})

## Usando combineByKey

Esta es la más general de las funciones de agregación por clave. La mayoría de los otros combinadores por tecla se implementan usándolo. Podemos considerarlo como el equivalente agregado, ya que permite al usuario devolver valores que no son del mismo tipo que nuestros datos de entrada.

Por ejemplo, podemos usarlo para calcular las duraciones promedio por tipo de la siguiente manera.

In [46]:
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()

{'normal.': (21075991.0, 97278),
 'buffer_overflow.': (2751.0, 30),
 'loadmodule.': (326.0, 9),
 'perl.': (124.0, 3),
 'neptune.': (0.0, 107201),
 'smurf.': (0.0, 280790),
 'guess_passwd.': (144.0, 53),
 'pod.': (0.0, 264),
 'teardrop.': (0.0, 979),
 'portsweep.': (1991911.0, 1040),
 'ipsweep.': (43.0, 1247),
 'land.': (0.0, 21),
 'ftp_write.': (259.0, 8),
 'back.': (284.0, 2203),
 'imap.': (72.0, 12),
 'satan.': (64.0, 1589),
 'phf.': (18.0, 4),
 'nmap.': (0.0, 231),
 'multihop.': (1288.0, 7),
 'warezmaster.': (301.0, 20),
 'warezclient.': (627563.0, 1020),
 'spy.': (636.0, 2),
 'rootkit.': (1008.0, 10)}

In [47]:
sum_counts.take(5)

[('normal.', (21075991.0, 97278)),
 ('buffer_overflow.', (2751.0, 30)),
 ('loadmodule.', (326.0, 9)),
 ('perl.', (124.0, 3)),
 ('neptune.', (0.0, 107201))]

In [48]:
duration_means_by_type = sum_counts.map(lambda x: (x[0], round(x[1][0]/x[1][1],3))).collectAsMap()

# Print them sorted
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
    print( tag, duration_means_by_type[tag])

portsweep. 1915.299
warezclient. 615.258
spy. 318.0
normal. 216.657
multihop. 184.0
rootkit. 100.8
buffer_overflow. 91.7
perl. 41.333
loadmodule. 36.222
ftp_write. 32.375
warezmaster. 15.05
imap. 6.0
phf. 4.5
guess_passwd. 2.717
back. 0.129
satan. 0.04
ipsweep. 0.034
neptune. 0.0
smurf. 0.0
pod. 0.0
teardrop. 0.0
land. 0.0
nmap. 0.0
