# Daily Video Streaming ETL Processing

This notebook processes daily video streaming events data to calculate key metrics.

**Parameters:**
- **execution_date**: The date to process in YYYY-MM-DD format

In [0]:
# Set up widgets for parameters
dbutils.widgets.text("execution_date", "", "Execution Date (YYYY-MM-DD)")

# Get the execution date parameter
execution_date = dbutils.widgets.get("execution_date")
print(f"Raw execution_date parameter received: '{execution_date}'")

# Handle potential date format issues
import datetime
import re

# Clean up the execution date if it has extra characters
def clean_date_string(date_str):
    """Extract YYYY-MM-DD format from various possible date formats"""
    # If empty, use current date
    if not date_str or date_str.strip() == "":
        return datetime.datetime.now().strftime("%Y-%m-%d")
        
    # Try to find a pattern that looks like YYYY-MM-DD
    match = re.search(r'(\d{4})-(\d{1,2})-(\d{1,2})', date_str)
    if match:
        # Extract the matched date parts
        year, month, day = match.groups()
        # Pad month and day with leading zeros if needed
        month = month.zfill(2)
        day = day.zfill(2)
        return f"{year}-{month}-{day}"
    
    # Try other common formats
    try:
        # Try to parse as timestamp if it's a long number
        if date_str.isdigit() and len(date_str) > 8:
            timestamp = int(date_str) / 1000 if len(date_str) > 10 else int(date_str)
            dt = datetime.datetime.fromtimestamp(timestamp)
            return dt.strftime("%Y-%m-%d")
    except:
        pass
    
    # Return current date as fallback
    print(f"Could not parse date format from '{date_str}', using current date")
    return datetime.datetime.now().strftime("%Y-%m-%d")

# Clean up the date string
clean_execution_date = clean_date_string(execution_date)
if clean_execution_date != execution_date:
    print(f"Cleaned execution date to: '{clean_execution_date}'")
    execution_date = clean_execution_date

# Final validation
try:
    parsed_date = datetime.datetime.strptime(execution_date, "%Y-%m-%d")
    print(f"Using execution date: {parsed_date.strftime('%Y-%m-%d')}")
except ValueError:
    # If we still have an invalid date after cleaning, use current date
    current_date = datetime.datetime.now().strftime("%Y-%m-%d")
    print(f"Invalid date format after cleaning. Using current date: {current_date}")
    execution_date = current_date

## Define Data Paths

In [0]:
# Define data paths
base_path = "/pyspark/video-streaming-data"
events_path = f"{base_path}/module5-orchestration/scheduling/daily_events"
file_path = f"{events_path}/events_{execution_date}.csv"
output_path = f"{base_path}/module5-orchestration/scheduling/processed_metrics/date={execution_date}"

print(f"Input path: {file_path}")
print(f"Output path: {output_path}")

## Extract: Read Daily Events Data

In [0]:
# Check if file exists
import os

# Function to check for alternative dates if file is not found
def find_alternative_date(date_str, base_path, num_days=5):
    """Find a nearby date that has data if the requested date doesn't have data"""
    try:
        original_date = datetime.datetime.strptime(date_str, "%Y-%m-%d")
        
        # Try up to num_days before and after the requested date
        for i in range(1, num_days + 1):
            # Try earlier dates
            earlier_date = original_date - datetime.timedelta(days=i)
            earlier_str = earlier_date.strftime("%Y-%m-%d")
            earlier_path = f"{base_path}/events_{earlier_str}.csv"
            if os.path.exists(f"/dbfs/{earlier_path}"):
                return earlier_str
                
            # Try later dates
            later_date = original_date + datetime.timedelta(days=i)
            later_str = later_date.strftime("%Y-%m-%d")
            later_path = f"{base_path}/events_{later_str}.csv"
            if os.path.exists(f"/dbfs/{later_path}"):
                return later_str
                
        return None  # No alternative found
    except:
        return None

# Check if the file exists
if not os.path.exists(f"/dbfs/{file_path}"):
    print(f"WARNING: No data file found for date {execution_date}")
    
    # Try to find an alternative date
    alt_date = find_alternative_date(execution_date, events_path)
    if alt_date:
        print(f"Found alternative date with data: {alt_date}")
        execution_date = alt_date
        file_path = f"{events_path}/events_{execution_date}.csv"
        output_path = f"{base_path}/module5-orchestration/scheduling/processed_metrics/date={execution_date}"
        print(f"Using file: {file_path}")
    else:
        raise FileNotFoundError(f"No data file found for date {execution_date} or nearby dates")

# Read the daily events
print(f"Reading events from: {file_path}")
daily_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(file_path)

# Print record count and schema
print(f"Record count: {daily_df.count()}")
print("Schema:")
daily_df.printSchema()

