<a href="https://colab.research.google.com/github/Kaveesha23dil/Big_Data_Assignmnet_02/blob/part-5-%26-6/Big_Data_Assignment_02.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Google Colab Setup

In [31]:
!pip install pyspark==3.5.0



### Installing MondoDB Client

In [32]:
!pip install pymongo



### Configure Spark with MongoDB Connector

In [33]:
### Configure Spark with MongoDB Connector
from pyspark.sql import SparkSession

# We must include the 'spark.jars.packages' config to download the MongoDB connector
spark = SparkSession.builder \
    .appName("BigDataAssignment02") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0") \
    .config("spark.mongodb.write.connection.uri", "mongodb+srv://cit23020025_db_user:Z98uYpIUdMYn0ylH@cluster0.0ew6syh.mongodb.net/?appName=Cluster0") \
    .config("spark.mongodb.read.connection.uri", "mongodb+srv://cit23020025_db_user:Z98uYpIUdMYn0ylH@cluster0.0ew6syh.mongodb.net/?appName=Cluster0") \
    .getOrCreate()


### Store MongoDB URI using environment variables

In [34]:
import os
os.environ["MONGO_URI"] = "mongodb+srv://cit23020025_db_user:Z98uYpIUdMYn0ylH@cluster0.0ew6syh.mongodb.net/?appName=Cluster0"

# TASK 1 – DATA INGESTION (BRONZE LAYER)

### Mount google drive

In [35]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Load dataset

In [36]:
df_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/content/drive/MyDrive/Big_data_Assignmnet/Online Retail (1).csv")

### Inspect schema

In [37]:
df_raw.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



### Record count

In [38]:
df_raw.count()

541909

### Add year and month

In [39]:
from pyspark.sql.functions import year, month, to_timestamp

df_bronze = df_raw.withColumn(
    "InvoiceDate", to_timestamp("InvoiceDate")
).withColumn(
    "year", year("InvoiceDate")
).withColumn(
    "month", month("InvoiceDate")
)

### Write Bronze Layer

In [40]:
df_bronze.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("/content/bronze")

# TASK 2 – DATA CLEANING & QUALITY (SILVER)

### Remove cancelled invoices

In [41]:
df_clean = df_bronze.filter(~df_bronze.InvoiceNo.startswith("C"))

### Handle missing CustomerID

In [42]:
df_clean = df_clean.dropna(subset=["CustomerID"])
df_clean.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+-----+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|year|month|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+-----+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|2010|   12|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|2010|   12|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|
|   536365|    22752|SET 7 BABUSHKA NE...|      

### Handle negative quantities (returns)

In [43]:
df_clean = df_clean.filter(df_clean.Quantity > 0)
df_clean.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+-----+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|year|month|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+-----+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|2010|   12|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|2010|   12|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|
|   536365|    22752|SET 7 BABUSHKA NE...|      

### Remove invalid prices

In [44]:
df_clean = df_clean.filter(df_clean.UnitPrice > 0)
df_clean.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+-----+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|year|month|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+-----+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|2010|   12|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|2010|   12|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|
|   536365|    22752|SET 7 BABUSHKA NE...|      

### Remove duplicates

In [45]:
df_clean = df_clean.dropDuplicates()
df_clean.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+-----+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|year|month|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+-----+
|   536401|    20725|LUNCH BAG RED RET...|       1|2010-12-01 11:21:00|     1.65|     15862|United Kingdom|2010|   12|
|   536408|    22537|MAGIC DRAWING SLA...|      24|2010-12-01 11:41:00|     0.42|     14307|United Kingdom|2010|   12|
|   536408|    21212|PACK OF 72 RETROS...|      24|2010-12-01 11:41:00|     0.55|     14307|United Kingdom|2010|   12|
|   536464|   90200A|PURPLE SWEETHEART...|       1|2010-12-01 12:23:00|     4.25|     17968|United Kingdom|2010|   12|
|   536528|    22130|PARTY CONE CHRIST...|      12|2010-12-01 13:17:00|     0.85|     15525|United Kingdom|2010|   12|
|   536530|    22364|GLASS JAR DIGESTI...|      

### Data Quality Report

In [46]:
from pyspark.sql.functions import col, count, isnan, when, sum, min, max, avg, countDistinct
from pyspark.sql.types import DoubleType, FloatType, IntegerType, LongType, StringType, TimestampType

exprs = []

for field in df_clean.schema.fields:
    c = field.name
    dtype = field.dataType

    # Add count of non-nulls for each column
    exprs.append(count(col(c)).alias(f"{c}_non_null_count"))
    # Add count of nulls for each column
    exprs.append(sum(when(col(c).isNull(), 1).otherwise(0)).alias(f"{c}_null_count"))

    if isinstance(dtype, (DoubleType, FloatType, IntegerType, LongType)):
        # Add min, max, and average for numeric types
        exprs.append(min(col(c)).alias(f"{c}_min"))
        exprs.append(max(col(c)).alias(f"{c}_max"))
        exprs.append(avg(col(c)).alias(f"{c}_avg"))
    elif isinstance(dtype, StringType):
        # Add count of distinct values for string types
        exprs.append(countDistinct(col(c)).alias(f"{c}_distinct_count"))
    # You can add more specific checks for other data types (e.g., TimestampType) if needed.

