# CA Exam â€” Advanced Database & Big Data
**Landmark University**

In [None]:
!pip install pymongo pyspark pyngrok pandas -q

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
from pymongo import MongoClient
from pyngrok import ngrok
import pandas as pd
import json

ngrok.set_auth_token("38tqmWIWfCWtpoy3h8GzdyQRULA_DuY4PuGduyqnYTc3zGVV")

client = MongoClient("mongodb+srv://lmuij113_db_user:e03bEYLbqCpcetGk@cluster0.y3swaud.mongodb.net/?retryWrites=true&w=majority")
client.admin.command('ping')
print("connected")

In [None]:
orders = pd.read_csv("orders.csv")
items = pd.read_csv("order_items.csv")
products = pd.read_csv("products.csv")
print(orders.shape, items.shape, products.shape)
orders.head()

# Part A - MongoDB (35 marks)

## A1. Import (10 marks)

In [None]:
db = client['campusmart']

db.customers.drop()
db.orders.drop()
db.order_items.drop()

# customers
cust_list = []
with open('customers.json', 'r') as f:
    for line in f:
        if line.strip():
            cust_list.append(json.loads(line))
db.customers.insert_many(cust_list)

# orders
db.orders.insert_many(orders.to_dict('records'))

# order_items
db.order_items.insert_many(items.to_dict('records'))

print("customers:", db.customers.count_documents({}))
print("orders:", db.orders.count_documents({}))
print("order_items:", db.order_items.count_documents({}))

## A2. Indexes (5 marks)

In [None]:
db.orders.create_index("customer_id")
db.orders.create_index("order_ts")
db.order_items.create_index("order_id")
db.customers.create_index("country")

print("indexes created")
list(db.orders.list_indexes())

## A3. CRUD + queries (8 marks)

In [None]:
# A3.1 - top 5 customers in cameroon
cm_ids = [c["_id"] for c in db.customers.find({"country": "CM"}, {"_id": 1})]

top5 = list(db.orders.aggregate([
    {"$match": {"customer_id": {"$in": cm_ids}, "status": {"$in": ["paid", "shipped", "delivered"]}}},
    {"$group": {"_id": "$customer_id", "total": {"$sum": "$total_amount"}}},
    {"$sort": {"total": -1}},
    {"$limit": 5}
]))

for r in top5:
    print(f"{r['_id']}: ${r['total']:.2f}")

In [None]:
# A3.2 - add 20 loyalty points for customers >= 300 spending
high_spenders = list(db.orders.aggregate([
    {"$match": {"status": {"$in": ["paid", "shipped", "delivered"]}}},
    {"$group": {"_id": "$customer_id", "total": {"$sum": "$total_amount"}}},
    {"$match": {"total": {"$gte": 300}}}
]))

ids = [h["_id"] for h in high_spenders]
result = db.customers.update_many({"_id": {"$in": ids}}, {"$inc": {"loyalty_points": 20}})
print(f"updated {result.modified_count} customers")

In [None]:
# A3.3 - delete old cancelled orders
result = db.orders.delete_many({"status": "cancelled", "order_ts": {"$lt": "2025-10-01"}})
print(f"deleted {result.deleted_count} orders")

## A4. Aggregation (12 marks)

In [None]:
db.products.drop()
db.products.insert_many(products.to_dict('records'))

pipeline = [
    {"$match": {"status": "delivered"}},
    {"$lookup": {"from": "order_items", "localField": "order_id", "foreignField": "order_id", "as": "items"}},
    {"$unwind": "$items"},
    {"$lookup": {"from": "products", "localField": "items.product_id", "foreignField": "product_id", "as": "prod"}},
    {"$unwind": "$prod"},
    {"$group": {
        "_id": {"cat": "$prod.category", "pid": "$items.product_id"},
        "rev": {"$sum": "$items.line_total"},
        "orders": {"$addToSet": "$order_id"}
    }},
    {"$sort": {"_id.cat": 1, "rev": -1}},
    {"$group": {
        "_id": "$_id.cat",
        "total_rev": {"$sum": "$rev"},
        "prods": {"$push": {"pid": "$_id.pid", "rev": "$rev"}}
    }},
    {"$project": {"category": "$_id", "total_rev": 1, "top3": {"$slice": ["$prods", 3]}}}
]

for r in db.orders.aggregate(pipeline):
    print(f"\n{r['category']}: ${r['total_rev']:.2f}")
    for p in r['top3']:
        print(f"  {p['pid']}: ${p['rev']:.2f}")

# Part B - PySpark (40 marks)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("CampusMart").master("local[*]").getOrCreate()
sc = spark.sparkContext
spark

In [None]:
from pyngrok import ngrok

# Set ngrok authtoken
from google.colab import userdata
import os
NGROK_AUTH_TOKEN = userdata.get('NGROK_AUTH_TOKEN')
ngrok.set_auth_token(NGROK_AUTH_TOKEN)

# Kill any existing ngrok tunnels (both pyngrok managed and system-wide)
ngrok.kill()
!pkill ngrok || true

# Get Spark UI port
spark_ui_port = sc.uiWebUrl.split(':')[-1]
print(f"Spark UI port: {spark_ui_port}")

# Start ngrok tunnel
public_url = ngrok.connect(spark_ui_port)
print(f"Spark UI public URL: {public_url}")

## B1. Ingest (8 marks)

