In [1]:
spark.table("hw_4.events_full").show()

In [2]:
sample_very_big_df = spark.table("hw_4.sample_very_big")
sample_big_df = spark.table("hw_4.sample_big")

sample_very_big_df.join(sample_big_df, "event_id", "inner").count()

In [3]:
spark.table("hw_4.sample_very_big") \
.write.mode("overwrite").bucketBy(10, "event_id") \
.saveAsTable("hw_4.sample_very_big_bucket")


spark.table("hw_4.sample_big") \
.write.mode("overwrite").bucketBy(10, "event_id") \
.saveAsTable("hw_4.sample_big_bucket")

In [4]:
print("HTTPS://arena-hadoop.inno.tech:18088/proxy/" + sc.applicationId + "/jobs/")

In [6]:
sample_very_big_df = spark.table("hw_4.sample_very_big_bucket")
sample_big_df = spark.table("hw_4.sample_big_bucket")

sample_very_big_df.join(sample_big_df, "event_id", "inner").count()


In [7]:
from pyspark.sql.functions import when, lit, col

skew_column = when(col("id") < 900, lit(0)).otherwise(lit(1)).alias("skew_column")

skewed_df = spark.range(1000).withColumn("skew", skew_column).repartition(10, col("skew"))

skewed_df.show()


In [8]:
def print_parts(df):
    ret = df.rdd.mapPartitions(lambda x: [len(list(x))]).collect()
    print(ret)
    
print_parts(skewed_df)

In [9]:
# здесь мы передаем только новое количество партиций и Spark выполнит RoundRobinPartitioning

balanced_df = skewed_df.repartition(11)
print_parts(balanced_df)

In [10]:
# здесь мы добавляем к числу партиций колонку, по которой необходимо сделать репартиционирование,
# поэтому Spark выполнит HashPartitioning

balanced_df = skewed_df.repartition(20, col("id"))
print_parts(balanced_df)

## Добавление соли


In [13]:
%sparkLocal.pyspark
import pyspark.sql.functions as f

df = spark.read.format("csv").options(header=True, inferSchema=True).load("data/airport-codes.csv")

df.groupBy(f.col("type")).count().orderBy(f.col("count").desc()).show(30, False)
df.show()

In [14]:
from pyspark.sql.functions import collect_list, col

skew_grouped = df.groupBy(col("type")).agg(collect_list(col("ident")).alias("ids"))
skew_grouped.show(20, 100)

Поскольку при вычислении агрегата происходит неявный HashPartitioning по ключу (ключам) агрегата, то при выполнении определенных условий происходит нехватка памяти на воркере, которую нельзя исправить, не изменив подход к построению агрегата.

Один из вариантов устранение - соление ключей:



In [16]:
from pyspark.sql.functions import expr

salt = expr("""pmod(round(rand() * 100, 0), 10)""").cast("integer")
salted = df.withColumn("salt", salt)
salted.select(col("type"), col("ident"), col("salt")).sample(0.1).show(20, False)

Это позволяет нам существенно снизить объем данных в каждой партиции (30к vs 3к):


In [18]:
salted.groupBy(col("type"), col("salt")).count().orderBy(col("count").desc()).show(20, False)


Это позволяет нам посчитать требуемый агрегат более оптимальным путем, не смотря на появление второго агрегата:


In [20]:
salted \
    .groupBy(col("type"), col("salt")).agg(collect_list(col("ident")).alias("ids")) \
    .groupBy(col("type")).agg(collect_list(col("ids")).alias("ids")) \
    .show(20, 100)

##  Кеширование
По умолчанию при применении каждого действия Spark пересчитывает весь граф, что может негативно сказать на производительности приложения. Для демонстрации возьмем датасет Airport Codes
Посчитаем несколько действий. Несмотря на то, что only_ru является общим для всех действий, он пересчитывается при вызове каждого действия.

In [22]:
only_ru = df.filter((col("iso_country") == "RU") & (col("elevation_ft") > 1000))
only_ru.show(1, 50, True)

only_ru.count()
only_ru.collect()
only_ru.groupBy(col("municipality")).count().orderBy(col("count").desc()).na.drop("any").show()