quality_report = df_clean.agg(*exprs)
quality_report.show(truncate=False)

+------------------------+--------------------+------------------------+------------------------+--------------------+------------------------+--------------------------+----------------------+--------------------------+-----------------------+-------------------+------------+------------+------------------+--------------------------+----------------------+------------------------+--------------------+-------------+-------------+------------------+-------------------------+---------------------+--------------+--------------+------------------+----------------------+------------------+----------------------+-------------------+---------------+--------+--------+------------------+--------------------+----------------+---------+---------+-----------------+
|InvoiceNo_non_null_count|InvoiceNo_null_count|InvoiceNo_distinct_count|StockCode_non_null_count|StockCode_null_count|StockCode_distinct_count|Description_non_null_count|Description_null_count|Description_distinct_count|Quantity_non_nul

# TASK 3 – FEATURE ENGINEERING

### Revenue Feature

In [47]:
from pyspark.sql.functions import hour, dayofweek

df_feat = df_clean.withColumn(
    "revenue", df_clean.Quantity * df_clean.UnitPrice
)

### Time-based Features

In [48]:
df_feat = df_feat.withColumn("hour", hour("InvoiceDate")) \
                 .withColumn("weekday", dayofweek("InvoiceDate"))

### Basket-Level Features

In [49]:
from pyspark.sql.functions import sum as _sum, countDistinct

basket_metrics = df_feat.groupBy("InvoiceNo").agg(
    _sum("revenue").alias("invoice_total"),
    countDistinct("StockCode").alias("items_per_invoice")
)

### RFM Features (Customer)

In [50]:
from pyspark.sql.functions import max, datediff, current_date

rfm = df_feat.groupBy("CustomerID").agg(
    datediff(current_date(), max("InvoiceDate")).alias("recency"),
    countDistinct("InvoiceNo").alias("frequency"),
    _sum("revenue").alias("monetary")
)

### Window Function Example

In [51]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

window_spec = Window.partitionBy("CustomerID")

df_feat = df_feat.withColumn(
    "customer_total_spend",
    sum("revenue").over(window_spec)
)

# TASK 4 – MONGODB DATA MODELING (GOLD)

### Configure MongoDB Connection (Environment Variables)

In [52]:
import os
os.environ["MONGO_URI"] = "mongodb+srv://cit23020025_db_user:Z98uYpIUdMYn0ylH@cluster0.0ew6syh.mongodb.net/?appName=Cluster0"
os.environ["MONGO_DB"] = "BigDataAssignment02"

import verification

In [53]:
import os
print(os.environ["MONGO_URI"] is not None)
print(os.environ["MONGO_DB"])

True
BigDataAssignment02


In [54]:
import os
print(os.environ["MONGO_URI"])
print(os.environ["MONGO_DB"])

mongodb+srv://cit23020025_db_user:Z98uYpIUdMYn0ylH@cluster0.0ew6syh.mongodb.net/?appName=Cluster0
BigDataAssignment02


Restart Spark with MongoDB Spark Connector

In [55]:
df_feat = df_feat.withColumn("hour", hour("InvoiceDate")) \
                 .withColumn("weekday", dayofweek("InvoiceDate"))

Test Spark to see its working

In [56]:
spark.range(5).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [57]:
from pyspark.sql import SparkSession

# The following lines created a new SparkSession that overwrote the
# one configured for MongoDB. We will use the SparkSession created
# in cell JwnRcx10YLaO instead.
# spark = SparkSession.builder \
#     .appName("Retail_ETL") \
#     .getOrCreate()

sc = spark.sparkContext

print("Spark is alive:", not sc._jsc.sc().isStopped())

Spark is alive: True


FACT: fact_invoices

In [59]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

line_item = F.struct(
    F.col("StockCode").alias("stock_code"),
    F.col("Description").alias("description"),
    F.col("Quantity").cast("int").alias("quantity"),
    F.round(F.col("UnitPrice"), 2).alias("unit_price"),
    F.round(F.col("revenue"), 2).alias("line_revenue")
)

# Join df_feat with basket_metrics to get invoice-level aggregates
df_joined = df_feat.join(basket_metrics, on="InvoiceNo", how="inner")

# Group by invoice-level details and collect line items
fact_invoices = df_joined.groupBy(
    F.col("InvoiceNo").alias("invoice_id"),
    F.col("CustomerID").alias("customer_id"),
    F.col("InvoiceDate").alias("invoice_datetime"),
    F.col("Country").alias("country"),
    F.col("hour").alias("invoice_hour"),
    F.col("weekday").alias("invoice_weekday"),
    F.col("customer_total_spend").alias("customer_total_spend"),
    F.col("invoice_total").alias("invoice_total"),
    F.col("items_per_invoice").alias("total_items_in_invoice")
).agg(
    F.collect_list(line_item).alias("line_items")
)

