In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
import logging

# Initialize Spark session
spark = SparkSession.builder.appName("VideoStreamingETL").getOrCreate()

# Reduce verbose logging
spark.sparkContext.setLogLevel("ERROR")

# Configure logging to suppress warnings
logging.getLogger("py4j").setLevel(logging.ERROR)

In [None]:
viewing_history_path = "video_streaming_data/viewing_history.csv"
users_path = "video_streaming_data/users.json"

viewing_history_df = spark.read.option("header", "true").csv(viewing_history_path)
users_df = spark.read.option("multiline", "true").json(users_path)

In [None]:
# Write data to Parquet format, Limit rows during development to reduce memory pressure in this lab environment
sample_df = viewing_history.orderBy("timestamp_column").limit(100000)

# Write to disk
sample_df.write.mode("overwrite").parquet(f"{processed_path}/viewing_history.parquet")

print(" Viewing history successfully written to Parquet.")

In [None]:
# Write the sample data with partitioning
viewing_history.write.partitionBy("device_type").mode("overwrite").parquet(f"{processed_path}/viewing_history_partitioned")
print(" viewing history data has been successfully written with partitioning!")

# Verify Partitioning by checking folder
import os
partitioned_path = f"{processed_path}/viewing_history_partitioned"
print("Partitions created:", os.listdir(partitioned_path))

# Verify partitioned column values
df_partitioned = spark.read.parquet(partitioned_path)
df_partitioned.select("device_type").distinct().show()

In [None]:
# Bucket data to sort by video_id making operations like joins or aggregations more efficient when filtering by video_id

viewing_history.write.bucketBy(10, "video_id")
.sortBy("video_id")
.mode("overwrite")
.format("parquet")
.option("path", f"{processed_path}/viewing_history_bucketed")
.saveAsTable("viewing_history_bucketed")

print(" Data successfully written with bucketing!")

# Check if the table exists
spark.catalog.listTables()

# Verify bucketing in table schema
spark.sql("DESCRIBE FORMATTED viewing_history_bucketed").show()

# Read the table from Spark's Metastore
bucketed_df = spark.read.table("viewing_history_bucketed")
bucketed_df.show(5)

# Query execution time before bucketing
import time
from pyspark.sql.functions import col

start_time = time.time()
viewing_history.filter(col("video_id") == '56789').show()
end_time = time.time()

print(f" Query Execution Time (Before Bucketing): {end_time - start_time:.4f} seconds")

# Query execution time after bucketing
start_time = time.time()
spark.sql("SELECT * FROM viewing_history_bucketed WHERE video_id = '56789'").show()
end_time = time.time()

print(f" Query Execution Time (After Bucketing): {end_time - start_time:.4f} seconds")

In [None]:
from pyspark.sql.functions import col, sum as _sum, when

# Get the Null counts in viewing_history
print(" Null counts in viewing_history_df:")
viewing_history_df.select([
_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
for c in viewing_history_df.columns
]).show()

# Get the Null counts in users_df
print(" Null counts in users_df:")
users_df.select([
_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
for c in users_df.columns
]).show()

# Fill missing device_type with "Unknown"
viewing_history_df = viewing_history_df.fillna({"device_type": "Unknown"})

# Drop account_status column (mostly null)
if "account_status" in viewing_history_df.columns:
viewing_history_df = viewing_history_df.drop("account_status")

# Drop rows missing user_id or watched_at (critical fields)
viewing_history_df = viewing_history_df.dropna(subset=["user_id", "watched_at"])

# Fill preferred_language with "Unknown"
users_df = users_df.fillna({"preferred_language": "Unknown"})

# Fill missing subscription_date with a placeholder
users_df = users_df.fillna({"subscription_date": "2020-01-01"})
# Verify by rerunning
from pyspark.sql.functions import col, sum as _sum, when

In [None]:
# Check for duplicates

users_total = users_df.count()
unique_user_ids = users_df.select("user_id").distinct().count()

# Print total rows, distinct rows, and duplicate rows
print(f"Total user records: {users_total}")
print(f"Unique user_id count: {unique_user_ids}")
print(f"Duplicate user rows: {users_total - unique_user_ids}")

# Remove duplicates based on user_id
users_df = users_df.dropDuplicates(["user_id"])

