<a href="https://colab.research.google.com/github/iarondon3/End-to-End-Retail-Data-Ecosystem/blob/main/05-Big-Data-Analytics/spark_analytics_lab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ‚öôÔ∏è Step 1: Initialize Spark Environment



In [None]:
import os
import time

start_time = time.time()
print("üì¶ Installing Dependencies (Java 8 + FindSpark)...")

# 1. Install Java 8 (The most stable version for Spark)
# We use 'apt-get' to force this specific version
os.system("apt-get update > /dev/null")
os.system("apt-get install openjdk-8-jdk-headless -qq > /dev/null")

# 2. Install Python Libraries
os.system("pip install pyspark findspark plotly faker > /dev/null 2>&1")

# 3. Set Environment Variables
# Force system to use Java 8
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# 4. Initialize Spark using FindSpark
import findspark
findspark.init()

print("üöÄ Starting Spark Session...")
from pyspark.sql import SparkSession
import plotly.express as px

try:
    spark = SparkSession.builder \
        .appName("Walgreens_BigData_Lab") \
        .master("local[*]") \
        .config("spark.ui.port", "4050") \
        .getOrCreate()

    elapsed = round(time.time() - start_time, 2)
    print(f"‚úÖ Spark Session Created successfully in {elapsed}s.")
    print(f"   Spark Version: {spark.version}")
    print(f"   Java Used: {os.environ['JAVA_HOME']}")

except Exception as e:
    print(f"‚ùå Error: {e}")

> **üîç What just happened?**

> We successfully set up a **Local Spark Environment** to simulate a professional Big Data platform like Databricks.
> * **Dependencies:** We installed **Java 8** and **PySpark**, which are the essential requirements to run Apache Spark.
> * **Engine Start:** We initialized the `SparkSession`. We are now ready to load and process massive datasets in the next steps.

#  üèóÔ∏è Step 2: Generate Clickstream Data (Raw Logs)

Simulating high-velocity web logs: Views, Cart Adds, and Payments.

In [None]:

# @markdown **Log Volume:**
LOG_ENTRIES = 90000 # @param {type:"slider", min:10000, max:100000, step:10000}

from faker import Faker
import random
from datetime import datetime, timedelta

fake = Faker()
print(f"üé≤ Generating {LOG_ENTRIES} raw web events...")

# --- WALGREENS PRODUCTS (Consistent) ---
PRODUCTS = [
    "Metformin 500mg", "Lisinopril 10mg", "Atorvastatin 20mg",
    "Allegra 24hr", "Zyrtec 10mg", "Claritin 24hr",
    "CeraVe Moisturizing Cream", "Neutrogena Hydro Boost",
    "Tylenol Extra Strength", "Advil Liqui-Gels"
]

events_data = []
# Simulation Logic:
# Users generate sessions. Most view, some add to cart, few purchase.
session_ids = [fake.uuid4() for _ in range(int(LOG_ENTRIES / 5))]

for _ in range(LOG_ENTRIES):
    session = random.choice(session_ids)

    # Weighted Randomness to simulate a "Leaky Funnel"
    # Notice: High probability of ERROR_PAYMENT vs PURCHASE_COMPLETE
    event_type = random.choices(
        ['VIEW_PRODUCT', 'ADD_TO_CART', 'CHECKOUT_START', 'PURCHASE_COMPLETE', 'ERROR_PAYMENT'],
        weights=[0.50, 0.25, 0.15, 0.03, 0.07], # <--- 70% failure rate at checkout step simulated here
        k=1
    )[0]

    events_data.append({
        "event_id": fake.uuid4(),
        "timestamp": fake.date_time_between(start_date='-1M', end_date='now').isoformat(),
        "session_id": session,
        "user_id": random.randint(1000, 5000), # Registered users
        "event_type": event_type,
        "product_name": random.choice(PRODUCTS),
        "device": random.choice(['Mobile App', 'Desktop Web', 'Mobile Web']),
        "metadata": {"os": random.choice(["iOS", "Android", "Windows"]), "browser": "Chrome"}
    })

# Create Spark DataFrame
# In real life, we would read from S3/Data Lake: spark.read.json("s3://logs/...")
df_raw = spark.createDataFrame(events_data)

print(f"‚úÖ Data Generation Complete. Raw DataFrame Schema:")
df_raw.printSchema()
print("   Sample Raw Event:")
df_raw.show(1, truncate=False)

> **üîç What just happened?**

> We generated a **Raw Clickstream Dataset** representing user interactions on the Walgreens website.
> * **Semi-Structured Data:** Unlike the clean tables in previous units, this data mimics JSON logs (common in Data Lakes), containing nested metadata and varying event types.
> * **The Scenario:** We simulated a "Leaky Funnel" where many users reach the checkout page but fail to complete the purchase due to technical errors.

