# Tutorial de Spark

Vamos a ejecutar Spark sobre un solo nodo. No tendremos distribución ni HDFS, pero podremos mostrar _cómo_ se escribe código en este _framework_.

In [None]:
import findspark
findspark.init()
import pyspark
import random
sc = pyspark.SparkContext(appName="Pi")

# 1. Leyendo o escribiendo datos

El principal "punto de entrada" a Spark es el objeto global `sc`, el `sparkContext`, o contexto de Spark. El flujo típico de un programa de Spark es el siguiente:

1. se leen datos -_big_ (desde almacenamiento distribuido) o _small_ (desde nuestro cliente)-  *hacia* Spark.
2. se realizan transformaciones y operaciones.
3. se producen resultados *desde* Spark hacia el almacenamiento distribuido o que se traen al cliente.

Vamos a leer una lista de Python con 3 elementos hacia Spark usando `parallelize`:

In [None]:
input_list = [1, 2, 3]
l = sc.parallelize(input_list)

¿Qué contiene?

In [None]:
l

Un `ParallelCollectionRDD` es la representación como `RDD` (luego veremos qué es esto) de una colección básica de Python (una lista, en este caso). No es un objeto al que podamos acceder directamente, solo tenemos una referencia al mismo, pero la colección (representada como estructura en Spark) "vive" gestionada por Spark.

Si queremos retornar un objeto "de vuelta" al cliente usamos `collect`. ¡Ambas operaciones pueden ser muy costosas, sobre todo con datos de gran tamaño!

In [None]:
result = l.collect()

Efectivamente, `result` es una lista con valores `[1, 2, 3]`.

In [None]:
result

Por si queda alguna duda, las listas son **objetos distintos**. Al obtener la lista se crea un nuevo objeto en el cliente, que contiene los valores requeridos.

In [None]:
hex(id(result)) == hex(id(input_list))

# 2. Qué es un RDD

Un `RDD` (_Resilient Distributed Dataset_) es la estructura básica en la que se almacenan los datos en Spark, y es la principal estructura en las primeras versiones de Spark.

Como hemos dicho, un `RDD` "vive" gestionado por Spark. Así que podemos asumir que existe en un nodo y que, posiblemente está distribuido. 

**Conceptualmente** podemos pensar en un `RDD` como una lista de pares `(clave, valor)` inmutable.