fact_invoices.show(2, truncate=False)

+----------+-----------+-------------------+--------------+------------+---------------+--------------------+------------------+----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|invoice_id|customer_id|invoice_datetime   |country       |invoice_hour|invoice_weekday|customer_total_spend|invoice_total     |total_items_in_invoice|line_items                                                 

### Write fact_invoices to MongoDB

In [60]:
(fact_invoices.write
  .format("mongodb")
  .mode("overwrite")
  .option("database", "BigDataAssignment02")
  .option("collection", "fact_invoices")
  .save())


### Build dim_customers

In [62]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import date

# Calculate snapshot_date using the maximum InvoiceDate from df_feat
snapshot_date = df_feat.agg(F.max("InvoiceDate").alias("max_dt")).collect()[0]["max_dt"]

# Convert snapshot_date to a date object for datediff to work correctly if it's a timestamp
if isinstance(snapshot_date, date):
    snapshot_date_col = F.lit(snapshot_date)
else:
    snapshot_date_col = F.to_date(F.lit(str(snapshot_date.date())))

cust = (df_feat
    .groupBy("CustomerID")
    .agg(
        F.first("Country", ignorenulls=True).alias("primary_country"),
        F.max("InvoiceDate").alias("last_purchase_date"),
        F.countDistinct("InvoiceNo").alias("total_orders"),
        F.sum("Quantity").alias("total_products"),
        F.avg("UnitPrice").alias("avg_unit_price"),
        F.max("UnitPrice").alias("max_unit_price"),
        F.min("UnitPrice").alias("min_unit_price"),
        F.avg("Quantity").alias("avg_quantity")
    )
)

# Join with the rfm DataFrame to include recency, frequency, and monetary values
dim_customers = cust.join(rfm, on="CustomerID", how="left_outer") \
    .withColumn("recency", F.datediff(snapshot_date_col, F.col("last_purchase_date")))

# Select and reorder columns for dim_customers
dim_customers = dim_customers.select(
    F.col("CustomerID").alias("customer_id"),
    F.col("primary_country"),
    F.col("last_purchase_date"),
    F.col("recency"),
    F.col("frequency"),
    F.col("monetary"),
    F.col("total_orders"),
    F.col("total_products"),
    F.col("avg_unit_price"),
    F.col("max_unit_price"),
    F.col("min_unit_price"),
    F.col("avg_quantity")
)

dim_customers.show(5, truncate=False)

+-----------+---------------+-------------------+-------+---------+--------+------------+--------------+------------------+--------------+--------------+------------------+
|customer_id|primary_country|last_purchase_date |recency|frequency|monetary|total_orders|total_products|avg_unit_price    |max_unit_price|min_unit_price|avg_quantity      |
+-----------+---------------+-------------------+-------+---------+--------+------------+--------------+------------------+--------------+--------------+------------------+
|12346      |United Kingdom |2011-01-18 10:01:00|325    |1        |77183.6 |1           |74215         |1.04              |1.04          |1.04          |74215.0           |
|12347      |Iceland        |2011-12-07 15:52:00|2      |7        |4310.0  |7           |2458          |2.6440109890109893|12.75         |0.25          |13.505494505494505|
|12348      |Finland        |2011-09-25 13:13:00|75     |4        |1797.24 |4           |2341          |5.764838709677418 |40.0        

### Write dim_customers

In [63]:
(dim_customers.write
  .format("mongodb")
  .mode("overwrite")
  .option("database", "BigDataAssignment02")
  .option("collection", "dim_customers")
  .save())


### Build dim_products

In [65]:
from pyspark.sql import functions as F

country_dist = (df_feat
    .groupBy("StockCode", "Description", "Country") # Group by Description to include it in the struct
    .agg(
        F.sum(F.col("Quantity").cast("int")).alias("qty"),
        F.round(F.sum("revenue"), 2).alias("revenue")
    )
    .groupBy("StockCode", "Description") # Group by StockCode and Description for product aggregates
    .agg(
        F.collect_list(
            F.struct(
                F.col("Country").alias("country"),
                F.col("qty").alias("quantity_sold"),
                F.col("revenue").alias("country_revenue")
            )
        ).alias("country_sales_distribution")
    )
)

# Aggregate product-level features from df_feat
dim_products = df_feat.groupBy("StockCode", "Description").agg(
    F.sum("Quantity").alias("total_quantity_sold"),
    F.sum("revenue").alias("total_product_revenue"),
    F.avg("UnitPrice").alias("avg_unit_price"),
    F.max("UnitPrice").alias("max_unit_price"),
    F.min("UnitPrice").alias("min_unit_price")
).join(country_dist, on=["StockCode", "Description"], how="left_outer") \
.select(
    F.col("StockCode").alias("product_id"),
    F.col("Description").alias("description"),
    F.col("total_quantity_sold"),
    F.col("total_product_revenue"),
    F.col("avg_unit_price"),
    F.col("max_unit_price"),
    F.col("min_unit_price"),
    F.col("country_sales_distribution")
)

