In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

purchase_schema = StructType([
    StructField("customer", IntegerType(), True),
    StructField("product_model", StringType(), True)
])

purchase_data = [
    (1, "iphone13"),
    (1, "dell i5 core"),
    (2, "iphone13"),
    (2, "dell i5 core"),
    (3, "iphone13"),
    (3, "dell i5 core"),
    (1, "dell i3 core"),
    (1, "hp i5 core"),
    (1, "iphone14"),
    (3, "iphone14"),
    (4, "iphone13")
]

df1 = spark.createDataFrame(purchase_data, schema=purchase_schema)
df1.show()

# Schema for product_data
product_schema = StructType([
    StructField("product_model", StringType(), True)
])

product_data = [
    ("iphone13",),
    ("dell i5 core",),
    ("dell i3 core",),
    ("hp i5 core",),
    ("iphone14",)
]

df2 = spark.createDataFrame(product_data, schema=product_schema)
df2.show()

# -------------------------
# Questions to solve using the above DataFrames
# -------------------------

# 2. Find the customers who have bought only iphone13
df3 = df1.groupBy("customer") \
         .agg(collect_set("product_model").alias("product_model")) \
         .filter((size("product_model") == 1) & (array_contains("product_model", "iphone13")))
df3.show()

# 3. Find customers who upgraded from product iphone13 to product iphone14
df4 = df1.groupBy("customer") \
         .agg(collect_set("product_model").alias("product_model")) \
         .filter(arrays_overlap("product_model", array(lit("iphone14"), lit("iphone13"))))
display(df4)


# 4. Find customers who have bought all models in the new Product Data
df5 = df1.groupBy("customer") \
         .agg(collect_set("product_model").alias("product_model")) \
         .filter(size("product_model") == lit(df2.count()))


df5.show()