In [4]:
# Demo: Read YouTube Bronze Data with PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("youtube-bronze-demo")
    .getOrCreate()
)
print(f"Spark version: {spark.version}")
spark

Exception in thread "main" java.lang.IllegalArgumentException: basedir must be absolute: ?/.ivy2/local
	at org.apache.ivy.util.Checks.checkAbsolute(Checks.java:48)
	at org.apache.ivy.plugins.repository.file.FileRepository.setBaseDir(FileRepository.java:137)
	at org.apache.ivy.plugins.repository.file.FileRepository.<init>(FileRepository.java:44)
	at org.apache.spark.deploy.SparkSubmitUtils$.createRepoResolvers(SparkSubmit.scala:1353)
	at org.apache.spark.deploy.SparkSubmitUtils$.buildIvySettings(SparkSubmit.scala:1460)
	at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:182)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:343)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1047)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:200)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:223)
	at org.apache.spark.deploy.SparkSubmit

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
# Read bronze JSONL files (all channels)
# Note: files are named _compacted.jsonl â€” Spark ignores _-prefixed files by default,
# so we read them via Python glob and feed them to Spark as an RDD.
import glob as pyglob

BRONZE_PATH = "/home/hadoop/workspace/data/bronze/metadata/source=channel"

files = pyglob.glob(f"{BRONZE_PATH}/dt=*/*/_compacted.jsonl")
lines = []
for f in files:
    with open(f) as fh:
        lines.extend(line.strip() for line in fh if line.strip())

raw_df = spark.read.json(spark.sparkContext.parallelize(lines))
print(f"Total raw records: {raw_df.count()}")
raw_df.printSchema()

In [None]:
# Flatten into a Silver-like analytical view
silver_df = (
    raw_df
    .select(
        F.col("id").alias("video_id"),
        F.col("snippet.channelId").alias("channel_id"),
        F.col("snippet.channelTitle").alias("channel_name"),
        F.col("snippet.title").alias("title"),
        F.col("snippet.publishedAt").alias("published_at"),
        F.col("statistics.viewCount").cast(LongType()).alias("view_count"),
        F.col("statistics.likeCount").cast(LongType()).alias("like_count"),
        F.col("statistics.commentCount").cast(LongType()).alias("comment_count"),
        F.col("contentDetails.duration").alias("duration"),
    )
    .withColumn("like_view_ratio", F.round(F.col("like_count") / F.col("view_count"), 4))
)

silver_df.show(truncate=50)

In [None]:
# Top videos by engagement (like/view ratio)
print("=== Top 10 Videos by Engagement ===")
(
    silver_df
    .orderBy(F.col("like_view_ratio").desc())
    .select("channel_name", "title", "view_count", "like_count", "like_view_ratio")
    .show(10, truncate=60)
)

In [None]:
# Aggregate stats per channel
print("=== Channel Summary ===")
(
    silver_df
    .groupBy("channel_name")
    .agg(
        F.count("*").alias("video_count"),
        F.sum("view_count").alias("total_views"),
        F.sum("like_count").alias("total_likes"),
        F.round(F.avg("like_view_ratio"), 4).alias("avg_engagement"),
    )
    .orderBy(F.col("total_views").desc())
    .show(truncate=False)
)

In [None]:
spark.stop()