# Spark

- [Ecosistema Spark](#spark)
- [Arquitectura](#arq)

- [RDD](#rdd)
    - [Operaciones sobre RDD](#op_rdd)
        - [Acciones](#actions)
        - [Transformaciones](#transforms)
        
- [Key-Value RDD](#key_rdd)

- [Otros elemenos de Spark Core](#otros)
    - [Variables broadcast](#vb)
    - [Acumuladores](#acum)
    
- [Ejemplo final](#ejemplo)

    
<div id='xx' />

<div id='spark' />

## Ecosistema Spark


Documentación [Spark](https://spark.apache.org/docs/latest/index.html)


### Spark Core

Contiene la funcionalidad básica de Spark, incluyendo componentes para la planificación de tareas, gestión de memoria, recuperación ante fallos, interacción con los sistemas de almacenamiento, etc.

Es también el API que define RDDs (Resilient Distribuited Datasets), que son la principal abstracción en Spark y representan las colecciones de objetos distribuidos a través de muchos nodos de cómputo paralela.

### Spark SQL

Es el paquete de Spark para trabajar con datos estructurados. Permite la consulta de datos a través de SQL y es compatible con muchas fuentes de datos, incluyendo tablas Hive, Parquet y JSON.

Nos permite entremezclar consultas SQL con las manipulaciones de datos de forma pragmática sobre RDDs en Python, Java y Scala, combinando así SQL con analíticas complejas.

### Spark Streaming

Es un componente que permite el procesamiento de secuencias de datos en timepo real. Algunos ejemplos pueden ser:

- Ficheros de logs generados por servidores web de producción.
- Colas de mensajes que contienen las actualizaciones de estado enviados por los usuarios de un servicio web.

Este componente proporciona un API para manipular las secuencias de datos que casi coincide con el API RDD del núcleo de Spark, facilitándo el movernos entre aplicaciones que manipulan los datos almacenados en la memoria, en el disco o que llegan en tiempo real. Además, fue diseñado para proporcionar el mismo grado de tolerancia a fallos, rendimiento y escabilidad que ofrece Spark Core.

### MLib

Es la librería de spark que contiene machine learning. Todos los métodos de esta librería están diseñados para escalar a través de un clúster.

### GraphX

Es una librería que nos proporciona un API para la manipulación de grafos y realizar cálculos de grafos en paralelo con gran rendimiento. Extiende el Spark RDD API permitiéndonos crear grafos con propiedades arbitrarias anexas a cada vértice y enlace.

### Clúster managers

Spark está diseñado para escalar desde uno a cientos de nodos de computación, por ello, para maximizar la flexibilidad Spark puede correr sobre diferentes gestores de clústeres tales como YARN, Apache Mesos o en modo Standalone.

<div id='arq' />

## Arquitectura

Cada aplicación Spark consuste en un programa driver que lanza varias operaciones de computación en paralelo sobre varios nodos 'workers' (también llamados 'executo'). El driver contiene las principales funciones de nuestra aplicación y define y distribuye los datasets en el clúster aplicando operaciones sobre ellos.

Para que el programa acceda a Spark, necesitamos crear un objeto <b>SparkContext</b>. En la shell de spark/pyspark se crea automáticamente una variable con nombre **sc** que es el SparkContext.

Este SparkContext es la parte principal del API de Spark y representa el punto de entrada a todas las funcionalidades de Spark. Al iniciarlo estaremos definiento el tipo de instalación de Spark a la que queremos conectarnos (local, standalone, mesos, yarn) para ejecutar nuestras operaciones.

El esquema de ejecución de una aplicación de Spark sobre YARN es el siguiente:

1. Se invoca al ResourceManager al ejecutar una aplicación Spark.
2. El ResourceManager es el encargado de consultar el estado de los nodos, solicitando recursos para la aplicación y levantando un ApplicationMaster en uno de los nodos worker, el cual será el encargado de monitorizar los spark executors e irá informando del estado de la aplicación al cliente spark.

<div id='rdd' />

## RDD

Documentación [Spark RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

Como ya hemos comentado con anterioridad, se trata de la principal abstracción y unidad fundamental de datos Spark:

- **R**esistente: si los datos en memoria de spierden pueden ser recuperados.
- **D**istribuido: los datos son almacenados de forma distribuida a lo largo del clúster y se accede a ellos mediante programación paralela.

Un RDD es una colección inmutable de objetos distribuidos. spark es el encargado de distribuir de forma automática los datos contenidos en un RDD en el clúster y paraleliza las operaciones que realicemos sobre dicho RDD.

Existen dos formas de crear un RDD. La manera más sencilla es coger una lista que tengamos en memoria y paralelizarla mediante el método *paralelize()* (no muy utilizado ya que se necesita cargar en memoria la colección de datos entera). Lo más habitual en entornos de producción es leer de un fichero externo.

In [1]:
# Comentado ya que solo podemos trabajar sobre una instancia SparkContext

#from pyspark import SparkContext
#sc = SparkContext()
#rdd = sc.parallelize(range(100))
#rdd

In [2]:
from pyspark import SparkContext

sc = SparkContext()
rdd = sc.textFile("../data/test.log")

<div id='op_rdd' />

### Operaciones sobre RDDs

Existen dos tipos de operaciones sobre un RDD:

<div id='actions' />

#### Acciones

Operaciones que se ejecutan sobre un RDD y nos retornan un resultado. escriben información en disco y ejecutan la computación. Por ejemplo: *count()* o *first()*.

La ejecución de las acciones se lleva a cabo de forma distribuida en los nodos executors pero el resultado final siempre es un objeto local en el nodo driver, por lo tanto es impoortante tener en cuenta el tipo de acción a ejecutar (el resultado debe ser lo suficientemente pequeño para no colapsar la memoria).

Cada vez que invocamos una nueva acción, el RDD se recalcula de nuevo. Para evitar esta ineficiencia si repetimos varias acciones sobre un mismo RDD podemos persistir los resultados intermedios mediante los métodos *persist()* o *cache()*.

Cuando persistimos un RDD, cada nodo persiste en su memoria la parte del RDD que almacenaba en disco en ese nodo y acelerando así la reutilización para las distintas acciones que se realicen sobre él. Spark de forma automática se encarga de monitorizar el uso de la cache de cada nodo eliminando las particiones más antiguas que no se han utilizado recientemente. Si queremos eliminar manualmente el RDD de la caché se puede utilizar el método *unpersist()*.

A continuación las acciones más habituales sobre un RDD:

In [3]:
rdd.count()
#rdd.first()

# Retorna 5 elementos del rdd
#rdd.take(5)

# Retorna el contenido del rdd.
#print(rdd.collect())

# Guarda el rdd en disco en el formato XXX (TextFile, SequenceFile, ObjectFile, PickleFile)
#rdd.saveAsXXX()   


54

<div id='transforms' />

#### Transformaciones

Operaciones que se ejecutan sobre un RDD y nos devuelvren un nuevo RDD. Por ejemplo *map()* o *filter()*. 

Una de las principales características de Spark es que es **lazy evaluation** (evaluación perezosa), esto significa que todas las transformaciones sobre un RDD no se ejecutan hasta que se ejecute una acción.

Las transformaciones siempre tienen lugar en los nodos ejecutores.

Cuando invocamos una transformación sobre un RDD, spark almacena el tipo de transformación en el siguiente RDD que queremos crear, construyendo internamente un grafo de transformaciones. Este grafo se llama DAG (grafo dirigido acíclico). Spark usa esta información para generar cada RDD cuando se demanda y también para recuperar datos perdidos.

Las transformaciones más habituales son:

- **rdd.filter()** eliminar elementos del rdd.
- **rdd.map()** modificar elementos del rdd.
- **rdd.flatMap()** modificar elementos del rdd.
- **rdd.join()** unir dos rdd.

La mayor parte de las transformaciones e incluso alguna acción dependen de funciones que son usadas por Spark para computar los datos. Si son sencillas usamos expresiones lambda y para casos más complejos se definen previamente funciones python.

In [4]:
# Filtrar errores

error_rdd = rdd.filter(lambda line: "ERROR" in line)
print("Input had " + str(error_rdd.count()) + " error line")
for line in error_rdd.take(5):
    print(line)

Input had 4 error line
64.242.88.10 - - [07/Mar/2004:16:25:35 -0800] "ERROR /test/error1/primer_error HTTP/1.1" 404 342
64.242.88.10 - - [07/Mar/2004:16:32:51 -0800] "ERROR /test/error1/segundo_error HTTP/1.1" 404 4325
64.242.88.10 - - [07/Mar/2004:16:58:59 -0800] "ERROR /test/error3/tercer_error HTTP/1.1" 404 234125
64.242.88.10 - - [07/Mar/2004:17:21:59 -0800] "ERROR /test/error4/cuarto_error HTTP/1.1" 404 125


In [5]:
# Filtramos errores con una función

def containsError(s):
    return "ERROR" in s

error_rdd = rdd.filter(containsError)
for line in error_rdd.take(5):
    print(line)

64.242.88.10 - - [07/Mar/2004:16:25:35 -0800] "ERROR /test/error1/primer_error HTTP/1.1" 404 342
64.242.88.10 - - [07/Mar/2004:16:32:51 -0800] "ERROR /test/error1/segundo_error HTTP/1.1" 404 4325
64.242.88.10 - - [07/Mar/2004:16:58:59 -0800] "ERROR /test/error3/tercer_error HTTP/1.1" 404 234125
64.242.88.10 - - [07/Mar/2004:17:21:59 -0800] "ERROR /test/error4/cuarto_error HTTP/1.1" 404 125


In [6]:
# Map: longitud de cada línea del fichero de logs

long_rdd = rdd.map(lambda x: len(x))
long_rdd.take(10)

[152, 126, 98, 102, 104, 145, 102, 96, 145, 109]

In [7]:
# FlatMap: obtenemos las palabras del documento

words_rdd = rdd.flatMap(lambda x: x.split(" "))
words_rdd.take(5)

['64.242.88.10', '-', '-', '[07/Mar/2004:16:05:49', '-0800]']

<div id='key_rdd' />

### Key-Value RDD

Cada elemento del RDD puede contener cualquier tipo de dato: texto, números, listas, contenedores... lo único que debemos tener en cuenta es que las funciones que definimos para nuestras transformaciones sean capaces de trabajar con el dato oportuno.

Existen otro tipo de transformaciones que requieren un tipo especial de RDD, los **key-value** RDD. Los datos en este tipo de RDD serán tuplas (clave-valor). Lo habitual será que mediante una transformación transformemos un RDD a un key-value RDD.

Los key-value RDD además de las transformaciones ya estudiadas admiten algunas transformaciones especiales:

- **keys()** retorna RDD con las claves.
- **values()** retorna RDD con los valores.
- **mapValues(func)** retorna un RDD en el que aplica la función definida sobre cada valor sin modificar la clave.
- **flatMapValues(func)** retorna un RDD en el que aplica la función retornando un iterador para cada valor del RDD.
- **reduceByKey(func)** retorna un RDD con los valores con la misma clave combinados con la función definida.
- **groupByKey()** retorna un RDD con los valores agrupados en un iterador por la clave.
- **sortByKey()** retorna un RDD ordenado por la clave.

In [8]:
# Creamos un rdd

pair_rdd = sc.parallelize([("B", [4, 5]), ("B", [6 ,7 , 9]), ("A", [2])])

In [9]:
pair_rdd.collect()

[('B', [4, 5]), ('B', [6, 7, 9]), ('A', [2])]

In [10]:
pair_rdd.values().collect()

[[4, 5], [6, 7, 9], [2]]

In [11]:
pair_rdd.mapValues(lambda x: len(x)).collect()

[('B', 2), ('B', 3), ('A', 1)]

In [12]:
pair_rdd.flatMapValues(lambda x: x).collect()

[('B', 4), ('B', 5), ('B', 6), ('B', 7), ('B', 9), ('A', 2)]

In [13]:
pair_rdd.reduceByKey(lambda x,y: x+y).collect()

[('B', [4, 5, 6, 7, 9]), ('A', [2])]

In [14]:
pair_rdd.groupByKey().collect()

[('B', <pyspark.resultiterable.ResultIterable at 0x7f43b4e73a50>),
 ('A', <pyspark.resultiterable.ResultIterable at 0x7f43b4e73c10>)]

In [15]:
for pair in pair_rdd.groupByKey().collect():
    print(pair[0], list(pair[1]))

B [[4, 5], [6, 7, 9]]
A [[2]]


In [16]:
pair_rdd.sortByKey().collect()

[('A', [2]), ('B', [4, 5]), ('B', [6, 7, 9])]

Transformaciones sobre varios RDD:

- **subtractByKey(otherRdd)** retorna un rdd borrando los elementos con la misma clave.
- **join(otherRdd)** realiza un inner join por la clave.
- **rightOuterJoin(otherRdd)** realiza un right outer join por la clave.
- **leftOuterJoin(otherRdd)** realiza un left outer join por la clave.
- **cogroup(otherRdd)** agrupa los datos de ambos rdd que comparten la misma clave.

In [17]:
# Creamos otro rdd

other_rdd = sc.parallelize([("A", [1])])
pair_rdd.subtractByKey(other_rdd).collect()

[('B', [4, 5]), ('B', [6, 7, 9])]

In [18]:
pair_rdd.join(other_rdd).collect()

[('A', ([2], [1]))]

In [19]:
pair_rdd.rightOuterJoin(other_rdd).collect()

[('A', ([2], [1]))]

In [20]:
pair_rdd.leftOuterJoin(other_rdd).collect()

[('B', ([4, 5], None)), ('B', ([6, 7, 9], None)), ('A', ([2], [1]))]

In [21]:
pair_rdd.cogroup(other_rdd).collect()

[('B',
  (<pyspark.resultiterable.ResultIterable at 0x7f43e4193a10>,
   <pyspark.resultiterable.ResultIterable at 0x7f43e4193650>)),
 ('A',
  (<pyspark.resultiterable.ResultIterable at 0x7f43e41933d0>,
   <pyspark.resultiterable.ResultIterable at 0x7f43dc8de610>))]

In [22]:
for pair in pair_rdd.cogroup(other_rdd).collect():
    print(pair[0], list(pair[1][0]), list(pair[1][1]))

B [[4, 5], [6, 7, 9]] []
A [[2]] [[1]]


Siguiendo con el ejemplo del análisis de un web log, podríamos querer saber las palabras que más veces aparecen en los registros que contienen errores:

In [23]:
# Transformamos en key-value RDD y sumamos las veces que aparece cada palabra
# rdd.flatMapValues(func) -> retorna un RDD en el que aplica la función retornando un iterador para cada valor del RDD.
# rdd.map() -> modificar elementos del rdd.
# rdd.reduceByKey(func) -> retorna un RDD con los valores con la misma clave combinados con la función definida.

kv_rdd = error_rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)

# Obtenemos las 5 primeras más usadas

kv_rdd.takeOrdered(5, key = lambda x: -x[1])

[('-', 8), ('-0800]', 4), ('"ERROR', 4), ('64.242.88.10', 4), ('HTTP/1.1"', 4)]

<div id='otros' />

## Otros elementos de Spark Core

<div id='vb' />

### Variables Broadcast

Son variables globales de solo lectura que se envían a todos los nodos workers de spark para que sean usadas en una o más operaciones.

Un buen ejemplo sería cuando necesitamos hacer lookups sobre una tabla en la que tenemos la relación entre status code y su descripción:

In [24]:
# Variables broadcast

my_broadcast_var = sc.broadcast({200: "OK", 401: "Bad Request", 404: "Not Found"})

rdd = sc.textFile("../data/test.log")

def extractStatusCode(line):
    toks = line.split(' ')
    if len(toks) > 7:
        return (my_broadcast_var.value[int(toks[8])], 1)
    else:
        return ("", 1)
    
statusCodes = rdd.map(extractStatusCode).groupByKey()

for line in statusCodes.collect():
    print("Status Code: ", line[0], " => ", sum(line[1]))

Status Code:  Not Found  =>  4
Status Code:  Bad Request  =>  10
Status Code:  OK  =>  40


<div id='acum' />

### Acumuladores

Nos proporciona una sintaxis sencilla para utilizar variables globales compartidas por todos los nodos workes. Se suelen utilizar para agregar valores.

Uno de los usos más comunes es contar los eventos que ocurren durante la ejecución de un trabajo. Por ejemplo, contar las líneas en blanco que tenemos en nuestro fichero de logs

In [25]:
# Creamos acumulador inicializado a 0

blank_lines = sc.accumulator(0)

def extractCallSigns(line):
    global blank_lines
    if (line == ""):
        blak_lines += 1
    return line

call_signs = rdd.map(extractCallSigns)

print("Blank lines: {1} of {0}".format(call_signs.count(), blank_lines.value))

Blank lines: 0 of 54


<div id='ejemplo' />

## Ejemplo final

###### ¿Qué comunidades autónomas han realizado más contratos a mujeres que a hombres durante todo el periodo?

In [26]:
#from pyspark import SparkContext

# Carga de datos

#sc = SparkContext()
cp = sc.textFile("../data/comunidades_provincias.csv")
cm = sc.textFile("../data/contratos_municipio.csv")

# Limpieza de datos

cp_headers = cp.first()
cp = cp.filter(lambda line: line != cp_headers)
headers = cm.first()
cm = cm.filter(lambda line: line != headers)

In [27]:
cm.take(4)

['201601;Almeria;Abla;27;20;7',
 '201601;Almeria;Abrucena;39;26;13',
 '201601;Almeria;Adra;456;259;197',
 '201601;Almeria;Albanchez;7;4;3']

In [28]:
# Tokenizamos datos

def parseLine(line):
    tokens = zip(line.split(";"), headers)
    parsed_tokens = []
    for token in tokens:
        token_type = token[1]
        print("token_type = ", token[0])
        parsed_tokens.append(token[0])
    return parsed_tokens

cp = cp.map(parseLine)
cm = cm.map(parseLine)
cm.take(4)

[['201601', 'Almeria', 'Abla', '27', '20', '7'],
 ['201601', 'Almeria', 'Abrucena', '39', '26', '13'],
 ['201601', 'Almeria', 'Adra', '456', '259', '197'],
 ['201601', 'Almeria', 'Albanchez', '7', '4', '3']]

In [29]:
# Hacemos una prueba para transformar el número de contratos de string a entero.

int(cm.take(4)[1][4])

26

In [30]:
# Dividimos datos para hombres y mujeres

cmh = cm.map(lambda x: (x[1], int(x[4])))
cmm = cm.map(lambda x: (x[1], int(x[5])))

In [31]:
cmh.take(3)

[('Almeria', 20), ('Almeria', 26), ('Almeria', 259)]

In [32]:
cmm.take(3)

[('Almeria', 7), ('Almeria', 13), ('Almeria', 197)]

In [33]:
# Comprobamos que tienen la misma key

cmm.keys().collect() == cmh.keys().collect()

True

In [34]:
# Sumamos la cantidad de contratos de hombres y mujeres por provincia

fh = cmh.reduceByKey(lambda x,y: x+y)
fm = cmm.reduceByKey(lambda x,y: x+y)

# Unimos los datos por provincia

final = fh.cogroup(fm).collect()
prov = []
for pair in final:
    print(pair[0], list(pair[1][0]), list(pair[1][1]))

Almeria [97074] [71785]
Cadiz [231294] [136023]
Granada [165456] [119271]
Sevilla [335311] [239878]
Zaragoza [135031] [107537]
Asturias [98137] [94136]
Balears, Illes [171866] [144575]
Palmas, Las [124122] [107948]
Santa Cruz de Tenerife [101463] [93312]
�vila [15159] [14343]
Palencia [25083] [21602]
Segovia [19806] [17367]
Soria [10440] [9176]
Cuenca [27935] [18387]
Barcelona [666605] [620722]
Girona [91887] [78521]
Castellon/Castello [71152] [50250]
Caceres [56121] [42515]
Bizkaia [136386] [138408]
Cordoba [173253] [104537]
Huelva [137061] [113121]
Jaen [171076] [81942]
Malaga [235880] [180633]
Huesca [32880] [24680]
Teruel [14237] [10401]
Cantabria [67918] [67813]
Burgos [39792] [34683]
Leon [37028] [36131]
Salamanca [33322] [30722]
Valladolid [61531] [55611]
Zamora [14996] [14638]
Albacete [67885] [41392]
Ciudad Real [61271] [37881]
Guadalajara [42710] [28730]
Toledo [76255] [48829]
Lleida [60744] [40048]
Tarragona [96623] [82205]
Alicante/Alacant [200290] [156573]
Valencia/Valenci

In [35]:
# Veamos cómo son los datos de comunidades

cp.take(4)

[['Andalucia', 'Almeria'],
 ['Andalucia', 'Cadiz'],
 ['Andalucia', 'Cordoba'],
 ['Andalucia', 'Granada']]

In [36]:
# Seleccionamos las provincias con mayor contratos de mujeres

provincia = []

for pair in final:
    if list(pair[1][0]) < list(pair[1][1]):
        print(pair[0], list(pair[1][0]), list(pair[1][1]))
        provincia.append(pair[0])

Bizkaia [136386] [138408]
Navarra [97782] [104844]
Gipuzkoa [71690] [81377]
Melilla [5188] [5708]


In [37]:
# Finalmente seleccionamos las comunidades de dichas provincias

comunidad = []

for pair in cp.groupByKey().collect():
    for p in provincia:
        if p in list(pair[1]):
            comunidad.append(pair[0])
            
comunidad = list(set(comunidad))
print(comunidad)

['Navarra, Comunidad Foral de', 'Melilla', 'Pais Vasco']


#### Extra

In [38]:
# Desde el principio con los contratos en la misma rdd.

rdd = cm.map(lambda x: (x[1], (int(x[4]), int(x[5]))))

# Como puedo hacer la suma de contratos por género??

In [39]:
rdd.values().take(3)

[(20, 7), (26, 13), (259, 197)]

In [40]:
rdd.keys().take(3)

['Almeria', 'Almeria', 'Almeria']

In [41]:
rdd.groupByKey().collect()

[('Almeria', <pyspark.resultiterable.ResultIterable at 0x7f43dc88da10>),
 ('Cadiz', <pyspark.resultiterable.ResultIterable at 0x7f43dc8d0910>),
 ('Granada', <pyspark.resultiterable.ResultIterable at 0x7f43dc8d0d10>),
 ('Sevilla', <pyspark.resultiterable.ResultIterable at 0x7f43dc8d0410>),
 ('Zaragoza', <pyspark.resultiterable.ResultIterable at 0x7f43dc8791d0>),
 ('Asturias', <pyspark.resultiterable.ResultIterable at 0x7f43dc87e550>),
 ('Balears, Illes', <pyspark.resultiterable.ResultIterable at 0x7f43b4e8d190>),
 ('Palmas, Las', <pyspark.resultiterable.ResultIterable at 0x7f43dc8d0f50>),
 ('Santa Cruz de Tenerife',
  <pyspark.resultiterable.ResultIterable at 0x7f43b4e3cad0>),
 ('�vila', <pyspark.resultiterable.ResultIterable at 0x7f43b4e3c950>),
 ('Palencia', <pyspark.resultiterable.ResultIterable at 0x7f43b4e73c50>),
 ('Segovia', <pyspark.resultiterable.ResultIterable at 0x7f43dc879690>),
 ('Soria', <pyspark.resultiterable.ResultIterable at 0x7f43dc879590>),
 ('Cuenca', <pyspark.resul