In [2]:
# 17.09.2023 Mikhail Porokhnya

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [4]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [5]:
import findspark

In [6]:
findspark.init('spark-3.5.0-bin-hadoop3')

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# creating SparkSession
spark = SparkSession.builder.appName("ProductCategory").getOrCreate()

# example DataFrame with data
data = [("Product1", ["Category1", "Category2"]),
        ("Product2", ["Category2", "Category3"]),
        ("Product3", [])]

schema = ["Product_name", "Categories"]

df = spark.createDataFrame(data, schema)

# split the list of categories into separate lines
df_exploded = df.select(col("Product_name"), explode(col("Categories")).alias("Categories"))

# cast to one data type
df_exploded = df_exploded.withColumn("Categories", df_exploded["Categories"].cast("string"))

# create a DataFrame with products without categories
df_with_empty_categories = df.filter((col("Categories").cast("string") == "[]"))

# cast to one data type
df_with_empty_categories = df_with_empty_categories.withColumn("Categories", df_with_empty_categories["Categories"].cast("string"))

# merging two DataFrames
result_df = df_exploded.union(df_with_empty_categories)

# display the result
result_df.show()


+------------+----------+
|Product_name|Categories|
+------------+----------+
|    Product1| Category1|
|    Product1| Category2|
|    Product2| Category2|
|    Product2| Category3|
|    Product3|        []|
+------------+----------+

