# Capstone Project: Event Analytics Pipeline
## Build Pipeline Orchestration Notebook

**Author:** Akshay Ravi (aravi7)  
**Purpose:** This notebook orchestrates the ETL pipeline for transforming raw e-commerce events from JSON to Parquet format.

### Pipeline Overview
```
Raw Events (JSON, gzipped)     →     Glue Spark ETL     →     Processed Events (Parquet)
s3://capstone-events-...            (with bookmarks)         s3://capstone-events-processed-...
```

### What This Notebook Does
1. **Configuration** - Set up AWS clients and parameters
2. **Upload ETL Script** - Deploy the PySpark transformation code to S3
3. **Discover Partitions** - Run MSCK REPAIR on raw events table
4. **Execute ETL Job** - Trigger Glue job with bookmark-enabled incremental processing
5. **Discover Output Partitions** - Run MSCK REPAIR on processed events table
6. **Validate** - Run test queries to verify the pipeline

### Idempotency
This notebook is designed to be run multiple times safely:
- ETL script upload overwrites existing script
- Glue job bookmarks ensure only new data is processed
- MSCK REPAIR only adds new partitions, doesn't duplicate

---
## 1. Configuration & Setup

In [1]:
import boto3
import time
from datetime import datetime
import pandas as pd

# =============================================================================
# CONFIGURATION - Update these values for your environment
# =============================================================================
STUDENT_ID = 'aravi7'
AWS_ACCOUNT_ID = '410367694421'
AWS_REGION = 'us-west-2'

# Derived names (match CloudFormation template)
SOURCE_BUCKET = f'capstone-events-{STUDENT_ID}-{AWS_ACCOUNT_ID}'
PROCESSED_BUCKET = f'capstone-events-processed-{STUDENT_ID}-{AWS_ACCOUNT_ID}'
GLUE_DATABASE = f'capstone_{STUDENT_ID}_db'
RAW_TABLE = 'raw_events'
PROCESSED_TABLE = 'processed_events'
GLUE_JOB_NAME = f'capstone-etl-{STUDENT_ID}'

# Initialize AWS clients
s3_client = boto3.client('s3', region_name=AWS_REGION)
glue_client = boto3.client('glue', region_name=AWS_REGION)
athena_client = boto3.client('athena', region_name=AWS_REGION)

print("Configuration:")
print(f"  Student ID:       {STUDENT_ID}")
print(f"  AWS Account:      {AWS_ACCOUNT_ID}")
print(f"  Region:           {AWS_REGION}")
print(f"  Source Bucket:    {SOURCE_BUCKET}")
print(f"  Processed Bucket: {PROCESSED_BUCKET}")
print(f"  Glue Database:    {GLUE_DATABASE}")
print(f"  Glue Job:         {GLUE_JOB_NAME}")

Configuration:
  Student ID:       aravi7
  AWS Account:      410367694421
  Region:           us-west-2
  Source Bucket:    capstone-events-aravi7-410367694421
  Processed Bucket: capstone-events-processed-aravi7-410367694421
  Glue Database:    capstone_aravi7_db
  Glue Job:         capstone-etl-aravi7


---
## 2. Verify Infrastructure Exists

Before proceeding, let's verify that the CloudFormation stack has created all required resources.

In [2]:
def check_bucket_exists(bucket_name):
    """Check if S3 bucket exists."""
    try:
        s3_client.head_bucket(Bucket=bucket_name)
        return True
    except:
        return False

def check_glue_database_exists(database_name):
    """Check if Glue database exists."""
    try:
        glue_client.get_database(Name=database_name)
        return True
    except:
        return False

def check_glue_table_exists(database_name, table_name):
    """Check if Glue table exists."""
    try:
        glue_client.get_table(DatabaseName=database_name, Name=table_name)
        return True
    except:
        return False

def check_glue_job_exists(job_name):
    """Check if Glue job exists."""
    try:
        glue_client.get_job(JobName=job_name)
        return True
    except:
        return False

# Verify all resources
print("Verifying infrastructure...\n")

checks = [
    ("Source Bucket", check_bucket_exists(SOURCE_BUCKET)),
    ("Processed Bucket", check_bucket_exists(PROCESSED_BUCKET)),
    ("Glue Database", check_glue_database_exists(GLUE_DATABASE)),
    ("Raw Events Table", check_glue_table_exists(GLUE_DATABASE, RAW_TABLE)),
    ("Processed Events Table", check_glue_table_exists(GLUE_DATABASE, PROCESSED_TABLE)),
    ("Glue ETL Job", check_glue_job_exists(GLUE_JOB_NAME)),
]