# Confirm cleanup
print(f" Users after deduplication: {users_df.count()} rows")

In [None]:
# Check the current schema
viewing_history_df.printSchema()

from pyspark.sql.functions import to_timestamp, col

# Convert user_id to IntegerType
viewing_history_df = viewing_history_df.withColumn("user_id", col("user_id").cast("int"))

# Convert watched_at to TimestampType
viewing_history_df = viewing_history_df.withColumn("watched_at", to_timestamp(col("watched_at")))

# Confirm the changes
viewing_history_df.printSchema()
viewing_history_df.select("user_id", "watched_at").show(5)

In [None]:
# Optional: Cast user_id in both dataframes
viewing_history_df = viewing_history_df.withColumn("user_id", col("user_id").cast("int"))
users_df = users_df.withColumn("user_id", col("user_id").cast("int"))

# Make sure both DataFrames are clean and have user_id
viewing_history_df.select("user_id").show(3)
users_df.select("user_id").show(3)

# Join both DataFrames
full_df = viewing_history_df.join(users_df, on="user_id", how="inner")

# Preview Results
full_df.select("user_id", "video_id", "watched_at", "email", "preferred_language").show(5)

In [None]:
# Count Views per User per Video
from pyspark.sql.functions import count

# Count how many times each user watched each video
most_watched_df = full_df.groupBy("user_id", "video_id").agg(
count("*").alias("watch_count")
)

# Show most watched videos overall
most_watched_df.orderBy("watch_count", ascending=False).show(10)

## Add Total Views per User
# Count total watch events per user
total_views_df = full_df.groupBy("user_id").agg(
count("*").alias("total_views")
)

# Show most active users
total_views_df.orderBy("total_views", ascending=False).show(10)

### Rank Top 3 Most-Watched Videos per User
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Create window spec to rank videos within each user
window_spec = Window.partitionBy("user_id").orderBy(col("watch_count").desc())

# Add ranking column
ranked_df = most_watched_df.withColumn("rank", row_number().over(window_spec))

# Filter to get top 3 videos per user
top_videos_per_user = ranked_df.filter(col("rank") <= 3)

# Show results
top_videos_per_user.orderBy("user_id", "rank").show(10)

In [None]:
# Preview device types
viewing_history_df.select("device_type").distinct().show()
### Define a Python UDF to clean device labels
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the normalization logic
def normalize_device(device):
if device is None:
return "Unknown"
device = device.lower()
if "iphone" in device:
return "iPhone"
elif "android" in device:
return "Android"
else:
return "Other"

# Register as a PySpark UDF
normalize_device_udf = udf(normalize_device, StringType())
### Apply the UDF to your DataFrame
# Create a new column with normalized values
viewing_history_df = viewing_history_df.withColumn(
"normalized_device", normalize_device_udf(viewing_history_df["device_type"])
)
### Inspect the result
viewing_history_df.select("device_type", "normalized_device").distinct().show()

 PySpark based on **RDDs**, provides **distributed** **computing** capabilities that **scale** **horizontally** across clusters, handling **terabytes** of data.

1. Transformations and Actions:
Transformations (like select(), filter(), reshape()) are lazy operations that build execution plans
Actions (like show(), count()) trigger actual computation and return results

2. Optimization Techniques:
Predicate pushdown, column pruning, and caching to improve performance
Partitioning strategies for efficient data storage and retrieval
Execution plan analysis for query optimization

3. Advanced Analytics:
Window functions for complex analytical queries
Pivot operations for reshaping datasets
Approximation algorithms for efficient large-scale analytics

Session

In [None]:
#Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark Transformations").getOrCreate()

Loading Data

In [None]:
from pyspark.storagelevel import StorageLevel

# Parquet is a columnar storage format that offers efficient compression and encoding schemes, making it ideal for big data processing.
def load_dataframe(spark):
    df = spark.read.parquet("transaction_data.parquet")
    return df, df.storageLevel

df, storage_level = load_dataframe(spark)
print("Storage level before caching:", storage_level)


Caching

In [None]:
# Default caching is equivalent to df.persist(StorageLevel.MEMORY_AND_DISK) Other Options are: MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY
def cache_dataframe(df):
    df.cache()
    df.count()
    is_cached = df.storageLevel != StorageLevel.NONE
    return df, is_cached, df.storageLevel

