In [None]:
!pip install graphframes

In [None]:
# Import libraries

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list
from pyspark.ml.feature import HashingTF, MinHashLSH
from graphframes import GraphFrame

In [None]:
# Start Spark session with GraphFrames support
spark = SparkSession.builder.appName("BigDataProject").config("spark.jars.packages", "graphframes:graphframes:0.8.3-spark3.4-s_2.12").getOrCreate()

# GraphFrames needs a checkpoint dir
spark.sparkContext.setCheckpointDir("/tmp/graphframes_checkpoint")

In [None]:
# Load CSV datasets
orders = spark.read.csv("orders.csv", header=True, inferSchema=True)
order_prior = spark.read.csv("order_products__prior.csv", header=True, inferSchema=True)
order_train = spark.read.csv("order_products__train.csv", header=True, inferSchema=True)
products = spark.read.csv("products.csv", header=True, inferSchema=True)
aisles = spark.read.csv("aisles.csv", header=True, inferSchema=True)
departments = spark.read.csv("departments.csv", header=True, inferSchema=True)

# Save all loaded DataFrames to Parquet format for faster I/O later
products.write.parquet("products.parquet", mode="overwrite")
aisles.write.parquet("aisles.parquet", mode="overwrite")
departments.write.parquet("departments.parquet", mode="overwrite")
orders.write.parquet("orders.parquet", mode="overwrite")
order_prior.write.parquet("order_products__prior.parquet", mode="overwrite")
order_train.write.parquet("order_products__train.parquet", mode="overwrite")

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, count

# Check for missing values in all datasets
for df, name in [
    (orders, "orders"),
    (order_prior, "order_prior"),
    (order_train, "order_train"),
    (products, "products"),
    (aisles, "aisles"),
    (departments, "departments")
]:
    print(f"Missing values in {name}:")
    df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Check for fully duplicate rows in each dataset
for df, name in [
    (orders, "orders"),
    (order_prior, "order_prior"),
    (order_train, "order_train"),
    (products, "products"),
    (aisles, "aisles"),
    (departments, "departments")
]:
    dup_count = df.groupBy(df.columns).count().filter("count > 1").count()
    print(f"Duplicate rows in {name}: {dup_count}")


# There are duplicate values not shown here, such as product ID,
# but it is normal since one product can be ordered multiple times by the same or different customer

In [None]:
# Join prior orders with order metadata
merged_prior = order_prior.join(orders, on="order_id", how="inner")

In [None]:
# Join train orders with order metadata
merged_train = order_train.join(orders, on="order_id", how="inner")

In [None]:
# Combine both into one dataset (prior + train)
stacked_df = merged_train.union(merged_prior)

In [None]:
# Repartition for performance and cache for repeated use
stacked_df = stacked_df.repartition("user_id").cache()
stacked_df.count()

In [None]:
# Check how many unique products we have
stacked_df.select("product_id").distinct().count()

In [None]:
# Check how many times they were reordered
from pyspark.sql.functions import count

most_popular_products = (
    stacked_df.groupBy("product_id").count()
    .join(products, on="product_id", how="inner")
    .sort("count", ascending=False)
    .limit(20)
)

most_popular_products.show()

In [None]:
# Convert Spark DataFrame to Pandas
top_products_pd = most_popular_products.toPandas()

top_products_pd = top_products_pd.sort_values("count", ascending=True)

# Plot
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 8))
plt.barh(top_products_pd["product_name"], top_products_pd["count"], color='green')
plt.xlabel("Number of Orders")
plt.title("Top 20 Most Popular Products")
plt.tight_layout()
plt.show()

In [None]:
# Print the 20 least popular products

least_popular_products = (
    stacked_df.groupBy("product_id").count()
    .join(products, on="product_id", how="inner")
    .sort("count", ascending=True)
    .limit(20)
)

least_popular_products.show()

In [None]:
# Convert Spark DataFrame to Pandas
least_products_pd = least_popular_products.toPandas()

# Sort by count (just for display order)
least_products_pd = least_products_pd.sort_values("count", ascending=True)

# Plot
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 8))
plt.barh(least_products_pd["product_name"], least_products_pd["count"], color='tomato')
plt.xlabel("Number of Orders")
plt.title("20 Least Popular Products")
plt.tight_layout()
plt.show()


