In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
ACCESS_KEY = "ACCESS_KEY"
SECRET_KEY = "SECRET_KEY"
MINIO_URL = "http://minio:9000"

spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("HW2") \
    .config("spark.sql.adaptive.enabled", False) \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.sql.sources.bucketing.enabled", True) \
    .config("spark.executor.memory", "450M") \
    .config("spark.driver.memory", "450M") \
    .config('spark.jars.packages', 
        "org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-pom:1.12.365,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1"
    ) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_URL) \
    .getOrCreate()

:: loading settings :: url = jar:file:/usr/local/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-pom added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e784d968-afd3-4ec7-b74a-ad76490e359d;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.1026 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found com.amazonaws#aws-java-sdk-pom;1.12.365 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#ha

# Задание 1

## Входные данные 
- Файл с данными по оттоку телеком оператора в США (churn.csv)
- Справочник с названиями штатов (state.json)
- Справочник с численностью населения территорий (определяется полем area code) внутри штатов (state.json)
- Террия с численностью населения меньше 10_000 считается **мелкой**

## Что нужно сделать
1. Посчитать количество отточных и неотточных абонентов (поле churn), исключив **мелкие** территории
2. Отчет должен быть выполнен в разрезе **каждого штата** с его полным наименованием
3. Описать возникающие узкие места при выполнении данной операции
4. Применить один из способов оптимизации для ускорения выполнения запроса (при допущении, что справочник численности населения **сильно меньше** основных данных)
5. Если существует еще какой-то способ, применить также и его отдельно от п.4 (при допущении, что справочник численности населения **сопоставим по размеру** с основными данными)
6. Кратко описать реализованные способы и в чем их практическая польза

- P.S. Одним из выбранных способов должен быть Bucket specific join
- P.P.S. При обосновании предлагаем прикладывать запуска команды df.explain()

In [3]:
churn_df = spark.read.option("header", True).csv("s3a://input/data/churn.csv")
state_dict = spark.read.json("s3a://input/data/state.json")
pop_dict = spark.read.json("s3a://input/data/population.json")

24/06/30 11:59:37 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

### Решение

In [4]:
large_areas_df = pop_dict.filter(F.col("population") >= 10000)

In [5]:
churn_with_states_df = churn_df.join(state_dict, state_dict['state_id']==churn_df['state'], "inner")
churn_with_population_df = churn_with_states_df.join(large_areas_df, 
                                                     large_areas_df['area code']==churn_with_states_df['area code'],
                                                     "inner").drop(churn_df["area code"])

In [6]:
result_df = churn_with_population_df.groupBy("state_name", "churn").count().orderBy("state_name")

In [7]:
# Здесь необходимо вывести результат:
result_df.explain()
result_df.show(truncate=False)