all_passed = True
for name, exists in checks:
    status = "✅" if exists else "❌"
    print(f"  {status} {name}")
    if not exists:
        all_passed = False

if all_passed:
    print("\n✅ All infrastructure components verified!")
else:
    print("\n❌ Some components missing. Please check your CloudFormation stack.")

Verifying infrastructure...

  ✅ Source Bucket
  ✅ Processed Bucket
  ✅ Glue Database
  ✅ Raw Events Table
  ✅ Processed Events Table
  ✅ Glue ETL Job

✅ All infrastructure components verified!


---
## 3. Check Raw Data Availability

Let's see how much raw data has been generated by the Lambda function.

In [3]:
def count_raw_files():
    """Count the number of raw event files in the source bucket."""
    paginator = s3_client.get_paginator('list_objects_v2')
    
    file_count = 0
    total_size = 0
    files = []
    
    for page in paginator.paginate(Bucket=SOURCE_BUCKET, Prefix='events/'):
        for obj in page.get('Contents', []):
            if obj['Key'].endswith('.jsonl.gz'):
                file_count += 1
                total_size += obj['Size']
                files.append({
                    'key': obj['Key'],
                    'size_mb': round(obj['Size'] / (1024*1024), 2),
                    'last_modified': obj['LastModified']
                })
    
    return file_count, total_size, files

file_count, total_size, files = count_raw_files()

print(f"Raw Data Summary:")
print(f"  Total files:     {file_count}")
print(f"  Total size:      {round(total_size / (1024*1024), 2)} MB")
print(f"  Est. events:     {file_count * 625000:,} (avg 625K per file)")
print(f"  Est. data hours: {file_count / 12:.1f} hours (12 files per hour)")


# Show recent files
print(f"\nMost recent files:")
for f in sorted(files, key=lambda x: x['last_modified'], reverse=True)[:5]:
    print(f"  {f['key']} ({f['size_mb']} MB)")

Raw Data Summary:
  Total files:     152
  Total size:      1537.28 MB
  Est. events:     95,000,000 (avg 625K per file)
  Est. data hours: 12.7 hours (12 files per hour)

Most recent files:
  events/year=2025/month=12/day=06/hour=21/minute=21/events-20251206-212118.jsonl.gz (9.27 MB)
  events/year=2025/month=12/day=06/hour=21/minute=16/events-20251206-211623.jsonl.gz (11.69 MB)
  events/year=2025/month=12/day=06/hour=21/minute=11/events-20251206-211122.jsonl.gz (11.46 MB)
  events/year=2025/month=12/day=06/hour=21/minute=06/events-20251206-210618.jsonl.gz (8.99 MB)
  events/year=2025/month=12/day=06/hour=21/minute=01/events-20251206-210116.jsonl.gz (8.23 MB)


---
## 4. Upload ETL Script to S3

The Glue job needs its PySpark script stored in S3. This script:
- Reads raw JSON events using Glue bookmarks (incremental processing)
- Parses timestamps and adds derived columns
- Writes output as partitioned Parquet files

In [4]:
 # ETL Script - PySpark code for transforming JSON to Parquet

