In [1]:
! pip install pyspark
! pip install pyarrow

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=9519ad39c596a2268499e912c75ff0a6106a3930b3778f8ad5241b2e913937e2
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"


spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark

In [3]:
spark = SparkSession.builder.appName("ProductCategoryAnalysis") .getOrCreate()

In [4]:
products_data = [("1", "Product1"),
                 ("2", "Product2"),
                 ("3", "Product3")]
products_schema = ["ProductID", "ProductName"]
products_df = spark.createDataFrame(data=products_data, schema=products_schema)

In [5]:
categories_data = [("1", "Category1"),
                   ("2", "Category2")]
categories_schema = ["ProductID", "CategoryName"]
categories_df = spark.createDataFrame(data=categories_data, schema=categories_schema)

In [6]:
from pyspark.sql.functions import col

In [10]:
def get_product_category_pairs_with_empty_products(products_df, categories_df):
    joined_df = products_df.join(categories_df, products_df['ProductID'] == categories_df['ProductID'], how='left_outer')

    result_df = joined_df.select(products_df['ProductName'], categories_df['CategoryName']) \
                         .withColumnRenamed('ProductName', 'Product') \
                         .withColumnRenamed('CategoryName', 'Category')
    result_df1 = result_df.filter(~(col('Category').isNull()))
    empty_products_df = result_df.filter(col('Category').isNull()).select('Product')

    return result_df1, empty_products_df

In [11]:
result_df, empty_products_df = get_product_category_pairs_with_empty_products(products_df, categories_df)

print("Product-Category pairs:")
result_df.show()

print("Products without categories:")
empty_products_df.show()

spark.stop()

Product-Category pairs:
+--------+---------+
| Product| Category|
+--------+---------+
|Product1|Category1|
|Product2|Category2|
+--------+---------+

Products without categories:
+--------+
| Product|
+--------+
|Product3|
+--------+

