In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [None]:
ACCESS_KEY = "Wgnl0MKcUDKSa39rJb4u"
SECRET_KEY = "rT6wUVf6XTpN1DDY032dcY9oeHvupsSgpMkpg1I7"
MINIO_URL = "http://minio:9000"

spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("HW2") \
    .config("spark.sql.adaptive.enabled", False) \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.sql.sources.bucketing.enabled", True) \
    .config("spark.executor.memory", "1000M") \
    .config("spark.driver.memory", "600M") \
    .config('spark.jars.packages', 
        "org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-pom:1.12.365,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1"
    ) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_URL) \
    .getOrCreate()

In [None]:
spark

# Задание 1

## Входные данные 
- Файл с данными по оттоку телеком оператора в США (churn.csv)
- Справочник с названиями штатов (state.json)
- Справочник с численностью населения территорий (определяется полем area code) внутри штатов (state.json)
- Террия с численностью населения меньше 10_000 считается **мелкой**

## Что нужно сделать
1. Посчитать количество отточных и неотточных абонентов (поле churn), исключив **мелкие** территории
2. Отчет должен быть выполнен в разрезе **каждого штата** с его полным наименованием
3. Описать возникающие узкие места при выполнении данной операции
4. Применить один из способов оптимизации для ускорения выполнения запроса (при допущении, что справочник численности населения **сильно меньше** основных данных)
5. Если существует еще какой-то способ, применить также и его отдельно от п.4 (при допущении, что справочник численности населения **сопоставим по размеру** с основными данными)
6. Кратко описать реализованные способы и в чем их практическая польза

- P.S. Одним из выбранных способов должен быть Bucket specific join
- P.P.S. При обосновании предлагаем прикладывать запуска команды df.explain()

In [None]:
churn_df = spark.read.option("header", True).csv("s3a://input/data/churn.csv")
state_dict = spark.read.json("s3a://input/data/state.json")
pop_dict = spark.read.json("s3a://input/data/population.json")

### Решение

Проведем сначала небольшой разведочный анализ (EDA) для понимания того, что хранится в каждом нашем датафрейме spark.

In [None]:
churn_df.show(5)

In [None]:
churn_df.printSchema()

In [None]:
state_dict.show(5)

In [None]:
state_dict.printSchema()

In [None]:
pop_dict.show(5)

In [None]:
pop_dict.printSchema()

In [None]:
#зафиксируем трешхолд для последующей оптимизации
POP_THRESHOLD = 10_000

In [None]:
#уберем мелкие территории, так называемый метод Predicate pushdown перенесли шаг фильтрации ближе к источнику
churn_and_pop = churn_df.join(pop_dict, on="area code", how="inner") \
    .filter(F.col("population") > POP_THRESHOLD)

In [None]:
result = churn_and_pop.join(state_dict, churn_and_pop.state==state_dict.state_id, how="left") \
    .groupBy("state_name") \
    .pivot("churn") \
    .agg(F.count("*")) \
    .orderBy("state_name")

In [None]:
result.explain()
result.show(truncate=False)

Следует перечислить следующие узкие места:
* т.к. наши файлы pop_dict и state_dict имеют размерность точно меньше <10МБ , то при соединении мы можем использовать BroadcastJoinThreshold было бы тут эффективнее
* из прочего пункта поскольку у нас sort merge join, то по плану видно, что имеется четыре операции перемешивания exchange до операции merge(логично было бы что два поскольку два join, поэтому вопросики к catalyst в данном случае либо ко мне) 
* сильный перекос в timeline в spark ui stages

### Оптимизация 1 - broadcast has join

In [None]:
pop_dict_broad = F.broadcast(pop_dict)
state_dict_broad = F.broadcast(state_dict)

In [None]:
churn_and_pop_broad = churn_df.join(pop_dict_broad, on="area code", how="inner") \
    .filter(F.col("population") > POP_THRESHOLD)

In [None]:
result_broad = churn_and_pop_broad.join(state_dict_broad, churn_and_pop_broad.state==state_dict.state_id, how="left") \
    .groupBy("state_name") \
    .pivot("churn") \
    .agg(F.count("*")) \
    .orderBy("state_name")

In [None]:
result_broad.explain()
result_broad.show(truncate=False)

Broadcast hash join дает преимущество в виде скорости за счет того, что данные содержит локально на каждом executor и нет необходимости производить сетевую операцию - shuffle

### Оптимизация 2 - bucket specific join

In [None]:
#бакитируем один из датафреймов чтобы репартирование при join происходило без shuffle и sort
churn_df.repartition(1) \
    .write \
    .mode("overwrite") \
    .bucketBy(100, col="area code") \
    .option("path", "s3a://input/data/bucket/churn_bucket") \
    .saveAsTable("churn_bucket")

In [None]:
pop_dict \
    .withColumn("area code", F.col("area code").cast("string")) \
    .repartition(1) \
    .write \
    .bucketBy(100, "area code") \
    .option("path", "s3a://input/data/bucket/pop_bucket") \
    .saveAsTable("pop_bucket")

