<h1 style="font-size:40px;"> Tuning Spark </h1>

![](img/book_tuning.jpg)

Aunque Apache Spark mejora cada día conseguir que un proceso de Spark funcione correctamente puede ser complicado en algunos escenarios: Muchos datos, operaciones muy costosas... 

Muchos de ellos acaban con el famosísimo error `Spark java.lang.StackOverflowError` que para los que vienen del mundo JVM les será bien conocido y da nombre a la famosa web https://stackoverflow.com/.


En esta sección vamos a ver algunos consejos de cómo configurar nuestra aplicación de spark para conseguir una buena *performance* y evitar los errores más típicos, pero ya hay que adelantar que no hay una fórmula mágica. Dependerá mucho de nuestro cluster, nuestro proceso así que cada proceso es distinto y hay que ir probando cada una de las técnicas hasta conseguir que nuestro proceso sea estable y rápido.


Existen varios recursos en internet sobre estos temas, nos hemos basados en los siguientes:


* https://spark.apache.org/docs/latest/tuning.html

* https://databricks.com/training/instructor-led-training/courses/apache-spark-tuning-and-best-practices

* https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/

* https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/



Como hemos comentado, cada caso es un mundo, en esta pequeña guía nos vamos a centrar en **Spark on YARN**, pudiendo ser distinto la configuración si usamos otro manager (*standalone*, Mesos, Kubernetes...)


![](img/spark-yarn-cluster.png)

&nbsp;   


## Configuración estándar y estudio de nuestro cluster

Lo primero que tenemos que ver es el tamaño de nuestro cluster (cores totales y RAM total), así cómo saber si el proceso que vamos a ejecutar va a estar solo en toda la máquina o vamos a tener que compartir con otros usuarios al mismo tiempo.


Podemos acceder al Hadoop Task Manager para ver las características de nuestro cluster:




![](img/tamano_cluster.png)
<center>
http://master02.bigdata.alumnos.upcont.es:8088/cluster/nodes
</center>

De aquí podemos extraer que nuestro cluster tiene 400GB de memoria RAM y 256 cores. Además esta RAM y cores está dividida de manera balanceada en 4 nodos (100GB y 64cores).

Si nuestra aplicación se fuese a correr sola en todo el cluster podríamos configurar los executors como:

* `spark.executor.memory` = 100g
* `spark.executor.cores` = 64