== Physical Plan ==
*(11) Sort [state_name#72 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(state_name#72 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=145]
   +- *(10) HashAggregate(keys=[state_name#72, churn#38], functions=[count(1)])
      +- Exchange hashpartitioning(state_name#72, churn#38, 200), ENSURE_REQUIREMENTS, [plan_id=141]
         +- *(9) HashAggregate(keys=[state_name#72, churn#38], functions=[partial_count(1)])
            +- *(9) Project [churn#38, state_name#72]
               +- *(9) SortMergeJoin [cast(area code#20 as bigint)], [area code#83L], Inner
                  :- *(6) Sort [cast(area code#20 as bigint) ASC NULLS FIRST], false, 0
                  :  +- Exchange hashpartitioning(cast(area code#20 as bigint), 200), ENSURE_REQUIREMENTS, [plan_id=123]
                  :     +- *(5) Project [area code#20, churn#38, state_name#72]
                  :        +- *(5) SortMergeJoin [state#18], [state_id#71], Inner
                  :           :- *(



+--------------------+-----+-----+
|state_name          |churn|count|
+--------------------+-----+-----+
|Alabama             |False|58   |
|Alabama             |True |7    |
|Alaska              |True |3    |
|Alaska              |False|44   |
|Arizona             |True |4    |
|Arizona             |False|53   |
|Arkansas            |True |9    |
|Arkansas            |False|35   |
|California          |False|21   |
|California          |True |8    |
|Colorado            |True |7    |
|Colorado            |False|49   |
|Connecticut         |True |8    |
|Connecticut         |False|56   |
|Delaware            |True |8    |
|Delaware            |False|48   |
|District of Columbia|True |4    |
|District of Columbia|False|43   |
|Florida             |True |8    |
|Florida             |False|45   |
+--------------------+-----+-----+
only showing top 20 rows



                                                                                

### Оптимизация 1
**Bucket Specific Join**

In [8]:
churn_with_population = churn_df.join(large_areas_df, 
                                      churn_df["area code"] == large_areas_df["area code"], "inner").drop(churn_df["area code"])
churn_with_population.write \
                     .mode("overwrite") \
                     .bucketBy(10, "state") \
                     .sortBy("state") \
                     .option("path", "s3a://input/data/bucketed/churn") \
                     .saveAsTable("churn_with_population")

                                                                                

In [9]:
bucketed_churn_with_population = spark.table("churn_with_population")
optimized_df = bucketed_churn_with_population.join(state_dict, 
                                                   bucketed_churn_with_population["state"] == state_dict["state_id"], "inner")

# Подсчет количества отточных и неотточных абонентов
optimized_result_df = optimized_df.groupBy("state_name", "churn").count().orderBy("state_name")

In [10]:
# Здесь необходимо вывести результат:
optimized_result_df.explain()
optimized_result_df.show(truncate=False)

== Physical Plan ==
*(6) Sort [state_name#72 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(state_name#72 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=566]
   +- *(5) HashAggregate(keys=[state_name#72, churn#416], functions=[count(1)])
      +- Exchange hashpartitioning(state_name#72, churn#416, 200), ENSURE_REQUIREMENTS, [plan_id=562]
         +- *(4) HashAggregate(keys=[state_name#72, churn#416], functions=[partial_count(1)])
            +- *(4) Project [churn#416, state_name#72]
               +- *(4) SortMergeJoin [state#397], [state_id#71], Inner
                  :- *(1) Sort [state#397 ASC NULLS FIRST], false, 0
                  :  +- *(1) Filter isnotnull(state#397)
                  :     +- *(1) ColumnarToRow
                  :        +- FileScan parquet spark_catalog.default.churn_with_population[state#397,churn#416] Batched: true, Bucketed: true, DataFilters: [isnotnull(state#397)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://input/data/bu



+--------------------+-----+-----+
|state_name          |churn|count|
+--------------------+-----+-----+
|Alabama             |False|58   |
|Alabama             |True |7    |
|Alaska              |True |3    |
|Alaska              |False|44   |
|Arizona             |True |4    |
|Arizona             |False|53   |
|Arkansas            |True |9    |
|Arkansas            |False|35   |
|California          |False|21   |
|California          |True |8    |
|Colorado            |True |7    |
|Colorado            |False|49   |
|Connecticut         |True |8    |
|Connecticut         |False|56   |
|Delaware            |True |8    |
|Delaware            |False|48   |
|District of Columbia|True |4    |
|District of Columbia|False|43   |
|Florida             |True |8    |
|Florida             |False|45   |
+--------------------+-----+-----+
only showing top 20 rows



                                                                                

### Оптимизация 2
**Broadcast Join**

In [11]:
broadcast_state_df = F.broadcast(state_dict)
broadcast_pop_df = F.broadcast(pop_dict)

large_areas_df = pop_dict.filter(F.col("population") >= 10000)
churn_with_population = churn_df.join(large_areas_df, 
                                      churn_df["area code"] == large_areas_df["area code"], "inner").drop(churn_df["area code"])

broadcast_optimized_df = churn_with_population.join(broadcast_state_df, churn_with_population["state"] == state_dict["state_id"], "inner")


In [12]:
broadcast_result_df = broadcast_optimized_df.groupBy("state_name", "churn").count().orderBy("state_name")

In [13]:
# Здесь необходимо вывести результат:
broadcast_result_df.explain()
broadcast_result_df.show(truncate=False)

== Physical Plan ==
*(8) Sort [state_name#72 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(state_name#72 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=825]
   +- *(7) HashAggregate(keys=[state_name#72, churn#38], functions=[count(1)])
      +- Exchange hashpartitioning(state_name#72, churn#38, 200), ENSURE_REQUIREMENTS, [plan_id=821]
         +- *(6) HashAggregate(keys=[state_name#72, churn#38], functions=[partial_count(1)])
            +- *(6) Project [churn#38, state_name#72]
               +- *(6) BroadcastHashJoin [state#18], [state_id#71], Inner, BuildRight, false
                  :- *(6) Project [state#18, churn#38]
                  :  +- *(6) SortMergeJoin [cast(area code#20 as bigint)], [area code#83L], Inner
                  :     :- *(2) Sort [cast(area code#20 as bigint) ASC NULLS FIRST], false, 0
                  :     :  +- Exchange hashpartitioning(cast(area code#20 as bigint), 200), ENSURE_REQUIREMENTS, [plan_id=796]
                  :     :     +- 



+--------------------+-----+-----+
|state_name          |churn|count|
+--------------------+-----+-----+
|Alabama             |False|58   |
|Alabama             |True |7    |
|Alaska              |True |3    |
|Alaska              |False|44   |
|Arizona             |True |4    |
|Arizona             |False|53   |
|Arkansas            |True |9    |
|Arkansas            |False|35   |
|California          |False|21   |
|California          |True |8    |
|Colorado            |True |7    |
|Colorado            |False|49   |
|Connecticut         |True |8    |
|Connecticut         |False|56   |
|Delaware            |True |8    |
|Delaware            |False|48   |
|District of Columbia|True |4    |
|District of Columbia|False|43   |
|Florida             |True |8    |
|Florida             |False|45   |
+--------------------+-----+-----+
only showing top 20 rows



                                                                                

### Описание узких мест и способов оптимизации
**Узкие места:**
1. Объем данных при соединении больших таблиц.
2. Время выполнения групповых операций.

**Способы оптимизации:**
1. *Bucket Specific Join*: помогает оптимизировать операции соединения за счет предварительной сортировки и разделения данных на бакеты.
Практическая польза: уменьшает объем данных для сравнения и ускоряет соединение.
2. *Broadcast Join*: при использовании маленькой таблицы как broadcast, вся таблица передается всем узлам, что уменьшает затраты на шеффлинг.
Практическая польза: значительно ускоряет соединения, когда одна из таблиц достаточно маленькая, чтобы поместиться в памяти.

# Задание 2

## Входные данные 

*skew_transactions.csv* - информация о длительности просомтра контента пользователям
колонки:
1. user_uid — уникальный идентификатор пользователя
2. element_uid — уникальный идентификатор контента
3. watched_time — время просмотра в секундах

*catalogue.json* - каталог с описанием контента и метаинформации по нему
колонки:
1. type — тип элемента
2. duration — длительность в минутах (средняя длительность эпизода в случае с сериалами и многосерийными фильмами), округлённая до десятков
3. attributes — анонимизированные атрибуты данного элемента
4. availability — доступные права на элемент(subscription, purchase, rent)
5. feature_1 — анонимизированная вещественная переменная
6. feature_2 — анонимизированная вещественная переменная
7. feature_3 — анонимизированная порядковая переменная
8. feature_4 — анонимизированная вещественная переменная
9. feature_5 — анонимизированная вещественная переменная

## Что нужно сделать
1. Выполните join основных данных со справочником используя DataFrame API (по колонке id для контента - `element_uid`)
2. Описать проблему в датасетах с точки зрения обработки Spark
3. Решить задачу любым способом
4. Решить задачу с помощью salt-join подхода

P.S. Как вы можете заметить при просмотре данных по пользователями, нужный нам ключ для операции будет перекошен (90% строк представлены на фильм, очень популярный среди смотревших) - это нужно доказать в рамках п.2

### Решение 

In [64]:
# !pip install pandas

In [42]:
import pandas as pd
import pyspark.sql.types as T

In [51]:
catalogue_df_pandas = pd.read_json("./catalogue.json").transpose()
catalogue_df_pandas['feature_5'] = catalogue_df_pandas['feature_5'].astype('float')
catalogue_df_pandas['element_uid'] = catalogue_df_pandas.index
mySchema = T.StructType([
    T.StructField("type", T.StringType(), True),
    T.StructField("availability", T.ArrayType(T.StringType()), True),
    T.StructField("duration", T.IntegerType(), True),
    T.StructField("feature_1", T.FloatType(), True),
    T.StructField("feature_2", T.FloatType(), True),
    T.StructField("feature_3", T.IntegerType(), True),
    T.StructField("feature_4", T.FloatType(), True),
    T.StructField("feature_5", T.FloatType(), True),
    T.StructField("attributes", T.ArrayType(T.IntegerType()), True), 
    T.StructField("element_uid", T.IntegerType(), True),
])

In [52]:
transactions_df = spark.read.option("header", True).csv("s3a://input/data/skew_transactions.csv")
catalogue_df = spark.createDataFrame(catalogue_df_pandas, mySchema)

In [53]:
catalogue_df.show()

+-----+--------------------+--------+-----------+----------+---------+---------+----------+--------------------+-----------+
| type|        availability|duration|  feature_1| feature_2|feature_3|feature_4| feature_5|          attributes|element_uid|
+-----+--------------------+--------+-----------+----------+---------+---------+----------+--------------------+-----------+
|movie|[purchase, rent, ...|     140|  1657223.4|0.75360966|       39|1.1194091|       0.0|[1, 2, 3, 4, 5, 6...|       1983|
|movie|[purchase, rent, ...|     110|3.5565208E7|0.76625377|       41|1.1386044| 0.6547074|[1, 26, 27, 28, 2...|       3783|
|movie|[purchase, rent, ...|      90|1.3270677E7|0.76542467|       27|1.1318073| 0.5927161|[1, 38, 39, 40, 7...|       5208|
|movie|[purchase, rent, ...|     120|2.1749918E7| 0.7578744|       26|1.1335255| 0.6547074|[1, 47, 48, 49, 5...|       9744|
|movie|    [purchase, rent]|     110|  9212964.0| 0.7595661|        7|1.1101274| 0.6547074|[1, 59, 60, 61, 6...|       1912|


In [65]:
result = transactions_df.join(catalogue_df, on="element_uid", how="left")

In [66]:
# Здесь необходимо вывести результат:
result.explain()
result.show(truncate=False)

== Physical Plan ==
*(4) Project [element_uid#171, user_uid#172, watched_time#173, type#177, availability#178, duration#179, feature_1#180, feature_2#181, feature_3#182, feature_4#183, feature_5#184, attributes#185]
+- *(4) SortMergeJoin [cast(element_uid#171 as int)], [element_uid#186], LeftOuter
   :- *(1) Sort [cast(element_uid#171 as int) ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(cast(element_uid#171 as int), 200), ENSURE_REQUIREMENTS, [plan_id=496]
   :     +- FileScan csv [element_uid#171,user_uid#172,watched_time#173] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://input/data/skew_transactions.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<element_uid:string,user_uid:string,watched_time:string>
   +- *(3) Sort [element_uid#186 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(element_uid#186, 200), ENSURE_REQUIREMENTS, [plan_id=507]
         +- *(2) Filter isnotnull(element_uid#186)
     

                                                                                

+-----------+--------+------------+---------------+------------------------------+--------+-----------+----------+---------+---------+----------+-------------------------------------------------------------------------------------------------------------+
|element_uid|user_uid|watched_time|type           |availability                  |duration|feature_1  |feature_2 |feature_3|feature_4|feature_5 |attributes                                                                                                   |
+-----------+--------+------------+---------------+------------------------------+--------+-----------+----------+---------+---------+----------+-------------------------------------------------------------------------------------------------------------+
|9324       |300486  |597         |movie          |[subscription]                |40      |100.62478  |0.74050003|8        |1.1369238|0.5927161 |[32144, 7, 32145, 1134, 14, 15, 1135, 25]                                              

В данных наблюдается проблема перекоса. Она возникает, когда некоторые ключи значительно более частые, чем остальные. Данная проблема может вызвать неравномерное распределение данных по узлам кластера, что приведет к следующим проблемам:
- **Неравномерная нагрузка**: Один узел может обрабатывать намного больше данных, чем другие, что вызывает узкие места и увеличивает общее время выполнения.
- **Повышенное потребление памяти**: Узлы, обрабатывающие перекошенные ключи, могут столкнуться с нехваткой памяти.

Покажем перекос в данных:

In [71]:
element_count_df = transactions_df.groupBy("element_uid").count()

element_count_df.orderBy(F.col("count").desc()).show()
print(f'Всего наблюдений: {transactions_df.count()}')

                                                                                

+-----------+-------+
|element_uid|  count|
+-----------+-------+
|       2714|2732800|
|        747|  61272|
|      10170|    299|
|       9898|    299|
|       9837|    299|
|       3469|    299|
|       8763|    299|
|       6796|    298|
|       4107|    298|
|       9108|    298|
|       2028|    298|
|       6591|    298|
|       6812|    298|
|       2231|    298|
|        311|    297|
|       8563|    297|
|        381|    297|
|       4874|    297|
|       7720|    297|
|       5858|    296|
+-----------+-------+
only showing top 20 rows

Всего наблюдений: 2938865


### Решение с оптимизацией

#### Решение 1 (Broadcast)

In [74]:
broadcast_result = transactions_df.join(F.broadcast(catalogue_df), transactions_df["element_uid"] == catalogue_df["element_uid"], "inner")

In [75]:
broadcast_result.explain()
broadcast_result.show(truncate=False)

== Physical Plan ==
*(2) BroadcastHashJoin [cast(element_uid#171 as int)], [element_uid#186], Inner, BuildRight, false
:- *(2) Filter isnotnull(element_uid#171)
:  +- FileScan csv [element_uid#171,user_uid#172,watched_time#173] Batched: false, DataFilters: [isnotnull(element_uid#171)], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://input/data/skew_transactions.csv], PartitionFilters: [], PushedFilters: [IsNotNull(element_uid)], ReadSchema: struct<element_uid:string,user_uid:string,watched_time:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[9, int, false] as bigint)),false), [plan_id=785]
   +- *(1) Filter isnotnull(element_uid#186)
      +- *(1) Scan ExistingRDD[type#177,availability#178,duration#179,feature_1#180,feature_2#181,feature_3#182,feature_4#183,feature_5#184,attributes#185,element_uid#186]




                                                                                

+-----------+--------+------------+---------------+------------------------------+--------+-----------+----------+---------+---------+----------+----------------------------------------------------------------------------------------------------------------------+-----------+
|element_uid|user_uid|watched_time|type           |availability                  |duration|feature_1  |feature_2 |feature_3|feature_4|feature_5 |attributes                                                                                                            |element_uid|
+-----------+--------+------------+---------------+------------------------------+--------+-----------+----------+---------+---------+----------+----------------------------------------------------------------------------------------------------------------------+-----------+
|6130       |563180  |3264        |movie          |[purchase, rent]              |90      |3182363.8  |0.76542467|8        |1.0882922|0.0       |[516, 1699, 4265, 7224, 

#### Решение 2 (Salt-Join)

In [85]:
def add_salt(df, num_salts):
    salt_expr = (F.col("element_uid") % num_salts).alias("salt")
    return df.withColumn("salt", salt_expr)

num_salts = 10
salted_transactions_df = add_salt(transactions_df, num_salts)
salted_catalogue_df = catalogue_df.withColumn("salt", F.round(F.col("element_uid") % num_salts))

salted_result = salted_transactions_df.join(salted_catalogue_df,
                                               (salted_transactions_df["element_uid"] == salted_catalogue_df["element_uid"]) & 
                                               (salted_transactions_df["salt"] == salted_catalogue_df["salt"]), "inner")

In [86]:
salted_result.explain()
salted_result.show(truncate=False)

== Physical Plan ==
*(5) SortMergeJoin [cast(element_uid#171 as int), knownfloatingpointnormalized(normalizenanandzero(salt#909))], [element_uid#186, knownfloatingpointnormalized(normalizenanandzero(cast(salt#914 as double)))], Inner
:- *(2) Sort [cast(element_uid#171 as int) ASC NULLS FIRST, knownfloatingpointnormalized(normalizenanandzero(salt#909)) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(cast(element_uid#171 as int), knownfloatingpointnormalized(normalizenanandzero(salt#909)), 200), ENSURE_REQUIREMENTS, [plan_id=1127]
:     +- *(1) Project [element_uid#171, user_uid#172, watched_time#173, (cast(element_uid#171 as double) % 10.0) AS salt#909]
:        +- *(1) Filter (isnotnull(element_uid#171) AND isnotnull((cast(element_uid#171 as double) % 10.0)))
:           +- FileScan csv [element_uid#171,user_uid#172,watched_time#173] Batched: false, DataFilters: [isnotnull(element_uid#171), isnotnull((cast(element_uid#171 as double) % 10.0))], Format: CSV, Location: InMemory



+-----------+--------+------------+----+-----+------------------------------+--------+-----------+---------+---------+---------+---------+----------------------------------------------------------------------------------+-----------+----+
|element_uid|user_uid|watched_time|salt|type |availability                  |duration|feature_1  |feature_2|feature_3|feature_4|feature_5|attributes                                                                        |element_uid|salt|
+-----------+--------+------------+----+-----+------------------------------+--------+-----------+---------+---------+---------+---------+----------------------------------------------------------------------------------+-----------+----+
|5858       |239666  |2812        |8.0 |movie|[purchase, rent, subscription]|130     |3.7613444E7|0.7514579|13       |1.096473 |0.5927161|[20141, 3008, 658, 20142, 113, 83, 3663, 52, 1840, 42, 14, 15, 17, 18, 19, 20, 55]|5858       |8   |
|5858       |460608  |463         |8.0 |movi

                                                                                

# Задание 3

## Входные данные 

*cut_transactions.csv*  — информация о длительности просомтра контента пользователям

Описание фичей в cut_transactions.csv: 
1. user_uid — уникальный идентификатор пользователя
2.  element_uid — уникальный идентификатор контента
3.  watched_time — время просмотра в секундах

*cut_ratings.csv*  — информация об оценках, поставленных пользователями

Описание фичей в cut_ratings.csv: 
1. user_uid — уникальный идентификатор пользователя 
2. element_uid — уникальный идентификатор контента 
3. rating — поставленный пользователем рейтинг

*ids.csv*  — выборка пользователей
Описание фичей в ids.csv: 
1. user_uid — уникальный идентификатор пользователя 


## Что нужно сделать
Для каждого пользователя из выборки посчитать:
1. Максимальное и минимальное время просмотра фильмов с оценками 8, 9 и 10 
2. Название фичи должно быть в формате feat_агрегирующая_функция_watched_time_rating_оценка. 
3. Если у пользователь не ставил оценки 8, 9 и 10 то значение фичей должно быть null
4. Описать принятые при разработки кода решения и возможные оптимизации

P.S. На каждом этапе обработки должно быть должны агрегироваться минимально возможные объемы данных (сокращаем затраты на shuflle)

In [87]:
transactions_df = spark.read.option("header", True).csv("s3a://input/data/cut_transactions.csv")
ratings_df = spark.read.option("header", True).csv("s3a://input/data/cut_ratings.csv")
ids_df = spark.read.option("header", True).csv("s3a://input/data/ids.csv")


### Решение

In [127]:
filtered_ratings_df = ratings_df.filter(F.col("rating").isin([8, 9, 10]))
joined_df = transactions_df.join(filtered_ratings_df, ["user_uid", "element_uid"], "inner")
aggregated_df = joined_df.groupBy("user_uid", "rating").agg(
    F.max("watched_time"),
    F.min("watched_time")
)
pivot_df = aggregated_df.groupBy("user_uid").pivot("rating").agg(
    F.max("max(watched_time)"),
    F.min("min(watched_time)")
)
renamed_df = pivot_df.select(
    F.col("user_uid"),
    F.col("8_max(max(watched_time))").alias("feat_max_watched_time_rating_8"),
    F.col("8_min(min(watched_time))").alias("feat_min_watched_time_rating_8"),
    F.col("9_max(max(watched_time))").alias("feat_max_watched_time_rating_9"),
    F.col("9_min(min(watched_time))").alias("feat_min_watched_time_rating_9"),
    F.col("10_max(max(watched_time))").alias("feat_max_watched_time_rating_10"),
    F.col("10_min(min(watched_time))").alias("feat_min_watched_time_rating_10")
)
result = F.broadcast(ids_df).join(renamed_df, "user_uid", "left")

                                                                                

In [129]:
result.explain()
result.show(truncate=False)

== Physical Plan ==
*(9) Project [user_uid#1083, feat_max_watched_time_rating_8#2501, feat_min_watched_time_rating_8#2502, feat_max_watched_time_rating_9#2503, feat_min_watched_time_rating_9#2504, feat_max_watched_time_rating_10#2505, feat_min_watched_time_rating_10#2506]
+- *(9) SortMergeJoin [user_uid#1083], [user_uid#1038], LeftOuter
   :- *(1) Sort [user_uid#1083 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(user_uid#1083, 200), ENSURE_REQUIREMENTS, [plan_id=5337]
   :     +- FileScan csv [user_uid#1083] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://input/data/ids.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_uid:string>
   +- SortAggregate(key=[user_uid#1038], functions=[max(if ((rating#1062 <=> 8)) max(watched_time)#2460 else null), min(if ((rating#1062 <=> 8)) min(watched_time)#2461 else null), max(if ((rating#1062 <=> 9)) max(watched_time)#2460 else null), min(if ((rating#1062 <=> 9)) min(watche

24/06/30 15:30:06 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for left outer join.
                                                                                

+--------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+
|user_uid|feat_max_watched_time_rating_8|feat_min_watched_time_rating_8|feat_max_watched_time_rating_9|feat_min_watched_time_rating_9|feat_max_watched_time_rating_10|feat_min_watched_time_rating_10|
+--------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+
|524044  |NULL                          |NULL                          |NULL                          |NULL                          |7743                           |7547                           |
|436138  |NULL                          |NULL                          |NULL                          |NULL                          |7652                           |7652                           |
|2414

#### Пояснения к решению:
1. **Фильтрация данных**: Сначала фильтруем данные по оценкам, чтобы уменьшить объем данных для последующих операций;
2. **Агрегация**: Выполняем агрегацию данных, чтобы посчитать минимальное и максимальное время просмотра для каждой оценки;
3. **Поворот данных (pivot)**: Преобразуем данные для получения нужного формата признаков;
4. **Broadcast Join**: Применяем для таблиц, которые достаточно малы, чтобы поместиться в памяти;