### Установка и импорт PySpark

In [53]:
!pip install pyspark



In [54]:
from pyspark.sql import SparkSession, DataFrame, functions
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [55]:
spark = SparkSession.builder.appName("TestPySpark").getOrCreate()

### Я решил создать датафреймы, чтобы проверить работу метода

In [56]:
# Таблица продуктов
products_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True)
])
products_data = [
    (1, "Молоко"),
    (2, "Хлеб"),
    (3, "Масло"),
    (4, "Яйца"),
    (5, "Сыр"),
    (6, "Сок"),
    (7, "Батон"),
    (8, "Яблоки")
]
products_df = spark.createDataFrame(products_data, schema=products_schema)

# Таблица категорий
categories_schema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("category_name", StringType(), True)
])
categories_data = [
    (1, "Молочные продукты"),
    (2, "Хлебобулочные изделия"),
    (3, "Фрукты"),
    (5, "Изделия из пшеницы"),
    (6, "Бакалея")
]
categories_df = spark.createDataFrame(categories_data, schema=categories_schema)

# Связи продуктов с категориями
schema_product_category = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("category_id", IntegerType(), True)
])
product_category_data = [(1, 1),
                      (2, 2),
                      (2, 5),
                      (3, 1),
                      (4, 4),
                      (5, 1),
                      (6, 4),
                      (7, 2),
                      (7, 5),
                      (8, 3)
                      ]
product_category_df = spark.createDataFrame(product_category_data, schema_product_category)

print("Products DataFrame:")
products_df.show()
print("Categories DataFrame:")
categories_df.show()
print("Categories and Products DataFrame:")
product_category_df.show()

Products DataFrame:
+----------+------------+
|product_id|product_name|
+----------+------------+
|         1|      Молоко|
|         2|        Хлеб|
|         3|       Масло|
|         4|        Яйца|
|         5|         Сыр|
|         6|         Сок|
|         7|       Батон|
|         8|      Яблоки|
+----------+------------+

Categories DataFrame:
+-----------+--------------------+
|category_id|       category_name|
+-----------+--------------------+
|          1|   Молочные продукты|
|          2|Хлебобулочные изд...|
|          3|              Фрукты|
|          5|  Изделия из пшеницы|
|          6|             Бакалея|
+-----------+--------------------+

Categories and Products DataFrame:
+----------+-----------+
|product_id|category_id|
+----------+-----------+
|         1|          1|
|         2|          2|
|         2|          5|
|         3|          1|
|         4|          4|
|         5|          1|
|         6|          4|
|         7|          2|
|         7|       

### Итоговый DataFrame  
Сначала объединяю таблицу **products_df** с промежуточной таблицей **product_category_df** (объединение *inner*). Далее объединяю с таблицей категорий **categories_df** (объединение *left*, т.к. в задании сказано, чтобы были все продукты, даже без категории).  
Далее выкидываю ненужные столбцы, переименовываю оставшиеся. Заменяю значения NULL на что-то более понятное.

In [57]:
result = products_df.join(product_category_df, "product_id").\
join(categories_df, "category_id", "left").drop("category_id", "product_id").\
orderBy(functions.asc("product_name"))

result = result.withColumnRenamed("product_name", "Продукт").\
withColumnRenamed("category_name", "Категория")

result = result.na.fill("--Без категории--", subset=["Категория"])

result.show(truncate=False)

+-------+---------------------+
|Продукт|Категория            |
+-------+---------------------+
|Батон  |Изделия из пшеницы   |
|Батон  |Хлебобулочные изделия|
|Масло  |Молочные продукты    |
|Молоко |Молочные продукты    |
|Сок    |--Без категории--    |
|Сыр    |Молочные продукты    |
|Хлеб   |Изделия из пшеницы   |
|Хлеб   |Хлебобулочные изделия|
|Яблоки |Фрукты               |
|Яйца   |--Без категории--    |
+-------+---------------------+



In [58]:
spark.stop()