ETL_SCRIPT = '''
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, to_date, hour, year, month, dayofmonth, lpad

# Initialize Glue context
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'SOURCE_DATABASE',
    'SOURCE_TABLE',
    'TARGET_PATH',
    'TARGET_DATABASE',
    'TARGET_TABLE'
])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print(f"Starting ETL job: {args['JOB_NAME']}")
print(f"Source: {args['SOURCE_DATABASE']}.{args['SOURCE_TABLE']}")
print(f"Target: {args['TARGET_PATH']}")

# Read from source table with bookmarks enabled
print("Reading source data with bookmarks...")

source_dyf = glueContext.create_dynamic_frame.from_catalog(
    database=args['SOURCE_DATABASE'],
    table_name=args['SOURCE_TABLE'],
    transformation_ctx="source_data"
)

record_count = source_dyf.count()
print(f"Records to process: {record_count:,}")

if record_count == 0:
    print("No new data to process. Exiting.")
    job.commit()
    sys.exit(0)

# Transform
print("Applying transformations...")

df = source_dyf.toDF()

df_transformed = df \
    .withColumn("timestamp", to_timestamp(col("timestamp"))) \
    .withColumn("event_date", to_date(col("timestamp"))) \
    .withColumn("event_hour", hour(col("timestamp"))) \
    .withColumn("year", year(col("timestamp")).cast("string")) \
    .withColumn("month", lpad(month(col("timestamp")).cast("string"), 2, "0")) \
    .withColumn("day", lpad(dayofmonth(col("timestamp")).cast("string"), 2, "0")) \
    .withColumn("hour", lpad(hour(col("timestamp")).cast("string"), 2, "0"))

df_final = df_transformed.select(
    "timestamp", "user_id", "session_id", "event_type", "product_id",
    "quantity", "price", "category", "search_query", "event_date", "event_hour",
    "year", "month", "day", "hour"
)

print("Schema after transformation:")
df_final.printSchema()

# Write to Parquet
print("Writing to Parquet...")

output_dyf = DynamicFrame.fromDF(df_final, glueContext, "output_data")

glueContext.write_dynamic_frame.from_options(
    frame=output_dyf,
    connection_type="s3",
    connection_options={
        "path": args['TARGET_PATH'],
        "partitionKeys": ["year", "month", "day", "hour"]
    },
    format="parquet",
    format_options={"compression": "snappy"},
    transformation_ctx="sink_data"
)

print(f"Successfully wrote {record_count:,} records to {args['TARGET_PATH']}")
job.commit()
print("Job completed successfully!")
'''


# Upload script to S3
script_key = 'scripts/etl_job.py'

s3_client.put_object(
    Bucket=PROCESSED_BUCKET,
    Key=script_key,
    Body=ETL_SCRIPT.encode('utf-8'),
    ContentType='text/x-python'
)

print(f"✅ ETL script uploaded to s3://{PROCESSED_BUCKET}/{script_key}")

✅ ETL script uploaded to s3://capstone-events-processed-aravi7-410367694421/scripts/etl_job.py


---
## 5. Discover Raw Data Partitions (MSCK REPAIR)

Before running the ETL job, we need to tell Glue about the partitions in our raw data. The Lambda function writes data with Hive-style partitioning, but the Glue catalog doesn't automatically discover new partitions.

In [5]:
def run_athena_query(query, database, wait=True):
    """
    Execute an Athena query and optionally wait for completion.
    Returns query execution ID and results if wait=True.
    """
    # Start query execution
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={'Database': database},
        ResultConfiguration={
            'OutputLocation': f's3://{PROCESSED_BUCKET}/athena-results/'
        }
    )
    
    execution_id = response['QueryExecutionId']
    print(f"Query started: {execution_id}")
    
    if not wait:
        return execution_id, None
    
    # Wait for query to complete
    while True:
        status_response = athena_client.get_query_execution(
            QueryExecutionId=execution_id
        )
        status = status_response['QueryExecution']['Status']['State']
        
        if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            break
        
        print(f"  Status: {status}...")
        time.sleep(2)
    
    if status == 'SUCCEEDED':
        print(f"  ✅ Query completed successfully")
        
        # Get results
        results = athena_client.get_query_results(
            QueryExecutionId=execution_id
        )
        return execution_id, results
    else:
        error_message = status_response['QueryExecution']['Status'].get('StateChangeReason', 'Unknown error')
        print(f"  ❌ Query failed: {error_message}")
        return execution_id, None


# Run MSCK REPAIR on raw_events table
print("Discovering partitions in raw_events table...")
print(f"Running: MSCK REPAIR TABLE {GLUE_DATABASE}.{RAW_TABLE}\n")

execution_id, results = run_athena_query(
    f"MSCK REPAIR TABLE {RAW_TABLE}",
    GLUE_DATABASE
)

Discovering partitions in raw_events table...
Running: MSCK REPAIR TABLE capstone_aravi7_db.raw_events

Query started: 5a8230dd-1e17-4eba-a089-2a3356480823
  Status: QUEUED...
  Status: RUNNING...
  Status: RUNNING...
  ✅ Query completed successfully


In [6]:
# Verify partitions were discovered
print("\nVerifying partitions in raw_events table...")

execution_id, results = run_athena_query(
    f"SELECT DISTINCT year, month, day, hour FROM {RAW_TABLE} ORDER BY year, month, day, hour",
    GLUE_DATABASE
)

if results:
    rows = results['ResultSet']['Rows'][1:]  # Skip header
    print(f"\nFound {len(rows)} distinct hour partitions:")
    for row in rows[:10]:  # Show first 10
        values = [col.get('VarCharValue', 'NULL') for col in row['Data']]
        print(f"  year={values[0]}/month={values[1]}/day={values[2]}/hour={values[3]}")
    if len(rows) > 10:
        print(f"  ... and {len(rows) - 10} more")


