Service

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from typing import Tuple


class ProductCategoryService:
    """
    Service for working with products and categories.
    Returns:
        - All pairs "Product name — Category name"
        - All products without categories
    """

    def __init__(self, products_df: DataFrame, categories_df: DataFrame, product_category_df: DataFrame):
        """
        :param products_df: DataFrame with columns [product_id, product_name]
        :param categories_df: DataFrame with columns [category_id, category_name]
        :param product_category_df: DataFrame with columns [product_id, category_id]
        """
        self.products_df = products_df
        self.categories_df = categories_df
        self.product_category_df = product_category_df

        self._validate_inputs()

    def _validate_inputs(self):
        required_product_cols = {"product_id", "product_name"}
        required_category_cols = {"category_id", "category_name"}
        required_mapping_cols = {"product_id", "category_id"}

        def check_columns(df: DataFrame, required: set[str], name: str):
            actual = set(df.columns)
            missing = required - actual
            if missing:
                raise ValueError(f"{name} missing required columns: {missing}")

        check_columns(self.products_df, required_product_cols, "products_df")
        check_columns(self.categories_df, required_category_cols, "categories_df")
        check_columns(self.product_category_df, required_mapping_cols, "product_category_df")

    def get_product_category_data(self) -> Tuple[DataFrame, DataFrame]:
        """
        Returns:
        - DataFrame with all pairs "Product name — Category name"
        - DataFrame with all products without categories
        """
        product_categories = (
            self.product_category_df
            .join(self.categories_df, on="category_id", how="left")
            .join(self.products_df, on="product_id", how="left")
            .select(
                col("product_name"),
                col("category_name")
            )
        )

        products_with_categories = self.product_category_df.select("product_id").distinct()
        products_without_categories = (
            self.products_df
            .join(products_with_categories, on="product_id", how="left_anti")
            .select("product_name")
        )

        return product_categories, products_without_categories


Data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("ProductCategoryTest").getOrCreate()

# ----------------------------
# products
# ----------------------------
products_data = [
    (1, "iPhone 14"),
    (2, "Samsung Galaxy S23"),
    (3, "MacBook Air"),
    (4, "Xiaomi Redmi"),
    (5, "Orange")
]

products_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("product_name", StringType(), False)
])

products_df = spark.createDataFrame(products_data, schema=products_schema)

# ----------------------------
# Categories
# ----------------------------
categories_data = [
    (1, "Smartphones"),
    (2, "Laptops"),
    (3, "Apple Products"),
    (4, "Automobile"),
]

categories_schema = StructType([
    StructField("category_id", IntegerType(), False),
    StructField("category_name", StringType(), False)
])

categories_df = spark.createDataFrame(categories_data, schema=categories_schema)

# ----------------------------
# M-M Products and Categories
# ----------------------------
product_category_data = [
    (1, 1),  # iPhone 14 -> Smartphones
    (1, 3),  # iPhone 14 -> Apple Products
    (2, 1),  # Samsung Galaxy -> Smartphones
    (3, 2),  # MacBook Air -> Laptops
    (3, 3),  # MacBook Air -> Apple Products
    (4, 1)   # Xiaomi -> Smartphones
]

product_category_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("category_id", IntegerType(), False)
])

product_category_df = spark.createDataFrame(product_category_data, schema=product_category_schema)



print("Products:")
products_df.show()

print("Categories:")
categories_df.show()

print("Product-Category Links:")
product_category_df.show()

Example

In [None]:
service = ProductCategoryService(products_df, categories_df, product_category_df)

pairs_df, no_category_df = service.get_product_category_data()

print("With category")
pairs_df.show(truncate=False)
print("Without category")
no_category_df.show(truncate=False)