In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, coalesce

In [2]:
# создание spark сессии
spark = SparkSession.builder.getOrCreate()

In [3]:
# тестовые данные продуктов
products_data = [
    (1, "Product A"),
    (2, "Product B"),
    (3, "Product C"),
    (4, "Product D")
]
products_df = spark.createDataFrame(products_data, ["product_id", "product_name"])
products_df.show()

+----------+------------+
|product_id|product_name|
+----------+------------+
|         1|   Product A|
|         2|   Product B|
|         3|   Product C|
|         4|   Product D|
+----------+------------+



In [4]:
# тестовые данные категорий
categories_data = [
    (1, "Category X"),
    (2, "Category Y"),
    (3, "Category Z"),
    (4, "Category W")
]
categories_df = spark.createDataFrame(categories_data, ["category_id", "category_name"])
categories_df.show()

+-----------+-------------+
|category_id|category_name|
+-----------+-------------+
|          1|   Category X|
|          2|   Category Y|
|          3|   Category Z|
|          4|   Category W|
+-----------+-------------+



In [5]:
# тестовые данные связей продуктов и категорий
product_category_data = [
    (1, 1),
    (1, 2),
    (2, 2),
    (3, 3),
]
product_category_df = spark.createDataFrame(product_category_data, ["product_id", "category_id"])
product_category_df.show()

+----------+-----------+
|product_id|category_id|
+----------+-----------+
|         1|          1|
|         1|          2|
|         2|          2|
|         3|          3|
+----------+-----------+



In [6]:
def get_product_category_pairs(products_df, categories_df, product_category_df):
    # объединение продуктов и связей с категориями
    product_with_category = products_df.join(
        product_category_df,
        products_df.product_id == product_category_df.product_id,
        "left"
    )
    
    # объединение результов с категориями
    result = product_with_category.join(
        categories_df,
        product_with_category.category_id == categories_df.category_id,
        "left"
    )
    
    # выбор нужных колонок и замена null на "No Category"
    final_result = result.select(
        col("product_name"),
        coalesce(col("category_name"), lit("No Category")).alias("category_name")
    ).distinct()
    
    return final_result

In [7]:
# вызов функции
result_df = get_product_category_pairs(products_df, categories_df, product_category_df)

# вывод результов
result_df.show()

+------------+-------------+
|product_name|category_name|
+------------+-------------+
|   Product A|   Category X|
|   Product C|   Category Z|
|   Product D|  No Category|
|   Product A|   Category Y|
|   Product B|   Category Y|
+------------+-------------+

