# Spark high perfomance

    - Configuração do cluster
    - Analise de jobs
    - otimização de join
    - Fair Scheduling
    - Serialization
    - Garbage Colector

## 1. Configuração do cluster
        
    --num-executors= "numero de cores por nó"
    --executor-cores= "total de cores por nó
    --executor-memory= "total de memoria por nó"
    
    - Taxa de transferência do HDFS: o cliente HDFS tem problemas com toneladas de threads simultâneos. Observou-se que o HDFS atinge a taxa de transferência total de gravação com ~5 tarefas por executor. Portanto, é bom manter o número de núcleos por executor abaixo desse número.


### - Spark Architecture

<img src="Spark-cluster-computing-master-slave-architecture.png" width="700" height="900">

### - Worker
  
<img src="1_fbAp1O2_d9pH0dCcvgfqrA.jpeg" width="700" height="900">

### - Um executor por nó
<img src="1_OsiBkBLQi-Qxb9TTxcb0FA.jpeg" width="700" height="900">

    - 1 executor com 15 núcleos significa que você terá 1 JVM que pode executar no máximo 15 tarefas.
    - Taxa de transferência do HDFS será prejudicada e resultará alto tempo GC.
    - Se uma das tarefas nesses núcleos executar OOM ou travar de maneira incorreta, até 15 tarefas precisarão ser reprocessadas.
   

### - Um executor por core
<img src="1_fP06qHvupflNshcUg6jg7w.jpeg" width="700" height="900">

    - 16 executor com 1 CORE significa que você terá 16 JVM e cada JVM pode executar uma tarefa.
    - variáveis ​​compartilhadas/em cache, como variáveis ​​de transmissão e acumuladores, serão replicadas em cada núcleo dos nós, que é 15 vezes.
    - Pouca memoria por executor (1/16), pode ocorrer OOM. 
    - Os executores normalmente não compartilham memória entre si. Portanto, se um executor não estiver usando tanta memória quanto outro, a memória não poderá ser fornecida ao executor com maiores necessidades de memória. Ou seja, a alocação de memória é menos eficiente. A menos que spark.memory.offHeap.enabled seja definido como true (é definido como false padrão).

### - Cinco executores com 3 cores ou três executores com 5 cores (Equilíbrio)
<img src="1_5oy8I6Jwj1QPMF3XKdAiaA.jpeg" width="700" height="900">

    - Com base nas recomendações mencionadas acima, vamos atribuir 5 núcleos por executores --executor-cores = 5(para uma boa taxa de transferência do HDFS)
    - Deixe 1 núcleo por nó para daemons Hadoop/Yarn => Número de núcleos disponíveis por nó = 16-1 = 15
    - Portanto, Total disponível de núcleos no cluster = 15 x 10 = 150
    - Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30
    - Deixando 1 executor para ApplicationManager => --num-executors= 29
    - Número de executores por nó = 30/10 = 3
    - Memória por executor = 64 GB/3 = 21 GB
    - Contando a sobrecarga de heap = 10% de 21 GB = 3 GB. Então, real --executor-memory= 21 - 3 = 18 GB


#### http://spark-configuration.luminousmen.com/

   
```
spark.default.parallelism	725
spark.executor.memory	18g
spark.executor.instances	29
spark.driver.cores	5
spark.executor.cores	5
spark.driver.memory	18g
spark.driver.maxResultSize	18g
spark.driver.memoryOverhead	1843m
spark.executor.memoryOverhead	1843m
spark.dynamicAllocation.enabled	false
spark.sql.adaptive.enabled	true

spark.memory.fraction	0.8
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures	5
spark.rdd.compress	true
spark.shuffle.compress	true
spark.shuffle.spill.compress	true
spark.serializer	org.apache.spark.serializer.KryoSerializer
spark.executor.extraJavaOptions	-XX:+UseG1GC -XX:+G1SummarizeConcMark
spark.driver.extraJavaOptions	-XX:+UseG1GC -XX:+G1SummarizeConcMark

```

## Analise de jobs

In [3]:
# https://towardsdatascience.com/skewed-data-in-spark-add-salt-to-compensate-16d44404088b

In [None]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id, asc, desc
from pyspark.sql.functions  import spark_partition_id
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer

In [21]:
df = spark.createDataFrame([
    Row(a=1, b=4., c='GFG1', d=date(2000, 8, 1),
        e=datetime(2000, 8, 1, 12, 0)),
   
    Row(a=2, b=8., c='GFG2', d=date(2000, 6, 2),
        e=datetime(2000, 6, 2, 12, 0)),
   
    Row(a=4, b=5., c='GFG3', d=date(2000, 5, 3),
        e=datetime(2000, 5, 3, 12, 0))
])
 
# show table
df.show()
 
# show schema
df.printSchema()

# numero de partioções
n_partitions = df.rdd.getNumPartitions()
print(f'Numero de partições: {n_partitions}\n')

print('Numero de linhas por partição')
df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()

spark.sparkContext.getConf().getAll()

+---+---+----+----------+-------------------+
|  a|  b|   c|         d|                  e|
+---+---+----+----------+-------------------+
|  1|4.0|GFG1|2000-08-01|2000-08-01 12:00:00|
|  2|8.0|GFG2|2000-06-02|2000-06-02 12:00:00|
|  4|5.0|GFG3|2000-05-03|2000-05-03 12:00:00|
+---+---+----+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)

Numero de partições: 8

Numero de linhas por partição
+-----------+-----+
|partitionId|count|
+-----------+-----+
|          5|    1|
|          7|    1|
|          2|    1|
+-----------+-----+

Tamanho aproximado do dataframe: 2.8 MB
Tamanho aproximado de cada partição: 0.35 MB

spark ui ->
http://192.168.0.15:4040


In [2]:
# spark.serializer org.apache.spark.serializer.KryoSerializer
# http://spark-configuration.luminousmen.com/
# https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html
# https://www.codetd.com/pt/article/10830437
# https://databricks.com/session_na20/fine-tuning-and-enhancing-performance-of-apache-spark-jobs