In [None]:
%spark.conf

spark.executor.instances=2
spark.executor.memory=1G
spark.kryoserializer.buffer.max=1024m

spark.sql.autoBroadcastJoinThreshold=20971520

Генерация events таблицы

In [2]:
import org.apache.spark.mllib.random.RandomRDDs._
import java.time.LocalDate
import java.time.format.DateTimeFormatter

val dates = (0 to 14).map(LocalDate.of(2020, 11, 1).plusDays(_).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))).toSeq

def generateCity(r: Double): String = if (r < 0.9) "BIG_CITY" else "SMALL_CITY_" + scala.math.round((r - 0.9) * 1000)

def generateCityUdf = udf(generateCity _)

// spark.sql("drop table hw2.events_full")
spark.sql("create database hw_4")
for(i <- dates) {
    uniformRDD(sc, 10000000L, 1)
    .toDF("uid")
    .withColumn("date", lit(i))
    .withColumn("city", generateCityUdf($"uid"))
    .selectExpr("date", " sha2(cast(uid as STRING), 256) event_id", "city")
    .withColumn("skew_key", when($"city" === "BIG_CITY", lit("big_event")).otherwise($"event_id"))
    .write.mode("append")
    .partitionBy("date")
    .saveAsTable("hw_4.events_full")
}


Генерация events_sample

In [3]:
spark.table("hw_4.events_full")
.select("event_id")
.sample(0.001)
.repartition(2)
.write.mode("overwrite")
.saveAsTable("hw_4.sample")

Генерация events_small

In [4]:
spark.table("hw_4.sample")
.limit(100)
.coalesce(1)
.write.mode("overwrite")
.saveAsTable("hw_4.sample_small")

Генерация events_big

In [5]:
spark.table("hw_4.events_full")
.select("event_id")
.sample(0.003)
.repartition(1)
.write.mode("overwrite")
.saveAsTable("hw_4.sample_big")

Генерация events_very_big

In [6]:
spark.table("hw_4.events_full")
.select("event_id")
.sample(0.015)
.repartition(1)
.write.mode("overwrite")
.saveAsTable("hw_4.sample_very_big")

Сгрененирован большой набор синтетических данных в таблице hw2.events_full. Из этого набора данных созданы маленькие (относительно исходного набора) таблицы разного размера kotelnikov.sample_[small, big, very_big]. 

Ответить на вопросы:
 * какова структура таблиц
 * сколько в них записей 
 * сколько места занимают данные
 

In [8]:
%pyspark

tables = spark.sql("SHOW TABLES IN hw_4").collect()
for table in tables:
    print(f'Схема таблицы {table[0]}.{table[1]}:')
    spark.table(f'{table[0]}.{table[1]}').printSchema()
    print(f"Число записей в таблице {table[0]}.{table[1]}: {spark.table(f'{table[0]}.{table[1]}').count()}")
    print("-------------------------------------------------")
    print()

Получить планы запросов для джойна большой таблицы hw_4.events_full с каждой из таблиц hw_4.sample, hw_4.sample_big, hw_4.sample_very_big по полю event_id. В каких случаях используется BroadcastHashJoin? 

In [10]:
%pyspark

tables = spark.sql("SHOW TABLES IN hw_4").collect()
for table in tables:
    if table[1] in ['sample', 'sample_big', 'sample_very_big']:
        print(f'План запроса для таблицы hw_4.events_full с таблией {table[0]}.{table[1]}:')
        spark.table('hw_4.events_full').join(
            spark.table(f'{table[0]}.{table[1]}'), 
            on='event_id', 
            how='inner'
            ).explain()
        print("-------------------------------------------------")
        print()

In [11]:
%pyspark

print(f"Значение по умолчанию параметра autoBroadcastJoinThreshold: {spark.conf.get('spark.sql.autoBroadcastJoinThreshold')}")

BroadcastHashJoin используется только при джоине с таблицей hw_4.sample, так как он размером 7 Кб, а размер по умолчанию autoBroadcastJoinThreshold 10 Мб. Размер остальных таблиц больше 10 Мб.

Выполнить джойны с таблицами  hw_4.sample,  hw_4.sample_big в отдельных параграфах, чтобы узнать время выполнения запросов (например, вызвать .count() для результатов запросов). Время выполнения параграфа считается автоматически и указывается в нижней части по завершении

Зайти в spark ui (ссылку сгенерировать в следующем папраграфе). Сколько tasks создано на каждую операцию? Почему именно столько? Каков DAG вычислений?  

In [14]:
%pyspark

sc.setLocalProperty("callSite.short", "events_full join sample")

spark.table('hw_4.events_full').join(
            spark.table('hw_4.sample'), 
            on='event_id', 
            how='inner'
            ).count()

In [15]:
%pyspark

sc.setLocalProperty("callSite.short", "events_full join sample_big")

spark.table('hw_4.events_full').join(
            spark.table('hw_4.sample_big'), 
            on='event_id', 
            how='inner'
            ).count()

82 таски для sample (44 секунды) и 284 для sample_big (2 мин. 8 сек.)

Насильный broadcast

Оптимизировать джойн с таблицами hw_4.sample_big, hw_4.sample_very_big с помощью broadcast(df). Выполнить запрос, посмотреть в UI, как поменялся план запроса, DAG, количество тасков. 

In [18]:
%pyspark

from pyspark.sql.functions import broadcast