In [None]:
churn_bucket = spark.table("churn_bucket")
pop_bucket = spark.table("pop_bucket")

In [None]:
churn_and_pop_bucket = churn_bucket.join(pop_bucket, on="area code", how="inner") \
    .filter(F.col("population") > POP_THRESHOLD)

In [None]:
result_bucket = churn_and_pop_bucket.join(state_dict, churn_and_pop_bucket.state==state_dict.state_id, how="left") \
    .groupBy("state_name") \
    .pivot("churn") \
    .agg(F.count("*")) \
    .orderBy("state_name")

In [None]:
result_bucket.explain()
result_bucket.show(truncate=False)

Видим по плану запроса, что у нас отсутвует shuffle по area code для таблиц pop_bucket и churn_bucket, тем самым мы получили прирост производительсности по времени.

# Задание 2

## Входные данные 

*skew_transactions.csv* - информация о длительности просомтра контента пользователям
колонки:
1. user_uid — уникальный идентификатор пользователя
2. element_uid — уникальный идентификатор контента
3. watched_time — время просмотра в секундах

*catalogue.json* - каталог с описанием контента и метаинформации по нему
колонки:
1. type — тип элемента
2. duration — длительность в минутах (средняя длительность эпизода в случае с сериалами и многосерийными фильмами), округлённая до десятков
3. attributes — анонимизированные атрибуты данного элемента
4. availability — доступные права на элемент(subscription, purchase, rent)
5. feature_1 — анонимизированная вещественная переменная
6. feature_2 — анонимизированная вещественная переменная
7. feature_3 — анонимизированная порядковая переменная
8. feature_4 — анонимизированная вещественная переменная
9. feature_5 — анонимизированная вещественная переменная

## Что нужно сделать
1. Выполните join основных данных со справочником используя DataFrame API (по колонке id для контента - `element_uid`)
2. Описать проблему в датасетах с точки зрения обработки Spark
3. Решить задачу любым способом
4. Решить задачу с помощью salt-join подхода

P.S. Как вы можете заметить при просмотре данных по пользователями, нужный нам ключ для операции будет перекошен (90% строк представлены на фильм, очень популярный среди смотревших) - это нужно доказать в рамках п.2

### Решение 

In [None]:
%pip install pandas

In [None]:
import pandas as pd

In [None]:
skew_df = spark.read.option("header", True).csv("s3a://input/data/skew_transactions.csv")

In [None]:
catalogue_df = pd.read_json("datasets/catalogue.json", dtype=str)

In [None]:
catalogue_df.head(5)

In [None]:
#транспонируем датафрейм и также индекс обозначим как столбец element_uid

catalogue_df = catalogue_df.transpose().reset_index()
catalogue_df = catalogue_df.rename(columns={"index": "element_uid"})


In [None]:
catalogue_df.head(5)

In [None]:
#посмотрим на уникальные значения
catalogue_df["element_uid"].value_counts(ascending=False)

In [None]:
catalogue_df = spark.createDataFrame(catalogue_df)

In [None]:
skew_df.show(5)

In [None]:
#также посмотрим на униклаьные значения element_uid, чтобы увидеть что распределение столбца element_uid перекошено
skew_df.groupBy("element_uid").count().orderBy(F.col("count").desc()).show(5)

In [None]:
result = skew_df.join(catalogue_df.select("element_uid", "type", "availability", "duration"), on="element_uid", how="left")

In [None]:
result.explain()
result.show(truncate=False)

Как видно по плану у нас выполняется дефолтный sort merge join, мы имеем shuffle, а с учетом перекоса ключа есть следующие проблемы:
* **Излишняя нагрузка на некоторые узлы**: В sort-merge join данные должны быть отсортированы и объединены по ключам. Если ключи распределены неравномерно, узлы, отвечающие за частые ключи, получат больше данных для обработки. Это приводит к дисбалансу нагрузки(судя по spark ui timeline), когда некоторые узлы будут перегружены, становясь узкими местами (bottlenecks), а другие будут простаивать.
* **Использование памяти в режиме memory spill**: Не в данном кейсе, но в целом, перегруженные узлы могут исчерпать доступную память из-за большого объема данных, что может привести к сбоям или необходимости использования дискового пространства для выполнения операций, что замедляет процесс.
* **Большое количество операций shuffle**: Операция происходит между узлами, что прямо сказывается на производительсности и скорости выполенения

### Решение с оптимизацией - broadcast hash join
Опять же поскольку одна из таблиц, а конкретнее catalogue_df у нас 2.7Mb < 10Mb (смотрел в хранилище minio, не отрицаю возможно есть более релевантный способ), то мы можем использовать broadcast hash join чтобы избежать лишнего shuffle и следовательно лишнего сетевого взаимодействия

In [None]:
catalogue_broad = F.broadcast(catalogue_df)
skew_repartition = skew_df.repartition(5)

In [None]:
result_broad = skew_repartition.join(catalogue_broad.select("element_uid", "type", "availability", "duration"), on="element_uid", how="left")

