# Advanced Spark
**Andrey Titov**  
andrey.titov@bigdatateam.org  
Big Data Instructor @ BigData Team  
https://bigdatateam.org

## На этом занятии
+ Партиционирование
+ Планы выполнения задач
+ Оптимизация соединений и группировок
+ Управление схемой данных
+ Оптимизатор запросов Catalyst

## Партиционирование
RDD и DF являются представляют собой классы, описывающие распределенные коллекции данных. Они (коллекции) разбиты на крупные блоки, которые называются партициями. В графе вычисления, который называется в Spark DAG (Direct Acyclic Graph), есть три основных компонента - `job`, `stage`, `task`.

`job` представляет собой весь граф целиком, от момента создания DF, до применения `action` к нему. Состоит из одной или более `stage`. Когда возникает необходимость сделать `shuffle` данных, Spark создает новый `stage`. Каждый `stage` состоит из большого количества `task`. `task` это базовая операция над данными. Одновременно Spark выполняет N `task`, которые обрабатывают N партиций, где N - это суммарное число доступных потоков на всех воркерах.

Исходя из этого, важно обеспечивать:
+ достаточное количество партиций для распределения нагрузки по всем воркерам
+ равномерное распределение данных между партициями

Создадим датасет с перекосом данных:

In [None]:
from pyspark.sql.functions import when, lit, col

skew_column = when(col("id") < 900, lit(0)).otherwise(lit(1)).alias("skew_column")

skewed_df = spark.range(1000).withColumn("skew", skew_column).repartition(10, col("skew"))

skewed_df.show()

In [None]:
def print_parts(df):
    ret = df.rdd.mapPartitions(lambda x: [len(list(x))]).collect()
    print(ret)

In [None]:
print_parts(skewed_df)

Любые операции с таким датасетом будут работать медленно, т.к.
+ если суммарное количество потоков на всех воркерах больше 10, то в один момент времени работать будут максимум 10, остальные будут простаивать
+ из 10 партицийи только в 2 есть данные и это означает, что только 2 потока будут обрабатывать данные, при этом из-за перекоса данных между ними (900 vs 100) первый станет bottleneck'ом

Обычно перекошенные датасеты возникают после вычисления агрегатов, оконных функций и соединений, но также могут возникать и при чтении источников.

Для устранения проблемы перекоса данных, следует использовать метод `repartition`:

In [None]:
# здесь мы передаем только новое количество партиций и Spark выполнит RoundRobinPartitioning

balanced_df = skewed_df.repartition(20)
print_parts(balanced_df)

In [None]:
# здесь мы добавляем к числу партиций колонку, по которой необходимо сделать репартиционирование,
# поэтому Spark выполнит HashPartitioning

balanced_df = skewed_df.repartition(20, col("id"))
print_parts(balanced_df)

### Добавление соли
Часто при вычислении агрегатов приходится работать с перекошенными данными:

In [None]:
df = spark.read.format("csv").options(header=True, inferSchema=True).load("/tmp/datasets/airport-codes.csv")
df.groupBy(col("type")).count().orderBy(col("count").desc()).show(30, False)

In [None]:
from pyspark.sql.functions import collect_list, col

skew_grouped = df.groupBy(col("type")).agg(collect_list(col("ident")).alias("ids"))
skew_grouped.show(20, 50)

Поскольку при вычислении агрегата происходит неявный `HashPartitioning` по ключу (ключам) агрегата, то при выполнении определенных условий происходит нехватка памяти на воркере, которую нельзя исправить, не изменив подход к построению агрегата.

Один из вариантов устранение - соление ключей:

In [None]:
from pyspark.sql.functions import expr

salt = expr("""pmod(round(rand() * 100, 0), 10)""").cast("integer")
salted = df.withColumn("salt", salt)
salted.select(col("type"), col("ident"), col("salt")).sample(0.1).show(20, False)

Это позволяет нам существенно снизить объем данных в каждой партиции (30к vs 3к):

In [None]:
salted.groupBy(col("type"), col("salt")).count().orderBy(col("count").desc()).show(20, False)

Это позволяет нам посчитать требуемый агрегат более оптимальным путем, не смотря на появление второго агрегата:

