**Задание**

В PySpark приложении датафреймами(pyspark.sql.DataFrame) заданы продукты, категории и их связи. Каждому продукту может соответствовать несколько категорий или ни одной. А каждой категории может соответствовать несколько продуктов или ни одного. Напишите метод на PySpark, который в одном датафрейме вернет все пары «Имя продукта – Имя категории» и имена всех продуктов, у которых нет категорий.

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ca47b4e6cb64d91a51bea8a0b432ff5c5228af6c3217315e90dc2acd513e398a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

In [33]:
my_pyspark = SparkSession.builder.appName("ProductCategoryPairs").getOrCreate()

In [34]:
products = [
    ("Продукт 1", "1"),
    ("Продукт 2", "1"),
    ("Продукт 3", None),
    ("Продукт 4", "2"),
    ("Продукт 5", None),
]

categories = [
    ("1", "Категория 1"),
    ("2", "Категория 2"),
    ("3", "Категория 3"),
]

In [35]:
# создаем исходные датафреймы
products_df = my_pyspark.createDataFrame(products, ["product", "category"])
categories_df = my_pyspark.createDataFrame(categories, ["category", "category_name"])

In [36]:
# объединяем данные
joined_df = products_df.join(categories_df, on="category", how="left")

In [37]:
# фильтруем
products_without_category_df = joined_df.filter(col("category").isNull())

In [38]:
# формируем результирующий датафрейм
result_df = joined_df.select(
    col("product").alias("Имя продукта"),
    when(col("category_name").isNull(), "Нет категории").otherwise(col("category_name")).alias("Имя категории")
).distinct()

In [39]:
result_df.show(truncate=False)

+------------+-------------+
|Имя продукта|Имя категории|
+------------+-------------+
|Продукт 1   |Категория 1  |
|Продукт 5   |Нет категории|
|Продукт 4   |Категория 2  |
|Продукт 2   |Категория 1  |
|Продукт 3   |Нет категории|
+------------+-------------+



In [40]:
my_pyspark.stop()