# Programando con Python Spark (PySpark)

Recordando:
- El driver program accesa al ambiente de Spark mediante un objeto SparkContext
- El tipo de dato y concepto clave en Spark es el dataset llamados RDD
- Cargamos datos en un RDD y hacemos operaciones

## ¡Nuestro primer programa!

In [63]:
!wget https://github.com/mirumee/saleor/blob/master/README.md

--2018-09-07 02:43:31--  https://github.com/mirumee/saleor/blob/master/README.md
Resolving github.com (github.com)... 192.30.253.112, 192.30.253.113
Connecting to github.com (github.com)|192.30.253.112|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘README.md.3’

README.md.3             [  <=>               ] 104.10K   333KB/s    in 0.3s    

2018-09-07 02:43:33 (333 KB/s) - ‘README.md.3’ saved [106597]



In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext(master = "local[*]", appName="Israel")

In [4]:
lineas = sc.textFile("README.md", 4)

In [5]:
lineas

README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [6]:
lineas.count()

19

In [7]:
lineasPython =  lineas.filter(lambda line :  "Python" in line)

In [8]:
lineasPython

PythonRDD[3] at RDD at PythonRDD.scala:49

In [10]:
lineasPython.take(1)

[]

----

## RDD

**R**esilient **D**istributed **D**ateset (**RDD**)

Recordando y agregando:
- Contiene **Datos distribuidos** mediante **particiones** (de Workers)
- Habilita **operaciones** para su **ejecución en paralelo**
- Son **inmutables**
- **En caso de pérdida, la computación ejecutada se re-ejecuta**


Tres maneras de crear RDDs:

- Mediante un dataset externo:

In [11]:
lineas = sc.textFile("README.md", 4)

In [12]:
lineas

README.md MapPartitionsRDD[9] at textFile at NativeMethodAccessorImpl.java:0

- Distribuyendo una colección:

In [13]:
lineas = sc.parallelize([1, 2, 3])

In [14]:
lineas

ParallelCollectionRDD[10] at parallelize at PythonRDD.scala:184

- Transformando un RDD existente:

In [15]:
lineasPython =  lineas.filter(lambda line :  "Python" in line)

In [16]:
lineasPython

PythonRDD[11] at RDD at PythonRDD.scala:49

### Operaciones sobre RDDs

#### Transformaciones

Crean un nuevo RDD a partir de otro previo.

P. ej.:
*map()*


#### Acciones

Corre/ejecuta/computa un resultado basado en un RDD existente.

P. ej.:
*count()*

## Programación Funcional con Python

- Muchas transformaciones y algunas acciones esperan una función
- En algunos casos, pueden ser funciones para operaciones más complejas
- Para funciones simples, una expresión lambda es conveniente:
```python
>>> lambda line: “Python” in line
```


### map()

- Lee un elemento a la vez
- Toma un valor, crea un nuevo valor


In [17]:
rdd = sc.parallelize([1, 2, 3, 4])

In [18]:
rdd.map(lambda x: x * 2)

PythonRDD[13] at RDD at PythonRDD.scala:49

In [19]:
rdd.map(lambda x: x * 2).collect()

[2, 4, 6, 8]

### filter()

- Lee un elemento a la vez
- Evalua cada elemento
- Regresa los elementos que pasan el filtro  (filtro)

In [20]:
rdd = sc.parallelize([1, 2, 3, 4])

In [21]:
BBB = rdd.filter(lambda x: x % 2 == 0)

In [22]:
BBB.collect()

[2, 4]

In [23]:
rdd.filter(lambda x: x % 2 == 0).collect()

[2, 4]

### flatMap()

Produce multiples elementos por cada elemento de entrada

In [24]:
rdd = sc.parallelize([1,2,3])

In [25]:
rdd.map(lambda x: [x, x * 2]).take(2)

[[1, 2], [2, 4]]

In [26]:
rdd.flatMap(lambda x: [x, x * 2])

PythonRDD[21] at RDD at PythonRDD.scala:49

# Transformations are lazy!
## Featuring: Lazy evaluation!! 🔥🙈