In [None]:
# Check how many orders happen per hour
stacked_df.groupBy("order_hour_of_day") \
    .agg(count("order_id").alias("count")) \
    .sort("count", ascending=True) \
    .show()

In [None]:
# Count number of orders for each day of week and hour of day
heatmap_df = stacked_df.groupBy("order_dow", "order_hour_of_day") \
    .agg(count("*").alias("order_count"))

heatmap_pd = heatmap_df.toPandas()

# Create pivot table: rows = hour, columns = day
heatmap_pivot = heatmap_pd.pivot(index="order_hour_of_day", columns="order_dow", values="order_count")


# Plot
plt.figure(figsize=(12, 6))
sns.heatmap(heatmap_pivot, cmap="YlOrRd", cbar_kws={"label": "Number of Orders"})

plt.title("Order Volume by Day of Week and Hour of Day")
plt.xlabel("Day of Week (0 = Sunday)")
plt.ylabel("Hour of Day")
plt.xticks(rotation=0)
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

In [None]:
# Join stacked_df with products to get department_id for each product
df_departments = stacked_df.join(products, on="product_id", how="inner")

department_popularity = df_departments.groupBy("department_id") \
    .agg(count("*").alias("total_orders")) \
    .orderBy("total_orders", ascending=False)

most_popular_departments = department_popularity.join(departments, on="department_id", how="inner")
most_popular_departments.show()

# Convert to Pandas
dept_pd = most_popular_departments.toPandas().sort_values("total_orders", ascending=True)

# Plot
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.barh(dept_pd["department"], dept_pd["total_orders"], color="purple")
plt.xlabel("Number of Orders")
plt.title("Most Popular Departments")
plt.tight_layout()
plt.show()


In [None]:
# To check which aisles are the most popular
df_with_aisles = stacked_df.join(products, on="product_id", how="inner")

aisle_popularity = df_with_aisles.groupBy("aisle_id") \
    .agg(count("*").alias("total_orders")) \
    .orderBy("total_orders", ascending=False)

most_popular_aisles = aisle_popularity.join(aisles, on="aisle_id", how="inner")
most_popular_aisles.orderBy("total_orders", ascending=False).show()

# Convert to pandas and keep the top 25 for plotting
aisle_pd = most_popular_aisles.toPandas().sort_values("total_orders", ascending=False).head(25)

# Plot
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 8))
plt.barh(aisle_pd["aisle"], aisle_pd["total_orders"], color="teal")
plt.xlabel("Number of Orders")
plt.title("Most Popular Aisles")
plt.tight_layout()
plt.show()

In [None]:
# Top selling aisles within each department
df_joined = stacked_df.join(products, "product_id", "inner") \
                      .join(aisles, "aisle_id", "inner") \
                      .join(departments, "department_id", "inner")

# Create treemap data: total orders per (department, aisle) pair
df_treemap = df_joined.groupBy("department", "aisle") \
                      .agg(count("*").alias("total_orders"))

treemap_pd = df_treemap.toPandas()

import plotly.express as px

fig = px.treemap(
    treemap_pd,
    path=["department", "aisle"],
    values="total_orders",
    color="total_orders",  # Color by volume
    color_continuous_scale="YlGnBu",
    title="Top-Selling Aisles Within Each Department"
)

fig.show()

In [None]:
# Group by reordered
reorder_counts = stacked_df.groupBy("reordered") \
    .agg(count("*").alias("count")) \
    .orderBy("reordered")

# Convert to Pandas
reorder_pd = reorder_counts.toPandas()

In [None]:
# Plot directly without .tolist()
plt.figure(figsize=(6, 4))
plt.bar(reorder_pd["reordered"], reorder_pd["count"], color=["lightcoral", "mediumseagreen"])
plt.title("Reordered vs Not Reordered")
plt.xlabel("Reordered Flag (0 = No, 1 = Yes)")
plt.ylabel("Number of Products")
plt.xticks([0, 1], ["Not Reordered", "Reordered"])
plt.tight_layout()
plt.show()

In [None]:
from pyspark.sql.functions import sum

