### **Import Required Packages**

In [1]:
# !pip install isodate

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, explode, to_timestamp, udf, lit, when,
    current_timestamp
)
from pyspark.sql.types import *
import isodate
import json
print("Packages imported successfully!")

Packages imported successfully!


In [3]:
# UDF to convert ISO8601 duration to seconds
def parse_duration(iso_duration):
    try:
        duration = isodate.parse_duration(iso_duration)
        return int(duration.total_seconds())
    except:
        return None

parse_duration_udf = udf(parse_duration, IntegerType())

In [4]:
# Set spark session
spark = SparkSession.builder \
    .appName("TransformYouTubeData") \
    .getOrCreate()

# Load raw data
raw_df = spark.read.option("multiline", True).json("/home/george/data_engineering/youtube-analytics-pipeline/data/raw/raw_youtube_data.json")

25/04/21 16:58:01 WARN Utils: Your hostname, DESKTOP-0MIQQS8 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/04/21 16:58:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/21 16:58:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
raw_df

DataFrame[contentDetails: struct<caption:string,definition:string,dimension:string,duration:string,licensedContent:boolean,projection:string>, etag: string, id: string, kind: string, snippet: struct<categoryId:string,channelId:string,channelTitle:string,defaultLanguage:string,description:string,liveBroadcastContent:string,localized:struct<description:string,title:string>,publishedAt:string,tags:array<string>,thumbnails:struct<default:struct<height:bigint,url:string,width:bigint>,high:struct<height:bigint,url:string,width:bigint>,maxres:struct<height:bigint,url:string,width:bigint>,medium:struct<height:bigint,url:string,width:bigint>,standard:struct<height:bigint,url:string,width:bigint>>,title:string>, statistics: struct<commentCount:string,favoriteCount:string,likeCount:string,viewCount:string>]

In [6]:
# Transform the data
transformed_df = raw_df.select(
    col("id").alias("video_id"),
    col("snippet.title").alias("title"),
    col("snippet.description").alias("description"),
    to_timestamp(col("snippet.publishedAt")).alias("published_at"),
    col("statistics.viewCount").cast("int").alias("view_count"),
    col("statistics.likeCount").cast("int").alias("like_count"),
    col("statistics.commentCount").cast("int").alias("comment_count"),
    col("contentDetails.duration").alias("raw_duration"),
    col("snippet.tags").alias("tags"),
    col("snippet.categoryId").alias("category_id"),
    col("snippet.channelTitle").alias("channel_title"),
    # Calculate engagement rate later after checking if view_count is non-zero
    current_timestamp().alias("fetched_at")
)

transformed_df.show(truncate=False)

+-----------+----------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
# Calculate engagement rate
from pyspark.sql.functions import when, col

transformed_df = transformed_df.withColumn(
    "engagement_rate",
    when(col("view_count") > 0,
         (col("like_count") + col("comment_count")) / col("view_count")
    ).otherwise(0.0)
)

In [9]:
# Convert YouTube's raw_duration (which is in ISO 8601 duration format 
#like "PT3M45S", "PT1H2M10S") into something usable in Spark (like INTERVAL or total seconds)
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Parse ISO 8601 duration to seconds
def parse_duration_to_seconds(duration_str):
    pattern = re.compile(
        r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?'
    )
    match = pattern.match(duration_str)
    if not match:
        return 0
    hours = int(match.group(1)) if match.group(1) else 0
    minutes = int(match.group(2)) if match.group(2) else 0
    seconds = int(match.group(3)) if match.group(3) else 0
    return hours * 3600 + minutes * 60 + seconds

# Register UDF
parse_duration_udf = udf(parse_duration_to_seconds, IntegerType())

# Apply transformation
transformed_df = transformed_df.withColumn(
    "duration",
    parse_duration_udf(col("raw_duration"))
).drop("raw_duration")