En la práctica un RDD puede ni siquiera existir como estructura en memoria; en el ejemplo anterior la sucesión de dos instrucciones crea un DAG con el esquema de procesado que solo se ejecuta cuando se "pide" algo al cliente (en nuestro caso, al hacer collect de `l`. Vamos a ver esto con un ejemplo en el que introduciremos una serie de instrucciones nuevas:

## 2.1 Contando palabras de "El Quijote"

Este es un ejemplo de uso de las primitivas de lectura de archivos, y nuestro primer programa map-reduce. Vamos a usar `textFile` para "leer" un archivo de texto del cliente hacia Spark.

In [None]:
quijote = sc.textFile("el_quijote.txt")

In [None]:
quijote

Tenemos un `MapPartitionsRDD`. Se trata de un RDD de cadenas de caracteres, donde cada una de ellas es una línea. Vamos a ver esto usando la función `first`.

In [None]:
quijote.first()

La instrucción `first` es útil si queremos mostrar parte del RDD y ver si contiene lo que debería. También podríamos hacer `take(N)` para obtener los `N` primeros elementos. En cualquier caso, hay que cuidar que esto no ejecute (aunque sea parcialmente) un grafo muy complicado.

Con esto, hemos creado (y ejecutado) un DAG muy simple. Ahora vamos a contar palabras (lo que implica, como sabéis, una parte `map` y otra `reduce`). En Spark no es necesario escribir clases independientes para cada operación; incluso, ¡se puede hacer todo de una vez!.

In [None]:
palabras = quijote.flatMap(lambda line: line.upper().split(" "))\
                .map(lambda x: (x, 1))\
                .reduceByKey(lambda a, b: (a + b))

In [None]:
palabras

Tenemos como resultado un `RDD`, pero, ¿está hecho el cálculo? Definitivamente **no**. Spark está esperando a que lancemos una instrucción que ejecute el grafo. Por lo general Spark devuelve referencias `lazy`, esto es, que solo tienen un cierto valor si se ejecutan todos los cálculos dependientes y se realiza una operación que verdaderamente requiere dicho valor.

In [None]:
palabras.sortBy(lambda x: -x[1]).keys().take(10)

Lo de arriba calcula las 10 palabras más comunes en "El Quijote". Nótese el uso de `sortBy` con una función anónima: esto es una operación cara que por lo general implica `shuffles` y debe, por tanto evitarse al máximo.

No obstante, `sortBy` es "tan eficiente como puede ser":
    - Primero se ordenan cada una de las particiones.
    - Luego se producen los shuffles correspondientes, siguiendo los órdenes.

# 2.2. Una versión alternativa

In [None]:
sc.parallelize([x.upper().split() for x in open("el_quijote.txt")])\
    .flatMap(lambda x: [(y, 1) for y in x])\
    .reduceByKey(lambda a, b: a + b)\
    .sortBy(lambda x: -x[1]).keys().take(10)

**Para pensar:** ¿Cuál es la principal diferencia? ¿Por qué tarda más?

# 3. Spark DataFrames

El uso de `RDDs` puede ser tedioso. Algunas de sus limitaciones:
- No hay flexibilidad (son estructuras `(clave, valor)`).
- Una vez creado un RDD no hay forma de conocer sus tipos. Esto puede inducir a error en operaciones complicadas, por ejemplo, en un `join`.
- Cuando queremos escribir a cliente tenemos una estructura que hay que transformar.

Spark introdujo experimentalmente en su versión 1.6 los `DataFrames`. Estos son una abstracción sobre una tabla, con filas y columnas, similar a los Pandas `DataFrames`. Su interfaz es más sencilla, sobre todo si estamos familiarizados con las bases de datos. Vamos a revisitar nuestro ejemplo:

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
quijote_l = sc.textFile("el_quijote.txt")\
    .flatMap(lambda line: [(x, 1) for x in line.upper().split(" ")])

In [None]:
quijote_l.first()

In [None]:
df_quijote_sin_schema = spark.createDataFrame(quijote_l)
df_quijote_sin_schema

In [None]:
df_quijote_sin_schema.show()

¡Hemos creado nuestro primer `DataFrame`! Se muesltran dos columnas con tipos `string` y `bigint` (inferidos) y nombres poco inteligibles. Repitamos la operación aplicando un schema.

In [None]:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

In [None]:
df_quijote = spark.createDataFrame(
    quijote_l,
    schema=StructType([StructField("palabra", StringType()),
                      StructField("frecuencia", IntegerType())]))

In [None]:
df_quijote

In [None]:
df_quijote.show()

¿Qué podemos hacer con un `DataFrame`? Vamos a mostrar las 10 palabras más frecuentes usando instrucciones "tipo SQL". Veamos la instruccion en 3 etapas:

In [None]:
from pyspark.sql.functions import col

1. GROUPBY palabra + sum (¡igual que en SQL!)

In [None]:
df_quijote.groupby("palabra").sum()

2. orderBy `sum(frecuencia)` (columna creada al sumar frecuencias) en orden descendente.

In [None]:
df_quijote.groupby("palabra").sum().orderBy(
    col("sum(frecuencia)").desc())

3. tomamos las 10 filas más frecuentes.

In [None]:
df_quijote.groupby("palabra").sum().orderBy(
    col("sum(frecuencia)").desc()).limit(10)

4. devolvemos el resultado al cliente a un `pandas DataFrame`

In [None]:
df_quijote_pd = df_quijote.groupby("palabra").sum()\
                          .orderBy(col("sum(frecuencia)").desc())\
                          .limit(10).toPandas()

In [None]:
df_quijote_pd

La interacción con `pandas` es muy conveniente. Desde Pandas podemos trabajar en el cliente (graficar, escribir a csv o a otros formatos, hacer operaciones no costosas, presentar los resultads, etc.).

Obviamente, también podemos "enviar" un DataFrame de Pandas (usualmente un dataset mediano para interactuar con otro grande) hacia Spark

In [None]:
df_quijote_otro = spark.createDataFrame(df_quijote_pd)

In [None]:
df_quijote_otro

In [None]:
df_quijote_otro.count()

In [None]:
df_quijote_otro.show()

# 4. Para saber más

- La documentación de PySpark es muy completa, con tuturiales y una API detallada. https://spark.apache.org/docs/latest/api/python/.
- El libro de Holden Karau "High Performance Spark" es una de las referencias más completas si quieres aprender qué ocurre dentro de Spark, saber cómo se optimizan consultas e ir a temas más avanzados: https://learning.oreilly.com/library/view/high-performance-spark/9781491943199/.


###### 