[Haskell Lazy Evaluation](https://wiki.haskell.org/Lazy_evaluation):

>Lazy evaluation is a method to evaluate a Haskell program. It means that expressions are not evaluated when they are bound to variables, but their evaluation is deferred until their results are needed by other computations. In consequence, arguments are not evaluated before they are passed to a function, but only when their values are actually used. 

[The Incomplete Guide to Lazy Evaluation (in Haskell)](https://hackhands.com/guide-lazy-evaluation-haskell/):

> Originally, I wanted to write a complete guide to lazy evaluation, but then.

>Lazy evaluation is the most widely used method for executing Haskell program code on a computer. It determines the time and memory usage of Haskell programs, and it allows new and powerful ways to write modular code. To make full use of purely functional programming, a good understanding of lazy evaluation is very helpful.


Un RDD solo es ejecutado cuando las acciones corren sobre el mismo:

In [30]:
lineas = sc.textFile("README.md", 4)
lineasPython =  lineas.filter(lambda line :  "Python" in line)

In [31]:
lineasPython

PythonRDD[29] at RDD at PythonRDD.scala:49

In [32]:
lineasPython.first()

ValueError: RDD is empty

Al usar la evaluación floja, Spark puede contener en memoria RDD que se procesa unicamente cuando se le requiere. Sin la necesidad de cargar a memoria todas las lineas conteniendo "Python".


💻🐍

## Acciones

- Las acciones causan transformaciones para ser almacenadas en RDDs nuevos
- También regresan resultados a ambas partes: el *driver* o un almacenamiento externo
- Los RDDs son re-calculados por cada acción que se les ejecuta
- Pueden ser almacenados para un uso posterior: `rdd.persist()`

### count()

Obtiene las instancias en el RDD:

In [33]:
rdd = sc.parallelize([1, 2, 3, 4])

In [34]:
rdd

ParallelCollectionRDD[32] at parallelize at PythonRDD.scala:184

In [35]:
rdd.count()

4

### collect()
- `collect()` recupera el RDD completo 🚒
- Útil para inspeccionar datasets pequeños de manera local y para unit-testing
- **LOS RESULTADOS DEBEN CABER EN LA MEMORIA DEL EQUIPO LOCAL**

In [36]:
rdd = sc.parallelize([1, 2, 3])

In [37]:
rdd

ParallelCollectionRDD[34] at parallelize at PythonRDD.scala:184

In [38]:
rdd.takeOrdered(3)

[1, 2, 3]

### take(), takeSample(), first(), top(), takeOrdered() 

- ```take(n)``` regresa los primeros *n* elementos de un RDD
- ```take(n)``` puede obtener resultados sezgados. Su uso es adecuado solo para pruebas o debugging
- ```takeSample()``` como el nombre lo indica, es el más adecuado para tomar una muesta del dataset
- ```first(n)``` al igual que ```take(n)```, obtiene los primeros *n* elementos de un RDD
- ```top(), takeOrdered()``` como métodos más formales para obtener elementos ordenados de un RDD

### takeOrdered()

In [39]:
rdd = sc.parallelize([5, 1, 3, 2])

In [40]:
rdd.takeOrdered(4)

[1, 2, 3, 5]

In [41]:
rdd.takeOrdered(4, lambda n: -n)

[5, 3, 2, 1]

### reduce()
Toma dos elementos del mismo tipo y regresa un nuevo elemento:

In [42]:
rdd = sc.parallelize([1,2,3])

In [43]:
rdd.reduce(lambda x, y:  x*y)

6

## Persistencia
- Spark re-calcula los RDDs cada vez que se llama a una acción:
    - Esto puede ser caro y también causar un tráfico innecesario desde el disco (lectura)
- Podemos evitar esto almacenando datos en caché con ```persist()```.

In [44]:
lineas = sc.textFile("README.md", 4)

In [45]:
lineas.count()

19

In [46]:
lineasPython =  lineas.filter(lambda line :  "Python" in line)

In [48]:
# Causa a Spark el recargar la variable "lineas" desde el disco 🙊
%time lineasPython.count()

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 141 ms


0

### Aplicando persistencia

In [49]:
lineas = sc.textFile("README.md", 4)

In [50]:
lineas.persist() # Ahora, este RDD se mantiene en RAM

README.md MapPartitionsRDD[47] at textFile at NativeMethodAccessorImpl.java:0

In [51]:
lineas.count()

19

In [52]:
lineasPython =  lineas.filter(lambda line :  "Python" in line)

In [60]:
# Spark no volverá a hacer el cómputo para "lineas" cada vez que es usado
%time lineasPython.count()

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 135 ms


0

## Construyendo un Pipeline de operaciones para RDDs



```python
>>> lineas = sc.textFile("README.md")
>>> lineas.map(...).filter(...).count(...)



>>> lineas = sc.textFile("README.md")
>>> (lineas
     .map(...)
     .filter(...)
     .count(...))
```

In [68]:
!wget https://github.com/mirumee/saleor/blob/master/README.md

--2018-09-07 02:43:56--  https://github.com/mirumee/saleor/blob/master/README.md
Resolving github.com (github.com)... 192.30.253.112, 192.30.253.113
Connecting to github.com (github.com)|192.30.253.112|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘README.md.4’

README.md.4             [  <=>               ] 104.10K   372KB/s    in 0.3s    

2018-09-07 02:43:58 (372 KB/s) - ‘README.md.4’ saved [106597]



In [71]:
lineas = sc.textFile("README.md.4").filter(lambda line :  "Python" in line).count()

In [72]:
lineas

4

---

### Ejercicio 01: Crear un nuevo RDD con la cadena "Hola Spark" e imprimirla en pantalla al obtener el primer elemento

### Ejercicio 02: Completar el siguiente bloque de código, para usar el archivo README.md e imprimir el numero de lineas y el conteo de palabras en el archivo

```python

# Crear un RDD a partir de un dataset
readme_rdd =  
# Imprimir en pantalla el num. de lineas del RDD
print('Conteo de lineas: ')
print()
print('Conteo de palabras: ')
palabras_lista = readme_rdd.flatMap(lambda linea: linea.split(" ")) \
                            . # map
                            .reduceByKey(lambda a, b: a + b) \
                            . # collect
  print(palabras_lista)

```

---
### Soluciones:

#### Ejercicio 01

#### Ejercicio 02: reduceByKey() y collect()


#### Word count con otro dataset y takeOrdered

Referencia:
```python
Sort by keys (ascending):

>>> RDD.takeOrdered(5, key = lambda x: x[0])

Sort by keys (descending):

>>> RDD.takeOrdered(5, key = lambda x: -x[0])

Sort by values (ascending):

>>> RDD.takeOrdered(5, key = lambda x: x[1])

Sort by values (descending):

>>> RDD.takeOrdered(5, key = lambda x: -x[1])
```



In [None]:
!wget http://www.gutenberg.org/files/74/74-0.txt

In [None]:
sc.stop()