Verifying partitions in raw_events table...
Query started: 408eeea4-d760-41a5-b09c-bcfee9850068
  Status: QUEUED...
  Status: RUNNING...
  ✅ Query completed successfully

Found 14 distinct hour partitions:
  year=2025/month=12/day=06/hour=08
  year=2025/month=12/day=06/hour=09
  year=2025/month=12/day=06/hour=10
  year=2025/month=12/day=06/hour=11
  year=2025/month=12/day=06/hour=12
  year=2025/month=12/day=06/hour=13
  year=2025/month=12/day=06/hour=14
  year=2025/month=12/day=06/hour=15
  year=2025/month=12/day=06/hour=16
  year=2025/month=12/day=06/hour=17
  ... and 4 more


---
## 6. Run Glue ETL Job

Now we trigger the Glue ETL job. Key features:
- **Bookmarks enabled**: Only processes files not seen in previous runs
- **Idempotent**: Safe to run multiple times
- **Incremental**: New runs only process new data

In [7]:
def run_glue_job(job_name, wait=True):
    """
    Start a Glue job and optionally wait for completion.
    """
    print(f"Starting Glue job: {job_name}")
    
    # Start job run
    response = glue_client.start_job_run(JobName=job_name)
    run_id = response['JobRunId']
    print(f"Job run started: {run_id}")
    
    if not wait:
        return run_id, None
    
    # Wait for completion
    print("Waiting for job to complete...")
    
    while True:
        status_response = glue_client.get_job_run(
            JobName=job_name,
            RunId=run_id
        )
        status = status_response['JobRun']['JobRunState']
        
        if status in ['SUCCEEDED', 'FAILED', 'STOPPED', 'TIMEOUT']:
            break
        
        # Show progress
        elapsed = status_response['JobRun'].get('ExecutionTime', 0)
        print(f"  Status: {status} (elapsed: {elapsed}s)")
        time.sleep(30)  # Check every 30 seconds
    
    # Final status
    if status == 'SUCCEEDED':
        duration = status_response['JobRun'].get('ExecutionTime', 0)
        print(f"\n✅ Job completed successfully in {duration} seconds")
        return run_id, status_response['JobRun']
    else:
        error_message = status_response['JobRun'].get('ErrorMessage', 'Unknown error')
        print(f"\n❌ Job failed with status: {status}")
        print(f"   Error: {error_message}")
        return run_id, status_response['JobRun']


# Run the ETL job
run_id, job_run = run_glue_job(GLUE_JOB_NAME, wait=True)

Starting Glue job: capstone-etl-aravi7
Job run started: jr_11178f1ec6f01f52c67f0fd90bc10931f5b50b09fc5ecdfce16e48a390a8b859
Waiting for job to complete...
  Status: RUNNING (elapsed: 0s)
  Status: RUNNING (elapsed: 23s)
  Status: RUNNING (elapsed: 53s)
  Status: RUNNING (elapsed: 83s)
  Status: RUNNING (elapsed: 113s)
  Status: RUNNING (elapsed: 143s)
  Status: RUNNING (elapsed: 174s)
  Status: RUNNING (elapsed: 204s)
  Status: RUNNING (elapsed: 234s)
  Status: RUNNING (elapsed: 264s)
  Status: RUNNING (elapsed: 294s)
  Status: RUNNING (elapsed: 324s)
  Status: RUNNING (elapsed: 354s)
  Status: RUNNING (elapsed: 384s)
  Status: RUNNING (elapsed: 415s)
  Status: RUNNING (elapsed: 445s)
  Status: RUNNING (elapsed: 475s)
  Status: RUNNING (elapsed: 505s)
  Status: RUNNING (elapsed: 535s)
  Status: RUNNING (elapsed: 565s)
  Status: RUNNING (elapsed: 595s)
  Status: RUNNING (elapsed: 625s)
  Status: RUNNING (elapsed: 655s)
  Status: RUNNING (elapsed: 686s)
  Status: RUNNING (elapsed: 716s)


---
## ⚠️ Note on Incremental Processing

**This notebook demonstrates incremental ETL processing.**

The pipeline was initially run on **15 files (~8.5 million events, 1.2 hours of data)** to validate the setup. That first run completed in **254 seconds**.

This current execution is running **~14 hours later**, with **152 total files** now in the source bucket. Thanks to Glue job bookmarks, the ETL job will **only process the new 137 files** (~86 million new events), skipping the 15 files that were already processed in the first run.

