In [0]:
%spark.conf

spark.executor.instances=2
spark.executor.memory=1G
spark.kryoserializer.buffer.max=1024m

spark.sql.autoBroadcastJoinThreshold=20971520

Нужно скопировать себе эту тетрадку. Параграфы с генерацией данных и созданием семплов запускать не нужно, они оставлены для ознакомления

In [2]:
import org.apache.spark.mllib.random.RandomRDDs._
import java.time.LocalDate
import java.time.format.DateTimeFormatter

val dates = (0 to 14).map(LocalDate.of(2020, 11, 1).plusDays(_).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))).toSeq

def generateCity(r: Double): String = if (r < 0.9) "BIG_CITY" else "SMALL_CITY_" + scala.math.round((r - 0.9) * 1000)

def generateCityUdf = udf(generateCity _)

// spark.sql("drop table hw2.events_full")
spark.sql("create database hw_4")
for(i <- dates) {
    uniformRDD(sc, 10000000L, 1)
    .toDF("uid")
    .withColumn("date", lit(i))
    .withColumn("city", generateCityUdf($"uid"))
    .selectExpr("date", " sha2(cast(uid as STRING), 256) event_id", "city")
    .withColumn("skew_key", when($"city" === "BIG_CITY", lit("big_event")).otherwise($"event_id"))
    .write.mode("append")
    .partitionBy("date")
    .saveAsTable("hw_4.events_full")
}


In [3]:
spark.table("hw_4.events_full")
.select("event_id")
.sample(0.001)
.repartition(2)
.write.mode("overwrite")
.saveAsTable("hw_4.sample")


In [4]:

spark.table("hw_4.sample")
.limit(100)
.coalesce(1)
.write.mode("overwrite")
.saveAsTable("hw_4.sample_small")

In [5]:


spark.table("hw_4.events_full")
.select("event_id")
.sample(0.003)
.repartition(1)
.write.mode("overwrite")
.saveAsTable("hw_4.sample_big")

In [6]:


spark.table("hw_4.events_full")
.select("event_id")
.sample(0.015)
.repartition(1)
.write.mode("overwrite")
.saveAsTable("hw_4.sample_very_big")

Для упражнений сгрененирован большой набор синтетических данных в таблице hw2.events_full. Из этого набора данных созданы маленькие (относительно исходного набора) таблицы разного размера kotelnikov.sample_[small, big, very_big]. 

Ответить на вопросы:
 * какова структура таблиц
 * сколько в них записей 
 * сколько места занимают данные
 

In [8]:
%pyspark

from pyspark.sql.functions import col

tables = spark.sql("SHOW TABLES IN hw_4").collect()

for table in tables:
    
    # Имя объекта
    print(f"{table['database']}.{table['tableName']}")
    
    # Статистика
    compute_df = spark.sql(f"ANALYZE TABLE {table['database']}.{table['tableName']} COMPUTE STATISTICS")
    describe_df = spark.sql(f"DESCRIBE FORMATTED {table['database']}.{table['tableName']}")
    statistics = describe_df \
        .where(col("col_name") == "Statistics") \
        .select("data_type") \
        .rdd.map(lambda x: x[0]) \
        .collect()
    print(statistics[0])    
    
    # Схема таблицы
    spark.table(f"{table['database']}.{table['tableName']}").printSchema()
    

Информация о том, сколько места занимают данные посмотреть в HDFS UI

Получить планы запросов для джойна большой таблицы hw_4.events_full с каждой из таблиц hw_4.sample, hw_4.sample_big, hw_4.sample_very_big по полю event_id. В каких случаях используется BroadcastHashJoin? 

BroadcastHashJoin автоматически выполняется для джойна с таблицами, размером меньше параметра spark.sql.autoBroadcastJoinThreshold. Узнать его значение можно командой spark.conf.get("spark.sql.autoBroadcastJoinThreshold").

In [11]:
%pyspark

conf = spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
print(conf)

events_full_df = spark.table("hw_4.events_full")

