# TASK 1 – Environment Setup & Data Ingestion

### Step 1 — Install & Configure PySpark in Colab

In [224]:
!pip install pyspark==3.5.1
!pip install pymongo



### Step 2 - Environment setup and check secrets are success

In [225]:
# MongoDB config
from google.colab import userdata
import os
from pymongo import MongoClient

# Get Secrets
try:
    mongo_uri = userdata.get('MONGO_URI')

    print(f"Attempting to connect with User: {mongo_uri.split(':')[1].split('@')[0]}...")
except Exception as e:
    print("Error reading secrets! Make sure 'Notebook access' is ON in the sidebar.")
    raise e

# Connect & Test
os.environ["MONGO_URI"] = mongo_uri

# Verify connection
try:
    # prove it work
    client = MongoClient(os.environ["MONGO_URI"])
    client.admin.command('ping')

    print("Authentication Successful! Connected to MongoDB Atlas.")
    print("Ready for Spark integration.")
except Exception as e:
    print(f"Connection failed: {e}")

Attempting to connect with User: //admin...
Authentication Successful! Connected to MongoDB Atlas.
Ready for Spark integration.


### Step 3 – Create SparkSession with MongoDB

In [226]:
from pyspark.sql import SparkSession
from google.colab import userdata

mongo_uri = userdata.get("MONGO_URI")

spark = SparkSession.builder \
    .appName("MongoDB_Final_Fix") \
    .master("local[*]") \
    .config(
        "spark.jars.packages",
        "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0"
    ) \
    .config("spark.mongodb.connection.uri", mongo_uri) \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

print("Spark created successfully ✅")

Spark created successfully ✅


Check Spart connection with MongoDB JAR

In [227]:
print(spark.sparkContext._conf.get("spark.jars.packages"))

org.mongodb.spark:mongo-spark-connector_2.12:10.3.0


### Step 4 - Mount google drive

In [228]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [229]:
BASE_PATH = "/content/drive/MyDrive/BigData_Assigenment02"
DATASET_PATH = BASE_PATH + "/Dataset/E_Commerce_Data_UK.csv"
BRONZE_PATH = BASE_PATH + "/bronze/"


### Step 5 - Load dataset into spark

In [230]:
raw_df = spark.read.csv(
    DATASET_PATH,
    header=True,
    inferSchema=True
)

### Step 6 - Display Schema

In [231]:
raw_df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



### Step 7 - Display record count

In [232]:
total_records = raw_df.count()
print("Total Records in Dataset:", total_records)


Total Records in Dataset: 541909


### Step 8 - Store Bronze Data as Partitioned Parquet

Convert InvoiceDate to Timestamp

In [233]:
from pyspark.sql.functions import to_timestamp

bronze_df = raw_df.withColumn(
    "InvoiceDate",
    to_timestamp("InvoiceDate", "M/d/yyyy HH:mm")
)

Add Year & Month Columns for Partitioning

In [234]:
from pyspark.sql.functions import year, month

bronze_df = bronze_df \
    .withColumn("year", year("InvoiceDate")) \
    .withColumn("month", month("InvoiceDate"))

Store Bronze Data as Partitioned Parquet

In [235]:
bronze_df.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet(BRONZE_PATH)

print("Bronze Layer Saved Successfully ✅")

Bronze Layer Saved Successfully ✅


# TASK 2 – Data Cleaning & Quality Management

### Step 1 - Count total records

In [236]:
total_records = bronze_df.count()
print("Total Records Processed:", total_records)

Total Records Processed: 541909


### Step 2 - Missing customerID handling

In [237]:
from pyspark.sql.functions import col

missing_customer_count = bronze_df.filter(
    col("CustomerID").isNull()
).count()

df_step1 = bronze_df.filter(
    col("CustomerID").isNotNull()
)

print("Missing CustomerID Records Removed:", missing_customer_count)

Missing CustomerID Records Removed: 135080


### step 3 - Negative quantities its same as product return

In [238]:
negative_qty_count = df_step1.filter(
    col("Quantity") < 0
).count()

df_step2 = df_step1.filter(
    col("Quantity") > 0
)

print("Negative Quantity Records Removed:", negative_qty_count)

Negative Quantity Records Removed: 8905


### Step 4 - Cancelled invoices

In [239]:
cancelled_count = df_step2.filter(
    col("InvoiceNo").startswith("C")
).count()

df_step3 = df_step2.filter(
    ~col("InvoiceNo").startswith("C")
)

print("Cancelled Invoice Records Removed:", cancelled_count)

Cancelled Invoice Records Removed: 0


### Step 5 — Invalid or Extreme Prices

In [240]:
invalid_price_count = df_step3.filter(
    col("UnitPrice") <= 0
).count()

df_step4 = df_step3.filter(
    col("UnitPrice") > 0
)

print("Invalid Price Records Removed:", invalid_price_count)

Invalid Price Records Removed: 40


