# **FIUBA Organización de Datos 75.06**
# **Introduccion a [Apache Spark](https://spark.apache.org/) usando [pySpark]((https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)**

## Arquitectura

En Spark, para poder realizar una tarea la comunicacion ocurre entre un **driver** y una serie de **executors** (ejecutores). El driver tiene **jobs** y para ejecutarlos se rompen en **tareas (tasks)** que se envian a los executors. Los resultados de esas tareas se envian denuevo al **driver**.  

En este IPython Notebook la funcion del driver de Spark se ejecuta dentro del kernel asociado al notebook. 

Al utilizar localmente o conectandose a un cluster, el software driver 
es **[PySpark shell](https://spark.apache.org/docs/latest/programming-guide.html#using-the-shell)**. Este contiene el main loop del programa que **creara datasets distribuidos (RDDs)** en el cluster y **aplicará operaciones (transformations & actions)** a esos datasets.

### Ejemplo de Cluster

En este diagrama podemos ver un ejemplo de un cluster, suponiendo que estamos corriendo una aplicacion o un determinada tarea en el cluster, se pueden ver en el grafico, marcados en violeta una serie de cores dedicados a ese job o aplicacion.

![executors](http://spark-mooc.github.io/web-assets/images/executors.png)

Se puede ver **a través WebUI de Spark [http://localhost:4040](http://localhost:4040)**, información sobre los executors disponibles en el cluster, los jobs que se corrieron y que estan corriendo, etc.

A alto nivel una aplicacion de Spark Consiste en un programa **driver** que **lanza** varias **operaciones paralelas en multiples executors JVMs que corren en un cluster o localmente en la misma maquina** (localmente puede correr cada uno en cada core disponible).

## SparkContext

Al utilizar Spark, uno crea una nueva aplicacion de Spark creando un `SparkContext`, el cual nos permitirá interactuar con el API de Spark. 

Cuando se crea el `SparkContext` se le pide al master algunos cores para poder ejecutar tasks. El master separar estos cores para esa aplicacion, no siendo utilizados por otras aplicaciones.

Al usar el entorno de lab desde el IPython Notebook el `SparkContext` esta creado en la variable `sc`

In [4]:
type(sc)

pyspark.context.SparkContext

In [5]:
help(sc)

Help on SparkContext in module pyspark.context object:

class SparkContext(__builtin__.object)
 |  Main entry point for Spark functionality. A SparkContext represents the
 |  connection to a Spark cluster, and can be used to create L{RDD} and
 |  broadcast variables on that cluster.
 |  
 |  Methods defined here:
 |  
 |  __enter__(self)
 |      Enable 'with SparkContext(...) as sc: app(sc)' syntax.
 |  
 |  __exit__(self, type, value, trace)
 |      Enable 'with SparkContext(...) as sc: app' syntax.
 |      
 |      Specifically stop the context on exit of the with block.
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)
 |      Create a new SparkContext. At least the master and app name should be set,
 |      either through the named parameters here or through C{conf}.
 |   

Para mas información sobre como inicializar el entorno de spark en un contexto de cluster ver [https://spark.apache.org/docs/latest/programming-guide.html#initializing-spark](https://spark.apache.org/docs/latest/programming-guide.html#initializing-spark)

## Construyendo RDDs

Los **RRDs (Resilent Distributed Datasets)**, son colecciones tolerantes a fallos cuyos elementos pueden ser operados en paralelo. Se pueden crear de a partir de colecciones o **a partir de external datasets (distribuidos o no)** que veremos con algunos ejemplos.

### Paralelizando una coleccion de python

Inicialmente creamos una coleccion de datos en python que usaremos para crear el RDD usando `sc.parallelize`

In [6]:
## creamos 1000 enteros en una lista
integersList = xrange(1,1001)
integersList[0]
len(integersList)

1000

In [7]:
## Paralelizamos la coleccion utilizando 8 particiones o slices
## Esta operacion es una transformacion de datos en un RDD
## Dado que Spark usa lazy evaluation, no corren jobs de Spark
## hasta el momento
integersListRDD = sc.parallelize(integersList, 10)
type(integersListRDD)

pyspark.rdd.PipelinedRDD

In [8]:
## podemos ver tambien otra informacion interesante del RDD
## el numero de particiones
integersListRDD.getNumPartitions()

10

In [9]:
## el conjunto de transformaciones que se aplica
print integersListRDD.toDebugString()

(10) PythonRDD[1] at RDD at PythonRDD.scala:43 []
 |   ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:392 []


In [10]:
## para ver mas metodos disponibles del RDD
help(integersListRDD)

Help on PipelinedRDD in module pyspark.rdd object:

class PipelinedRDD(RDD)
 |  Pipelined maps:
 |  
 |  >>> rdd = sc.parallelize([1, 2, 3, 4])
 |  >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
 |  [4, 8, 12, 16]
 |  >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
 |  [4, 8, 12, 16]
 |  
 |  Pipelined reduces:
 |  >>> from operator import add
 |  >>> rdd.map(lambda x: 2 * x).reduce(add)
 |  20
 |  >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
 |  20
 |  
 |  Method resolution order:
 |      PipelinedRDD
 |      RDD
 |      __builtin__.object
 |  
 |  Methods defined here:
 |  
 |  __del__(self)
 |  
 |  __init__(self, prev, func, preservesPartitioning=False)
 |  
 |  id(self)
 |  
 |  ----------------------------------------------------------------------
 |  Methods inherited from RDD:
 |  
 |  __add__(self, other)
 |      Return the union of this RDD and another one.
 |      
 |      >>> rdd = sc.parallelize([1, 1, 2, 3])
 |      >>> (rdd + rdd).colle

## ¿Qué sucede en el cluster?: Información distribuida

En Spark los datasets son representados como una lista de entradas, la cual se rompe en distintas particiones, cada una guarda en un maquina distinta del cluster. 

Cada particion tiene un unico subconjunto de las entradas de la lista. La abstraccion que usa Spark para manipularlos se conoce como "Resilent Distributed Datasets" (RDDs) y una de las particularidades que tienen es que estan disponibles en memoria (de ser posible).

El escenario de nuestro ejemplo se veria de la siguiente forma si consideramos algunos executors

![partitions](http://spark-mooc.github.io/web-assets/images/partitions.png)

### Creando RDDs desde external Datasets

PySpark puede crear datasets distribuidos desde cualquier storage soportado por Hadoop (local filesystem, HDFS, Cassandra, HBase, AWS S3, etc). Soportando archivos de texto y otros tipos de formatos de entrada soportados por Hadoop ([SequenceFiles](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html) y [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html)).

### Cargando un archivo de texto

Aprovechando que tenemos en nuestra VM: [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). Vamos a convertir el archivo de texto en un RDD usando el método `SparkContext.textFile()`. 

In [11]:
## creamos el RDD a partir de un archivo de texto
shakespeareRDD = sc.textFile('data/cs100/lab1/shakespeare.txt',8)
## aplicamos una accion para tomar los 15 items del RDD
shakespeareRDD.take(15)

[u'1609',
 u'',
 u'THE SONNETS',
 u'',
 u'by William Shakespeare',
 u'',
 u'',
 u'',
 u'                     1',
 u'  From fairest creatures we desire increase,',
 u"  That thereby beauty's rose might never die,",
 u'  But as the riper should by time decease,',
 u'  His tender heir might bear his memory:',
 u'  But thou contracted to thine own bright eyes,',
 u"  Feed'st thy light's flame with self-substantial fuel,"]

## Transformaciones: `map()`

Hasta el momento **creamos datasets distribuidos que se han dividido en varias particiones, donde cada particion se encuentra en una maquina de nuestro cluster**, veamos que sucede al hacer una operacion a nuestro dataset.

Vamos a partir de la operacion mas usual que podemos llegar a querer hacer sobre nuestro **'por cada uno de los elementos del dataset hagamos una accion y apliquemos devolvamos un resultado'**. Esto se logra con una **transformacion `map()`** que aplicara la función f a cada resultado.

In [12]:
## aplicamos una transformacion map para restar a todos 1
## aplicamos la accion take para mostrar 10 resultados

integersListRDD.map(lambda a: a-1).take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [13]:
## generamos una tupla que tenga el valor original, el anterior y siguiente
## aplicamos una transformacion map para restar a todos 1
## aplicamos la accion take para mostrar 10 resultados

integersListRDD.map(lambda a: (a, a-1, a+1)).take(10)

[(1, 0, 2),
 (2, 1, 3),
 (3, 2, 4),
 (4, 3, 5),
 (5, 4, 6),
 (6, 5, 7),
 (7, 6, 8),
 (8, 7, 9),
 (9, 8, 10),
 (10, 9, 11)]

## ¿Qué sucede en el cluster?

Cuando ejecutamos `map()` en el dataset, un unico **stage** de tareas se lanza. Un **stage** es un grupo de tareas que van a realizar el mismo computo pero con distintos datos de entrada. Una tarea entonces se lanzara para cada una de las particiones.

En el grafico de abajo vemos que sucede entonces con cada una de las particiones al ejecutarse la tarea y el resultado que producen.

![foo](http://spark-mooc.github.io/web-assets/images/map.png)

## Acciones: `collect()`,`count()`

Luego de haber aplicado una transformacion en el RDD, podemos 
queres volver a obtener informacion en el **driver**, podemos utilizar una accion `collect()`, usualmente se utiliza despues de alguna operacion que limite la cantidad de resultados (`filter()`) para asegurarnos que el resultado devuelto entre en la memoria disponible del driver.

Ya usamos anteriormente otra accion `take()` para traer un numero de resultados al driver.

Volviendo a nuestro ejemplo con los numero enteros podemos:

In [14]:
## aplicamos una transformacion map para restar a todos 1
## aplicamos la accion take para mostrar 10 resultados

subRDD = integersListRDD.map(lambda a: a-1)
print subRDD.collect()



[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221,

En el cluster sucede la siguiente situacion, al realizarse el `collect()`
devuelviendo los resultados a SparkContext.

![collect](http://spark-mooc.github.io/web-assets/images/collect.png)

Otra accion interesante es `count()` que permite contar la cantidad de elementos en el RDD

In [15]:
# por ejemplo para los RDD con los que estuvimos trabajando

print integersListRDD.count()
print subRDD.count()
print shakespeareRDD.count()

1000
1000
122395


## Mas Transformaciones: `filter()`

La transformacion `filter()` transformacion permite aplicar una funcion al evaluarse solamente emitira a la salida aquellos que la funcion filtro haya devuelto `True`.

In [16]:
## obtener los numeros pares
print integersListRDD.filter(lambda a: a % 2).take(10)
print integersListRDD.filter(lambda a: a % 2).count()

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
500


In [17]:
# eliminar lineas vacias de the complete works of shakespeare
shakespeareRDD.filter(lambda a: a != "").take(15)

[u'1609',
 u'THE SONNETS',
 u'by William Shakespeare',
 u'                     1',
 u'  From fairest creatures we desire increase,',
 u"  That thereby beauty's rose might never die,",
 u'  But as the riper should by time decease,',
 u'  His tender heir might bear his memory:',
 u'  But thou contracted to thine own bright eyes,',
 u"  Feed'st thy light's flame with self-substantial fuel,",
 u'  Making a famine where abundance lies,',
 u'  Thy self thy foe, to thy sweet self too cruel:',
 u"  Thou that art now the world's fresh ornament,",
 u'  And only herald to the gaudy spring,',
 u'  Within thine own bud buriest thy content,']

In [18]:
print shakespeareRDD.filter(lambda a: a != "").count()
print shakespeareRDD.count()

112902
122395


## Reduce

La accion `reduce()` reduce los elementos de un RDD a un unico valor al aplicar una funcion que toma dos parametros y retorna un unico valor.

La funcion planteada tiene que ser **conmutativa y asociativa**, ya que `reduce()` **es aplicado a nivel de particion y luego nuevamente para agregar resultados de particiones**. Si esto no se respeta, los resultados de `reduce()` seran inconsistentes.

In [19]:
## la suma es asociativa y conmutativa
print integersListRDD.reduce(lambda a, b: a + b)
print integersListRDD.repartition(4).reduce(lambda a, b: a + b)

500500
500500


In [20]:
## la resta no no asociativa y conmutativa
print integersListRDD.reduce(lambda a, b: a - b)
print integersListRDD.repartition(4).reduce(lambda a, b: a - b)

481384
163978


## Otras Acciones: `first()`,`takeOrdered()`,`top()`

Estas son otras acciones que pueden ser utiles para procesar


In [21]:
## first() devuelve el primer elemento del RDD
print shakespeareRDD.first()

1609


In [22]:
## traemos los 3 elementos mas pequeños
integersListRDD.takeOrdered(3)
## traemos los 3 elementos mas grandes
integersListRDD.top(3)

[1000, 999, 998]

In [23]:
## se puede utilizar una funcion para fijar el orden en takeOrdered
## por ejemplo para revertirlo
integersListRDD.takeOrdered(3, lambda a: -a)

[1000, 999, 998]

Otras acciones avanzadas para investigar son [`takeSample()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.takeSample) y [`countByValue()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.countByValue)

## Transformaciones Adicionales

### `flatMap()`

Similar a map, pero permite que cada item de entrada se mapee a cero o mas elementos de salida.

In [24]:
simpleRDD = sc.parallelize([2, 3, 4])
print simpleRDD.map(lambda x: range(1, x)).collect()
print simpleRDD.flatMap(lambda x: range(1, x)).collect()

[[1], [1, 2], [1, 2, 3]]
[1, 1, 2, 1, 2, 3]


### Transformaciones para pair RDDs

Las siguientes transformaciones aplican a RDDs donde los elementos 
son tuplas del tipo `(key, value)`


### `reduceByKey()`

la transformacion `reduceByKey()` junta pares que tienen la misma key y aplica la funcion a los dos valores asociados a la vez. Esta opera primero aplicando la funcion a cada particion para las mismas claves y luego a traves de las particiones (similar al reduce).

### `groupByKey()`

Agrupa todos los elementos de una misma clave generando una clave con una lista con los elementos.

Ambas se pueden usar para resolver el mismo problema, pero reduceBykey es mas eficiente en datasets distribuidos grandes, dado que en este escenario Spark sabe que puede combinar output de un mismo key (en la misma maquina) antes de redistribuir datos entre los distintos nodos (shuffling).

In [25]:
from operator import add

pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
# usamos mapValues para mejorar el formato de impresion
print pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()

# Diferentes formas de sumar por clave
print pairRDD.groupByKey().map(lambda (k, v): (k, sum(v))).collect()
# Using mapValues, which is recommended when they key doesn't change
print pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect()
# reduceByKey is more efficient / scalable
print pairRDD.reduceByKey(add).collect()

[('a', [1, 2]), ('b', [1])]
[('a', 3), ('b', 1)]
[('a', 3), ('b', 1)]
[('a', 3), ('b', 1)]


Otras transformaciones interesantes para investigar son [combineByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.combineByKey) y [foldByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.foldByKey).

## Ejercicio: Buscando la linea de máxima longitud

In [26]:
# generando tuplas para obtener la linea 
# de maxima longitud de todo el texto

result = (shakespeareRDD
 .map(lambda a: (a, len(a)))
 .reduce(lambda a,b: a if a[1] > b[1] else b))

resultCount = (shakespeareRDD
               .map(lambda a: len(a))
               .top(1))

assert result[1] == resultCount[0]
print resultCount
print result[0]

[85]
    whither wilt?' ROSALIND. Nay, you might keep that check for it, till you met your