# BroadcastHashJoin
sample_small_df = spark.table("hw_4.sample_small")
ef_sample_small_df = events_full_df \
    .join(sample_small_df, "event_id", "inner") \
    .explain()

# BroadcastHashJoin
sample_df = spark.table("hw_4.sample")
ef_sample_df = events_full_df \
    .join(sample_df, "event_id", "inner") \
    .explain()

# SortMergeJoin
sample_big_df = spark.table("hw_4.sample_big")
ef_sample_big_df = events_full_df \
    .join(sample_big_df, "event_id", "inner") \
    .explain()

# SortMergeJoin
sample_very_big_df = spark.table("hw_4.sample_very_big")
ef_sample_very_big_df = events_full_df \
    .join(sample_very_big_df, "event_id", "inner") \
    .explain()

#ef_sample_df.show()


Выполнить джойны с таблицами  hw_4.sample,  hw_4.sample_big в отдельных параграфах, чтобы узнать время выполнения запросов (например, вызвать .count() для результатов запросов). Время выполнения параграфа считается автоматически и указывается в нижней части по завершении

Зайти в spark ui (ссылку сгенерировать в следующем папраграфе). Сколько tasks создано на каждую операцию? Почему именно столько? Каков DAG вычислений?  

In [13]:
%pyspark

# 84 tasks, 
# время выполнения 1 мин 8 сек, 
# количество task меньше, чем в операции ниже за счет наличия broadcastб 
# в даге 2 стейджа, реализующие механизм broadcast - передача малого sample датафрема на worker'ы с их последющим джойном к "большой" таблице event_full
sc.setLocalProperty("callSite.short", "events_full join sample")

events_full_df = spark.table("hw_4.events_full")
sample_df = spark.table("hw_4.sample")

ef_sample_df = events_full_df \
    .join(sample_df, "event_id", "inner") \
    .count()

print(ef_sample_df)

In [14]:
%pyspark

# 284 tasks, время выполнения 2 мин 38 сек,
sc.setLocalProperty("callSite.short", "events_full join sample_big")

events_full_df = spark.table("hw_4.events_full")
sample_big_df = spark.table("hw_4.sample_big")

ef_sample_big_df = events_full_df \
    .join(sample_big_df, "event_id", "inner") \
    .count()

print(ef_sample_big_df)

Оптимизировать джойн с таблицами hw_4.sample_big, hw_4.sample_very_big с помощью broadcast(df). Выполнить запрос, посмотреть в UI, как поменялся план запроса, DAG, количество тасков. Второй запрос не выполнится 

In [16]:
%pyspark
from pyspark.sql.functions import broadcast

sc.setLocalProperty("callSite.short", "events_full join sample_big_broad")

events_full_df = spark.table("hw_4.events_full")
sample_big_df = spark.table("hw_4.sample_big")
sample_big_df_broad = broadcast(sample_big_df)

ef_sample_big_df = events_full_df \
    .join(sample_big_df_broad, "event_id", "inner") \
    .count()

print(ef_sample_big_df)


In [17]:
%pyspark
from pyspark.sql.functions import broadcast

sc.setLocalProperty("callSite.short", "events_full br join sample_big")

events_full_df = spark.table("hw_4.events_full")
sample_very_big_df = spark.table("hw_4.sample_very_big")
sample_very_big_df_broad = broadcast(sample_very_big_df)

ef_sample_big_df = events_full_df \
    .join(sample_very_big_df_broad, "event_id", "inner") \
    .count()

print(ef_sample_big_df)



Таблица hw_4.sample_very_big оказывается слишком большой для бродкаста и не помещается полностью на каждой ноде, поэтому возникает исключение.


Отключить автоматический броадкаст командой spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1"). Сделать джойн с семплом hw_4.sample, сравнить время выполнения запроса.

In [20]:
%pyspark

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")


In [21]:
%pyspark


sc.setLocalProperty("callSite.short", "events_full join sample without broadcast")

events_full_df = spark.table("hw_4.events_full")
sample_df = spark.table("hw_4.sample")

ef_sample_df = events_full_df \
    .join(sample_df, "event_id", "inner") \
    .count()

print(ef_sample_df)