sc.setLocalProperty("callSite.short", "events_full join sample_big with broadcast")

spark.table('hw_4.events_full').join(
            broadcast(spark.table('hw_4.sample_big')), 
            on='event_id', 
            how='inner'
            ).count()

In [19]:
%pyspark

sc.setLocalProperty("callSite.short", "events_full join sample_very_big with broadcast")

spark.table('hw_4.events_full').join(
            broadcast(spark.table('hw_4.sample_very_big')), 
            on='event_id', 
            how='inner'
            ).count()

82 таски для sample_big (1 мин 20 сек.), для sample_very_big с принудительным бродкастом не полняется, так как sample_very_big не помещается на каждой ноде, возникает исключение(org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow.).

Отключение auto broadcast

Отключить автоматический броадкаст командой spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1"). Сделать джойн с семплом hw_4.sample, сравнить время выполнения запроса.

In [22]:
%pyspark

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

sc.setLocalProperty("callSite.short", "events_full join sample without broadcast")

spark.table('hw_4.events_full').join(
            spark.table('hw_4.sample'), 
            on='event_id', 
            how='inner'
            ).count()

284 таски для sample (4 мин. 16 сек.) с отключенным бродкастом по сравнению с 44 секундами.

In [24]:
%pyspark

#Вернуть настройку к исходной
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")

In [25]:
%pyspark

spark.conf.get('spark.sql.autoBroadcastJoinThreshold')

In [26]:
%pyspark
spark.sql("clear cache")

В процессе обработки данных может возникнуть перекос объёма партиций по количеству данных (data skew). В таком случае время выполнения запроса может существенно увеличиться, так как данные распределятся по исполнителям неравномерно. 

Инициализация датафрейма.

In [28]:
%pyspark 
from pyspark.sql.functions import col

skew_df = spark.table("hw_4.events_full")\
.where("date = '2020-11-01'")\
.repartition(30, col("city"))\
.cache()

skew_df.count()

Датафрейм разделен на 30 партиций по ключу city, который имеет сильно  неравномерное распределение.

4.1. Наблюдение проблемы

Посчитать количество event_count различных событий event_id , содержащихся в skew_df с группировкой по городам. Результат упорядочить по event_count.

In [30]:
%pyspark

from pyspark.sql.functions import count, col

sc.setLocalProperty("callSite.short", "example of a problem")

grouped = skew_df.groupBy("city")\
                 .agg(count("event_id")\
                 .alias("event_count"))\
                 .orderBy(col("event_count").desc())

grouped.show()

В spark ui stage джобы, состоящем из 30 тасков (из такого количества партиций состоит skew_df) можно увидеть время выполнения тасков по экзекьюторам. Одному из них выпала партиция с существенно большим количеством данных. Остальные экзекьюторы в это время бездействуют - это является проблемой.

4.2. repartition

Предварительное перемешивание данных с помощью метода repartition(p_num), где p_num - количество партиций, на которые будет перемешан исходный датафрейм

In [32]:
%pyspark

import pyspark.sql.functions as f

sc.setLocalProperty("callSite.short", "repartition")

grouped = skew_df.repartition(100)\
                 .groupBy("city")\
                 .agg(f.count("event_id")\
                 .alias("event_count"))\
                 .orderBy(f.col("event_count").desc())

grouped.show()

4.3. Key Salting

Другой способ - создание синтетического ключа с равномерным распределением. В нашем случае неравномерность исходит от единственного значения city='BIG_CITY', которое часто повторяется в данных и при группировке попадает к одному экзекьютору. В таком случае лучше провести группировку в два этапа по синтетическому ключу CITY_SALT, который принимает значение BIG_CITY_rand (rand - случайное целое число) для популярного значения BIG_CITY и CITY для остальных значений. На втором этапе восстанавливаем значения CITY и проводим повторную агрегацию, которая не занимает времени, потому что проводится по существенно меньшего размера данным. 

Такая же техника применима и к джойнам по неравномерному ключу, см, например https://itnext.io/handling-data-skew-in-apache-spark-9f56343e58e8

Что нужно реализовать:
* добавить синтетический ключ
* группировка по синтетическому ключу
* восстановление исходного значения
* группировка по исходной колонке

In [34]:
%pyspark

# Добавляем колонку с "солью": для BIG_CITY - случайное целое от 0 до 20 включительно, для SMALL_CITY - 21, 
# Выводим кусочки датафрейма для BIG_CITY и SMALL_CITY для контроля правильности выполненной процедуры

import pyspark.sql.functions as F

sc.setLocalProperty("callSite.short", "Key Salting (salt)")

BIG_CITY_rand = F.expr("""pmod(round(rand() * 100, 0), 20)""").cast("integer")
SMALL_CITY = 21

salted = skew_df.withColumn("salt", F.when(F.col("city") == "BIG_CITY", BIG_CITY_rand).otherwise(SMALL_CITY))

salted.filter(salted.city=="BIG_CITY").show()
salted.filter(salted.city!="BIG_CITY").show()

In [35]:
%pyspark

sc.setLocalProperty("callSite.short", "Key Salting (grouping)")

result = salted.groupBy(F.col("city"), F.col("salt")).agg(F.count("*").alias("count")) \
               .groupBy(F.col("city")).agg(F.sum("count").alias("event_count")) \
               .orderBy(F.col("event_count").desc())
      
result.show()