In [None]:
salted \
    .groupBy(col("type"), col("salt")).agg(collect_list(col("ident")).alias("ids")) \
    .groupBy(col("type")).agg(collect_list(col("ids")).alias("ids")) \
    .select(col("type"), expr("""flatten(ids)""").alias("ids")) \
    .show(20, 50)

### Выводы:
+ DF API позволяет строить большое количество агрегатов. При этом необходимо помнить, что операции `groupBy`, `cube`, `rollup` возвращают [org.apache.spark.sql.RelationalGroupedDataset](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset), к которому затем необходимо применить одну из функций агрегации - `count`, `sum`, `agg` и т. п.
+ При вычислении агрегатов необходимо помнить, что эта операция требует перемешивания данных между воркерами, что, в случае перекошенных данных, может привести к OOM на воркере.

## Кеширование
По умолчанию при применении каждого действия Spark пересчитывает весь граф, что может негативно сказать на производительности приложения. Для демонстрации возьмем датасет [Airport Codes](https://datahub.io/core/airport-codes)  

In [None]:
df
df.printSchema()

Посчитаем несколько действий. Несмотря на то, что `only_ru` является общим для всех действий, он пересчитывается при вызове каждого действия.

In [None]:
only_ru = df.filter((col("iso_country") == "RU") & (col("elevation_ft") > 1000))
only_ru.show(1, 50, True)

only_ru.count()
only_ru.collect()
only_ru.groupBy(col("municipality")).count().orderBy(col("count").desc()).na.drop("any").show()

Для решения этой проблемы следует использовать методы `cache`, либо `persist`. Данные методы сохраняют состояние графа после первого действия, и следующие обращаются к нему. Разница между методами заключается в том, что `persist` позволяет выбрать, куда сохранить данные, а `cache` использует значение по умолчанию. В текущей версии Spark это [StorageLevel.MEMORY_ONLY](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). Важно помнить, что данный кеш не предназначен для обмена данными между разными Spark приложения - он является внутренним для приложения. После того, как работа с данными окончена, необходимо выполнить `unpersist` для очистки памяти

In [None]:
only_ru = df.filter((col("iso_country") == "RU") & (col("elevation_ft") > 1000))
only_ru.cache()
only_ru.show(1, 50, True)
only_ru.count()
only_ru.collect()
only_ru.groupBy(col("municipality")).count().orderBy(col("count").desc()).na.drop("any").show()
only_ru.unpersist()

### Выводы:
+ Использование `cache` и `persist` позволяет существенно сократить время обработки данных, однако следует помнить и об увеличении потребляемой памяти на воркерах

## Планы выполнения задач

Любой `job` в Spark SQL имеет под собой план выполнения, кототорый генерируется на основе написанно запроса. План запроса содержит операторы, которые затем превращаются в Java код. Поскольку одну и ту же задачу в Spark SQL можно выполнить по-разному, полезно смотреть в планы выполнения, чтобы, например:
+ убрать лишние shuffle
+ убедиться, чтот тот или иной оператор будет выполнен на уровне источника, а не внутри Spark
+ понять, как будет выполнен `join`

Планы выполнения доступны в двух видах:
+ метод `explain()` у DF
+ на вкладке SQL в Spark UI

Прочитаем датасет [Airport Codes](https://datahub.io/core/airport-codes):

In [None]:
df = spark.read.format("csv").options(header=True, inferSchema=True).load("/tmp/datasets/airport-codes.csv")
df

Используем метод `explain`, чтобы посмотреть план запроса. Наиболее интересным является физический план, т.к. он отражает фактически алгоритм обработки данных. В данном случае в плане присутствует единственный оператор `FileScan csv`:

In [None]:
df.explain(extended=True)

Выполним `filter` и проверим план выполнения. Читать план нужно снизу вверх. В плане появился новый оператор `filter`

In [None]:
from pyspark.sql.functions import col

df.filter(col("type") == "small_airport").explain(extended=True)

Выполним агрегацию и проверим план выполнения. В нем появляется три оператора: 2 `HashAggregate` и `Exchange hashpartitioning`.

Первый `HashAggregate` содержит функцию `partial_count(1)`. Это означает, что внутри каждого воркера произойдет подсчет строк по каждому ключу. Затем происходит `shuffle` по ключу агрегата, после которого выполняется еще один `HashAggregate` с функцией `count(1)`. Использование двух `HashAggregate` позволяет сократить количество передаваемых данных по сети.

In [None]:
from pyspark.sql.functions import col

df.filter(col("type") == "small_airport").groupBy(col("iso_country")).count().explain(extended=True)

<img align="right" width="200" height="200" src="https://cs5.pikabu.ru/post_img/big/2015/12/11/7/1449830295198229367.jpg">

### Выводы:
+ Spark составляет физический план выполнения запроса на основании написанного вами кода
+ Изучив план запроса, можно понять, какие операторы будут применены в ходе обработки ваших данных
+ План выполнения запроса - один из основных инструментов оптимизации запроса

## Оптимизация соединений и группировок
При выполнении `join` двух DF важно следовать рекомендациям:
+ фильтровать данные до join'а
+ использовать equ join 
+ если можно путем увеличения количества данных применить equ join вместо non-equ join'а, то делать именно так
+ всеми силами избегать cross-join'ов
+ если правый DF помещается в памяти worker'а, использовать broadcast()

### Виды соединений
+ **BroadcastHashJoin**
  - equ join
  - broadcast
+ **SortMergeJoin**
  - equ join
  - sortable keys
+ **BroadcastNestedLoopJoin**
  - non-equ join
  - using broadcast
+ **CartesianProduct**
  - non-equ join
  
[Optimizing Apache Spark SQL Joins: Spark Summit East talk by Vida Ha](https://youtu.be/fp53QhSfQcI)

Подготовим два датасета:

In [None]:
left = df.select(col("type"), col("ident"), col("iso_country")).alias("left").localCheckpoint()
right = df.groupBy(col("type")).count().alias("right").localCheckpoint()

### BroadcastHashJoin
+ работает, когда условие - равенство одного или нескольких ключей
+ работает, когда один из датасетов небольшой и полностью вмещается в память воркера
+ оставляет левый датасет как есть
+ копирует правый датасет на каждый воркер
+ составляет hash map из правого датасета, где ключ - кортеж из колонок в условии соединения
+ итерируется по левому датасета внутри каждой партиции и проверяет наличие ключей в HashMap
+ может быть автоматически использован, либо явно через `broadcast(df)`

In [None]:
from pyspark.sql.functions import broadcast

result = left.join(broadcast(right), "type", "inner")

result.explain(True)

### SortMergeJoin
+ работает, когда ключи соединения в обоих датасета являются сортируемыми
+ репартиционирует оба датасета в 200 партиций по ключу (ключам) соединения
+ сортирует партиции каждого из датасетов по ключу (ключам) соединения
+ Используя сравнение левого и правого ключей, обходит каждую пару партиций и соединяет строки с одинаковыми ключами

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

result = left.join(right, "type", "inner")

result.explain(True)

### BroadcastNestedLoopJoin
+ работает, когда один из датасетов небольшой и полностью вмещается в память воркера
+ оставляет левый датасет как есть
+ копирует правый датасет на каждый воркер
+ проходится вложенным циклом по каждой партиции левого датасета и копией правого датасета и проверяет условие
+ может быть автоматически использован, либо явно через `broadcast(df)`

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

result = left.join(broadcast(right), left["type"] != right["type"], "inner")

result.explain(True)

### CartesianProduct
+ Создает пары из каждой партиции левого датасета с каждой партицией правого датасета, релоцирует каждую пару на один воркер и проверяет условие соединения
+ на выходе создает N*M партиций
+ работает медленнее остальных и часто приводит к ООМ воркеров

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

result = left.join(right, left["type"] != right["type"], "inner")

result.explain(True)

### Снижение количества shuffle
В ряде случаев можно уйти от лишних `shuffle` операций при выполнении соединения. Для этого оба DF должны иметь одинаковое партиционирование - одинаковое количество партиций и ключ партиционирования, совпадающий с ключом соединения.

Разница между планами выполнения будет хорошо видна в Spark UI на графе выполнения в Jobs и плане выполнения в SQL

In [None]:
%%time
left = df
right = df.groupBy(col("type")).count()
joined = left.join(right, "type")
joined.count()

In [None]:
%%time
df_rep = df.repartition(200, col("type"))
left = df_rep
right = df_rep.groupBy(col("type")).count()
joined = left.join(right, "type")
joined.count()

### Выводы:
+ В Spark используются 4 вида соединений: `BroadcastHashJoin`, `SortMergeJoin`, `BroadcastNestedLoopJoin`, `CartesianProduct`
+ Выбор алгоритма основывается на условии соединения и размере датасетов
+ `CartesianProduct` обладает самой низкой вычислительной эффективностью и его по возможности стоит избегать

## Управление схемой данных
В DF API каждая колонка имеет свой тип. Он может быть:
+ скаляром - `StringType`, `IntegerType` и т. д.
+ массивом - `ArrayType(T)`
+ словарем `MapType(K, V)`
+ структурой - `StructType()`

DF целиком также имеет схему, описанную с помощью класса `StructType`

Посмотреть список колонок можно с помощью атрибута `columns`:

In [None]:
df.columns

Схема DF доступна через атрибут `schema`

In [None]:
schema = df.schema
schema.simpleString()

In [None]:
df.schema["type"]

In [None]:
foo = df.schema["type"]
foo.dataType

Если указать схему при чтении источника, то spark не будет пытаться определить ее автоматически, что, в случае работы с такими типами файлов, как `csv` и `json`, сократит время создания `df`

In [None]:
df = spark.read.format("csv") \
        .schema(schema) \
        .options(header=True, inferSchema=False) \
        .load("/tmp/datasets/airport-codes.csv")

df.printSchema()

Схема может быть создана вручную:

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType

my_schema = \
    StructType(
        [
            StructField("foo", StringType()),
            StructField("bar", StringType()),
            StructField(
                        "boo", 
                        StructType(
                            [
                                StructField("x", IntegerType()),
                                StructField("y", BooleanType())
                            ]
                            )
                       )
        ]
    )

my_schema

### Выводы:
+ Spark использует схемы для описания типов колонок, схемы всего DF, чтения источников и для работы с JSON
+ Схема представляет собой инстанс класса `StructType`
+ Колонки в Spark могут иметь любой тип. При этом вложенность словарей, массивов и структур не ограничена

## Оптимизатор запросов Catalyst
Catalyst выполняет оптимизацию запросов с целью ускорения их выполнения и применяет следующие методы:
 + Column projection
 + Partition pruning
 + Predicate pushdown
 + Constant folding
 
 Подготовим датасет для демонстрации работы Catalyst:

In [None]:
df \
    .write \
    .format("parquet") \
    .partitionBy("iso_country") \
    .mode("overwrite") \
    .save("/tmp/airports.parquet") \

airports = spark.read.parquet("/tmp/airports.parquet")
airports

In [None]:
!hdfs dfs -ls /tmp/airports.parquet

### Column projection
Данный механизм позволяет избегать вычитывания ненужных колонок при работе с источниками

In [None]:
%%time
selected = airports.select(col("ident"))
selected.cache()
selected.count()
selected.unpersist()
selected.explain(True)

In [None]:
%%time
selected = airports
selected.cache()
selected.count()
selected.unpersist()
selected.explain(True)

### Partition pruning
Данный механизм позволяет избежать чтения ненужных партиций

In [None]:
%%time
filtered = airports.filter(col("iso_country") == "RU")
filtered.count()
filtered.explain(True)

### Predicate pushdown
Данный механизм позволяет "протолкнуть" условия фильтрации данных на уровень datasource

In [None]:
%%time
filtered = airports.filter(col("iso_region") == "RU")
filtered.count()
filtered.explain(True)

### Simplify casts
Данный механизм убирает ненужные `cast`

In [None]:
result = spark.range(10).select(col("id").cast("long"))
result.explain(True)

In [None]:
result = spark.range(10).select(col("id").cast("int").cast("long"))
result.explain(True)

### Constant folding
Данный механизм сокращает количество констант, используемых в физическом плане

In [None]:
from pyspark.sql.functions import lit

result = spark.range(10).select((lit(3) >  lit(0)).alias("foo"))
result.explain(True)

In [None]:
from pyspark.sql.functions import lit, col

result = spark.range(10).select((col("id") >  lit(0)).alias("foo"))
result.explain(True)

### Combine filters
Данный механизм объединяет фильтры

In [None]:
result = spark.range(10).filter(col("id") > 0).filter(col("id") != 5).filter(col("id") < 10)
result.explain(True)