# Оптимизация Spark вычислений
**Andrey Titov**
Senior Spark Engineer @ NVIDIA

## На этом занятии
+ настройка spark-submit
+ создание SparkSession
+ использование repartition и coalesce
+ schema inference
+ column projection
+ partition pruning
+ predicate pushdown
+ оптимизация join'ов
+ fair scheduler
+ workshop

In [None]:
# Файлы с данными
json_file = 'nbagames.json'
output_parquet_agg = "tmp/nbagames.parquet"
output_test_file = "tmp/test.parquet"

import os
os.environ["TEST_PARQUET_FILE"] = output_test_file

## Настройка spark-submit
Любое приложение, использующее Spark, запускается с помощью spark-submit.

Утилита spark-submit позволяет:
+ выбрать мастер сервис (local, YARN, local, spark, Mesos)
+ выбрать режим запуска (client, cluster)
+ указать количество CPU и RAM у worker'ов
+ указать количество worker'ов
+ указать количество CPU и RAM у драйвера
+ настроить логирование
+ подключить зависимости
+ добавить переменные окружения
+ подгрузить файлы
+ настроить параметры datasource'ов
+ настроить [другие параметры](https://spark.apache.org/docs/latest/configuration.html) приложения

**Скрипт запуска этого ноутбука**
```bash
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip=0.0.0.0 --NotebookApp.token= --port=8088'
export PYSPARK_PYTHON=python3
SPARK_VERSION=2.4.0

pyspark2 \
    --conf spark.ui.port=8089 \
    --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:$PWD/conf/log4j.properties" \
    --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:$PWD/conf/log4j.properties" \
    --conf spark.sql.execution.arrow.enabled=true \
    --conf spark.sql.crossJoin.enabled=true \
    --master yarn \
    --deploy-mode client \
    --num-executors 1 \
    --executor-cores 1 \
    --jars /home/bigdatateam_instructor/atitov/spark-examples/lib/udf-funcs_2.11-0.1.jar \
    --packages com.datastax.spark:spark-cassandra-connector_2.11:$SPARK_VERSION,org.apache.spark:spark-sql-kafka-0-10_2.11:$SPARK_VERSION
```

JAVA зависимости следует искать на сайте [mvnrepository.com](https://mvnrepository.com)

Посмотреть все параметры spark-submit можно через ```spark-submit --help```

Остальные параметры берутся из ```$SPARK_CONF_DIR/spark-defaults.conf```, а переменные окружения из ```$SPARK_CONF_DIR/spark-env.sh```

## Создание SparkSession
+ для использования Spark API в приложении должен быть создан ```pyspark.sql.SparkSession``` (для RDD - ```pyspark.SparkContext```)
+ в pyspark и spark-shell данные классы создаются автоматически

**Пример создания ```SparkSession```**
```python
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("SimpleApp") \
    .master("local[*]") \
    .option("foo", "bar") \
    .getOrCreate()
```

**Избегайте установки параметров Spark внутри приложения!**

### Rule of thumb
+ оптимальное число ядер на worker 5-8
+ оптимальное количество памяти на worker 8-24 GB
+ отключайте уровень логирования INFO
+ используйте G1GC
+ при сайзинге соблюдайте баланс между CPU, RAM, Network
+ не избегайте новых версий Spark только потому, что они отсутствуют в коробке Hortonworks/Cloudera

## Использование repartition и coalesce
+ для изменения количества партиций в Spark реализовано две функции: ```repartition()``` и ```coalesce()```
+ ```coalesce()``` позволяет уменьшить количество партиций и при этом не вызывает shuffle
+ ```repartition()``` позволяет уменьшать и увеличивать количество партиций и использует shuffle 

### Пример использования coalesce

In [None]:
# Записывать много маленьких партиций - плохая идея
file_with_many_partitions = spark.range(0,1000,1,100)
print(file_with_many_partitions.rdd.getNumPartitions())
file_with_many_partitions.write.mode("overwrite").parquet(output_test_file)

In [None]:
!ls $TEST_PARQUET_FILE | grep parquet | wc -l

In [None]:
# Использование coalesce() позволяет сократить количество партиций и при этом не передает данные по сети
file_with_many_partitions = spark.range(0,1000,1,100)
file_with_many_partitions.coalesce(4).write.mode("overwrite").parquet(output_test_file)

In [None]:
!ls $TEST_PARQUET_FILE | grep parquet | wc -l

### Пример использования repartition

In [None]:
# Создадим датасет с перекосом данных
from pyspark.sql.functions import lit, col, when, rand
left = spark.range(0,1000, 1, 4) \
    .withColumn("a", when(col("id") < 900, lit("one")).otherwise(lit("two"))) \
    .withColumn("r1", rand() * 1000)

right = spark.range(0,10, 1, 4) \
    .withColumn("b", when(col("id") < 9, lit("one")).otherwise("two")) \
    .withColumn("r2", rand() * 1000)
    

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

skewed = left.join(right, col("a") == col("b"))

print([len(x) for x in skewed.rdd.glom().collect()])

In [None]:
# repartition() убирает перекос данных между воркерами, 
# однако распределение данных по партияим будет произвольным
repartitioned = skewed.repartition(100)
print([len(x) for x in repartitioned.rdd.glom().collect()])

In [None]:
# Можно делать repartition по значению колонки или колонок. Поле "a" - не самый лучший кандидат для этого
repartitioned = skewed.repartition(100, col("a"))
print([len(x) for x in repartitioned.rdd.glom().collect()])

In [None]:
# Правильнее использовать колонки "r1"и "r2"
repartitioned = skewed.repartition(100, col("r1"), col("r2"))
print([len(x) for x in repartitioned.rdd.glom().collect()])

## Schema inference
При работе с большими JSON файлами ```spark.read``` отрабатывает не мгновенно. Это происходит из-за того, что Spark вынужден "выводить" схему данных из файла:

In [None]:
from time import time
start = time()
df = spark.read.json(json_file)
end = time()
print(end - start)

df.printSchema()

Схему можно указать вручную. Для этого есть два варианта:
+ указать схему в формате DDL String 
+ передать класс ```pyspark.sql.types.StructType```

In [None]:
# DDL String
from time import time
start = time()
schema = """`_id` STRUCT<`$oid`: STRING>, teams ARRAY<STRUCT<abbreviation: STRING, city: STRING>>"""
df = spark.read.schema(schema).json(json_file)
end = time()
print(end - start)

df.printSchema()

df.show()

In [None]:
# pyspark.sql.types
from time import time
from pyspark.sql.types import *

start = time()
schema = \
    StructType(
    [
        StructField(
            "_id", 
            StructType([StructField("$oid", StringType())])
        ),
        StructField(
            "teams",
            ArrayType(
                StructType(
                    [
                    StructField("abbreviation", StringType()),
                    StructField("city", StringType())
                    ]
                )
            )
        )
    ]
    )


df = spark.read.schema(schema).json(json_file)
end = time()
print(end - start)

df.printSchema()

df.show()

Используемые схемы также применимы для некоторых функций ```pyspark.sql.functions```, например:
+ ```from_json()```
+ ```cast()```

## Column projection
Данный механизм позволяет избегать вычитывания ненужных колонок при работе с источниками

In [None]:
# Запишем данные в parquet
spark.read.json(json_file).drop("_corrupt_record").write.mode("overwrite").parquet(output_test_file)

In [None]:
from time import time
from pyspark.sql.functions import col

df = spark.read.parquet(output_test_file)
start = time()
some_columns = df.select(col("_id.$oid"), col("date.$date"))

some_columns.cache()
some_columns.count()

some_columns.explain(True)

end = time()

some_columns.unpersist()

print(end - start)

In [None]:
from time import time
from pyspark.sql.functions import col

df = spark.read.parquet(output_test_file)
start = time()
some_columns = df
some_columns.cache()
some_columns.count()
some_columns.explain(True)

end = time()

some_columns.unpersist()

print(end - start)

## Partition pruning
Данный механизм позволяет избежать чтения ненужных партиций

In [None]:
# Запишем данные в parquet
from pyspark.sql.functions import explode, col
spark.read.json(json_file) \
    .select(explode(col("teams")).alias("team")) \
    .select(col("team.*")) \
    .write.mode("overwrite") \
    .partitionBy("city", "abbreviation") \
    .parquet(output_test_file)

In [None]:
df = spark.read.parquet(output_test_file)

agg = df.filter((col("city") == "Los Angeles") & col("abbreviation").isin("LAL", "LAC"))
agg.show()
agg.explain(True)

## Predicate pushdown
Данный механизм позволяет "протолкнуть" условия фильтрации данных на уровень datasource

In [None]:
# Запишем данные в parquet
from pyspark.sql.functions import explode, col
spark.read.json(json_file) \
    .select(explode(col("teams")).alias("team")) \
    .select(col("team.*")) \
    .write.mode("overwrite") \
    .partitionBy("city", "abbreviation") \
    .parquet(output_test_file)

In [None]:
df = spark.read.parquet(output_test_file)

agg = df.filter(col("won") == 1)
agg.show()
agg.explain(True)

## Оптимизация join
При выполнении join таблиц важно следовать рекомендациям:
+ фильтровать данные до join'а
+ использовать equ join 
+ если можно путем увеличения количества данных применить equ join вместо non-equ join'а, то делать именно так
+ всеми силами избегать cross-join'ов
+ если правый DF помещается в памяти worker'а, использовать broadcast()

### Как посмотреть какой вид join'а будет использоваться?
```df.join(other_df).explain(True)```

Виды join'ов (от быстрого к медленному):
+ BroadcastHashJoin
  - equ-join, using broadcast
+ SortMergeJoin
  - equ-join, sortable keys
+ BroadcastNestedLoopJoin
  - using broadcast
+ CartesianProduct
  - все остальные

In [None]:
from pyspark.sql.functions import *

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

left = spark.range(0,100).withColumn("a", lit("a")).withColumnRenamed("id", "id_left")
right = spark.range(0,10).withColumn("b", lit("b")).withColumnRenamed("id", "id_right")


# BroadcastHashJoin
left.join(broadcast(right), col("id_left") == col("id_right"), 'inner')

# SortMergeJoin
left.join(right, col("id_left") == col("id_right"), 'inner')

# BroadcastNestedLoopJoin
left.join(broadcast(right), col("id_left") < col("id_right"), 'inner')

# CartesianProduct
left.join(right, col("id_left") < col("id_right"), 'inner')

# CartesianProduct
left.crossJoin(right).explain(True)

## Fair Scheduler
+ fair scheduler - режим работы Spark, позволяющий выполнять несколько stage параллельно
+ по умолчанию выключен
+ при построении высоконагруженных приложений его следует использовать

### Включение планировщика
```bash
spark.scheduler.mode FAIR
spark.scheduler.allocation.file "/path/to/fairscheduler.xml"
```

Пример fairscheduler.xml
```xml
<?xml version="1.0"?>
<allocations>
  <pool name="pool0">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
</allocations>
```

In [None]:
import webbrowser

df = spark.read.json(json_file)

ui_url = sc._jsc.sc().uiWebUrl().get()

webbrowser.open(ui_url + '/stages/')

In [None]:
# Все стейджы выполняются последовательно

from pyspark.sql.functions import col
from time import time

for _ in range(20):
    df.na.drop().count()

![без fair scheduler](https://github.com/tenkeiu8/spark-examples/blob/master/images/without_fair_scheduler.png?raw=true)

In [None]:
# Стейджы выполняются параллельно в дефолтном пуле

from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql.functions import col
from time import time

def process_df(df):
    df.na.drop().count()

with ThreadPoolExecutor(4) as executor:
    future_tasks = []
    for _ in range(20):
        future_tasks.append(executor.submit(process_df, df))
    
    for future in as_completed(future_tasks):
        future.result()

![with_fair_scheduler](https://github.com/tenkeiu8/spark-examples/blob/master/images/with_fair_scheduler.png?raw=true)

In [None]:
# Стейджы выполняются параллельно в пуле pool0

from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql.functions import col
from time import time

def process_df(spark, df):
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool0")
    df.na.drop().count()

with ThreadPoolExecutor(4) as executor:
    future_tasks = []
    for _ in range(20):
        future_tasks.append(executor.submit(process_df, spark, df))
    
    for future in as_completed(future_tasks):
        future.result()

![fair_scheduler_with_pool](https://github.com/tenkeiu8/spark-examples/blob/master/images/fair_scheduler_with_pool.png?raw=true)