In [47]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col, when

spark = SparkSession.builder.appName("ProductCategoryApp").getOrCreate()

products_df = spark.read.csv("products.csv", header=True, inferSchema=True)
categories_df = spark.read.csv("categories.csv", header=True, inferSchema=True)
product_category_mapping_df = spark.read.csv("product_category_mapping.csv", header=True, inferSchema=True)


def get_product_category_info():

    product_category_info_df = products_df.join(
        product_category_mapping_df, on="product_id", how="left"
    ).join(
        categories_df, on="category_id", how="full"
    ).select(
        "product_name", "category_name"
    ).orderBy(when(col("product_name").isNull(), 1).otherwise(0), col("product_name").asc())

    return product_category_info_df

product_category_info_df = get_product_category_info()

product_category_info_df.show()

spark.stop()

+------------+-------------+
|product_name|category_name|
+------------+-------------+
|   Product A|   Category X|
|   Product A|   Category Y|
|   Product B|   Category X|
|   Product B|   Category Z|
|   Product C|   Category Y|
|   Product D|   Category X|
|   Product E|         NULL|
|        NULL|   Category W|
|        NULL|   Category U|
+------------+-------------+

