In [None]:
from pyspark.sql import SparkSession
import logging
from pyspark.sql.functions import col, sum as _sum, when, row_number
from pyspark.sql.window import Window

In [None]:
spark = SparkSession.builder.appName("VideoStreamingETL").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logging.getLogger("py4j").setLevel(logging.ERROR)

**Extract**

In [None]:
df_views = spark.read.option("header", True).option("inferSchema", True).csv(viewing_history_path)
df_users = spark.read.option("multiline", "true").json(users_path)
df_videos_catalog = spark.read.parquet(videos_catalog_path)
bucketed_df = spark.read.table("viewing_history_bucketed")

""" Extract Incremental Updates"""
df_sub_updates= spark.read.option("header", True).csv(subscription_updates_path)
df_latest_updates = subscription_updates_df.filter(col("change_date") >= "2024-02-07")

**Transform**

EDA

In [None]:
df_filtered=df.select("col1", "col2").show(5)
null_count=df.select([_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c .in df.columns])
total_users=df.count()
unique_users=df.select("user_id").distinct().count()

Cleaning

In [None]:
df=df.drop("col_name")
df=df.dropna(subset=["col1", "col2"])
df=df.dropDuplicates(["col1"])
df.fillna({"col_name": "Unknown"})
df.withColumn("col_name", col("col_name").cast(int))
df.withColumn("col_name", to_timestamp(col("col_name")))

Joining 2 data sources

In [None]:
df_full=df1.join(d2, on="col_name", how="inner")

Window

In [None]:
df_most_watched= df_full.groupBy("user_id", "video_id").agg(count("*").alias("watch_count"))
total_views_df= df_full.groupBy("user_id").agg(count("*").alias("total_views")
total_views_df.orderBy("total_views",ascending=False).show(5)

In [None]:
window_spec=Window.partitionBy("user_id").orderBy("watch_count").desc())
df_ranked=df_most_watched.withColumn("rank",row_number().over(window_spec))
df_top_videos_per_user=df_ranked.filter(col("rank")<=3)
df_top_videos_per_user.orderBy("user_id", "rank").show(5)

UDF

In [None]:
def normalize_device(device):
  if device is None:
    return "Unknown"
  device= device.lower()
  if device in iphone:
    return "iPhone"
  else:
    return "Other"

normalize_device_udf=udf(normalize_device, StringType())
df=df.withColumn("normalized_device",normalize_device_udf(df["device_type"]))

**Load**

Partitioning reduces query scan time by storing data in separate folders.

In [None]:
# Write to disk
df.write.partitionBy("device_type").mode("overwrite").parquet(f"{processed_path}/viewing_history_partitioned")

import os
partitioned_path = f"{processed_path}/viewing_history_partitioned"
print("Partitions created:", os.listdir(partitioned_path))
df_partitioned = spark.read.parquet(partitioned_path)
df_partitioned.select("device_type").distinct().show()

Bucketing improves performance for filtering and joins on bucketed columns.

In [None]:
df.write.bucketBy(10, "video_id")
.sortBy("video_id")
.mode("overwrite")
.format("parquet")
.option("path", f"{processed_path}/viewing_history_bucketed")
.saveAsTable("viewing_history_bucketed")
)

# Check if the table exists
spark.catalog.listTables()
# Verify bucketing in table schema
spark.sql("DESCRIBE FORMATTED viewing_history_bucketed").show()