# Show sample data
display(daily_df.limit(5))

In [0]:
# Step 1: Data cleaning
from pyspark.sql.functions import col, when, lit

# Remove duplicates and handle nulls
cleaned_df = daily_df \
    .dropDuplicates(["event_id"]) \
    .na.fill("", ["error_type"]) \
    .na.fill(0, ["buffering_count"]) \
    .filter(col("duration_seconds").isNotNull())

print(f"Record count after cleaning: {cleaned_df.count()}")

# Add processing timestamp
from pyspark.sql.functions import current_timestamp
cleaned_df = cleaned_df.withColumn("processing_time", current_timestamp())

In [0]:
# Step 2: Calculate basic metrics
from pyspark.sql.functions import count, avg, sum, round as spark_round, max as spark_max

# Calculate metrics by device type and country
device_country_metrics = cleaned_df.groupBy("device_type", "country") \
    .agg(
        count("*").alias("total_events"),
        spark_round(avg("duration_seconds"), 2).alias("avg_duration_seconds"),
        sum(when(col("error_type") != "", 1).otherwise(0)).alias("error_count"),
        spark_max("duration_seconds").alias("max_duration_seconds"),
        spark_round(avg("buffering_count"), 2).alias("avg_buffering_count")
    )

print("Device and Country Metrics:")
display(device_country_metrics)

In [0]:
# Step 3: Calculate content popularity metrics
content_metrics = cleaned_df.groupBy("content_id") \
    .agg(
        count("*").alias("view_count"),
        spark_round(avg("duration_seconds"), 2).alias("avg_view_duration"),
        sum(when(col("error_type") != "", 1).otherwise(0)).alias("error_count")
    ) \
    .orderBy(col("view_count").desc())

print("Top 10 Content by Views:")
display(content_metrics.limit(10))

In [0]:
# Step 4: Calculate hourly distribution
from pyspark.sql.functions import hour, from_utc_timestamp, col

# Extract hour from timestamp
hourly_df = cleaned_df.withColumn(
    "hour", hour(from_utc_timestamp(col("timestamp"), "UTC"))
)

# Calculate metrics by hour
hourly_metrics = hourly_df.groupBy("hour") \
    .agg(
        count("*").alias("event_count"),
        spark_round(avg("duration_seconds"), 2).alias("avg_duration")
    ) \
    .orderBy("hour")

print("Hourly Distribution:")
display(hourly_metrics)

## Load: Save Processed Metrics

In [0]:
# Ensure output directory exists
dbutils.fs.mkdirs(f"{output_path}")

# Save the primary metrics
print(f"Saving device/country metrics to: {output_path}/device_country_metrics")
device_country_metrics.write.mode("overwrite").parquet(f"{output_path}/device_country_metrics")

print(f"Saving content metrics to: {output_path}/content_metrics")
content_metrics.write.mode("overwrite").parquet(f"{output_path}/content_metrics")

print(f"Saving hourly metrics to: {output_path}/hourly_metrics")
hourly_metrics.write.mode("overwrite").parquet(f"{output_path}/hourly_metrics")

## Quality Checks

In [0]:
# Perform data quality checks
from pyspark.sql.functions import count, when, col, isnan

# Count total records processed
total_records = daily_df.count()
records_after_cleaning = cleaned_df.count()
dropped_record_count = total_records - records_after_cleaning

# Count records written
device_country_count = device_country_metrics.count()
content_metrics_count = content_metrics.count()
hourly_metrics_count = hourly_metrics.count()

# Check for negative durations
negative_durations = cleaned_df.filter(col("duration_seconds") < 0).count()

# Create a quality check summary
quality_checks = spark.createDataFrame([
    ("total_input_records", total_records),
    ("records_after_cleaning", records_after_cleaning),
    ("dropped_record_count", dropped_record_count),
    ("device_country_metrics_count", device_country_count),
    ("content_metrics_count", content_metrics_count),
    ("hourly_metrics_count", hourly_metrics_count),
    ("records_with_negative_duration", negative_durations)
], ["check_name", "value"])

# Save quality checks
quality_checks.write.mode("overwrite").parquet(f"{output_path}/quality_checks")

# Display quality summary
print("Data Quality Check Summary:")
display(quality_checks)

## Summary and Completion

In [0]:
print(f"ETL processing completed successfully for date: {execution_date}")
print(f"Processed {records_after_cleaning} records after cleaning")
print(f"Generated {device_country_count} device/country metric records")
print(f"Generated {content_metrics_count} content metric records")
print(f"Generated {hourly_metrics_count} hourly metric records")
print(f"All metrics saved to: {output_path}")

# Return success for the scheduler
dbutils.notebook.exit({
    "status": "success",
    "date": execution_date,
    "record_count": records_after_cleaning,
    "processing_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
})