Для решения этой проблемы следует использовать методы cache, либо persist. Данные методы сохраняют состояние графа после первого действия, и следующие обращаются к нему. Разница между методами заключается в том, что persist позволяет выбрать, куда сохранить данные, а cache использует значение по умолчанию. В текущей версии Spark это StorageLevel.MEMORY_ONLY. Важно помнить, что данный кеш не предназначен для обмена данными между разными Spark приложения - он является внутренним для приложения. После того, как работа с данными окончена, необходимо выполнить unpersist для очистки памяти


In [24]:
only_ru = df.filter((col("iso_country") == "RU") & (col("elevation_ft") > 1000))
only_ru.cache()
only_ru.show(1, 50, True)
only_ru.count()
only_ru.collect()
only_ru.groupBy(col("municipality")).count().orderBy(col("count").desc()).na.drop("any").show()
# only_ru.unpersist()

In [25]:
only_ru.groupBy(col("municipality")).count().orderBy(col("count").desc()).na.drop("any").show()

## Планы выполнения задач

Любой job в Spark SQL имеет под собой план выполнения, кототорый генерируется на основе написанно запроса. План запроса содержит операторы, которые затем превращаются в Java код. Поскольку одну и ту же задачу в Spark SQL можно выполнить по-разному, полезно смотреть в планы выполнения, чтобы, например:

    убрать лишние shuffle
    убедиться, чтот тот или иной оператор будет выполнен на уровне источника, а не внутри Spark
    понять, как будет выполнен join

Планы выполнения доступны в двух видах:

    метод explain() у DF
    на вкладке SQL в Spark UI



In [27]:
from pyspark.sql.functions import col

df.filter(col("type") == "small_airport").explain(extended=True)

Выполним агрегацию и проверим план выполнения. В нем появляется три оператора: 2 HashAggregate и Exchange hashpartitioning.

Первый HashAggregate содержит функцию partial_count(1). Это означает, что внутри каждого воркера произойдет подсчет строк по каждому ключу. Затем происходит shuffle по ключу агрегата, после которого выполняется еще один HashAggregate с функцией count(1). Использование двух HashAggregate позволяет сократить количество передаваемых данных по сети.



In [29]:
from pyspark.sql.functions import col

df.filter(col("type") == "small_airport").groupBy(col("iso_country")).count().explain()

## Оптимизация соединений и группировок

При выполнении join двух DF важно следовать рекомендациям:

- фильтровать данные до join'а
- использовать equ join
- если можно путем увеличения количества данных применить equ join вместо non-equ join'а, то делать именно так
- всеми силами избегать cross-join'ов
- если правый DF помещается в памяти worker'а, использовать broadcast()

Виды джойнов

- BroadcastHashJoin
-- equ join
-- broadcast
- SortMergeJoin
-- equ join
-- sortable keys
- BroadcastNestedLoopJoin
-- non-equ join
-- using broadcast
- CartesianProduct
-- non-equ join



In [31]:
df.show()


In [32]:
left = df.select(col("type"), col("ident"), col("iso_country")).alias("left").localCheckpoint()
right = df.groupBy(col("type")).count().alias("right").localCheckpoint()

BroadcastHashJoin¶

- работает, когда условие - равенство одного или нескольких ключей
- работает, когда один из датасетов небольшой и полностью вмещается в память воркера
- оставляет левый датасет как есть
- копирует правый датасет на каждый воркер
- составляет hash map из правого датасета, где ключ - кортеж из колонок в условии соединения
- итерируется по левому датасета внутри каждой партиции и проверяет наличие ключей в HashMap
- может быть автоматически использован, либо явно через broadcast(df)

In [34]:
from pyspark.sql.functions import broadcast

result = left.join(broadcast(right), "type", "inner")
# result = left.join(right, "type", "inner")

result.explain()


### SortMergeJoin
+ работает, когда ключи соединения в обоих датасета являются сортируемыми
+ репартиционирует оба датасета в 200 партиций по ключу (ключам) соединения
+ сортирует партиции каждого из датасетов по ключу (ключам) соединения
+ Используя сравнение левого и правого ключей, обходит каждую пару партиций и соединяет строки с одинаковыми ключами