Remove Extreme Outliers (IQR Method)

In [241]:
quantiles = df_step4.approxQuantile(
    "UnitPrice",
    [0.25, 0.75],
    0
)

Q1 = quantiles[0]
Q3 = quantiles[1]
IQR = Q3 - Q1

lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

outlier_count = df_step4.filter(
    (col("UnitPrice") < lower_bound) |
    (col("UnitPrice") > upper_bound)
).count()

df_step5 = df_step4.filter(
    (col("UnitPrice") >= lower_bound) &
    (col("UnitPrice") <= upper_bound)
)

print("Extreme Price Outliers Removed:", outlier_count)

Extreme Price Outliers Removed: 34356


### Step 6 — Remove Duplicate Records

In [242]:
before_dup = df_step5.count()

silver_df = df_step5.dropDuplicates()

after_dup = silver_df.count()

duplicate_removed = before_dup - after_dup

print("Duplicate Records Removed:", duplicate_removed)

Duplicate Records Removed: 4948


Final Clean Record Count

In [243]:
clean_records = silver_df.count()
print("Final Clean Records:", clean_records)

Final Clean Records: 358580


Null Counts Per Column

In [244]:
from pyspark.sql.functions import sum as spark_sum

null_counts = bronze_df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c)
    for c in bronze_df.columns
])

null_counts.show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+----+-----+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|year|month|
+---------+---------+-----------+--------+-----------+---------+----------+-------+----+-----+
|        0|        0|       1454|       0|          0|        0|    135080|      0|   0|    0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+----+-----+



### Step 7 - Data Quality report

In [245]:
dq_report = spark.createDataFrame([
    ("Total Records Processed", total_records),
    ("Missing CustomerID Removed", missing_customer_count),
    ("Negative Quantity Removed", negative_qty_count),
    ("Cancelled Invoices Removed", cancelled_count),
    ("Invalid Price Removed", invalid_price_count),
    ("Extreme Price Outliers Removed", outlier_count),
    ("Duplicate Records Removed", duplicate_removed),
    ("Final Clean Records", clean_records)
], ["Metric", "Value"])

dq_report.show()

+--------------------+------+
|              Metric| Value|
+--------------------+------+
|Total Records Pro...|541909|
|Missing CustomerI...|135080|
|Negative Quantity...|  8905|
|Cancelled Invoice...|     0|
|Invalid Price Rem...|    40|
|Extreme Price Out...| 34356|
|Duplicate Records...|  4948|
| Final Clean Records|358580|
+--------------------+------+



Export Data Quality Report

In [246]:
dq_report.toPandas().to_csv(
    BASE_PATH + "/reports/data_quality_report.csv",
    index=False
)

print("Data Quality Report Saved ✅")

Data Quality Report Saved ✅


In [247]:
# Define Silver Path (if not defined earlier)
SILVER_PATH = BASE_PATH + "/silver/"

# Write Silver Data
silver_df.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet(SILVER_PATH)

print("Silver Layer Saved Successfully ✅")

Silver Layer Saved Successfully ✅


# TASK 3 – Feature Engineering

### Step 1 — Revenue Calculation

In [248]:
from pyspark.sql.functions import col

feature_df = silver_df.withColumn(
    "Revenue",
    col("Quantity") * col("UnitPrice")
)

### Step 2 — Time-Based Features

Feature 1: Hour

In [249]:
from pyspark.sql.functions import hour

feature_df = feature_df.withColumn(
    "Hour",
    hour("InvoiceDate")
)

Feature 2: Weekday

In [250]:
from pyspark.sql.functions import dayofweek

feature_df = feature_df.withColumn(
    "Weekday",
    dayofweek("InvoiceDate")
)

Feature 3: Month

In [251]:
from pyspark.sql.functions import month

feature_df = feature_df.withColumn(
    "Month",
    month("InvoiceDate")
)

### Step 3 — Basket-Level Metrics

In [252]:
from pyspark.sql.functions import sum, count

basket_metrics = feature_df.groupBy("InvoiceNo") \
    .agg(
        sum("Revenue").alias("BasketRevenue"),
        count("StockCode").alias("ItemsPerBasket")
    )

Additional ItemsPerBasket

In [253]:
feature_df = feature_df.join(
    basket_metrics,
    on="InvoiceNo",
    how="left"
)

### Step 4 — Customer RFM Features

In [254]:
from pyspark.sql.functions import max, datediff, current_date

rfm_df = feature_df.groupBy("CustomerID") \
    .agg(
        datediff(current_date(), max("InvoiceDate")).alias("Recency"),
        count("InvoiceNo").alias("Frequency"),
        sum("Revenue").alias("Monetary")
    )

### Step 5 — Window-Based Feature

In [255]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

window_spec = Window.orderBy(col("Monetary").desc())

rfm_df = rfm_df.withColumn(
    "CustomerRank",
    rank().over(window_spec)
)