dim_products.show(3, truncate=False)

+----------+--------------------------------+-------------------+---------------------+------------------+--------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_id|description                     |total_quantity_sold|total_product_revenue|avg_unit_price    |max_unit_price|min_unit_price|country_sales_distribution                                                                                                                                                                                                                                                                                             |
+----------+--------------------------------+-------------------+---------------------+---------------

### Write dim_products

In [66]:
(dim_products.write
  .format("mongodb")
  .mode("overwrite")
  .option("database", "BigDataAssignment02")
  .option("collection", "dim_products")
  .save())


# Task 5 MongoDB Indexing & Write Optimization

### Configure MongoDB Connection (Environment Variables)

In [67]:
import os

os.environ["MONGO_URI"] = "mongodb+srv://cit23020025_db_user:Z98uYpIUdMYn0ylH@cluster0.0ew6syh.mongodb.net/?appName=Cluster0"
os.environ["MONGO_DB"] = "BigDataAssignment02"

### Install Required Libraries

In [68]:
!pip install pyspark pymongo



### Start Spark with MongoDB Connector

In [69]:
from pyspark.sql import SparkSession
import os

spark = (
    SparkSession.builder
    .appName("BigDataAssignment")
    .config("spark.jars.packages",
            "org.mongodb.spark:mongo-spark-connector_2.12:10.2.2")
    .config("spark.mongodb.connection.uri", os.environ["MONGO_URI"])
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")
print("Spark Connected to MongoDB")


Spark Connected to MongoDB


### Test MongoDB Connection

In [70]:
from pymongo import MongoClient

client = MongoClient(os.environ["MONGO_URI"])
db = client[os.environ["MONGO_DB"]]

print("Collections:", db.list_collection_names())


Collections: ['dim_customers', 'dim_products', 'fact_invoices']


### Write Gold datasets to MongoDB

In [71]:
import os

def write_to_mongo(df, collection_name):
    (df.write
      .format("mongodb")
      .mode("overwrite")  # use "append" if needed
      .option("database", os.environ["MONGO_DB"])
      .option("collection", collection_name)
      .save())

write_to_mongo(fact_invoices, "fact_invoices")
write_to_mongo(dim_customers, "dim_customers")
write_to_mongo(dim_products, "dim_products")

print("Gold collections written to MongoDB")

Gold collections written to MongoDB


### Create 4+ MongoDB indexes

In [73]:
from pymongo import MongoClient, ASCENDING, DESCENDING
import os

client = MongoClient(os.environ["MONGO_URI"])
db = client[os.environ["MONGO_DB"]]

# List of index names that might exist from previous runs or need to be dropped and recreated.
index_names_to_drop = [
    "idx_invoice_date",
    "idx_customer_id",
    "idx_customer_revenue",
    "idx_product_id",
    "idx_lineitems_stockcode",
    "idx_rfm_segment"
]

# Drop existing indexes to ensure clean creation (optional, but good for development)
for collection_name in db.list_collection_names():
    collection = db[collection_name]
    existing_indexes = collection.index_information().keys()
    for index_name in index_names_to_drop:
        if index_name in existing_indexes:
            try:
                collection.drop_index(index_name)
                print(f"Dropped index '{index_name}' from '{collection_name}'")
            except Exception as e:
                print(f"Error dropping index '{index_name}' from '{collection_name}': {e}")

# 1) fact_invoices: Index on invoice_datetime for time-series queries
if "fact_invoices" in db.list_collection_names():
    db.fact_invoices.create_index([("invoice_datetime", ASCENDING)], name="idx_invoice_date")
    # Index on customer_id for customer-specific invoice retrieval
    db.fact_invoices.create_index([("customer_id", ASCENDING)], name="idx_customer_id_fact")
    # Index for filtering by country and hour
    db.fact_invoices.create_index([("country", ASCENDING), ("invoice_hour", ASCENDING)], name="idx_country_hour")
    # Index on line_items.stock_code for product analytics within invoices
    db.fact_invoices.create_index([("line_items.stock_code", ASCENDING)], name="idx_lineitems_stockcode")

# 2) dim_customers: Index on customer_id for fast lookups and joins
if "dim_customers" in db.list_collection_names():
    db.dim_customers.create_index([("customer_id", ASCENDING)], name="idx_customer_id")
    # Index for RFM analysis or filtering by monetary value
    db.dim_customers.create_index([("monetary", DESCENDING)], name="idx_customer_monetary")
    # Index for recency-based queries
    db.dim_customers.create_index([("recency", ASCENDING)], name="idx_customer_recency")


# 3) dim_products: Index on product_id for fast lookups
if "dim_products" in db.list_collection_names():
    db.dim_products.create_index([("product_id", ASCENDING)], name="idx_product_id")
    # Index for filtering by total product revenue
    db.dim_products.create_index([("total_product_revenue", DESCENDING)], name="idx_product_revenue")

print("Indexes created")

Indexes created


### Show performance improvement

In [74]:
from datetime import datetime

query = {
    "Country": "United Kingdom",
    "InvoiceDate": {"$gte": datetime(2011, 1, 1), "$lt": datetime(2011, 12, 31)}
}

# Explain after indexes
exp = db.fact_invoices.find(query).explain()
print("nReturned:", exp["executionStats"]["nReturned"])
print("totalDocsExamined:", exp["executionStats"]["totalDocsExamined"])
print("executionTimeMillis:", exp["executionStats"]["executionTimeMillis"])

nReturned: 0
totalDocsExamined: 20002
executionTimeMillis: 18


In [76]:
from datetime import datetime
from pymongo import MongoClient
import os

# Assuming 'client' and 'db' are already available from previous cells' execution.
# If not, they would need to be re-initialized, e.g.:
# client = MongoClient(os.environ["MONGO_URI"])
# db = client[os.environ["MONGO_DB"]]

def summarize_explain_stats(explain_result):
    """
    Summarizes key execution statistics from a MongoDB explain plan.
    """
    execution_stats = explain_result.get("executionStats", {})
    return {
        "nReturned": execution_stats.get("nReturned", 0),
        "totalDocsExamined": execution_stats.get("totalDocsExamined", 0),
        "executionTimeMillis": execution_stats.get("executionTimeMillis", 0),
    }

# Corrected query using actual field names from fact_invoices: 'country' and 'invoice_datetime'
# The query in cell R62Oc_jR7mk9 used 'Country' and 'InvoiceDate', which are incorrect field names.
# This correction ensures the query can leverage the created indexes.
query_corrected_field_names = {
    "country": "United Kingdom",
    "invoice_datetime": {"$gte": datetime(2011, 1, 1), "$lt": datetime(2011, 12, 31, 23, 59, 59, 999999)} # Include full day for comparison
}

print("Running explain for the corrected query (after indexes):")
exp_after_corrected = db.fact_invoices.find(query_corrected_field_names).explain()
stats_after = summarize_explain_stats(exp_after_corrected)
print("After indexes (corrected query):", stats_after)

# To truly show 'Before' (without indexes) vs 'After' (with indexes) performance,
# one would typically perform these steps in a controlled environment:
# 1. Drop the relevant indexes on 'fact_invoices' collection.
#    # Example: db.fact_invoices.drop_index("idx_invoice_date")
# 2. Run: exp_before = db.fact_invoices.find(query_corrected_field_names).explain()
# 3. Recreate the indexes (as done in cell g6TFgp8v7W68).
# 4. Then run the explain again (as done above for exp_after_corrected).
# As this cell focuses on fixing the `SyntaxError` and showing performance *after* indexing
# with the correct query, the 'before' part is omitted for simplicity in this specific fix.

Running explain for the corrected query (after indexes):
After indexes (corrected query): {'nReturned': 16610, 'totalDocsExamined': 18442, 'executionTimeMillis': 25}


# Task 6 Analytics & Insights

### Monthly revenue trends

In [78]:
from pyspark.sql import functions as F

spark_monthly = (
    fact_invoices
    .withColumn("year_month", F.date_format("invoice_datetime", "yyyy-MM"))
    .groupBy("year_month")
    .agg(F.round(F.sum("invoice_total"), 2).alias("monthly_revenue"))
    .orderBy("year_month")
)

spark_monthly.show(20, truncate=False)

+----------+---------------+
|year_month|monthly_revenue|
+----------+---------------+
|2010-12   |821651.05      |
|2011-01   |695374.21      |
|2011-02   |531223.03      |
|2011-03   |747217.96      |
|2011-04   |539774.51      |
|2011-05   |772439.49      |
|2011-06   |760768.39      |
|2011-07   |719703.6       |
|2011-08   |758697.3       |
|2011-09   |1060225.36     |
|2011-10   |1152048.8      |
|2011-11   |1507833.87     |
|2011-12   |637790.33      |
+----------+---------------+



### Top customers by spend

In [80]:
top_customers = (
    fact_invoices
    .groupBy("customer_id")
    .agg(F.round(F.sum("invoice_total"), 2).alias("total_spend"),
         F.countDistinct("invoice_id").alias("invoice_count"))
    .orderBy(F.desc("total_spend"))
)

top_customers.show(10, truncate=False)

+-----------+-----------+-------------+
|customer_id|total_spend|invoice_count|
+-----------+-----------+-------------+
|15287      |1798818.88 |1430         |
|14646      |280206.02  |73           |
|18102      |259657.3   |60           |
|17450      |194390.79  |46           |
|16446      |168472.5   |2            |
|14911      |144895.39  |201          |
|12415      |124914.53  |21           |
|14156      |117210.08  |55           |
|17511      |94399.14   |31           |
|16029      |80850.84   |63           |
+-----------+-----------+-------------+
only showing top 10 rows



### Top products by revenue

In [81]:
items = fact_invoices.select(F.explode("line_items").alias("item"))

top_products = (
    items
    .groupBy(F.col("item.stock_code").alias("stock_code"))
    .agg(F.round(F.sum(F.col("item.line_revenue")), 2).alias("product_revenue"),
         F.sum(F.col("item.quantity")).alias("units_sold"))
    .orderBy(F.desc("product_revenue"))
)

top_products.show(10, truncate=False)


+----------+---------------+----------+
|stock_code|product_revenue|units_sold|
+----------+---------------+----------+
|DOT       |206248.77      |706       |
|22423     |174156.54      |13851     |
|23843     |168469.6       |80995     |
|85123A    |104462.75      |37641     |
|47566     |99445.23       |18283     |
|85099B    |94159.81       |48371     |
|23166     |81700.92       |78033     |
|POST      |78101.88       |3150      |
|M         |77750.27       |6984      |
|23084     |66870.03       |30739     |
+----------+---------------+----------+
only showing top 10 rows



### Country-level sales analysis

In [83]:
country_sales = (
    fact_invoices
    .groupBy("country")
    .agg(F.round(F.sum("invoice_total"), 2).alias("total_revenue"),
         F.countDistinct("invoice_id").alias("orders"))
    .orderBy(F.desc("total_revenue"))
)

country_sales.show(15, truncate=False)

+--------------+-------------+------+
|country       |total_revenue|orders|
+--------------+-------------+------+
|United Kingdom|9061634.72   |18019 |
|Netherlands   |285446.34    |94    |
|EIRE          |284324.74    |288   |
|Germany       |228678.4     |457   |
|France        |211142.05    |392   |
|Australia     |138453.81    |57    |
|Spain         |61558.56     |90    |
|Switzerland   |57067.6      |54    |
|Belgium       |41196.34     |98    |
|Sweden        |38367.83     |36    |
|Japan         |37461.94     |19    |
|Norway        |36165.44     |36    |
|Portugal      |33683.05     |58    |
|Finland       |22546.08     |41    |
|Singapore     |21279.29     |7     |
+--------------+-------------+------+
only showing top 15 rows



### Return or cancellation patterns

In [85]:
return_stats = (
    fact_invoices
    .groupBy("country")
    .agg(F.lit(0).alias("return_invoices"), # Since fact_invoices contains no returns
         F.count("invoice_id").alias("total_invoices")) # Counting invoice_id to avoid counting the whole row that is already filtered for invalid invoices
    .withColumn("return_rate", F.round(F.col("return_invoices") / F.col("total_invoices"), 4))
    .orderBy(F.desc("return_rate"))
)

return_stats.show(15, truncate=False)

+------------------+---------------+--------------+-----------+
|country           |return_invoices|total_invoices|return_rate|
+------------------+---------------+--------------+-----------+
|Sweden            |0              |36            |0.0        |
|Singapore         |0              |7             |0.0        |
|Germany           |0              |457           |0.0        |
|RSA               |0              |1             |0.0        |
|France            |0              |393           |0.0        |
|Greece            |0              |5             |0.0        |
|European Community|0              |4             |0.0        |
|Belgium           |0              |98            |0.0        |
|Finland           |0              |41            |0.0        |
|Malta             |0              |5             |0.0        |
|Unspecified       |0              |13            |0.0        |
|Italy             |0              |38            |0.0        |
|EIRE              |0              |289 

### MongoDB aggregation pipelines

In [86]:
def run_pipeline(coll, pipeline, limit=10):
    res = list(db[coll].aggregate(pipeline, allowDiskUse=True))
    for r in res[:limit]:
        print(r)
    return res


### Monthly revenue trends

In [87]:
pipeline_monthly = [
  {"$group": {
      "_id": {"$dateToString": {"format": "%Y-%m", "date": "$invoice_date"}},
      "monthly_revenue": {"$sum": "$revenue"},
      "orders": {"$sum": 1}
  }},
  {"$sort": {"_id": 1}},
  {"$project": {"_id": 0, "month": "$_id", "monthly_revenue": {"$round": ["$monthly_revenue", 2]}, "orders": 1}}
]

run_pipeline("fact_invoices", pipeline_monthly, limit=20)


{'orders': 20002, 'month': None, 'monthly_revenue': 0}


[{'orders': 20002, 'month': None, 'monthly_revenue': 0}]

### Top customers by spend

In [88]:
pipeline_top_customers = [
  {"$group": {"_id": "$customer_id", "total_spend": {"$sum": "$revenue"}, "orders": {"$sum": 1}}},
  {"$sort": {"total_spend": -1}},
  {"$limit": 10},
  {"$project": {"_id": 0, "customer_id": "$_id", "total_spend": {"$round": ["$total_spend", 2]}, "orders": 1}}
]

run_pipeline("fact_invoices", pipeline_top_customers)


{'orders': 2, 'customer_id': 17190, 'total_spend': 0}
{'orders': 1, 'customer_id': 17538, 'total_spend': 0}
{'orders': 7, 'customer_id': 14696, 'total_spend': 0}
{'orders': 1, 'customer_id': 13922, 'total_spend': 0}
{'orders': 23, 'customer_id': 14051, 'total_spend': 0}
{'orders': 4, 'customer_id': 14273, 'total_spend': 0}
{'orders': 1, 'customer_id': 16292, 'total_spend': 0}
{'orders': 1, 'customer_id': 12446, 'total_spend': 0}
{'orders': 6, 'customer_id': 16899, 'total_spend': 0}
{'orders': 7, 'customer_id': 16477, 'total_spend': 0}


[{'orders': 2, 'customer_id': 17190, 'total_spend': 0},
 {'orders': 1, 'customer_id': 17538, 'total_spend': 0},
 {'orders': 7, 'customer_id': 14696, 'total_spend': 0},
 {'orders': 1, 'customer_id': 13922, 'total_spend': 0},
 {'orders': 23, 'customer_id': 14051, 'total_spend': 0},
 {'orders': 4, 'customer_id': 14273, 'total_spend': 0},
 {'orders': 1, 'customer_id': 16292, 'total_spend': 0},
 {'orders': 1, 'customer_id': 12446, 'total_spend': 0},
 {'orders': 6, 'customer_id': 16899, 'total_spend': 0},
 {'orders': 7, 'customer_id': 16477, 'total_spend': 0}]

### Top products by revenue

In [89]:
pipeline_top_products = [
  {"$unwind": "$line_items"},
  {"$group": {
      "_id": "$line_items.stock_code",
      "product_revenue": {"$sum": "$line_items.line_revenue"},
      "units_sold": {"$sum": "$line_items.quantity"}
  }},
  {"$sort": {"product_revenue": -1}},
  {"$limit": 10},
  {"$project": {"_id": 0, "stock_code": "$_id", "product_revenue": {"$round": ["$product_revenue", 2]}, "units_sold": 1}}
]

run_pipeline("fact_invoices", pipeline_top_products)


{'units_sold': 706, 'stock_code': 'DOT', 'product_revenue': 206248.77}
{'units_sold': 13851, 'stock_code': '22423', 'product_revenue': 174156.54}
{'units_sold': 80995, 'stock_code': '23843', 'product_revenue': 168469.6}
{'units_sold': 37641, 'stock_code': '85123A', 'product_revenue': 104462.75}
{'units_sold': 18283, 'stock_code': '47566', 'product_revenue': 99445.23}
{'units_sold': 48371, 'stock_code': '85099B', 'product_revenue': 94159.81}
{'units_sold': 78033, 'stock_code': '23166', 'product_revenue': 81700.92}
{'units_sold': 3150, 'stock_code': 'POST', 'product_revenue': 78101.88}
{'units_sold': 6984, 'stock_code': 'M', 'product_revenue': 77750.27}
{'units_sold': 30739, 'stock_code': '23084', 'product_revenue': 66870.03}


[{'units_sold': 706, 'stock_code': 'DOT', 'product_revenue': 206248.77},
 {'units_sold': 13851, 'stock_code': '22423', 'product_revenue': 174156.54},
 {'units_sold': 80995, 'stock_code': '23843', 'product_revenue': 168469.6},
 {'units_sold': 37641, 'stock_code': '85123A', 'product_revenue': 104462.75},
 {'units_sold': 18283, 'stock_code': '47566', 'product_revenue': 99445.23},
 {'units_sold': 48371, 'stock_code': '85099B', 'product_revenue': 94159.81},
 {'units_sold': 78033, 'stock_code': '23166', 'product_revenue': 81700.92},
 {'units_sold': 3150, 'stock_code': 'POST', 'product_revenue': 78101.88},
 {'units_sold': 6984, 'stock_code': 'M', 'product_revenue': 77750.27},
 {'units_sold': 30739, 'stock_code': '23084', 'product_revenue': 66870.03}]

### Country-level sales analysis

In [90]:
pipeline_country = [
  {"$group": {"_id": "$country", "total_revenue": {"$sum": "$revenue"}, "orders": {"$sum": 1}}},
  {"$sort": {"total_revenue": -1}},
  {"$limit": 15},
  {"$project": {"_id": 0, "country": "$_id", "total_revenue": {"$round": ["$total_revenue", 2]}, "orders": 1}}
]

run_pipeline("fact_invoices", pipeline_country, limit=15)


{'orders': 7, 'country': 'Iceland', 'total_revenue': 0}
{'orders': 26, 'country': 'Channel Islands', 'total_revenue': 0}
{'orders': 16, 'country': 'Cyprus', 'total_revenue': 0}
{'orders': 2, 'country': 'Czech Republic', 'total_revenue': 0}
{'orders': 36, 'country': 'Sweden', 'total_revenue': 0}
{'orders': 19, 'country': 'Poland', 'total_revenue': 0}
{'orders': 20, 'country': 'Japan', 'total_revenue': 0}
{'orders': 6, 'country': 'Canada', 'total_revenue': 0}
{'orders': 5, 'country': 'Malta', 'total_revenue': 0}
{'orders': 1, 'country': 'Saudi Arabia', 'total_revenue': 0}
{'orders': 38, 'country': 'Italy', 'total_revenue': 0}
{'orders': 54, 'country': 'Switzerland', 'total_revenue': 0}
{'orders': 90, 'country': 'Spain', 'total_revenue': 0}
{'orders': 457, 'country': 'Germany', 'total_revenue': 0}
{'orders': 17, 'country': 'Austria', 'total_revenue': 0}


[{'orders': 7, 'country': 'Iceland', 'total_revenue': 0},
 {'orders': 26, 'country': 'Channel Islands', 'total_revenue': 0},
 {'orders': 16, 'country': 'Cyprus', 'total_revenue': 0},
 {'orders': 2, 'country': 'Czech Republic', 'total_revenue': 0},
 {'orders': 36, 'country': 'Sweden', 'total_revenue': 0},
 {'orders': 19, 'country': 'Poland', 'total_revenue': 0},
 {'orders': 20, 'country': 'Japan', 'total_revenue': 0},
 {'orders': 6, 'country': 'Canada', 'total_revenue': 0},
 {'orders': 5, 'country': 'Malta', 'total_revenue': 0},
 {'orders': 1, 'country': 'Saudi Arabia', 'total_revenue': 0},
 {'orders': 38, 'country': 'Italy', 'total_revenue': 0},
 {'orders': 54, 'country': 'Switzerland', 'total_revenue': 0},
 {'orders': 90, 'country': 'Spain', 'total_revenue': 0},
 {'orders': 457, 'country': 'Germany', 'total_revenue': 0},
 {'orders': 17, 'country': 'Austria', 'total_revenue': 0}]

### Return or cancellation patterns

In [91]:
pipeline_returns = [
  {"$group": {
      "_id": "$country",
      "return_invoices": {"$sum": {"$cond": [{"$eq": ["$is_return", True]}, 1, 0]}},
      "total_invoices": {"$sum": 1}
  }},
  {"$project": {
      "_id": 0,
      "country": "$_id",
      "return_invoices": 1,
      "total_invoices": 1,
      "return_rate": {"$round": [{"$divide": ["$return_invoices", "$total_invoices"]}, 4]}
  }},
  {"$sort": {"return_rate": -1}},
  {"$limit": 15}
]

run_pipeline("fact_invoices", pipeline_returns, limit=15)


{'return_invoices': 0, 'total_invoices': 20, 'country': 'Japan', 'return_rate': 0.0}
{'return_invoices': 0, 'total_invoices': 7, 'country': 'Iceland', 'return_rate': 0.0}
{'return_invoices': 0, 'total_invoices': 13, 'country': 'Unspecified', 'return_rate': 0.0}
{'return_invoices': 0, 'total_invoices': 17, 'country': 'Austria', 'return_rate': 0.0}
{'return_invoices': 0, 'total_invoices': 2, 'country': 'Czech Republic', 'return_rate': 0.0}
{'return_invoices': 0, 'total_invoices': 3, 'country': 'United Arab Emirates', 'return_rate': 0.0}
{'return_invoices': 0, 'total_invoices': 26, 'country': 'Channel Islands', 'return_rate': 0.0}
{'return_invoices': 0, 'total_invoices': 7, 'country': 'Singapore', 'return_rate': 0.0}
{'return_invoices': 0, 'total_invoices': 457, 'country': 'Germany', 'return_rate': 0.0}
{'return_invoices': 0, 'total_invoices': 36, 'country': 'Sweden', 'return_rate': 0.0}
{'return_invoices': 0, 'total_invoices': 58, 'country': 'Portugal', 'return_rate': 0.0}
{'return_invoi

[{'return_invoices': 0,
  'total_invoices': 20,
  'country': 'Japan',
  'return_rate': 0.0},
 {'return_invoices': 0,
  'total_invoices': 7,
  'country': 'Iceland',
  'return_rate': 0.0},
 {'return_invoices': 0,
  'total_invoices': 13,
  'country': 'Unspecified',
  'return_rate': 0.0},
 {'return_invoices': 0,
  'total_invoices': 17,
  'country': 'Austria',
  'return_rate': 0.0},
 {'return_invoices': 0,
  'total_invoices': 2,
  'country': 'Czech Republic',
  'return_rate': 0.0},
 {'return_invoices': 0,
  'total_invoices': 3,
  'country': 'United Arab Emirates',
  'return_rate': 0.0},
 {'return_invoices': 0,
  'total_invoices': 26,
  'country': 'Channel Islands',
  'return_rate': 0.0},
 {'return_invoices': 0,
  'total_invoices': 7,
  'country': 'Singapore',
  'return_rate': 0.0},
 {'return_invoices': 0,
  'total_invoices': 457,
  'country': 'Germany',
  'return_rate': 0.0},
 {'return_invoices': 0,
  'total_invoices': 36,
  'country': 'Sweden',
  'return_rate': 0.0},
 {'return_invoices': 0

# New Section

# New Section