df, is_cached, storage_level = cache_dataframe(df)
print("Is DataFrame cached:", is_cached)
print("Storage level after caching:", storage_level)

Debugging

In [None]:
# Understanding execution plans helps identify bottlenecks and helps in query Debugging
def exe_plan(df):
  plan_str = df._jdf.queryExecution().toString()
  return plan_str

Transformation

In [None]:
 # Column Pruning, Partitioning, Predicate pushdown(filter rows on condition)
 def filter_df(df):
  selected_df = df.select("transaction_id", "customer_id", "amount", "category", "transaction_date")
  df_with_partitions = selected_df.withColumn("year", year(df["transaction_date"])).withColumn("month", month(df["transaction_date"]))
  df_with_partitions.write.partitionBy("year", "month").mode("overwrite").parquet(output_path)
  filtered_df = df_with_partitions.filter(col("amount") > 50)
  filtered_df.explain(extended=True)
# The .explain() method shows the logical and physical plans for the query. Look for terms like "PushedFilters" or "PartitionFilters" in the plan to confirm pushdown.
  category_filtered_df = filtered_df.filter(col("category").isin(["Clothes", "Electronics"]))
  return category_filtered_df

 # Data Enrichment
def enrich_df(df):
  df_with_rounded = df.withColumn("rounded_amount", round(col("amount"), 0))
  df_with_desc = df_with_rounded.withColumn("transaction_desc", concat(lit("Transaction #"), col("transaction_id"), lit(" - "), col("category")))
  df_dropped = df_with_desc.drop("amount")
  df_renamed = df_dropped.withColumnRenamed("rounded_amount", "amount_rounded")
  return df_renamed

Analyzing

In [None]:
# Aggregation (Groupby Operation and Stats functions)
def aggregate_df(df):
  aggregated_df = df.groupBy("category").agg(
          F.count("transaction_id").alias("transactioner"),
          F.sum("amount").cast(DoubleType()).alias("total_amount"),
          F.avg("amount").cast(DoubleType()).alias("average_amount"),
          F.max("amount").cast(DoubleType()).alias("max_amount"),
          F.min("amount").cast(DoubleType()).alias("min_amount")
  )
  sorted_df = aggregated_df.orderBy(F.desc("total_amount")
  return sorted_df

  df.groupBy('category').agg(countDistinct('transaction_id').alias('transactioner')).orderBy(desc('transactioner')).show()

IndentationError: unexpected indent (ipython-input-3552336789.py, line 10)

In [None]:
# Window functions for data partitioning (ranking, percentiles, and moving aggregations with row context and SQL OVER Clause)
window_spec = Window.partitionBy("category").orderBy(desc("amount"))
windowed_df = df.withColumn("rank", rank().over(window_spec)).withColumn("dense_rank", dense_rank().over(window_spec)).withColumn("row_number", row_number().over(window_spec))
top_transactions = windowed_df.filter(col("rank") <= 3)
return top_transactions

In [None]:
# Pivot Operation: Pivoting can be expensive for large datasets because it may cause significant data shuffling. Limiting the rows (with .limit(50000)) is one way to mitigate this for large datasets
def pivot_df(df):
  limited_df = df.limit(50000)
  pivoted_df = limited_df.groupBy("customer_id").pivot("category").agg(sum("amount").cast(DoubleType()))
  categories = limited_df.select("category").distinct().collect()
  category_names = [row["category"] for row in categories]
  for category in category_names:
  pivoted_df = pivoted_df.fillna({category: 0.0})
  return pivoted_df

In [None]:
# Approximation algorithms
from pyspark.sql.functions import approx_count_distinct, percentile_approx, array, lit
def approx_df(df):
  approx_df = df.groupBy("category").agg(
  approx_count_distinct("customer_id").alias("approx_distinct_customers"),percentile_approx("amount", 0.5, 100).alias("median_amount"),percentile_approx("amount", array(lit(0.25), lit(0.5), lit(0.75)), 100).alias("quartiles"))
  return approx_df

In [None]:
#  Summary statistics for Exploratory Analysis & data quality issues like outliers, missing values, or unexpected distributions.
def summarize_df(df):
  summary_df = df.select("amount").summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max")
  return summary_df