In [None]:
orders_df = spark.read.csv("orders.csv", header=True, inferSchema=True)
items_df = spark.read.csv("order_items.csv", header=True, inferSchema=True)
products_df = spark.read.csv("products.csv", header=True, inferSchema=True)
events_df = spark.read.json("events.jsonl")

print("orders:", orders_df.count())
print("items:", items_df.count())
print("products:", products_df.count())
print("events:", events_df.count())

In [None]:
orders_df.printSchema()
orders_df.show(5)

In [None]:
items_df.printSchema()
items_df.show(5)

In [None]:
products_df.printSchema()
products_df.show(5)

In [None]:
events_df.printSchema()
events_df.show(5)

## B2. Data quality + feature engineering (10 marks)

In [None]:
# convert timestamps and add features
orders_clean = orders_df.withColumn("order_ts", to_timestamp(col("order_ts")))
orders_clean = orders_clean.withColumn("order_day", to_date(col("order_ts")))
orders_clean = orders_clean.withColumn("order_week", weekofyear(col("order_ts")))

orders_clean.select("order_id", "order_ts", "order_day", "order_week").show(5)

In [None]:
events_clean = events_df.withColumn("event_ts", to_timestamp(col("event_ts")))
events_clean.show(5)

In [None]:
# validate amount
orders_clean = orders_clean.withColumn("expected_total", col("subtotal") + col("tax") + col("shipping_fee"))
orders_clean = orders_clean.withColumn("amount_error", col("total_amount") - col("expected_total"))

suspicious = orders_clean.filter(abs(col("amount_error")) > 0.05)
print(f"suspicious orders: {suspicious.count()} out of {orders_clean.count()}")
suspicious.show(5)

## B3. KPIs (12 marks)

In [None]:
# B3.1 top 10 products by revenue
delivered = orders_clean.filter(col("status") == "delivered")
with_items = delivered.join(items_df, "order_id")
with_prods = with_items.join(products_df, "product_id")

top10 = with_prods.groupBy("product_id", "product_name", "category").agg(
    sum("line_total").alias("revenue")
).orderBy(col("revenue").desc()).limit(10)

top10.show()

In [None]:
# B3.2 revenue by category per week (pivot)
with_week = with_prods.withColumn("week", weekofyear(col("order_ts")))
pivot_table = with_week.groupBy("category").pivot("week").agg(round(sum("line_total"), 2))
pivot_table.show()

In [None]:
# B3.3 repeat rate
cust_counts = delivered.groupBy("customer_id").count()
total = cust_counts.count()
repeat = cust_counts.filter(col("count") >= 2).count()
print(f"repeat rate: {(repeat/total)*100:.2f}%")

## B4. Clickstream analytics (10 marks)

In [None]:
# B4.1 conversion rate per device
conv = events_clean.groupBy("device").agg(
    count(when(col("event_type") == "view", 1)).alias("views"),
    count(when(col("event_type") == "payment_success", 1)).alias("payments")
)
conv = conv.withColumn("conv_rate", round((col("payments")/col("views"))*100, 2))
conv.show()

In [None]:
# B4.2 schema improvement for search
print("""Schema improvement for search queries:
- add event_type = 'search'
- add search_query column (string)
- add search_results_count (int)
This helps track what users are searching for""")

In [None]:
# B4.3 avg events per session
w = Window.partitionBy("session_id")
sess = events_clean.withColumn("cnt", count("*").over(w)).select("session_id", "cnt").distinct()
sess.agg(avg("cnt").alias("avg_events_per_session")).show()

# Part C - Hadoop/HDFS (25 marks)

## C1. HDFS basics (8 marks)

In [None]:
print("""HDFS commands:

1) hdfs dfs -mkdir -p /campusmart/raw/

2) hdfs dfs -put data/* /campusmart/raw/

3) hdfs dfs -ls /campusmart/raw/
   hdfs dfs -du -h /campusmart/raw/

4) hdfs dfs -get /campusmart/raw/orders.csv ./
""")

## C2. Spark on YARN (6 marks)

In [None]:
print("""spark-submit command:

spark-submit --master yarn --deploy-mode cluster \
    --driver-memory 2g --executor-memory 2g \
    --num-executors 4 \
    spark_job.py hdfs:///campusmart/raw/orders.csv
""")

## C3. Integration pipeline (11 marks)

In [None]:
print("""Pipeline architecture:

1. MongoDB stores raw data (customers, orders, order_items, products)
2. Daily spark job reads from mongo, computes weekly category revenue
3. Results written to HDFS as parquet + MongoDB weekly_kpis collection

Validation:
- check no nulls in category/revenue
- revenue > 0
- row count matches expected
""")

In [None]:
# run the pipeline
delivered = orders_clean.filter(col("status") == "delivered")
delivered = delivered.withColumn("year", year(col("order_ts")))

joined = delivered.join(items_df, "order_id").join(products_df, "product_id")

kpis = joined.groupBy("year", "order_week", "category").agg(
    round(sum("line_total"), 2).alias("revenue"),
    countDistinct("order_id").alias("orders")
).withColumn("processed_at", current_timestamp())

kpis.show(15)

In [None]:
# write to mongodb
kpis_pd = kpis.toPandas()
db.weekly_kpis.drop()
db.weekly_kpis.insert_many(kpis_pd.to_dict('records'))
print(f"inserted {db.weekly_kpis.count_documents({})} records")

In [None]:
client.close()
spark.stop()
print("done")