In [0]:
df_raw = spark.readStream.table("yt_vids_data.raw")

In [0]:
from pyspark.sql.functions import col
df_cleaned = df_raw.select(
    col("video_id"),
    col("title"),
    col("published_at").cast("timestamp"), # Spark handles ISO string to Timestamp well
    col("duration"),
    col("view_count").cast("long"),        # "47341951" -> 47341951L
    col("like_count").cast("long"),
    col("comment_count").cast("long"),
    col("created_date")
)

In [0]:
from pyspark.sql.functions import col, regexp_extract, coalesce, lit, format_string, nullif, current_timestamp

# 1. Extract D, H, M, S
# We look for digits preceding the specific letter. 
# 'D' is unique to Days. 
d = coalesce(nullif(regexp_extract(col("duration"), r'(\d+)D', 1), lit('')).cast("int"), lit(0))
h = coalesce(nullif(regexp_extract(col("duration"), r'(\d+)H', 1), lit('')).cast("int"), lit(0))
m = coalesce(nullif(regexp_extract(col("duration"), r'(\d+)M', 1), lit('')).cast("int"), lit(0))
s = coalesce(nullif(regexp_extract(col("duration"), r'(\d+)S', 1), lit('')).cast("int"), lit(0))

# 2. Calculate total seconds
# Add (d * 86400) for the days component
total_seconds_col = (d * 86400) + (h * 3600) + (m * 60) + s

# 3. Calculate normalized Days, Hours, Minutes, Seconds from the total
final_d = (total_seconds_col / 86400).cast("int")
final_h = ((total_seconds_col % 86400) / 3600).cast("int")
final_m = ((total_seconds_col % 3600) / 60).cast("int")
final_s = (total_seconds_col % 60).cast("int")

# Calculate Total Hours (including days)
total_hours = (total_seconds_col / 3600).cast("int")
final_m = ((total_seconds_col % 3600) / 60).cast("int")
final_s = (total_seconds_col % 60).cast("int")

# Format: 50:30:10
df_total_hours = df_cleaned.withColumn(
    "duration_formatted", 
    format_string("%02d:%02d:%02d", total_hours, final_m, final_s)
)

In [0]:
df_selected = df_total_hours.select(
    col("video_id"),
    col("title").alias("Video_Title"),
    col("published_at").cast("date").alias("Published_Date"),
    col("view_count").alias("View_Count"),
    col("like_count").alias("Like_Count"),
    col("comment_count").alias("Comment_Count"),
    col("duration_formatted").alias("Duration"),
    col("created_date"))


In [0]:
  # 1. Database creation (This stays the same)
spark.sql("CREATE DATABASE IF NOT EXISTS yt_vids_data")

# 2. Define Paths 
# (Make sure the checkpoint path is in your actual storage account, not the placeholder I gave earlier)
save_path = "abfss://processed@youtubevideosmetadata951.dfs.core.windows.net/yt_vids_data"
processed_checkpoint = "abfss://processed@youtubevideosmetadata951.dfs.core.windows.net/checkpoints/silver_vids"

# 3. Write Stream
(df_selected.writeStream
  .format("delta")
  .outputMode("append")                        # CHANGED: Use outputMode("append") instead of mode("overwrite")
  .option("checkpointLocation", processed_checkpoint) # ADDED: Mandatory for streaming
  .option("path", save_path)                   # External Table location
  .trigger(availableNow=True)                  # ADDED: Process new data only, then stop
  .toTable("yt_vids_data.processed")           # CHANGED: Use .toTable() instead of .saveAsTable()
)

In [0]:
%sql
select * from yt_vids_data.processed
where video_id = 'yERIXEwAQjU'