This is the key value of bookmark-enabled incremental processing:
- ✅ No reprocessing of historical data
- ✅ Costs scale with new data only
- ✅ Idempotent - safe to run multiple times

---
## 7. Discover Processed Data Partitions

After the ETL job completes, we need to update the Glue catalog with the new Parquet partitions.

In [8]:
# Check what was written to the processed bucket
print("Checking processed data output...\n")

paginator = s3_client.get_paginator('list_objects_v2')
parquet_files = []

for page in paginator.paginate(Bucket=PROCESSED_BUCKET, Prefix='events/'):
    for obj in page.get('Contents', []):
        if obj['Key'].endswith('.parquet'):
            parquet_files.append({
                'key': obj['Key'],
                'size_mb': round(obj['Size'] / (1024*1024), 2)
            })

total_size = sum(f['size_mb'] for f in parquet_files)

print(f"Processed Data Summary:")
print(f"  Parquet files: {len(parquet_files)}")
print(f"  Total size:    {total_size:.2f} MB")

if parquet_files:
    print(f"\nSample files:")
    for f in parquet_files[:5]:
        print(f"  {f['key']} ({f['size_mb']} MB)")

Checking processed data output...

Processed Data Summary:
  Parquet files: 152
  Total size:    1875.24 MB

Sample files:
  events/year=2025/month=12/day=06/hour=08/part-00002-7305b1b3-ea16-4c0b-b116-81b0fd84c035.c000.snappy.parquet (14.35 MB)
  events/year=2025/month=12/day=06/hour=08/part-00005-7305b1b3-ea16-4c0b-b116-81b0fd84c035.c000.snappy.parquet (12.5 MB)
  events/year=2025/month=12/day=06/hour=08/part-00012-7305b1b3-ea16-4c0b-b116-81b0fd84c035.c000.snappy.parquet (12.17 MB)
  events/year=2025/month=12/day=06/hour=09/part-00000-7305b1b3-ea16-4c0b-b116-81b0fd84c035.c000.snappy.parquet (10.25 MB)
  events/year=2025/month=12/day=06/hour=09/part-00001-7305b1b3-ea16-4c0b-b116-81b0fd84c035.c000.snappy.parquet (10.03 MB)


In [9]:
# Run MSCK REPAIR on processed_events table
print("Discovering partitions in processed_events table...")
print(f"Running: MSCK REPAIR TABLE {GLUE_DATABASE}.{PROCESSED_TABLE}\n")

execution_id, results = run_athena_query(
    f"MSCK REPAIR TABLE {PROCESSED_TABLE}",
    GLUE_DATABASE
)

Discovering partitions in processed_events table...
Running: MSCK REPAIR TABLE capstone_aravi7_db.processed_events

Query started: b80d716d-5d85-407f-9480-fe27a0a5d356
  Status: QUEUED...
  Status: RUNNING...
  ✅ Query completed successfully


---
## 8. Validate Pipeline Output

Let's run some validation queries to ensure the pipeline worked correctly.

In [10]:
# Count records in processed table
print("Validating processed data...\n")

execution_id, results = run_athena_query(
    f"SELECT COUNT(*) as total_records FROM {PROCESSED_TABLE}",
    GLUE_DATABASE
)

if results:
    count = results['ResultSet']['Rows'][1]['Data'][0]['VarCharValue']
    print(f"Total records in processed_events: {int(count):,}")

Validating processed data...

Query started: 9aaf2fba-60ac-44e2-a3ad-248c4571e1a5
  Status: QUEUED...
  ✅ Query completed successfully
Total records in processed_events: 94,383,522


In [11]:
# Check event type distribution
print("Event type distribution:\n")

execution_id, results = run_athena_query(
    f"""
    SELECT 
        event_type, 
        COUNT(*) as count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
    FROM {PROCESSED_TABLE}
    GROUP BY event_type
    ORDER BY count DESC
    """,
    GLUE_DATABASE
)

if results:
    print(f"{'Event Type':<20} {'Count':>15} {'Percentage':>12}")
    print("-" * 50)
    for row in results['ResultSet']['Rows'][1:]:
        values = [col.get('VarCharValue', 'NULL') for col in row['Data']]
        print(f"{values[0]:<20} {int(values[1]):>15,} {values[2]:>11}%")

Event type distribution:

Query started: 760d4cb3-31e1-4c92-b5f2-49e79a71c688
  Status: QUEUED...
  ✅ Query completed successfully