In [None]:
result_broad.explain()
result_broad.show(truncate=False)

Мы получили прирост производительности:
* было - 2с
* стало - 0.5с
Конечно кидает warning, но в данной задаче он прям не критичен. Мы избегаем этап shuffle.

### Решение с оптимизацией - salting

In [None]:
def data_skew_helper(left, right, key, number_of_partitions, how="inner"):
    #generate random salt for left dataframe
    salt_value = F.lit(F.rand() * number_of_partitions % number_of_partitions).cast("int")
    left = left.withColumn("salt", salt_value)

    #Creating a column with an array of all possible salt values for right dataframe
    salt_col = F.explode(F.array([F.lit(i) for i in range(number_of_partitions)])).alias("salt")
    right = right.select("*",  salt_col)

    return left.join(right, [key, "salt"], how)

In [None]:
result_salt = data_skew_helper(skew_df, catalogue_df.select("element_uid", "type", "availability", "duration"), "element_uid", 5, how="left")

In [None]:
result_salt.explain()
result_salt.show(truncate=False)

In [None]:
#чтобы убедиться что распределение по партициям у нас лучше, чем при простом merge sort join без salt
result_salt_check_partition = result_salt.groupBy("element_uid", "salt").count().orderBy(F.col("count").desc())

In [None]:
result_salt_check_partition.show(truncate=False)

В данном случае мы получили время выполнения 0.6с, что быстрее по сравнению с раннее двумя использованными способами. Это происходит за счет того, что происходит смягчение перекоса данных, что позволяет равномерно распределить данные по нескольким ключам, тем самым помогая сбалансировать нагрузку между узлами. Но как видим по плану работ shuffle при этом у нас сохраняется как при простом sort merge join.

# Задание 3

## Входные данные 

*cut_transactions.csv*  — информация о длительности просомтра контента пользователям

Описание фичей в cut_transactions.csv: 
1. user_uid — уникальный идентификатор пользователя
2.  element_uid — уникальный идентификатор контента
3.  watched_time — время просмотра в секундах

*cut_ratings.csv*  — информация об оценках, поставленных пользователями

Описание фичей в cut_ratings.csv: 
1. user_uid — уникальный идентификатор пользователя 
2. element_uid — уникальный идентификатор контента 
3. rating — поставленный пользователем рейтинг

*ids.csv*  — выборка пользователей
Описание фичей в ids.csv: 
1. user_uid — уникальный идентификатор пользователя 


## Что нужно сделать
Для каждого пользователя из выборки посчитать:
1. Максимальное и минимальное время просмотра фильмов с оценками 8, 9 и 10 
2. Название фичи должно быть в формате feat_агрегирующая_функция_watched_time_rating_оценка. 
3. Если у пользователь не ставил оценки 8, 9 и 10 то значение фичей должно быть null
4. Описать принятые при разработки кода решения и возможные оптимизации

P.S. На каждом этапе обработки должно быть должны агрегироваться минимально возможные объемы данных (сокращаем затраты на shuflle)

In [None]:
transactions_df = spark.read.option("header", True).csv("s3a://input/data/cut_transactions.csv")
ratings_df = spark.read.option("header", True).csv("s3a://input/data/cut_ratings.csv")
ratings_df = F.broadcast(ratings_df)
sample = spark.read.option("header", True).csv("s3a://input/data/ids.csv")
sample = F.broadcast(sample)

In [None]:
transactions_df.printSchema()

In [None]:
transactions_df.show(5)

In [None]:
ratings_df.printSchema()

In [None]:
ratings_df.show(5)

In [None]:
sample.printSchema()

In [None]:
sample.show(5)

### Решение

In [None]:
full_table = transactions_df.join(ratings_df , on=["user_uid", "element_uid"], how="inner").join(sample, on="user_uid", how="inner")

In [None]:
full_table.show(5)

**Непосредственно решение нашей задачи**

In [None]:
filtered_table = full_table.filter(F.col("rating").isin([8, 9, 10]))
full_table_filter = filtered_table.groupBy("user_uid", "rating") \
                                  .agg(F.min("watched_time").alias("min_watched_time"), 
                                       F.max("watched_time").alias("max_watched_time"))


In [None]:
# fulfilling the requirement of column format 
ratings = [8, 9, 10]
result_df = sample

for rating in ratings:
    rating_df = full_table_filter.filter(F.col("rating") == rating) \
                                 .select(
                                     F.col("user_uid"),
                                     F.col("min_watched_time").alias(f"feat_min_watched_time_rating_{rating}"),
                                     F.col("max_watched_time").alias(f"feat_max_watched_time_rating_{rating}")
                                 )
    result_df = result_df.join(rating_df, on="user_uid", how="left")

# Sort table
result_table = result_df.orderBy("user_uid")

In [None]:
result_table.show(10)

In [None]:
result_table.explain()
result_table.show(truncate=False)

Опять же использовали подход с использованием broadcast hash join, поскольку наша табличка sample и cut_ratings небольшая, то это позволяет избежать shuffle операций и значительно ускорить join операции.