# Get total reorder count per product (only where reordered = 1)
reordered_counts = stacked_df.filter(stacked_df.reordered == 1) \
    .groupBy("product_id") \
    .agg(sum("reordered").alias("reorder_count")) \
    .orderBy("reorder_count", ascending=False)

# Add product names to reorder counts
most_reordered_products = reordered_counts.join(products, on="product_id", how="inner")

# Keep only the top 20 most reordered products
top_reordered = most_reordered_products.limit(20)

# Convert to pandas
top_reordered_pd = top_reordered.toPandas().sort_values("reorder_count", ascending=True)

# Plot
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 8))
plt.barh(top_reordered_pd["product_name"], top_reordered_pd["reorder_count"], color="seagreen")
plt.xlabel("Reorder Count")
plt.title("Top 20 Most Reordered Products")
plt.tight_layout()
plt.show()


In [None]:
# Group by days_since_prior_order and order_dow
grouped_orders = stacked_df.groupBy("days_since_prior_order", "order_dow") \
    .agg(count("*").alias("order_count"))

# Convert to pandas
grouped_pd = grouped_orders.toPandas()

# Pivot so days_since_prior_order becomes index, order_dow becomes columns
pivot_df = grouped_pd.pivot(index="days_since_prior_order", columns="order_dow", values="order_count")

# Set column order to match days (0 = Sunday to 6 = Saturday)
pivot_df = pivot_df[[0, 1, 2, 3, 4, 5, 6]]

# Plot
pivot_df.plot(
    kind="bar",
    stacked=True,
    figsize=(14, 6),
    colormap="tab10"
)

plt.title("Orders by Days Since Prior Order (stacked by Day of Week)")
plt.xlabel("Days Since Prior Order")
plt.ylabel("Number of Orders")
plt.legend(title="Day of Week (0=Sunday)", bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.show()

In [None]:
# Compute the average cart position for each product
avg_cart_position = order_prior.groupBy("product_id") \
    .agg(F.avg("add_to_cart_order").alias("avg_cart_position"))

# Compute the total orders per product
total_orders = order_prior.groupBy("product_id") \
    .agg(F.count("order_id").alias("total_orders"))

# Compute the total reorders
total_reorders = order_prior.groupBy("product_id").agg(
    F.sum("reordered").alias("total_reorders")
)

# Join these features together into one DataFrame
features = avg_cart_position.join(total_orders, on="product_id", how="inner")
features1 = features.join(total_reorders, on="product_id", how="inner")
features1.show()

In [None]:
# Get all unique products that were reordered in the training set and assign label 1
reordered_products = order_train.filter("reordered = 1") \
    .select("product_id").distinct() \
    .withColumn("label", F.lit(1))

# Join product features with the reordered labels, filling missing labels with 0
labeled_df = features1.join(reordered_products, on="product_id", how="left") \
    .fillna(0, subset=["label"])

In [None]:
product_info = products.select("product_id", "aisle_id", "department_id")

labeled_df = labeled_df.join(product_info, on="product_id", how="left")

In [None]:
# Union merged prior and merged train datasets to capture all order-product details
all_orders = merged_prior.union(merged_train)

In [None]:
enriched_orders = all_orders.join(features, on='product_id', how='left')
enriched_orders.show()

In [None]:
# Join user and order info to the enriched orders
final_df = enriched_orders.join(orders, on='order_id', how='left')

In [None]:
# Join full product details to the final DataFrame
official_df = final_df.join(products, on='product_id',how='inner')

In [None]:
from collections import Counter
import re

# Get the original column names (with duplicates)
orig_names = official_df.schema.names
counts     = Counter(orig_names)

# Create a new name for each column, appending '_1', '_2', ... on duplicates
name_counts = Counter()
new_names   = []
for name in orig_names:
    idx = name_counts[name]
    if idx == 0:
        new_names.append(name)
    else:
        new_names.append(f"{name}_{idx}")
    name_counts[name] += 1

# Rename the Dataframe with the new unique column names
official_df = official_df.toDF(*new_names)

# Identify and drop all duplicated columns (with suffixes like _1, _2, etc.)
to_drop = []
for name, cnt in counts.items():
    for i in range(1, cnt):
        to_drop.append(f"{name}_{i}")

official_df = official_df.drop(*to_drop)

# Now official_df.columns will list each name only once
print(official_df.columns)
