# Анализ связей «Продукт–Категория» в PySpark

# Подключение сделано скорее с целью демонстрации

In [38]:
%pip install pyspark findspark

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'C:\Users\ratmir\source\repos\Mindbox-JuniorDE-Test-Task-PySpark\venv\Scripts\python.exe -m pip install --upgrade pip' command.


In [39]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Product-Category Analysis").getOrCreate()

In [40]:
df_products = spark.read.csv("products.csv", header=True, inferSchema=True)

In [41]:
df_categories = spark.read.csv("categories.csv", header=True, inferSchema=True)

In [42]:
df_product_categories = spark.read.csv("product_categories.csv", header=True, inferSchema=True)

# Требуемая по заданию функция

In [45]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col


def get_product_category_pairs(
    products: DataFrame,
    product_categories: DataFrame,
    categories: DataFrame
) -> DataFrame:
    """
    Возвращает датафрейм со всеми парами (Имя продукта – Имя категории),
    а для продуктов без категорий выводит строки в формате (Имя строки - NULL)

    :param products: df товаров
    :param product_categories: df связей категорий и товаров по id
    :param categories: df категорий
    :return: DataFrame(product_name STRING, category_name STRING)
    """
    return (
        products.alias("p")
        .join(
            product_categories.alias("pc"),
            col("p.id") == col("pc.product_id"),
            how="left"
        )
        .join(
            categories.alias("c"),
            col("pc.category_id") == col("c.id"),
            how="left"
        )
        .select(
            col("p.name").alias("product_name"),
            col("c.name").alias("category_name")
        )
    )

# Запуск демонстрации

In [46]:
products = df_products.withColumn("id", col("id").cast("long"))
categories = df_categories.withColumn("id", col("id").cast("long"))
product_categories = df_product_categories \
    .withColumn("product_id", col("product_id").cast("long")) \
    .withColumn("category_id", col("category_id").cast("long"))

result_df = get_product_category_pairs(products, product_categories, categories)

result_df.show(n=100, truncate=False)
#spark.stop()

+--------------------+-------------+
|product_name        |category_name|
+--------------------+-------------+
|Смартфон Samsung    |Канцтовары   |
|Смартфон Samsung    |Техника      |
|Яблоки              |Продукты     |
|Джинсы Levi's       |Одежда       |
|Ноутбук Lenovo      |Техника      |
|Молоко Простоквашино|Продукты     |
|Футболка Nike       |Одежда       |
|Пылесос Dyson       |Техника      |
|Платье Mango        |Одежда       |
|Сырники             |Продукты     |
|Сок Добрый          |Продукты     |
|Телевизор LG        |Техника      |
|Кроссовки Adidas    |Одежда       |
|Пальто Zara         |Одежда       |
|Бананы              |Продукты     |
|Куртка Columbia     |Одежда       |
|Хлеб Бородинский    |Продукты     |
|Куриное филе        |Продукты     |
|Сыр Российский      |Продукты     |
|Ручка Parker        |Канцтовары   |
|Блокнот Moleskine   |NULL         |
+--------------------+-------------+