# üîÑ Step 3: PySpark Transformation & Analysis

Finding insights in massive logs using Distributed Computing logic.

In [None]:
from pyspark.sql import functions as F

print("üõ†Ô∏è Executing Spark Transformations...")

# 1. FUNNEL ANALYSIS (Aggregation)
# We group by event type to see the drop-off
df_funnel = df_raw.groupBy("event_type").count().orderBy("count", ascending=False)

# 2. PAYMENT FAILURE DETECTION (Filter)
# Identifying technical issues at the gateway
df_failures = df_raw.filter(F.col("event_type") == "ERROR_PAYMENT")
failure_count = df_failures.count()
total_checkout_attempts = df_raw.filter(F.col("event_type").isin(["PURCHASE_COMPLETE", "ERROR_PAYMENT"])).count()
fail_rate = round((failure_count / total_checkout_attempts) * 100, 1)

# 3. HIGH INTENT / NO PURCHASE (Anti-Join Strategy)
# Find users who added to cart BUT did not complete purchase
cart_users = df_raw.filter(F.col("event_type") == "ADD_TO_CART").select("user_id", "product_name").distinct()
buyers = df_raw.filter(F.col("event_type") == "PURCHASE_COMPLETE").select("user_id").distinct()

# The Anti-Join: "Give me users in Cart bucket who are NOT in Buyers bucket"
# This is a classic Big Data pattern for retargeting
df_abandoned = cart_users.join(buyers, on="user_id", how="left_anti")

print("‚úÖ Analysis Complete.")
print(f"   - Payment Errors Detected: {failure_count}")
print(f"   - Critical Failure Rate: {fail_rate}% (Anomaly Detected!)")
print(f"   - Users with Abandoned Carts (Retargeting List): {df_abandoned.count()}")

print("\nüîé Top 5 Abandoned Products (Potential Pricing Issue?):")
df_abandoned.groupBy("product_name").count().orderBy("count", ascending=False).show(5, truncate=False)

> **üîç What just happened?**

> We utilized **Apache Spark** to perform distributed transformations on the log data.
> * **Anomaly Detection:** We calculated a failure rate at the payment gateway (simulated around ~70%), identifying a critical bottleneck in the sales funnel.
> * **Anti-Join Pattern:** We used a `left_anti` join to identify "High Intent" users‚Äîthose who added items to their cart but never appeared in the `PURCHASE_COMPLETE` list. This dataset is typically sent to marketing tools for retargeting campaigns.

# üìä Step 4: Visualize Insights (Databricks Simulation)
 Visualizing the Funnel Drop-off and Error sources.

In [None]:
# @title üìä Step 4: Visualize Insights (Databricks Simulation)
# Visualizing the Funnel Drop-off and Error sources.

import plotly.express as px
import pandas as pd

# Convert Spark DF to Pandas for Plotting (Standard practice for aggregated data)
pdf_funnel = df_funnel.toPandas()

# Define Logic Order for Funnel
funnel_order = ['VIEW_PRODUCT', 'ADD_TO_CART', 'CHECKOUT_START', 'PURCHASE_COMPLETE']
pdf_funnel = pdf_funnel[pdf_funnel['event_type'].isin(funnel_order)]

# Sort by custom order
pdf_funnel['event_type'] = pd.Categorical(pdf_funnel['event_type'], categories=funnel_order, ordered=True)
pdf_funnel = pdf_funnel.sort_values('event_type')

print("üìä Rendering Funnel Visualization...")

# Chart 1: The Funnel
# FIXED: Changed color sequence from 'Deep' to 'Viridis' (Universally supported)
fig = px.funnel(
    pdf_funnel,
    x='count',
    y='event_type',
    title='üìâ E-Commerce Conversion Funnel (User Drop-off)',
    labels={'event_type': 'Stage', 'count': 'Events'},
    color='count',
    color_discrete_sequence=px.colors.sequential.Viridis
)
fig.show()

# Chart 2: Error Analysis
# Show Error Distribution by Device
pdf_errors = df_raw.filter(F.col("event_type") == "ERROR_PAYMENT").groupBy("device").count().toPandas()

fig2 = px.pie(
    pdf_errors, values='count', names='device',
    title='‚ö†Ô∏è Payment Failures by Device (Technical Debugging)',
    hole=0.4,
    color_discrete_sequence=px.colors.qualitative.Pastel # Nice soft colors for pie charts
)
fig2.show()

> **üîç What just happened?**

> We visualized the processed data to communicate findings to stakeholders.
> * **Funnel Chart:** Clearly visualizes the drop-off at each stage of the user journey, highlighting the massive gap between "Checkout Start" and "Purchase Complete".
> * **Device Breakdown:** Helps engineers isolate if the payment error is specific to a platform (e.g., iOS App) or a general backend failure.