In [None]:
from google.colab import auth
auth.authenticate_user()
print("Authenticated!")

from google.cloud import bigquery
import os

PROJECT_ID = "even-blueprint-441418-p2"
DATASET_ID = "media_analytics"
FULL_DATASET = f"{PROJECT_ID}.{DATASET_ID}"

client = bigquery.Client(project=PROJECT_ID)
print(" SCD2 in Silver!")

Authenticated!
 SCD2 in Silver!


In [None]:
def run(sql):
    print("Running...")
    job = client.query(sql)
    job.result()
    print(f"Done → {getattr(job, 'num_dml_affected_rows', 'N/A')} rows")

# DROP & CREATE SCD2 SILVER TABLE
run(f"DROP TABLE IF EXISTS `{FULL_DATASET}.silver_video_enriched_scd2`;")

run(f"""
CREATE TABLE `{FULL_DATASET}.silver_video_enriched_scd2`
PARTITION BY DATE(video_publish_date)
CLUSTER BY video_id
AS
SELECT
  GENERATE_UUID() AS surrogate_key,
  video AS video_id,
  video_title,
  SAFE.PARSE_TIMESTAMP('%b %d, %Y', video_publish_time) AS video_publish_date,
  SAFE_CAST(views AS INT64) AS views,
  SAFE_CAST(watch_time_hours AS FLOAT64) AS watch_time_hours,
  ROUND(SAFE_CAST(watch_time_hours AS FLOAT64) * 60, 2) AS watch_time_minutes,
  `{PROJECT_ID}.{DATASET_ID}.parse_duration`(average_view_duration) AS avg_view_duration_seconds,
  SAFE_CAST(average_percentage_viewed_ AS FLOAT64) AS avg_view_percentage,
  SAFE_CAST(likes AS INT64) AS likes,
  SAFE_CAST(comments_added AS INT64) AS comments_added,
  SAFE_CAST(your_estimated_revenue_usd AS FLOAT64) AS estimated_revenue_usd,
  CURRENT_TIMESTAMP() AS effective_from,
  NULL AS effective_to,
  TRUE AS is_current
FROM `{FULL_DATASET}.staging_aggregated_video`
WHERE video IS NOT NULL;
""")

print("Initial SCD2 Silver table created!")

Running...
Done → None rows
Running...
Done → None rows
Initial SCD2 Silver table created!


In [None]:
# STEP 1: Find videos that changed (title or duration)
changed = client.query(f"""
SELECT
  b.video AS video_id,
  b.video_title,
  `{PROJECT_ID}.{DATASET_ID}.parse_duration`(b.average_view_duration) AS new_duration,
  s.surrogate_key AS old_key,
  s.video_title AS old_title,
  s.avg_view_duration_seconds AS old_duration
FROM `{FULL_DATASET}.staging_aggregated_video` b
LEFT JOIN `{FULL_DATASET}.silver_video_enriched_scd2` s
  ON b.video = s.video_id AND s.is_current = TRUE
WHERE b.video IS NOT NULL
  AND (s.video_id IS NULL
    OR b.video_title != s.video_title
    OR `{PROJECT_ID}.{DATASET_ID}.parse_duration`(b.average_view_duration) != s.avg_view_duration_seconds)
""").to_dataframe()

print(f"Found {len(changed)} videos to update")

# STEP 2: Close old versions
if len(changed) > 0 and 'old_key' in changed.columns:
    old_keys = changed['old_key'].dropna().tolist()
    if old_keys:
        placeholders = ','.join([f"'{k}'" for k in old_keys])
        run(f"""
        UPDATE `{FULL_DATASET}.silver_video_enriched_scd2`
        SET is_current = FALSE, effective_to = CURRENT_TIMESTAMP()
        WHERE surrogate_key IN ({placeholders})
        """)