Event Type                     Count   Percentage
--------------------------------------------------
page_view                 47,184,334       49.99%
add_to_cart               18,882,867       20.01%
remove_from_cart           9,444,106       10.01%
purchase                   9,439,073        10.0%
search                     9,433,142        9.99%


In [12]:
# Check partitions in processed data
print("Processed data partitions:\n")

execution_id, results = run_athena_query(
    f"""
    SELECT 
        year, month, day, hour,
        COUNT(*) as record_count
    FROM {PROCESSED_TABLE}
    GROUP BY year, month, day, hour
    ORDER BY year, month, day, hour
    """,
    GLUE_DATABASE
)

if results:
    print(f"{'Partition':<35} {'Records':>15}")
    print("-" * 52)
    for row in results['ResultSet']['Rows'][1:]:
        values = [col.get('VarCharValue', 'NULL') for col in row['Data']]
        partition = f"year={values[0]}/month={values[1]}/day={values[2]}/hour={values[3]}"
        print(f"{partition:<35} {int(values[4]):>15,}")

Processed data partitions:

Query started: 437ee074-936f-4f1e-b82c-31a41e0615c5
  Status: QUEUED...
  ✅ Query completed successfully
Partition                                   Records
----------------------------------------------------
year=2025/month=12/day=06/hour=08         1,965,478
year=2025/month=12/day=06/hour=09         7,127,272
year=2025/month=12/day=06/hour=10         7,375,367
year=2025/month=12/day=06/hour=11         7,877,870
year=2025/month=12/day=06/hour=12         7,720,866
year=2025/month=12/day=06/hour=13         7,390,843
year=2025/month=12/day=06/hour=14         7,453,672
year=2025/month=12/day=06/hour=15         7,699,679
year=2025/month=12/day=06/hour=16         7,370,158
year=2025/month=12/day=06/hour=17         7,208,048
year=2025/month=12/day=06/hour=18         7,476,351
year=2025/month=12/day=06/hour=19         7,240,978
year=2025/month=12/day=06/hour=20         7,427,761
year=2025/month=12/day=06/hour=21         3,049,179


---
## 9. Run the 5 Required Queries

Finally, let's run all 5 required queries to validate our analytical dataset.

In [13]:
# Query 1: Conversion Funnel
print("=" * 70)
print("QUERY 1: Conversion Funnel")
print("=" * 70)

query1 = f"""
SELECT 
    product_id,
    COUNT(CASE WHEN event_type = 'page_view' THEN 1 END) AS view_count,
    COUNT(CASE WHEN event_type = 'add_to_cart' THEN 1 END) AS add_to_cart_count,
    COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) AS purchase_count,
    ROUND(
        CAST(COUNT(CASE WHEN event_type = 'add_to_cart' THEN 1 END) AS DOUBLE) / 
        NULLIF(COUNT(CASE WHEN event_type = 'page_view' THEN 1 END), 0) * 100, 
        2
    ) AS view_to_cart_rate_pct,
    ROUND(
        CAST(COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) AS DOUBLE) / 
        NULLIF(COUNT(CASE WHEN event_type = 'add_to_cart' THEN 1 END), 0) * 100, 
        2
    ) AS cart_to_purchase_rate_pct
FROM {PROCESSED_TABLE}
WHERE product_id IS NOT NULL
GROUP BY product_id
ORDER BY view_count DESC
LIMIT 10
"""

execution_id, results = run_athena_query(query1, GLUE_DATABASE)

if results:
    print(f"\n{'Product':<12} {'Views':>10} {'Carts':>10} {'Purchases':>10} {'V→C %':>10} {'C→P %':>10}")
    print("-" * 65)
    for row in results['ResultSet']['Rows'][1:]:
        values = [col.get('VarCharValue', 'NULL') for col in row['Data']]
        print(f"{values[0]:<12} {int(values[1]):>10,} {int(values[2]):>10,} {int(values[3]):>10,} {values[4]:>9}% {values[5]:>9}%")

QUERY 1: Conversion Funnel
Query started: 6040ef23-1ae9-45ab-a188-eb4db0595d85
  Status: QUEUED...
  ✅ Query completed successfully