### Step 6 - This part we save that data into Gold Layer

In [256]:
GOLD_PATH = BASE_PATH + "/gold/"

feature_df.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet(GOLD_PATH)

print("Feature Engineered Data Saved ✅")

Feature Engineered Data Saved ✅


# TASK 4 – MongoDB Data Modeling

### Step 1 - Load Dataset

In [257]:
feature_df = spark.read.parquet("/content/drive/MyDrive/BigData_Assigenment02/gold")

### Step 2 - Create MongoDB Collections

#### Collection 1 - fact_invoices

In [258]:
from pyspark.sql.functions import collect_list, struct, sum, first

fact_invoices = feature_df.groupBy(
    "InvoiceNo",
    "CustomerID",
    "InvoiceDate"
).agg(
    sum("Revenue").alias("TotalInvoiceRevenue"),
    collect_list(
        struct(
            "StockCode",
            "Description",
            "Quantity",
            "UnitPrice",
            "Revenue"
        )
    ).alias("line_items")
)


***Schema Definition (fact_invoices)***


{

   _id: ObjectId,

   Invoice: String,

   CustomerID: Integer,

   InvoiceDate: Date,

   Country: String,

   total_invoice_revenue: Double,

   line_items: [
     
      {
        StockCode: String,
        Description: String,
        Quantity: Integer,
        Price: Double,
        revenue: Double
      }
     ]

}

#### Collection 2 - dim_customers

In [259]:
from pyspark.sql.functions import when

dim_customers = rfm_df.withColumn(
    "CustomerSegment",
    when(rfm_df["Monetary"] >= 5000, "High Value")
    .when(rfm_df["Monetary"] >= 2000, "Medium Value")
    .otherwise("Low Value")
)

***Schema Definition (dim_customers)***

{

  _id: ObjectId,

  CustomerID: Integer,

  recency: Integer,

  frequency: Integer,

  monetary: Double,

  total_invoices: Integer,

  total_revenue: Double,

  customer_segment: String
  
}

#### Collection 3 - dim_products

In [260]:
from pyspark.sql.functions import sum, collect_set

dim_products = feature_df.groupBy(
    "StockCode",
    "Description"
).agg(
    sum("Revenue").alias("TotalProductRevenue"),
    sum("Quantity").alias("TotalQuantitySold"),
    collect_set("Country").alias("CountriesSold")
)

***Schema Definition (dim_products)***

{

  _id: ObjectId,

  StockCode: String,

  Description: String,

  total_quantity_sold: Integer,

  total_product_revenue: Double,

  countries_sold: [String]
  
}

# TASK 5 – MongoDB Indexing & Write Optimization

### Step 1 - Write Gold Datasets to MongoDB

In [261]:
# Write fact_invoices
fact_invoices.write \
    .format("mongodb") \
    .mode("overwrite") \
    .option("spark.mongodb.database", "E-Commerce_Database") \
    .option("spark.mongodb.collection", "fact_invoices") \
    .save()

# Write dim_customers
dim_customers.write \
    .format("mongodb") \
    .mode("overwrite") \
    .option("spark.mongodb.database", "E-Commerce_Database") \
    .option("spark.mongodb.collection", "dim_customers") \
    .save()

# Write dim_products
dim_products.write \
    .format("mongodb") \
    .mode("overwrite") \
    .option("spark.mongodb.database", "E-Commerce_Database") \
    .option("spark.mongodb.collection", "dim_products") \
    .save()

print("All Gold datasets written successfully ✅") # Database already created using colab

All Gold datasets written successfully ✅


### Step 3 - Creating Indexes on MongoDB Collections

In [262]:
from pymongo import MongoClient
from google.colab import userdata

# Ensure db client is active. This re-initializes it if needed.
mongo_uri = userdata.get("MONGO_URI")
client = MongoClient(mongo_uri)
db = client["E-Commerce_Database"]

# Index 1 — CustomerID (fact_invoices)
db.fact_invoices.create_index("CustomerID")
print("Index on fact_invoices.CustomerID created.")

# Index 2 - InvoiceDate (fact_invoices)
db.fact_invoices.create_index("InvoiceDate")
print("Index on fact_invoices.InvoiceDate created.")

# Index 3 — CustomerSegment (dim_customers)
db.dim_customers.create_index("CustomerSegment")
print("Index on dim_customers.CustomerSegment created.")

# Index 4 — StockCode (dim_products)
db.dim_products.create_index("StockCode")
print("Index on dim_products.StockCode created.")

print("All specified indexes created successfully ✅")

Index on fact_invoices.CustomerID created.
Index on fact_invoices.InvoiceDate created.
Index on dim_customers.CustomerSegment created.
Index on dim_products.StockCode created.
All specified indexes created successfully ✅


### Step 4 - Demonstrate Query Performance Improvement

Test before index

In [263]:
import time

start = time.time()
list(db.fact_invoices.find({"CustomerID": 17850}))
end = time.time()

