1398. Customers Who Bought Products A and B but Not C
### Table: Customers

| Column Name   | Type    |
|---------------|---------|
| customer_id   | int     |
| customer_name | varchar |

customer_id is the primary key for this table.  
customer_name is the name of the customer.

---

### Table: Orders

| Column Name   | Type    |
|---------------|---------|
| order_id      | int     |
| customer_id   | int     |
| product_name  | varchar |

order_id is the primary key for this table.  
customer_id is the id of the customer who bought the product "product_name".

---

Write an SQL query to report the customer_id and customer_name of customers who bought products "A", "B" but did not buy the product "C" since we want to recommend them buy this product.

Return the result table ordered by customer_id.

---

### Customers table:

| customer_id | customer_name |
|-------------|----------------|
| 1           | Daniel         |
| 2           | Diana          |
| 3           | Elizabeth      |
| 4           | Jhon           |

---

### Orders table:

| order_id | customer_id | product_name |
|----------|-------------|--------------|
| 10       | 1           | A            |
| 20       | 1           | B            |
| 30       | 1           | D            |
| 40       | 1           | C            |
| 50       | 2           | A            |
| 60       | 3           | A            |
| 70       | 3           | B            |
| 80       | 3           | D            |
| 90       | 4           | C            |

---

### Result table:

| customer_id | customer_name |
|-------------|----------------|
| 3           | Elizabeth      |

---

**Explanation:**  
Only the customer_id with id 3 bought the product A and B but not the product C.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, collect_set, array_contains

# Start Spark session
spark = SparkSession.builder.appName("ProductRecommendation").getOrCreate()

# Define schemas
customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("customer_name", StringType(), True)
])

orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_name", StringType(), True)
])

# Sample data
customers_data = [
    (1, "Daniel"),
    (2, "Diana"),
    (3, "Elizabeth"),
    (4, "Jhon")
]

orders_data = [
    (10, 1, "A"),
    (20, 1, "B"),
    (30, 1, "D"),
    (40, 1, "C"),
    (50, 2, "A"),
    (60, 3, "A"),
    (70, 3, "B"),
    (80, 3, "D"),
    (90, 4, "C")
]

# Create DataFrames
customers_df = spark.createDataFrame(customers_data, customers_schema)
orders_df = spark.createDataFrame(orders_data, orders_schema)


#create tempview
customers_df.createOrReplaceTempView("customers")
orders_df.createOrReplaceTempView("orders")

In [0]:
%sql

Select distinct o.customer_id , customer_name   from orders o inner join customers on o.customer_id=customers.customer_id
where 
o.customer_id in (Select distinct customer_id from orders where product_name='A' ) 
and 
o.customer_id in (Select distinct customer_id from orders where product_name='B' ) 
and 
o.customer_id not in (Select distinct customer_id from orders where product_name='C' ) 



In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
df_a = orders_df.filter(col("product_name")== "A").selectExpr("customer_id as cust_a_id")
df_b = orders_df.filter(col("product_name")== "B").selectExpr("customer_id as cust_b_id")
df_c = orders_df.filter(col("product_name")== "C").selectExpr("customer_id as cust_c_id")

df_interim = df_a.join(df_b, col("cust_a_id")==col("cust_b_id"),"inner").selectExpr("cust_a_id as cust_id")
df_filtered =df_interim.join(df_c , col("cust_id")==col("cust_c_id"),"leftanti").selectExpr("cust_id as f_cust_id")
df_filtered.join(customers_df , col("f_cust_id")==col("customer_id"),"inner").select("customer_id","customer_name").display()

In [0]:

# Aggregate products per customer
from pyspark.sql.functions import collect_set
product_df = orders_df.groupBy("customer_id") \
    .agg(collect_set("product_name").alias("products"))

# Filter customers who bought A and B but not C
filtered_df = product_df.filter(
    array_contains(col("products"), "A") &
    array_contains(col("products"), "B") &
    ~array_contains(col("products"), "C")
)

# Join with customer names
result_df = filtered_df.join(customers_df, on="customer_id") \
    .select("customer_id", "customer_name") \
    .orderBy("customer_id")

# Display result
display(result_df)