In [36]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

result = left.join(right, "type", "inner")

result.explain()

### BroadcastNestedLoopJoin
+ работает, когда один из датасетов небольшой и полностью вмещается в память воркера
+ оставляет левый датасет как есть
+ копирует правый датасет на каждый воркер
+ проходится вложенным циклом по каждой партиции левого датасета и копией правого датасета и проверяет условие
+ может быть автоматически использован, либо явно через `broadcast(df)`


In [38]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

result = left.join(broadcast(right), left["type"] != right["type"], "inner")

result.explain()

### CartesianProduct
+ Создает пары из каждой партиции левого датасета с каждой партицией правого датасета, релоцирует каждую пару на один воркер и проверяет условие соединения
+ на выходе создает N*M партиций
+ работает медленнее остальных и часто приводит к ООМ воркеров


In [40]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

result = left.join(right, left["type"] != right["type"], "inner")

result.explain()

## Снижение количества shuffle

В ряде случаев можно уйти от лишних shuffle операций при выполнении соединения. Для этого оба DF должны иметь одинаковое партиционирование - одинаковое количество партиций и ключ партиционирования, совпадающий с ключом соединения.

Разница между планами выполнения будет хорошо видна в Spark UI на графе выполнения в Jobs и плане выполнения в SQL



In [42]:
df.unpersist()

left = df
right = df.groupBy(col("type")).count()
joined = left.join(right, "type")
joined.count()

In [43]:
df_rep = df.repartition(200, col("type"))

left = df_rep
right = df_rep.groupBy(col("type")).count()

joined = left.join(right, "type")
joined.count()

## Оптимизатор запросов Catalyst
Catalyst выполняет оптимизацию запросов с целью ускорения их выполнения и применяет следующие методы:
 + Column projection
 + Partition pruning
 + Predicate pushdown
 + Constant folding
 
 Подготовим датасет для демонстрации работы Catalyst:


In [45]:
df \
    .write \
    .format("parquet") \
    .partitionBy("iso_country") \
    .mode("overwrite") \
    .save("/tmp/airports.parquet") 

airports = spark.read.parquet("/tmp/airports.parquet")
airports

In [48]:
spark.read.parquet("/tmp/airports.parquet/iso_country=AD/part-00000-ef581cca-e2a2-4a9e-b75f-d3cf7387ed30.c000.snappy.parquet").printSchema()

### Column projection
Данный механизм позволяет избегать вычитывания ненужных колонок при работе с источниками


In [50]:
selected = airports.select(col("ident"))
selected.cache()
selected.count()
selected.unpersist()
selected.explain()

In [51]:
selected = airports
selected.cache()
selected.count()
selected.unpersist()
selected.explain()

### Partition pruning
Данный механизм позволяет избежать чтения ненужных партиций


In [53]:
filtered = airports.filter(col("iso_country") == "RU")
filtered.count()
filtered.explain(True)

### Predicate pushdown
Данный механизм позволяет "протолкнуть" условия фильтрации данных на уровень datasource


In [55]:
filtered = airports.filter(col("iso_region") == "RU")
filtered.count()
filtered.explain()

### Simplify casts
Данный механизм убирает ненужные `cast`


In [57]:
result = spark.range(10).select(col("id").cast("long"))
result.explain(True)

In [58]:
result = spark.range(10).select(col("id").cast("int").cast("long"))
result.explain(True)

### Constant folding
Данный механизм сокращает количество констант, используемых в физическом плане


In [60]:
from pyspark.sql.functions import lit

result = spark.range(10).select((lit(3) > lit(0)).alias("foo"))
result.explain(True)

In [61]:
from pyspark.sql.functions import lit, col

result = spark.range(10).select((col("id") >  lit(0)).alias("foo"))
result.explain(True)

### Combine filters
Данный механизм объединяет фильтры

In [63]:
result = spark.range(10).filter(col("id") > 0) \
    .filter(col("id") != 5) \
    .filter(col("id") < 10)
    
result.explain(True)
