In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_set, lit

In [None]:
spark = SparkSession.builder.appName("product_categories").getOrCreate()

In [None]:
# DataFrame с продуктами
products = spark.createDataFrame(
    [
        (0, "Product A"),
        (1, "Product B"),
        (2, "Product C"),
        (3, "Product D")
    ],
    ["product_id", "product_name"]
)

# DataFrame с категориями
categories = spark.createDataFrame(
    [
        (0, "Category 1"),
        (1, "Category 2"),
        (2, "Category 3"),
        (3, "Category 4")
    ],
    ["category_id", "category_name"]
)

# DataFrame с связями между продуктами и категориями
product_categories = spark.createDataFrame(
    [
        (0, 0),
        (0, 1),
        (1, 2),
        (1, 3),
        (2, 0),
        (2, 2)
    ],
    ["product_id", "category_id"]
)

In [None]:
#   Метод для получения всех пар "Имя продукта - Имя категории"
#    и имен продуктов без категорий.
def get_product_categories(products, categories, product_categories):
    # Объединяем DataFrame с продуктами и категориями
    joined_df = products.join(
        product_categories,
        products.product_id == product_categories.product_id,
        "left"
    ).join(
        categories,
        product_categories.category_id == categories.category_id,
        "left"
    )

    # Собираем все категории для каждого продукта
    product_categories_df = joined_df.groupby("product_name").agg(collect_set("category_name").alias("categories"))

    # Фильтруем продукты без категорий
    products_without_categories = products.join(
        product_categories_df,
        products.product_name == product_categories_df.product_name,
        "leftanti"
    ).select("product_name")

    # Создаем отдельные DataFrame для продуктов с категориями и без
    with_categories_df = product_categories_df.selectExpr("product_name", "explode(categories) as category_name")
    without_categories_df = products_without_categories.withColumn("category_name", lit(None))

    # Объединяем DataFrame с продуктами с категориями и без
    result_df = with_categories_df.union(without_categories_df)

    return result_df

In [None]:
result_df = get_product_categories(products, categories, product_categories)
result_df.show()

In [None]:
spark.stop()