6 минут 2 секунды в случае, когда отключен автоматический броадкастинг, по сравнению с 2 минутами 1 секундой со включенным бродкастингом.


In [23]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "26214400")

In [24]:
spark.sql("clear cache")

В процессе обработки данных может возникнуть перекос объёма партиций по количеству данных (data skew). В таком случае время выполнения запроса может существенно увеличиться, так как данные распределятся по исполнителям неравномерно. В следующем параграфе происходит инициализация датафрейма, этот параграф нужно выполнить, изменять код нельзя. В задании нужно работать с инициализированным датафреймом.

Датафрейм разделен на 30 партиций по ключу city, который имеет сильно  неравномерное распределение.

In [26]:
%pyspark 
from pyspark.sql.functions import col

skew_df = spark.table("hw_4.events_full")\
.where("date = '2020-11-01'")\
.repartition(30, col("city"))\
.cache()

skew_df.count()

Посчитать количество event_count различных событий event_id , содержащихся в skew_df с группировкой по городам. Результат упорядочить по event_count.

В spark ui в разделе jobs выбрать последнюю, в ней зайти в stage, состоящую из 30 тасков (из такого количества партиций состоит skew_df). На странице стейджа нажать кнопку Event Timeline и увидеть время выполнения тасков по экзекьюторам. Одному из них выпала партиция с существенно большим количеством данных. Остальные экзекьюторы в это время бездействуют -- это и является проблемой, которую предлагается решить далее.

In [28]:
%pyspark

from pyspark.sql.functions import count, col

skew_grouped_df = skew_df \
    .groupBy("city").agg(count("*").alias("event_count")) \
    .orderBy(col("event_count").desc())
    
skew_grouped_df.show()

один из способов решения проблемы агрегации по неравномерно распределенному ключу является предварительное перемешивание данных. Его можно сделать с помощью метода repartition(p_num), где p_num -- количество партиций, на которые будет перемешан исходный датафрейм

In [30]:
%pyspark

from pyspark.sql.functions import count, col

skew_grouped_df = skew_df \
    .repartition(30) \
    .groupBy("city").agg(count("*").alias("event_count")) \
    .orderBy(col("event_count").desc())
    
skew_grouped_df.show()


Другой способ исправить неравномерность по ключу -- создание синтетического ключа с равномерным распределением. В нашем случае неравномерность исходит от единственного значения city='BIG_CITY', которое часто повторяется в данных и при группировке попадает к одному экзекьютору. В таком случае лучше провести группировку в два этапа по синтетическому ключу CITY_SALT, который принимает значение BIG_CITY_rand (rand -- случайное целое число) для популярного значения BIG_CITY и CITY для остальных значений. На втором этапе восстанавливаем значения CITY и проводим повторную агрегацию, которая не занимает времени, потому что проводится по существенно меньшего размера данным. 

Такая же техника применима и к джойнам по неравномерному ключу, см, например https://itnext.io/handling-data-skew-in-apache-spark-9f56343e58e8

Что нужно реализовать:
* добавить синтетический ключ
* группировка по синтетическому ключу
* восстановление исходного значения
* группировка по исходной колонке

In [32]:
%pyspark

# Добавляем колонку с "солью": для BIG_CITY - случайное целое от 0 до 20 включительно, для SMALL_CITY - 21, 
# и выводим кусочки датафрейма для BIG_CITY и SMALL_CITY для контроля правильности выполненной процедуры

from pyspark.sql.functions import expr, when, sum

salt = expr("""pmod(round((rand() * 100), 0), 20)""").cast("integer")

salted_df = skew_df.withColumn("salt", salt) \
    .withColumn("salt", when(col("city") == "BIG_CITY", col("salt")).otherwise(21)) 

salted_df.show()


In [33]:
%pyspark

salted_count_df = salted_df \
    .groupBy("city", "salt") \
    .count() 

result_df = salted_count_df \
    .groupBy("city").agg(sum("count").alias("event_count")) \
    .orderBy(col("event_count").desc())
    
result_df.show()


In [34]:
%pyspark

spark.stop()