print("Query Time:", end - start, "seconds")

Query Time: 0.46863317489624023 seconds


verify index usage

In [264]:
db.fact_invoices.find({"CustomerID": 17850}).explain()

{'explainVersion': '1',
 'queryPlanner': {'namespace': 'E-Commerce_Database.fact_invoices',
  'parsedQuery': {'CustomerID': {'$eq': 17850}},
  'indexFilterSet': False,
  'queryHash': 'B6E1B21F',
  'planCacheShapeHash': 'B6E1B21F',
  'planCacheKey': '445467B7',
  'optimizationTimeMillis': 0,
  'maxIndexedOrSolutionsReached': False,
  'maxIndexedAndSolutionsReached': False,
  'maxScansToExplodeReached': False,
  'prunedSimilarIndexes': False,
  'winningPlan': {'isCached': False,
   'stage': 'FETCH',
   'inputStage': {'stage': 'IXSCAN',
    'keyPattern': {'CustomerID': 1},
    'indexName': 'CustomerID_1',
    'isMultiKey': False,
    'multiKeyPaths': {'CustomerID': []},
    'isUnique': False,
    'isSparse': False,
    'isPartial': False,
    'indexVersion': 2,
    'direction': 'forward',
    'indexBounds': {'CustomerID': ['[17850, 17850]']}}},
  'rejectedPlans': []},
 'executionStats': {'executionSuccess': True,
  'nReturned': 34,
  'executionTimeMillis': 0,
  'totalKeysExamined': 34,
  

# TASK 6 – Analytics & Insights

### Part A - 5 Spark-based analytics queries

#### Monthly Revenue Trends

In [265]:
from pyspark.sql.functions import sum, month, year

monthly_revenue = feature_df \
    .groupBy(year("InvoiceDate").alias("Year"),
             month("InvoiceDate").alias("Month")) \
    .agg(sum("Revenue").alias("TotalRevenue")) \
    .orderBy("Year", "Month")

monthly_revenue.show()

+----+-----+------------------+
|Year|Month|      TotalRevenue|
+----+-----+------------------+
|2010|   12| 480348.9500000139|
|2011|    1| 495617.7300000107|
|2011|    2| 379688.2800000083|
|2011|    3| 492847.0500000031|
|2011|    4| 394154.6910000067|
|2011|    5| 563858.3699999995|
|2011|    6| 532562.6000000084|
|2011|    7| 518686.8710000066|
|2011|    8| 564883.8500000046|
|2011|    9| 849232.1320000029|
|2011|   10| 881896.7099999939|
|2011|   11|1013999.2999999907|
|2011|   12|477047.77000000223|
+----+-----+------------------+



#### Top Customers by Spend

In [266]:
top_customers = feature_df \
    .groupBy("CustomerID") \
    .agg(sum("Revenue").alias("TotalSpend")) \
    .orderBy("TotalSpend", ascending=False) \
    .limit(10)

top_customers.show()

+----------+------------------+
|CustomerID|        TotalSpend|
+----------+------------------+
|     14646|262583.41999999946|
|     18102|221190.81000000006|
|     17450| 180952.7299999999|
|     16446|          168472.5|
|     12415|113631.68000000001|
|     14911| 112862.2499999998|
|     14156| 97088.98000000004|
|     17511| 86658.71999999994|
|     12346|           77183.6|
|     16029| 71848.28999999998|
+----------+------------------+



#### Top Products by Revenue

In [267]:
top_products = feature_df \
    .groupBy("StockCode", "Description") \
    .agg(sum("Revenue").alias("TotalRevenue")) \
    .orderBy("TotalRevenue", ascending=False) \
    .limit(10)

top_products.show()

+---------+--------------------+------------------+
|StockCode|         Description|      TotalRevenue|
+---------+--------------------+------------------+
|    23843|PAPER CRAFT , LIT...|          168469.6|
|   85123A|WHITE HANGING HEA...|100392.09999999966|
|   85099B|JUMBO BAG RED RET...| 85040.54000000026|
|    23166|MEDIUM CERAMIC TO...|          81416.73|
|    47566|       PARTY BUNTING| 68655.75000000003|
|    84879|ASSORTED COLOUR B...|56413.030000000515|
|    23084|  RABBIT NIGHT LIGHT| 51251.24000000005|
|    79321|       CHILLI LIGHTS| 46078.20999999997|
|    22086|PAPER CHAIN KIT 5...|42584.130000000165|
|    21137|BLACK RECORD COVE...|39045.800000000025|
+---------+--------------------+------------------+



#### Country-Level Sales Analysis

In [268]:
country_sales = feature_df \
    .groupBy("Country") \
    .agg(sum("Revenue").alias("TotalRevenue")) \
    .orderBy("TotalRevenue", ascending=False)

country_sales.show()

+---------------+------------------+
|        Country|      TotalRevenue|
+---------------+------------------+
| United Kingdom| 6334870.804000116|
|    Netherlands| 266830.6399999994|
|           EIRE|213121.38999999964|
|        Germany|174099.74999999977|
|         France|162311.06000000017|
|      Australia|126161.75999999994|
|          Spain|47124.960000000094|
|    Switzerland|45040.950000000084|
|         Sweden|36568.429999999986|
|          Japan|35897.469999999994|
|        Belgium| 31353.25000000003|
|         Norway|27446.640000000003|
|       Portugal|24435.969999999987|
|        Finland| 16361.28000000001|
|Channel Islands|15050.439999999997|
|        Denmark|13375.439999999993|
|          Italy|13127.490000000005|
|         Cyprus| 9497.159999999996|
|      Singapore| 7928.389999999997|
|        Austria| 7658.830000000002|
+---------------+------------------+
only showing top 20 rows



#### Return / Cancellation Patterns

In [269]:
returns_analysis = feature_df \
    .filter(feature_df["Quantity"] < 0) \
    .groupBy("Country") \
    .count()

returns_analysis.show()

+-------+-----+
|Country|count|
+-------+-----+
+-------+-----+



### Part B - MongoDB Aggregation Pipelines

In [270]:
from pymongo import MongoClient
from google.colab import userdata

mongo_uri = userdata.get("MONGO_URI")
client = MongoClient(mongo_uri)
db = client["E-Commerce_Database"]

#### Monthly Revenue (fact_invoices)

In [271]:
pipeline = [
    {
        "$group": {
            "_id": {
                "year": {"$year": "$InvoiceDate"},
                "month": {"$month": "$InvoiceDate"}
            },
            "TotalRevenue": {"$sum": "$TotalInvoiceRevenue"}
        }
    },
    {"$sort": {"_id.year": 1, "_id.month": 1}}
]

list(db.fact_invoices.aggregate(pipeline))

[{'_id': {'year': 2010, 'month': 12}, 'TotalRevenue': 480348.95},
 {'_id': {'year': 2011, 'month': 1}, 'TotalRevenue': 495617.73},
 {'_id': {'year': 2011, 'month': 2}, 'TotalRevenue': 379688.28},
 {'_id': {'year': 2011, 'month': 3}, 'TotalRevenue': 492847.05},
 {'_id': {'year': 2011, 'month': 4}, 'TotalRevenue': 394154.691},
 {'_id': {'year': 2011, 'month': 5}, 'TotalRevenue': 563858.37},
 {'_id': {'year': 2011, 'month': 6}, 'TotalRevenue': 532562.6},
 {'_id': {'year': 2011, 'month': 7}, 'TotalRevenue': 518686.871},
 {'_id': {'year': 2011, 'month': 8}, 'TotalRevenue': 564883.85},
 {'_id': {'year': 2011, 'month': 9}, 'TotalRevenue': 849232.132},
 {'_id': {'year': 2011, 'month': 10}, 'TotalRevenue': 881896.71},
 {'_id': {'year': 2011, 'month': 11}, 'TotalRevenue': 1013999.3},
 {'_id': {'year': 2011, 'month': 12}, 'TotalRevenue': 477047.77}]

#### Top Customers (dim_customers)

In [272]:
pipeline = [
    {"$sort": {"Monetary": -1}},
    {"$limit": 10}
]

list(db.dim_customers.aggregate(pipeline))

[{'_id': ObjectId('6996a74105884627736ccd3e'),
  'CustomerID': 14646,
  'Recency': 5187,
  'Frequency': 1944,
  'Monetary': 262583.4199999997,
  'CustomerRank': 1,
  'CustomerSegment': 'High Value'},
 {'_id': ObjectId('6996a74105884627736ccd3f'),
  'CustomerID': 18102,
  'Recency': 5186,
  'Frequency': 400,
  'Monetary': 221190.81,
  'CustomerRank': 2,
  'CustomerSegment': 'High Value'},
 {'_id': ObjectId('6996a74105884627736ccd40'),
  'CustomerID': 17450,
  'Recency': 5194,
  'Frequency': 296,
  'Monetary': 180952.73,
  'CustomerRank': 3,
  'CustomerSegment': 'High Value'},
 {'_id': ObjectId('6996a74105884627736ccd41'),
  'CustomerID': 16446,
  'Recency': 5186,
  'Frequency': 3,
  'Monetary': 168472.5,
  'CustomerRank': 4,
  'CustomerSegment': 'High Value'},
 {'_id': ObjectId('6996a74105884627736ccd42'),
  'CustomerID': 12415,
  'Recency': 5210,
  'Frequency': 676,
  'Monetary': 113631.67999999998,
  'CustomerRank': 5,
  'CustomerSegment': 'High Value'},
 {'_id': ObjectId('6996a741058

#### Top Products by Revenue

In [273]:
pipeline = [
    {"$sort": {"TotalProductRevenue": -1}},
    {"$limit": 10}
]

list(db.dim_products.aggregate(pipeline))

[{'_id': ObjectId('6996a76005884627736ce791'),
  'StockCode': '23843',
  'Description': 'PAPER CRAFT , LITTLE BIRDIE',
  'TotalProductRevenue': 168469.6,
  'TotalQuantitySold': 80995,
  'CountriesSold': ['United Kingdom']},
 {'_id': ObjectId('6996a76405884627736ceac1'),
  'StockCode': '85123A',
  'Description': 'WHITE HANGING HEART T-LIGHT HOLDER',
  'TotalProductRevenue': 100392.09999999964,
  'TotalQuantitySold': 36706,
  'CountriesSold': ['Portugal',
   'Italy',
   'EIRE',
   'Finland',
   'Germany',
   'Switzerland',
   'Malta',
   'Spain',
   'Singapore',
   'Netherlands',
   'Cyprus',
   'France',
   'Channel Islands',
   'Australia',
   'United Kingdom',
   'Israel']},
 {'_id': ObjectId('6996a76405884627736ceab2'),
  'StockCode': '85099B',
  'Description': 'JUMBO BAG RED RETROSPOT',
  'TotalProductRevenue': 85040.54000000026,
  'TotalQuantitySold': 46078,
  'CountriesSold': ['Portugal',
   'Italy',
   'EIRE',
   'Finland',
   'Austria',
   'Germany',
   'RSA',
   'Switzerland',


#### Country Distribution

In [274]:
pipeline = [
    {"$unwind": "$CountriesSold"},
    {
        "$group": {
            "_id": "$CountriesSold",
            "ProductCount": {"$sum": 1}
        }
    },
    {"$sort": {"ProductCount": -1}}
]

list(db.dim_products.aggregate(pipeline))

[{'_id': 'United Kingdom', 'ProductCount': 3596},
 {'_id': 'EIRE', 'ProductCount': 1817},
 {'_id': 'Germany', 'ProductCount': 1561},
 {'_id': 'France', 'ProductCount': 1414},
 {'_id': 'Spain', 'ProductCount': 990},
 {'_id': 'Switzerland', 'ProductCount': 864},
 {'_id': 'Netherlands', 'ProductCount': 744},
 {'_id': 'Belgium', 'ProductCount': 711},
 {'_id': 'Portugal', 'ProductCount': 647},
 {'_id': 'Australia', 'ProductCount': 569},
 {'_id': 'Norway', 'ProductCount': 532},
 {'_id': 'Cyprus', 'ProductCount': 427},
 {'_id': 'Finland', 'ProductCount': 420},
 {'_id': 'Italy', 'ProductCount': 411},
 {'_id': 'Channel Islands', 'ProductCount': 353},
 {'_id': 'Austria', 'ProductCount': 282},
 {'_id': 'Sweden', 'ProductCount': 252},
 {'_id': 'Denmark', 'ProductCount': 232},
 {'_id': 'Japan', 'ProductCount': 211},
 {'_id': 'Israel', 'ProductCount': 197},
 {'_id': 'Unspecified', 'ProductCount': 190},
 {'_id': 'Poland', 'ProductCount': 171},
 {'_id': 'Singapore', 'ProductCount': 167},
 {'_id': 'USA

#### Customer Segment Distribution

In [275]:
pipeline = [
    {
        "$group": {
            "_id": "$CustomerSegment",
            "Count": {"$sum": 1}
        }
    }
]

list(db.dim_customers.aggregate(pipeline))

[{'_id': 'High Value', 'Count': 220},
 {'_id': 'Medium Value', 'Count': 530},
 {'_id': 'Low Value', 'Count': 3556}]

# TASK 7 – Performance Optimization

### Partitioning Strategy (Shuffle Optimization)

Before Optimization

In [276]:
feature_df.rdd.getNumPartitions()

2

Apply Repartitioning by CustomerID

In [277]:
optimized_df = feature_df.repartition(8, "CustomerID")

optimized_df.rdd.getNumPartitions()

8

### Caching & Persistence

Without Cache

In [278]:
feature_df.groupBy("Country").count().show()
feature_df.groupBy("CustomerID").count().show()

+------------------+-----+
|           Country|count|
+------------------+-----+
|            Sweden|  419|
|         Singapore|  202|
|           Germany| 7955|
|               RSA|   45|
|            France| 7388|
|            Greece|  128|
|European Community|   47|
|           Belgium| 1737|
|           Finland|  582|
|             Malta|   93|
|       Unspecified|  212|
|             Italy|  626|
|              EIRE| 6261|
|            Norway|  930|
|             Spain| 2145|
|           Denmark|  343|
|           Iceland|  175|
|            Israel|  218|
|   Channel Islands|  577|
|               USA|  172|
+------------------+-----+
only showing top 20 rows

+----------+-----+
|CustomerID|count|
+----------+-----+
|     17389|  142|
|     15619|    2|
|     13623|   67|
|     16386|   78|
|     13832|    3|
|     13285|  178|
|     15727|  284|
|     15790|   29|
|     15957|   40|
|     12940|   76|
|     16861|    6|
|     17420|   25|
|     17679|   24|
|     16574|   27|
|  

Apply Cache

In [279]:
feature_df.cache()

feature_df.groupBy("Country").count().show()
feature_df.groupBy("CustomerID").count().show()

+------------------+-----+
|           Country|count|
+------------------+-----+
|            Sweden|  419|
|         Singapore|  202|
|           Germany| 7955|
|               RSA|   45|
|            France| 7388|
|            Greece|  128|
|European Community|   47|
|           Belgium| 1737|
|           Finland|  582|
|             Malta|   93|
|       Unspecified|  212|
|             Italy|  626|
|              EIRE| 6261|
|            Norway|  930|
|             Spain| 2145|
|           Denmark|  343|
|           Iceland|  175|
|            Israel|  218|
|   Channel Islands|  577|
|               USA|  172|
+------------------+-----+
only showing top 20 rows

+----------+-----+
|CustomerID|count|
+----------+-----+
|     17389|  142|
|     15619|    2|
|     13623|   67|
|     16386|   78|
|     13832|    3|
|     13285|  178|
|     15727|  284|
|     15790|   29|
|     15957|   40|
|     12940|   76|
|     16861|    6|
|     17420|   25|
|     17679|   24|
|     16574|   27|
|  

### Broadcast Join Optimization

Without Broadcast

In [280]:
joined_df = feature_df.join(dim_customers, "CustomerID")
joined_df.show()

+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+------------------+----+-------+------------------+--------------+----+-----+-------+---------+------------------+------------+---------------+
|CustomerID|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|       Country|           Revenue|Hour|Weekday|     BasketRevenue|ItemsPerBasket|year|month|Recency|Frequency|          Monetary|CustomerRank|CustomerSegment|
+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+------------------+----+-------+------------------+--------------+----+-----+-------+---------+------------------+------------+---------------+
|     17389|   575683|    22666|RECIPE BOX PANTRY...|     192|2011-11-10 15:51:00|     2.55|United Kingdom|489.59999999999997|  15|      5|2723.5200000000004|            15|2011|   11|   5186|      142|19642.079999999998|          39|     High Val

With Broadcast

In [281]:
from pyspark.sql.functions import broadcast

optimized_join = feature_df.join(
    broadcast(dim_customers),
    "CustomerID"
)

optimized_join.show()

+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+------------------+----+-------+------------------+--------------+----+-----+-------+---------+------------------+------------+---------------+
|CustomerID|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|       Country|           Revenue|Hour|Weekday|     BasketRevenue|ItemsPerBasket|year|month|Recency|Frequency|          Monetary|CustomerRank|CustomerSegment|
+----------+---------+---------+--------------------+--------+-------------------+---------+--------------+------------------+----+-------+------------------+--------------+----+-----+-------+---------+------------------+------------+---------------+
|     17160|   575091|    22579|WOODEN TREE CHRIS...|      24|2011-11-08 14:05:00|     0.29|United Kingdom| 6.959999999999999|  14|      3| 516.8499999999999|            38|2011|   11|   5217|      226|3471.5299999999997|         374|   Medium Val

### Create a table for optimizer

In [289]:
import time
from pyspark.sql.functions import broadcast

print("TASK 7 – Performance Optimization (Before vs After)")

# TEST QUERY (Used for all performance tests)
def run_query(df):
    # The previous error indicated 'TotalPrice' column was not found.
    # Based on the feature engineering step, 'Revenue' is the correct column.
    return df.groupBy("Country").sum("Revenue").collect()


# 1. PARTITIONING OPTIMIZATION

# Before Partitioning
start = time.time()
run_query(feature_df) # Corrected from gold_df to feature_df
before_partition = time.time() - start

# After Partitioning
partitioned_df = feature_df.repartition("Country") # Corrected from gold_df to feature_df
start = time.time()
run_query(partitioned_df)
after_partition = time.time() - start

partition_improvement = ((before_partition - after_partition) / before_partition) * 100


# 2. CACHING OPTIMIZATION

# Before Caching
start = time.time()
run_query(feature_df) # Corrected from gold_df to feature_df
before_cache = time.time() - start

# After Caching
cached_df = feature_df.cache() # Corrected from gold_df to feature_df
cached_df.count()  # Force cache
start = time.time()
run_query(cached_df)
after_cache = time.time() - start

cache_improvement = ((before_cache - after_cache) / before_cache) * 100


# 3. BROADCAST JOIN OPTIMIZATION

# Normal Join (Before Broadcast)
start = time.time()
normal_join = feature_df.join(dim_customers, "CustomerID", "left") # Corrected from gold_df to feature_df
normal_join.collect()
before_broadcast = time.time() - start

# Broadcast Join (After Optimization)
start = time.time()
broadcast_join = feature_df.join( # Corrected from gold_df to feature_df
    broadcast(dim_customers),
    "CustomerID"
)
broadcast_join.collect()
after_broadcast = time.time() - start

broadcast_improvement = ((before_broadcast - after_broadcast) / before_broadcast) * 100


# FINAL TABLE VALUES
print("\n========== REPORT ==========")
print("Partitioning |", round(before_partition,2), "s |", round(after_partition,2), "s |", round(partition_improvement,2), "%")
print("Caching      |", round(before_cache,2), "s |", round(after_cache,2), "s |", round(cache_improvement,2), "%")
print("Broadcast    |", round(before_broadcast,2), "s |", round(after_broadcast,2), "s |", round(broadcast_improvement,2), "%")

TASK 7 – Performance Optimization (Before vs After)

Partitioning | 0.27 s | 0.74 s | -170.39 %
Caching      | 0.62 s | 0.37 s | 40.68 %
Broadcast    | 14.72 s | 15.49 s | -5.24 %


# BONUS – Customer Cohort Retention Analysis

### Step 1 - Create Invoice Month

In [283]:
from pyspark.sql.functions import trunc

# Extract invoice month (first day of month)
cohort_df = feature_df.withColumn(
    "InvoiceMonth",
    trunc("InvoiceDate", "month")
)

### Step 2 - Identify First Purchase Month (Cohort Month)

In [284]:
from pyspark.sql.functions import min

# Find first purchase month per customer
cohort_month_df = cohort_df.groupBy("CustomerID") \
    .agg(min("InvoiceMonth").alias("CohortMonth"))

### Step 3 - Join Back To Main Data

In [285]:
cohort_data = cohort_df.join(
    cohort_month_df,
    "CustomerID"
)

### STEP 4 - Calculate Months Since First Purchase

In [286]:
from pyspark.sql.functions import months_between

cohort_data = cohort_data.withColumn(
    "CohortIndex",
    months_between("InvoiceMonth", "CohortMonth").cast("int")
)

### STEP 5 - Count Active Customers Per *Cohort*

In [287]:
retention_table = cohort_data.groupBy(
    "CohortMonth", "CohortIndex"
).agg(
    count("CustomerID").alias("ActiveCustomers")
)

retention_table.show()

+-----------+-----------+---------------+
|CohortMonth|CohortIndex|ActiveCustomers|
+-----------+-----------+---------------+
| 2011-03-01|          7|           3271|
| 2010-12-01|         11|          20168|
| 2011-08-01|          1|           1439|
| 2010-12-01|         10|          12331|
| 2011-02-01|          9|           2607|
| 2011-02-01|          8|           2690|
| 2011-02-01|          5|           1826|
| 2011-01-01|          9|           4572|
| 2011-06-01|          1|            858|
| 2011-05-01|          6|           1880|
| 2011-03-01|          2|           2287|
| 2011-05-01|          4|           1391|
| 2011-06-01|          5|           2359|
| 2011-04-01|          5|           1864|
| 2011-02-01|          3|           2201|
| 2011-04-01|          2|           1123|
| 2011-05-01|          5|           2181|
| 2010-12-01|          6|          10022|
| 2011-01-01|          5|           2886|
| 2011-10-01|          1|           3257|
+-----------+-----------+---------

### STEP 6 - Calculate Retention Rate

In [288]:
from pyspark.sql.functions import first

# Get initial cohort size
cohort_size = retention_table.filter(
    retention_table.CohortIndex == 0
).select("CohortMonth",
         col("ActiveCustomers").alias("CohortSize"))

# Join back
retention_rate = retention_table.join(
    cohort_size,
    "CohortMonth"
)

retention_rate = retention_rate.withColumn(
    "RetentionRate",
    col("ActiveCustomers") / col("CohortSize")
)

retention_rate.show()

+-----------+-----------+---------------+----------+-------------------+
|CohortMonth|CohortIndex|ActiveCustomers|CohortSize|      RetentionRate|
+-----------+-----------+---------------+----------+-------------------+
| 2011-05-01|          7|            717|      5379|0.13329615170105968|
| 2011-05-01|          3|            821|      5379|0.15263060048336122|
| 2011-05-01|          1|            969|      5379|0.18014500836586725|
| 2011-05-01|          2|            840|      5379| 0.1561628555493586|
| 2011-05-01|          0|           5379|      5379|                1.0|
| 2011-05-01|          5|           2181|      5379|0.40546569994422754|
| 2011-05-01|          4|           1391|      5379|0.25859825246328316|
| 2011-05-01|          6|           1880|      5379|0.34950734337237405|
| 2011-10-01|          2|            814|     10690|0.07614593077642656|
| 2011-10-01|          0|          10690|     10690|                1.0|
| 2011-10-01|          1|           3257|     10690