#### list the details the sold products and the details of the customer who buy them


In [0]:
customer = spark.read.csv('/mnt/transformed-data/walmart/customer/',inferSchema=True,header=True)
invoicedetails = spark.read.csv('/mnt/transformed-data/walmart/invoicedetails/',inferSchema=True,header=True)
buys = spark.read.csv('/mnt/transformed-data/walmart/buys/',inferSchema=True,header=True)
product = spark.read.csv('/mnt/transformed-data/walmart/product/',inferSchema=True,header=True)

# list the details the sold products and the details of the customer who buy them
result_df = customer \
    .join(invoicedetails, customer["customer_id"] == invoicedetails["cust_id"], "inner") \
    .join(buys, invoicedetails["inv_id"] == buys["invoice_id"], "inner") \
    .join(product, buys["product_id"] == product["product_id"], "inner") \
    .select(
        customer["customer_id"],
        customer["customer_name"],
        buys["invoice_id"],
        product["product_id"],
        product["product_type"],
        product["brand"]
    )

result_df.show()



#### list the top 5 highest sold product


In [0]:
from pyspark.sql.functions import col, count

joined_df = buys.join(product, buys["product_id"] == product["product_id"], "inner")

# Group by product_id and count the occurrences
result_df = joined_df.groupBy(buys["product_id"], product["brand"]) \
    .agg(count("*").alias("count")) \
    .orderBy(col("count").desc()) \
    .limit(10)

result_df.show()


#### find the top 5 customers with the most product counts


In [0]:
from pyspark.sql.functions import col, count, dense_rank
from pyspark.sql.window import Window


top_5_customers = customer.join(invoicedetails, customer["customer_id"] == invoicedetails["cust_id"], "inner") \
    .join(buys, invoicedetails["inv_id"] == buys["invoice_id"], "inner") \
    .join(product, buys["product_id"] == product["product_id"], "inner") \
    .groupBy(customer["customer_id"], customer["customer_name"]) \
    .agg(count("*").alias("product_count")) \
    .withColumn("cust_rank", dense_rank().over(Window.orderBy(col("product_count").desc())))

# retrieve the top 5 customers
result_df = top_5_customers.filter(col("cust_rank") <= 5) \
    .select(
        top_5_customers["customer_id"],
        top_5_customers["customer_name"],
        top_5_customers["product_count"],
        top_5_customers["cust_rank"]
    )

result_df.show()