Product           Views      Carts  Purchases      V→C %      C→P %
-----------------------------------------------------------------
p_3001        1,575,319    629,563    313,829     39.96%     49.85%
p_1005        1,574,487    627,893    314,460     39.88%     50.08%
p_1001        1,574,465    629,406    315,007     39.98%     50.05%
p_5003        1,574,392    629,643    314,771     39.99%     49.99%
p_3004        1,574,301    630,454    313,983     40.05%      49.8%
p_1003        1,573,627    630,137    314,988     40.04%     49.99%
p_5004        1,573,607    629,010    314,122     39.97%     49.94%
p_1002        1,573,515    628,239    314,772     39.93%      50.1%
p_5002        1,573,440    628,542    314,407     39.95%     50.02%
p_6005        1,573,205    630,315    314,374     40.07%     49.88%


In [14]:
# Query 2: Hourly Revenue
print("=" * 70)
print("QUERY 2: Hourly Revenue")
print("=" * 70)

query2 = f"""
SELECT 
    DATE_TRUNC('hour', timestamp) AS revenue_hour,
    ROUND(SUM(price * quantity), 2) AS total_revenue,
    COUNT(*) AS purchase_count,
    ROUND(AVG(price * quantity), 2) AS avg_order_value
FROM {PROCESSED_TABLE}
WHERE event_type = 'purchase'
GROUP BY DATE_TRUNC('hour', timestamp)
ORDER BY revenue_hour
"""

execution_id, results = run_athena_query(query2, GLUE_DATABASE)

if results:
    print(f"\n{'Hour':<25} {'Revenue':>15} {'Purchases':>12} {'Avg Order':>12}")
    print("-" * 65)
    for row in results['ResultSet']['Rows'][1:]:
        values = [col.get('VarCharValue', 'NULL') for col in row['Data']]
        print(f"{values[0]:<25} ${float(values[1]):>14,.2f} {int(values[2]):>12,} ${float(values[3]):>11,.2f}")

QUERY 2: Hourly Revenue
Query started: 6d7a14b6-0b71-4730-a7a1-3614f7597793
  Status: QUEUED...
  ✅ Query completed successfully

Hour                              Revenue    Purchases    Avg Order
-----------------------------------------------------------------
2025-12-06 08:00:00.000   $ 91,526,394.67      196,718 $     465.27
2025-12-06 09:00:00.000   $331,330,239.79      712,862 $     464.79
2025-12-06 10:00:00.000   $343,023,225.10      737,960 $     464.83
2025-12-06 11:00:00.000   $366,600,468.53      788,002 $     465.23
2025-12-06 12:00:00.000   $358,347,867.81      771,372 $     464.56
2025-12-06 13:00:00.000   $343,224,052.85      738,759 $     464.60
2025-12-06 14:00:00.000   $346,099,943.73      744,343 $     464.97
2025-12-06 15:00:00.000   $358,354,212.65      770,715 $     464.96
2025-12-06 16:00:00.000   $343,080,118.47      737,882 $     464.95
2025-12-06 17:00:00.000   $335,286,832.18      720,866 $     465.12
2025-12-06 18:00:00.000   $347,664,570.00      747,385 $

In [15]:
# Query 3: Top 10 Products by Views
print("=" * 70)
print("QUERY 3: Top 10 Products by Views")
print("=" * 70)

query3 = f"""
SELECT 
    product_id,
    category,
    COUNT(*) AS view_count
FROM {PROCESSED_TABLE}
WHERE event_type = 'page_view'
GROUP BY product_id, category
ORDER BY view_count DESC
LIMIT 10
"""

execution_id, results = run_athena_query(query3, GLUE_DATABASE)

if results:
    print(f"\n{'Rank':<6} {'Product':<12} {'Category':<15} {'Views':>12}")
    print("-" * 50)
    for i, row in enumerate(results['ResultSet']['Rows'][1:], 1):
        values = [col.get('VarCharValue', 'NULL') for col in row['Data']]
        print(f"{i:<6} {values[0]:<12} {values[1]:<15} {int(values[2]):>12,}")

QUERY 3: Top 10 Products by Views
Query started: fc058e37-e494-46b6-a65b-0f8694cf38e4
  Status: QUEUED...
  ✅ Query completed successfully

Rank   Product      Category               Views
--------------------------------------------------
1      p_3001       home               1,575,319
2      p_1005       electronics        1,574,487
3      p_1001       electronics        1,574,465
4      p_5003       sports             1,574,392
5      p_3004       home               1,574,301
6      p_1003       electronics        1,573,627
7      p_5004       sports             1,573,607
8      p_1002       electronics        1,573,515
9      p_5002       sports             1,573,440
10     p_6005       toys               1,573,205