Pero esta configuración no suele ser la adecuada ya que suele ser mayor que lo que permite YARN en la variable `yarn.scheduler.maximum-allocation-mb` además según estudios realizados si vamos a trabajar en YARN y con HDFS no se suele recomendar usar más de 5 cores por JVM ([ HDFS I/O throughput](https://www.cloudera.com/documentation/enterprise/5-13-x/topics/admin_spark_tuning.html)).

Además del problema del I/O con HDFS suele ser bueno hacer executors más pequeños para que el proceso pueda convivir con otros usuarios. Siguiendo la idea de intentar sacar el mayor provecho a los recursos vamos a calcular el ratio RAM(Gb)/cores de nuestro cluster:


$$
\frac{400}{256} \approx 1.6
$$

Así que si fijamos los cores a 5 e intentamos mantener esta proporción, podemos calcular que el número de GB adecuado sería 8.

Para terminar hay que tener en cuenta otra configuración `spark.executor.memoryOverhead`:


![](https://ndu0e1pobsf1dobtvj5nls3q-wpengine.netdna-ssl.com/wp-content/uploads/2019/08/spark-tuning2-f1.png)


Además de la memoria que reservamos para cada executor tenemos que tener en cuenta también la configuración del *overhead* que es un trozo de memoria que se deja reservado y que por defecto:

![](img/overhead.png)

Así que, si usamos el valor por defecto tendríamos que tener en cuenta ese 10% en nuestra cuenta:

$$
RAM = \frac{1.6 \cdot 5}{1.1} \approx  7.27
$$

De este modo llegamos a la configuración:

* `spark.executor.memory` = 7g
* `spark.executor.cores` = 5


In [1]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window

In [2]:
conf = (

    SparkConf()
    .setAppName(u"[ICAI] Spark Tuning")
    .set("spark.executor.memory", "7g") 
    .set("spark.executor.cores", "5")

)

In [3]:
spark = (

    SparkSession.builder
    .config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()

)

Vamos a forzar cargar varios executors para ver la distribución en el cluster:

In [5]:
meta = spark.table('jayuso.ratings').repartition(1000).sample(True, 100.0).cache()

In [6]:
meta.count()

1534180093

Consiguiendo:


![](img/proceso_distribuido.png)

&nbsp;   


Que si hacemos los calculos:


In [7]:
394240 / 1024 / 241

1.5975103734439835

In [8]:
meta.unpersist()

DataFrame[id_contenido: double, id_user: double, franja: string, ratings: double]

## Dataframe vs RDD:  Tungsten y Catalyst

Como ya hemos visto la API DataFrame en general es más rápida y eficiente que trabajar directamente con RDD's. Esto es en gran medida a los proyectos Tungsten y Catalyst.

Veamos como los mismos datos ocupan mucho menos en formato DataFrame (que usa el proyecto Tungsten para serializar los datos) que en formato RDD:

In [9]:
meta = spark.table('jayuso.ratings').cache() #aqui comprime los datos en parquet y es muu compromodo

In [10]:
meta.count()

15342422

In [11]:
meta2 = meta.rdd.cache() #esto es mucho mas grnade

In [12]:
meta2.count()

15342422

![](img/df_rdd.png)

Una vez que tenemos nuestros datos en formato DataFrame podemos usar todas las funciones que ya hemos visto (las `F.*`). Estas funciones saben trabajar directamente en el formato de Tungsten y así ser más eficiente.


Si tenemos que usar una o más UDF de python/scala puro lo mejor sería hacerlas todas a la vez y así no tener que serializar/deserializar varias veces:


<img src="https://s3-us-west-2.amazonaws.com/curriculum-release/images/tuning/interleaved-lambdas.png" alt="Interleaved Lambdas" style="border: 1px solid #cccccc; margin: 20px"/>
<img src="https://s3-us-west-2.amazonaws.com/curriculum-release/images/tuning/chained-lambdas.png" alt="Chained Lambdas" style="border: 1px solid #cccccc; margin: 20px"/><br/>


> **Conclusión:**  Siempre que podamos utilizar DataFrame y utilizar las funciones propias de Spark. Si tenemos que usar UDF hacerlas si se puede juntas y cuando los datos sean lo más pequeños posibles (después de hacer los filtros y seleccionar las columnas estrictamente necesarias)

In [13]:
meta.unpersist()
meta2.unpersist()

MapPartitionsRDD[30] at javaToPython at NativeMethodAccessorImpl.java:0

## Shuffling (¿barajando?)

Dentro de las transformaciones que hacemos en Spark podemos distinguir dos tipos:

#### Transformaciones Wide vs Narrow

#### Transformaciones Narrow

<img src="https://s3-us-west-2.amazonaws.com/curriculum-release/images/105/transformations-narrow.png" alt="Narrow Transformations" style="height: 300px"/>

Las transformaciones de tipo Narrow pueden hacerse todas a la vez en un único *stage*. (FILTER, SELECT, WITHCOLUMN)


#### Transformaciones Wide

<img src="https://s3-us-west-2.amazonaws.com/curriculum-release/images/105/transformations-wide.png" alt="Wide Transformations" style="height: 300px"/>

- Las transformaciones Wide causan *shuffling* y esto produce varios *stages*
- Algunas de estas transformaciones son: `distinct`, `join`, `orderBy`, `groupBy`.
SIMEPRE QUE HAGAMOS ESTAS FUNCIONES VA A SER MAS COSTOSO
Primero hacer selec, filters etc para conseguir un dataset mas pequeño

&nbsp;    


Veamos algún ejemplo:

In [14]:
#todas estas transformaciones son narrow
meta_narrow = (

    spark.read.load('/datos/reviews_amazon.parquet')
    .withColumn('md5_col', F.md5('review'))
    .withColumn('dummy', F.lit(100))
    .withColumn('nuevo', F.col('rating') + F.col('dummy'))

)

In [15]:
meta_narrow.count()

19959

![](img/narrow.png)

In [16]:
#con transformaciones wide
meta_wide = (

    spark.read.load('/datos/reviews_amazon.parquet')
    .withColumn('md5_col', F.md5('review'))
    .withColumn('md5_col_subt', F.substring('md5_col', 0, 1))
    .groupBy('md5_col_subt')
    .count()
    .withColumn('nuevo', F.col('count') + F.rand())
    .orderBy(F.desc('nuevo'))

)

In [17]:
meta_wide.count()

16

![](img/wide.png)

Las operaciones de tipo *wide* son mucho más costosas, así que hay que hacerlas cuando sean necesarias y siempre intentar conseguir el DataFrame más pequeño posible (filtrar y seleccionar las columnas necesarias).

## Nivel de paralelismo

El nivel de paralelismo es clave para que un proceso de Spark (sobre todo los que involucran transformaciones *wide*) funcione o no, o tarde unos minutos u horas.

Veamos algún ejemplo y entendamos las dos configuraciones importantes, se ha ejecutado (no lo hacemos ahora por tardar mucho):
SIN NADA DE SHUFFLES

```python
(
    
    spark.table("jayuso.huge_table_v2")
    .withColumn('palabras', F.split('review', '\s+'))
    .withColumn('palabras', F.explode('palabras'))

).write.mode('overwrite').saveAsTable("jayuso.huge_table")
```

El proceso contiene multitud de errores como este:


```
ExecutorLostFailure (executor 15 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 8.0 GB of 8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
```

y tarda mucho:


![](img/muy_lento.png)
 
 
&nbsp;   

> (1) **¿Qué significa el 399?**
Son las particiones que hace en paralelo


&nbsp;   



Veamos otro ejemplo: CON SHUFFLES


```python
(
    
    spark.table("jayuso.huge_table_v2")
    .withColumn('dummy', F.substring('md5_col', 0, 1))
    .groupBy('dummy')
    .agg(F.collect_list('review').alias('lista'))

).write.mode('overwrite').saveAsTable("jayuso.huge_table")
```

&nbsp;   


![](img/error.png)


&nbsp;   


> (2) **¿Qué significa el 200 del segundo *stage*?**



&nbsp;  




Para entender estos números es necesario entender el nivel de paralelismo es decir el número de particiones en el que está dividido nuestro DataFrame en el cluster.



### Ejemplo 1

Los 399 es un número que calcular spark de manera automática analizando el DafaFrame que va a leer y estima el número de particiones necesario para trabajar con él. Veamos:

In [7]:
meta = spark.table("jayuso.huge_table_v2")

In [8]:
meta.rdd.getNumPartitions() #numero de particiones por defecto que consigue spark haciendo un analisis rapido cerca de 399
#SE PUEDE MODIFICAR

400

Este número se calcula como:



máximo(`fileSize` / `maxPartitionSize`, `spark.sparkContext.defaultParallelism`)



dónde, `fileSize` es la estimación que hace Spark sobre el DataFrame que va a leer y los otros dos son parámetros de configuración con valores por [defecto](https://spark.apache.org/docs/latest/configuration.html).

En general este valor que se calcula de manera automática está bien para casi todos los usos, aunque si vamos a tener que generar muchos datos más como en el ejemplo 1 con `F.explode` nos interesa que el nivel de paralelismo sea mayor. Esto lo podemos hacer con `repartition` o mejor configuración con un valor a mano la variable `spark.sparkContext.defaultParallelism`.


&nbsp;   


### Ejemplo 2 (LO MAS IMPORTANTE DEL NOTEBOOK!!!! - entra en el exm)

En el segundo ejemplo, hemos realizado una transformación de tipo *wide* (`groupBy`) el primer stage de lectura tienen 399 particiones, el segundo tienen 200. El motivo de este 200 es un parámetro muy importante:

In [20]:
spark.conf.get('spark.sql.shuffle.partitions') #=el nivel de paralelismo que usara para una transf tipo WIDE

'200'

Por defecto Spark siempre hará las operaciones shuffle con 200 particiones, este número **no depende de la magnitud de nuestros datos**, así que es muy importante tenerlo en cuenta e ir variándolo.


Vamos a iniciar una nueva sesión de spark cambiando estos parámetros:

In [21]:
spark.stop()

In [22]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window

In [23]:
conf = (

    SparkConf()
    .setAppName(u"[ICAI] Spark Tuning")
    .set("spark.executor.memory", "7g")
    .set("spark.executor.cores", "5")
    .set("spark.default.parallelism", 3000) #DA IGUAL EL DATA SET QUE VAYA A LEER, LO VA A TRABAJAR CON 3000 PARTICIONES
            #para la formula de la particion de un RDD max(.., ..)
    .set("spark.sql.shuffle.partitions", 3000) #PARA FUNCIONES SHUFFLE 

)
#en el master con 200 nos va a valer siempre

In [24]:
spark = (

    SparkSession.builder
    .config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()

)

In [25]:
meta = spark.table("jayuso.huge_table_v2")

In [26]:
meta.rdd.getNumPartitions() #puede ser cercano a 3000 no exactamene 3000

3000

Entonces podríamos lanzar de nuevo los comandos y funcionarán (no lo hacemos que tarda mucho):

```python
(

    spark.table("jayuso.huge_table_v2")
    .withColumn('palabras', F.split('review', '\s+'))
    .withColumn('palabras', F.explode('palabras'))

).write.mode('overwrite').saveAsTable("jayuso.huge_table")
```

```python
(

    spark.table("jayuso.huge_table_v2")
    .withColumn('dummy', F.substring('md5_col', 0, 1))
    .groupBy('dummy')
    .agg(F.collect_list('review').alias('lista'))

).write.mode('overwrite').saveAsTable("jayuso.huge_table")
```

# Broadcast Join

&nbsp;   


<img src="https://s3-us-west-2.amazonaws.com/curriculum-release/images/tuning/broadcast-join.png" style="height:300px;"  alt="Spill to disk"/><br/><br/>    

Este tipo de *join* es muy importante cuando tenemos un DataFrame grande y otro pequeño veamos un ejemplo:
PASA LA TABLA PEQUENA A TODAS LAS MAQUINAS PARA NO TENER QUE HACER PRIMERO UN SORT, MERGE, JOIN (ordenar por el campo join y mandar cachos ordenados a las mismas maquinas = primer cacho de una tabla y de otra a la misma maquina)
ASI LA MAQUINA GRANDE NO SE TIENE QUE MOVER

In [27]:
audiencias = spark.read.parquet('/datos/ejercicio_audis/audiencias.parquet')
catalogo = spark.read.json('/datos/ejercicio_audis/info_contenidos.json')

In [28]:
audiencias.count()

25595651

In [29]:
catalogo.count()

116290

In [30]:
catalogo

DataFrame[duracion: bigint, id_contenido: bigint]

In [31]:
merge1 = audiencias.join(catalogo, 'id_contenido')

In [32]:
merge1.explain()

== Physical Plan ==
*(2) Project [id_contenido#218, id_user#217, franja#219, segundos_visualizados#220L, duracion#231L]
+- *(2) BroadcastHashJoin [id_contenido#218], [cast(id_contenido#232L as double)], Inner, BuildRight
   :- *(2) Project [id_user#217, id_contenido#218, franja#219, segundos_visualizados#220L]
   :  +- *(2) Filter isnotnull(id_contenido#218)
   :     +- *(2) FileScan parquet [id_user#217,id_contenido#218,franja#219,segundos_visualizados#220L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/datos/ejercicio_audis/audiencias.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(id_contenido)], ReadSchema: struct<id_user:double,id_contenido:double,franja:string,segundos_visualizados:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, bigint, true] as double)))
      +- *(1) Project [duracion#231L, id_contenido#232L]
         +- *(1) Filter isnotnull(id_contenido#232L)
            +- *(1) FileScan json [duracion

Podemos observar como por defecto ha realizado un `BroadcastHashJoin` en la tercera linea

In [33]:
%timeit merge1.count()

The slowest run took 4.73 times longer than the fastest. This could mean that an intermediate result is being cached.
1.9 s ± 1.26 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


Por defecto spark hará *broadcast join* si detecta que uno de los dos DataFrames involucrados es menor que:

In [34]:
spark.conf.get('spark.sql.autoBroadcastJoinThreshold')

'10485760'

Si por ejemplo desactivamos este parámetro:

In [35]:
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', -1) #DESACTIVAMOS EL BROADCAST JOIN

In [36]:
merge2 = audiencias.join(catalogo, 'id_contenido')

In [37]:
merge2.explain() #sortMergeJoin 

== Physical Plan ==
*(5) Project [id_contenido#218, id_user#217, franja#219, segundos_visualizados#220L, duracion#231L]
+- *(5) SortMergeJoin [id_contenido#218], [cast(id_contenido#232L as double)], Inner
   :- *(2) Sort [id_contenido#218 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id_contenido#218, 3000)
   :     +- *(1) Project [id_user#217, id_contenido#218, franja#219, segundos_visualizados#220L]
   :        +- *(1) Filter isnotnull(id_contenido#218)
   :           +- *(1) FileScan parquet [id_user#217,id_contenido#218,franja#219,segundos_visualizados#220L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/datos/ejercicio_audis/audiencias.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(id_contenido)], ReadSchema: struct<id_user:double,id_contenido:double,franja:string,segundos_visualizados:bigint>
   +- *(4) Sort [cast(id_contenido#232L as double) ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(cast(id_contenid

Podemos ver como ahora el join es de tipo `SortMergeJoin`

In [38]:
%timeit merge2.count()

5.86 s ± 2.6 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
merge2 = audiencias.join(F.broadcast(catalogo), 'id_contenido') #PARA FORZAR EL BROADCAST JOIN AUNQUE ESTE DESACTIVADO

# Bonus Track

Para terminar vamos a ver un último parámetro que pude ser útil para cuando trabajamos en un cluster compartido como es el caso.


El parámetro `spark.dynamicAllocation.maxExecutors` limita el número de executors máximos que podemos pedir. 

#### Ejercicio en clase

* ¿Cuántos *executors* podríamos pedir cada alumno si en total somos 22 en clase?
* Probar todos la misma configuración y comprobar que podemos trabajar

# CONFIGURACION SPARK PARA TRABAJAR EN CLASE

In [2]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window

In [3]:
conf = (

    SparkConf()
    .setAppName(u"[ICAI] Spark Tuning")
    .set("spark.executor.memory", "7g")
    .set("spark.executor.cores", "5")
    .set("spark.default.parallelism", 400) #DA IGUAL EL DATA SET QUE VAYA A LEER, LO VA A TRABAJAR CON 3000 PARTICIONES
            #para la formula de la particion de un RDD max(.., ..)
    .set("spark.sql.shuffle.partitions", 400) #PARA FUNCIONES SHUFFLE 
    .set("spark.dynamicAllocation.maxExecutors", 2) #el maximo que soy capaz de coger son 49 (si no pones nada los coge todos)

)
#en el master con 200 nos va a valer siempre

In [4]:
spark = (

    SparkSession.builder
    .config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()

)

In [9]:
spark.stop()