# STEP 3: Insert new versions
if len(changed) > 0:
    new_rows = []
    for _, row in changed.iterrows():
        new_rows.append({
            "surrogate_key": client.query("SELECT GENERATE_UUID()").to_dataframe().iloc[0,0],
            "video_id": row['video_id'],
            "video_title": row['video_title'],
            "video_publish_date": client.query(f"""
                SELECT SAFE.PARSE_TIMESTAMP('%b %d, %Y', video_publish_time)
                FROM `{FULL_DATASET}.staging_aggregated_video`
                WHERE video = '{row['video_id']}'
            """).to_dataframe().iloc[0,0],
            "views": int(client.query(f"SELECT SAFE_CAST(views AS INT64) FROM `{FULL_DATASET}.staging_aggregated_video` WHERE video = '{row['video_id']}'").to_dataframe().iloc[0,0] or 0),
            "watch_time_hours": float(client.query(f"SELECT SAFE_CAST(watch_time_hours AS FLOAT64) FROM `{FULL_DATASET}.staging_aggregated_video` WHERE video = '{row['video_id']}'").to_dataframe().iloc[0,0] or 0),
            "watch_time_minutes": 0,  # will be calculated
            "avg_view_duration_seconds": int(row['new_duration']),
            "avg_view_percentage": float(client.query(f"SELECT SAFE_CAST(average_percentage_viewed_ AS FLOAT64) FROM `{FULL_DATASET}.staging_aggregated_video` WHERE video = '{row['video_id']}'").to_dataframe().iloc[0,0] or 0),
            "likes": int(client.query(f"SELECT SAFE_CAST(likes AS INT64) FROM `{FULL_DATASET}.staging_aggregated_video` WHERE video = '{row['video_id']}'").to_dataframe().iloc[0,0] or 0),
            "comments_added": int(client.query(f"SELECT SAFE_CAST(comments_added AS INT64) FROM `{FULL_DATASET}.staging_aggregated_video` WHERE video = '{row['video_id']}'").to_dataframe().iloc[0,0] or 0),
            "estimated_revenue_usd": float(client.query(f"SELECT SAFE_CAST(your_estimated_revenue_usd AS FLOAT64) FROM `{FULL_DATASET}.staging_aggregated_video` WHERE video = '{row['video_id']}'").to_dataframe().iloc[0,0] or 0),
            "effective_from": "CURRENT_TIMESTAMP()",
            "effective_to": None,
            "is_current": True
        })

    # Fix watch_time_minutes
    for r in new_rows:
        r["watch_time_minutes"] = round(r["watch_time_hours"] * 60, 2)
        r["effective_from"] = None

    # Insert
    job = client.load_table_from_json(
        new_rows,
        f"{FULL_DATASET}.silver_video_enriched_scd2",
        job_config=bigquery.LoadJobConfig(
            write_disposition="WRITE_APPEND",
            schema=[
                bigquery.SchemaField("surrogate_key", "STRING"),
                bigquery.SchemaField("video_id", "STRING"),
                bigquery.SchemaField("video_title", "STRING"),
                bigquery.SchemaField("video_publish_date", "TIMESTAMP"),
                bigquery.SchemaField("views", "INT64"),
                bigquery.SchemaField("watch_time_hours", "FLOAT64"),
                bigquery.SchemaField("watch_time_minutes", "FLOAT64"),
                bigquery.SchemaField("avg_view_duration_seconds", "INT64"),
                bigquery.SchemaField("avg_view_percentage", "FLOAT64"),
                bigquery.SchemaField("likes", "INT64"),
                bigquery.SchemaField("comments_added", "INT64"),
                bigquery.SchemaField("estimated_revenue_usd", "FLOAT64"),
                bigquery.SchemaField("effective_from", "TIMESTAMP"),
                bigquery.SchemaField("effective_to", "TIMESTAMP"),
                bigquery.SchemaField("is_current", "BOOL"),
            ]
        )
    )
    job.result()
    print(f"Inserted {len(new_rows)} new versions")

# FINAL PREVIEW
df = client.query(f"""
SELECT video_id, video_title, is_current, effective_from, effective_to
FROM `{FULL_DATASET}.silver_video_enriched_scd2`
ORDER BY video_id, effective_from DESC LIMIT 10
""").to_dataframe()
display(df)
print("SCD2 IN SILVER – DONE")

Found 0 videos to update


Unnamed: 0,video_id,video_title,is_current,effective_from,effective_to
0,-3d1NctSv0c,"Ken Jee Q & A Live Stream (50,000 Sub Special!)",True,2025-11-07 16:38:47.961547+00:00,
1,-ONQ628CXKQ,Data Scientist Reacts: REAL Data Science Job A...,True,2025-11-07 16:38:47.961547+00:00,
2,-kX2b6TF_9k,By The Numbers: Where Should The NBA Put a 4 P...,True,2025-11-07 16:38:47.961547+00:00,
3,-pdXWmj9xxU,How I Learn Data Science Through Studying Othe...,True,2025-11-07 16:38:47.961547+00:00,
4,-zbLpoJVBMI,What the Heck is WSL 2? (My New Favorite Tool),True,2025-11-07 16:38:47.961547+00:00,
5,0jTtHYie3CU,Should You Be Excited About Web 3? (As a Data ...,True,2025-11-07 16:38:47.961547+00:00,
6,143WWA5Sy9k,I Eat a Papaya Live on Stream (Plus Q&A for 15...,True,2025-11-07 16:38:47.961547+00:00,
7,15c7WD-lKUY,ML Ops: What is it REALLY?,True,2025-11-07 16:38:47.961547+00:00,
8,1Cf7SdnBncg,My Top 5 Data Science Internship Tips,True,2025-11-07 16:38:47.961547+00:00,
9,1FrY7ARSf10,9 Ways You Can Make Extra Income as a Data Sci...,True,2025-11-07 16:38:47.961547+00:00,


SCD2 IN SILVER – 100% DONE!