In [16]:
# Query 4: Category Performance
print("=" * 70)
print("QUERY 4: Category Performance (Daily)")
print("=" * 70)

query4 = f"""
SELECT 
    category,
    event_date,
    COUNT(*) AS total_events,
    COUNT(CASE WHEN event_type = 'page_view' THEN 1 END) AS page_views,
    COUNT(CASE WHEN event_type = 'add_to_cart' THEN 1 END) AS add_to_carts,
    COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) AS purchases
FROM {PROCESSED_TABLE}
WHERE category IS NOT NULL
GROUP BY category, event_date
ORDER BY event_date, category
"""

execution_id, results = run_athena_query(query4, GLUE_DATABASE)

if results:
    print(f"\n{'Category':<15} {'Date':<12} {'Total':>10} {'Views':>10} {'Carts':>10} {'Purchases':>10}")
    print("-" * 70)
    for row in results['ResultSet']['Rows'][1:15]:  # Show first 15 rows
        values = [col.get('VarCharValue', 'NULL') for col in row['Data']]
        print(f"{values[0]:<15} {values[1]:<12} {int(values[2]):>10,} {int(values[3]):>10,} {int(values[4]):>10,} {int(values[5]):>10,}")
    
    total_rows = len(results['ResultSet']['Rows']) - 1
    if total_rows > 15:
        print(f"... and {total_rows - 15} more rows")

QUERY 4: Category Performance (Daily)
Query started: 54e36e12-2807-4ab9-a7de-69db0641bea8
  Status: QUEUED...
  ✅ Query completed successfully

Category        Date              Total      Views      Carts  Purchases
----------------------------------------------------------------------
books           2025-12-06   14,150,338  7,859,415  3,143,641  1,574,437
clothing        2025-12-06   14,157,363  7,861,952  3,148,340  1,574,409
electronics     2025-12-06   14,163,313  7,867,771  3,146,438  1,573,472
home            2025-12-06   14,165,166  7,868,531  3,150,940  1,571,755
sports          2025-12-06   14,158,856  7,864,183  3,147,361  1,572,759
toys            2025-12-06   14,155,344  7,862,482  3,146,147  1,572,241


In [17]:
# Query 5: User Activity
print("=" * 70)
print("QUERY 5: User Activity (Daily)")
print("=" * 70)

query5 = f"""
SELECT 
    event_date,
    COUNT(DISTINCT user_id) AS unique_users,
    COUNT(DISTINCT session_id) AS unique_sessions,
    COUNT(*) AS total_events,
    ROUND(CAST(COUNT(*) AS DOUBLE) / COUNT(DISTINCT session_id), 2) AS events_per_session
FROM {PROCESSED_TABLE}
GROUP BY event_date
ORDER BY event_date
"""

execution_id, results = run_athena_query(query5, GLUE_DATABASE)

if results:
    print(f"\n{'Date':<12} {'Users':>12} {'Sessions':>12} {'Events':>15} {'Events/Session':>15}")
    print("-" * 70)
    for row in results['ResultSet']['Rows'][1:]:
        values = [col.get('VarCharValue', 'NULL') for col in row['Data']]
        print(f"{values[0]:<12} {int(values[1]):>12,} {int(values[2]):>12,} {int(values[3]):>15,} {float(values[4]):>15.2f}")

QUERY 5: User Activity (Daily)
Query started: 55562d2d-b894-489e-bd2c-462b756d3600
  Status: QUEUED...
  Status: RUNNING...
  ✅ Query completed successfully

Date                Users     Sessions          Events  Events/Session
----------------------------------------------------------------------
2025-12-06         90,000       90,000      94,383,522         1048.71


---
## 10. Summary & Deliverables


DELIVERABLES:

1. CloudFormation Template:
   s3://au2025-csed516-aravi7/capstone/capstone-starter.cfn.yaml

2. Orchestration Notebook:
   s3://au2025-csed516-aravi7/capstone/build_pipeline.ipynb (this file)

3. S3 URI of Analytical Dataset:
   s3://capstone-events-processed-aravi7-410367694421/events/

4. Queries File:
   s3://au2025-csed516-aravi7/capstone/queries.sql


RESOURCES CREATED:

 - Source Bucket:    s3://capstone-events-aravi7-410367694421/

- Processed Bucket: s3://capstone-events-processed-aravi7-410367694421/

- Glue Database:    capstone_aravi7_db

- Raw Table:        capstone_aravi7_db.raw_events

- Processed Table:  capstone_aravi7_db.processed_events

- ETL Job:          capstone-etl-aravi7