##Clean

In [None]:
!pip install SQLAlchemy psycopg2-binary pandas python-dateutil



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [2]:
from sqlalchemy import create_engine, text

PG_USER = "postgres"
PG_PWD  = "postgres"
PG_HOST = "127.0.0.1"
PG_PORT = 5432
DB_NAME = "4260354_gb_youtube_trends"

engine = create_engine(f"postgresql+psycopg2://{PG_USER}:{PG_PWD}@{PG_HOST}:{PG_PORT}/{DB_NAME}")
with engine.connect() as conn:
    print(conn.execute(text("select version()")).scalar())


PostgreSQL 16.10 (Ubuntu 16.10-0ubuntu0.24.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0, 64-bit


In [3]:
from sqlalchemy import text
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS gb_videos CASCADE;"))


In [4]:
import os, pandas as pd
from sqlalchemy import text, Text, Integer, BigInteger, Boolean, Date, DateTime

csv_path = "GBvideos.csv"  # <-- set correctly
print("CSV exists?", os.path.exists(csv_path), csv_path)
df = pd.read_csv(csv_path)
df.columns = [c.strip() for c in df.columns]
print("Raw rows:", len(df), "| columns:", list(df.columns))

# Robust date parsing (handles multiple common formats)
def parse_trending_series(s):
    c = pd.to_datetime(s, format="%y.%d.%m", errors="coerce")
    if c.notna().sum() == 0:
        c = pd.to_datetime(s, format="%y.%m.%d", errors="coerce")
    if c.notna().sum() == 0:
        c = pd.to_datetime(s, errors="coerce")
    return c

df["trending_date"] = parse_trending_series(df["trending_date"]).dt.date
if "publish_time" in df.columns:
    df["publish_time"] = pd.to_datetime(df["publish_time"], utc=True, errors="coerce")

for col in ["comments_disabled","ratings_disabled","video_error_or_removed"]:
    if col in df.columns:
        df[col] = df[col].astype(str).str.strip().str.lower().map({"true": True, "false": False})

for col in ["views","likes","dislikes","comment_count","category_id"]:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors="coerce")

print("Parsed trending_date non-null:", df["trending_date"].notna().sum(), "of", len(df))

# Ensure table exists (no unique index yet to avoid conflicts on first load)
ddl = """
CREATE TABLE IF NOT EXISTS gb_videos (
  id BIGSERIAL PRIMARY KEY,
  video_id TEXT NOT NULL,
  trending_date DATE NOT NULL,
  title TEXT,
  channel_title TEXT,
  category_id INT,
  publish_time TIMESTAMPTZ,
  tags TEXT,
  views BIGINT,
  likes BIGINT,
  dislikes BIGINT,
  comment_count BIGINT,
  thumbnail_link TEXT,
  comments_disabled BOOLEAN,
  ratings_disabled BOOLEAN,
  video_error_or_removed BOOLEAN,
  description TEXT
);
"""
with engine.begin() as conn:
    conn.execute(text(ddl))

# Filter to known columns and drop empties on key fields
dtype_map = {
    "video_id": Text(),
    "trending_date": Date(),
    "title": Text(),
    "channel_title": Text(),
    "category_id": Integer(),
    "publish_time": DateTime(timezone=True),
    "tags": Text(),
    "views": BigInteger(),
    "likes": BigInteger(),
    "dislikes": BigInteger(),
    "comment_count": BigInteger(),
    "thumbnail_link": Text(),
    "comments_disabled": Boolean(),
    "ratings_disabled": Boolean(),
    "video_error_or_removed": Boolean(),
    "description": Text(),
}

cols = [c for c in dtype_map if c in df.columns]
before = len(df)
df = df.dropna(subset=["video_id","trending_date"])
print("After dropna on keys:", len(df), f"(dropped {before-len(df)})")
df = df.drop_duplicates(subset=["video_id","trending_date"])
print("After de-dup keys:", len(df))

df[cols].to_sql(
    "gb_videos",
    engine,
    if_exists="append",
    index=False,
    dtype=dtype_map,
    method="multi",
    chunksize=10000,
)
print("Inserted rows:", len(df))


CSV exists? True GBvideos.csv
Raw rows: 38916 | columns: ['video_id', 'trending_date', 'title', 'channel_title', 'category_id', 'publish_time', 'tags', 'views', 'likes', 'dislikes', 'comment_count', 'thumbnail_link', 'comments_disabled', 'ratings_disabled', 'video_error_or_removed', 'description']
Parsed trending_date non-null: 38916 of 38916
After dropna on keys: 38916 (dropped 0)
After de-dup keys: 38742
Inserted rows: 38742


In [5]:
from sqlalchemy import text
with engine.begin() as conn:
    n = conn.execute(text("SELECT COUNT(*) FROM gb_videos")).scalar()
    print("Row count now:", n)
    conn.execute(text("""
        CREATE UNIQUE INDEX IF NOT EXISTS ux_gb_videos_vid_trend
          ON gb_videos (video_id, trending_date);
        CREATE INDEX IF NOT EXISTS ix_gb_videos_category_id ON gb_videos (category_id);
        CREATE INDEX IF NOT EXISTS ix_gb_videos_publish_time ON gb_videos (publish_time);
    """))

# Quick peek
import pandas as pd
with engine.connect() as conn:
    sample = pd.read_sql("""
        SELECT video_id, title, trending_date, views, publish_time
        FROM gb_videos
        ORDER BY publish_time DESC NULLS LAST
        LIMIT 5
    """, conn)
    display(sample)


Row count now: 38742


Unnamed: 0,video_id,title,trending_date,views,publish_time
0,r63VBOagGAo,Shawn Mendes x Portugal (FPF Official World Cu...,2018-06-14,653114,2018-06-13 13:11:56+00:00
1,YQJmvXamKYg,Conway: People are bending to the will of Pres...,2018-06-14,99048,2018-06-13 12:56:49+00:00
2,-QPdRfqTnt4,Dumbo Official Teaser Trailer,2018-06-14,4427381,2018-06-13 07:00:00+00:00
3,6h8QgZF5Qu4,Drop the Mic w/ Ashton Kutcher & Sean Diddy Combs,2018-06-14,864189,2018-06-13 05:27:27+00:00
4,arY6lepNdzU,"E3 2018 Exclusive Gameplay Demos, Interviews a...",2018-06-13,349122,2018-06-13 04:09:23+00:00


In [None]:
## Extract

In [6]:
# --- imports & engine ---
from datetime import datetime, timezone
import pandas as pd

EXTRACT_RUN_ID = datetime.now(timezone.utc).strftime("yt_gb_%Y%m%dT%H%M%S")

# --- Step A: enumerate & order unique dates deterministically ---
SQL_ENUM = text("""
SELECT
  trending_date AS trending_date_raw,
  trending_date AS parsed_dt
FROM gb_videos
WHERE trending_date IS NOT NULL
GROUP BY trending_date
ORDER BY trending_date;
""")

with engine.begin() as conn:
    dates_df = pd.read_sql(SQL_ENUM, conn)

num_batches = len(dates_df)
print(f"Unique trending dates (batches): {num_batches}")
display(dates_df.head())


Unique trending dates (batches): 205


Unnamed: 0,trending_date_raw,parsed_dt
0,2017-11-14,2017-11-14
1,2017-11-15,2017-11-15
2,2017-11-16,2017-11-16
3,2017-11-17,2017-11-17
4,2017-11-18,2017-11-18


In [7]:
# --- Step B: batch extractor (generator) ---
def iter_daily_batches(conn, dates_df):
    """
    Yields (batch_id, trending_date_raw, parsed_dt, dataframe_for_that_day)
    """
    for i, row in dates_df.reset_index(drop=True).iterrows():
        batch_id = i + 1
        t_raw = row["trending_date_raw"]
        p_dt = row["parsed_dt"]  # may be NaT if unparsable; fine for extract

        # fetch the day's rows using the RAW value (no transform here)
        df_day = pd.read_sql(
            text("""
                SELECT *
                FROM gb_videos
                WHERE trending_date = :t_raw
                ORDER BY video_id, trending_date
            """),
            conn,
            params={"t_raw": t_raw},
        )
        yield batch_id, t_raw, p_dt, df_day

In [8]:
from sqlalchemy import text

with engine.begin() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS extract_day_batches (
            id BIGSERIAL PRIMARY KEY,
            extract_run_id TEXT NOT NULL,
            batch_id INT NOT NULL,
            trending_date_raw DATE,
            parsed_dt DATE,
            row_count INT,
            created_at TIMESTAMPTZ DEFAULT NOW()
        );
    """))


In [9]:

# --- Step C: run the extract, optionally log metadata ---
total_rows = 0
batch_rows = []

with engine.begin() as conn:
    for batch_id, t_raw, p_dt, df_day in iter_daily_batches(conn, dates_df):
        out_path = f"extract/day={t_raw}/gb_videos_day_{t_raw}.csv"
        import os
        os.makedirs(os.path.dirname(out_path), exist_ok=True)
        df_day.to_csv(out_path, index=False)

        # record metadata
        rc = len(df_day)
        total_rows += rc
        batch_rows.append({"batch_id": batch_id, "trending_date_raw": t_raw, "parsed_dt": p_dt, "row_count": rc})

        # persist metadata
        conn.execute(
            text("""
                INSERT INTO extract_day_batches (extract_run_id, batch_id, trending_date_raw, parsed_dt, row_count)
                VALUES (:rid, :bid, :traw, :pdt, :rc)
            """),
            {"rid": EXTRACT_RUN_ID, "bid": batch_id, "traw": t_raw, "pdt": p_dt if pd.notna(p_dt) else None, "rc": rc}
        )

print(f"Extract run id: {EXTRACT_RUN_ID}")
print(f"Total rows extracted across all day-batches: {total_rows}")


Extract run id: yt_gb_20251020T192222
Total rows extracted across all day-batches: 38742


In [10]:
# --- Step D: acceptance checks ---
with engine.begin() as conn:
    src_total = conn.execute(text("SELECT COUNT(*) FROM gb_videos")).scalar()
    uniq_dates = conn.execute(text("SELECT COUNT(DISTINCT trending_date) FROM gb_videos WHERE trending_date IS NOT NULL")).scalar()

print("ACCEPTANCE CHECKS")
print(f"✅ Unique trending dates listed: {num_batches} (db says {uniq_dates})")
print(f"✅ Expected number of batches: {num_batches}")
print(f"✅ Sum(rows in all batches) vs total rows in main table: {total_rows} vs {src_total}")

# (Optional) Inspect the logged batches quickly in Python:
meta_df = pd.DataFrame(batch_rows)
meta_df.sort_values("batch_id", inplace=True)
display(meta_df.head())
display(meta_df.tail())
print("Per-batch row count sum:", meta_df["row_count"].sum())

ACCEPTANCE CHECKS
✅ Unique trending dates listed: 205 (db says 205)
✅ Expected number of batches: 205
✅ Sum(rows in all batches) vs total rows in main table: 38742 vs 38742


Unnamed: 0,batch_id,trending_date_raw,parsed_dt,row_count
0,1,2017-11-14,2017-11-14,200
1,2,2017-11-15,2017-11-15,199
2,3,2017-11-16,2017-11-16,199
3,4,2017-11-17,2017-11-17,200
4,5,2017-11-18,2017-11-18,200


Unnamed: 0,batch_id,trending_date_raw,parsed_dt,row_count
200,201,2018-06-10,2018-06-10,168
201,202,2018-06-11,2018-06-11,166
202,203,2018-06-12,2018-06-12,171
203,204,2018-06-13,2018-06-13,170
204,205,2018-06-14,2018-06-14,165


Per-batch row count sum: 38742


##Transform


In [11]:

# Sample function to extract date
def extract_date(datetime_str):
    return pd.to_datetime(datetime_str).date()

# Sample function to extract time
def extract_time(datetime_str):
    return pd.to_datetime(datetime_str).time()


In [12]:
import pandas as pd
from pathlib import Path

extract_root = Path("extract")
transform_root = Path("transform")
transform_root.mkdir(exist_ok=True)

def split_music_title(title):
    for delim in [" - ", "|"]:
        if delim in title:
            parts = title.split(delim, 1)
            return parts[0].strip(), parts[1].strip()
    return None, title

for day_folder in sorted(extract_root.glob("day=*")):
    date_str = day_folder.name.split("=")[1]
    csv_files = list(day_folder.glob("*.csv"))
    if not csv_files:
        print(f"⚠️ No CSV found in {day_folder}")
        continue

    raw_batch_df = pd.read_csv(csv_files[0])

    # --- Title & Channel Logic ---
    for idx, r in raw_batch_df.iterrows():
        if r["category_id"] == 10:  # Music
            artist, title = split_music_title(r["title"])
            raw_batch_df.loc[idx, "Artist"] = artist or r["channel_title"]
            raw_batch_df.loc[idx, "Title"] = title
        elif r["category_id"] == 23:  # Celebrity Parody
            if "|" in r["title"]:
                parts = r["title"].split("|", 1)
                raw_batch_df.loc[idx, "Author"] = parts[0].strip()
                raw_batch_df.loc[idx, "Title"] = parts[1].strip()
            else:
                raw_batch_df.loc[idx, "Author"] = r["channel_title"]
                raw_batch_df.loc[idx, "Title"] = r["title"]
        else:  # Others
            raw_batch_df.loc[idx, "Author"] = r["channel_title"]
            raw_batch_df.loc[idx, "Title"] = r["title"]

    # --- Date Standardization ---
    raw_batch_df["trending_date"] = pd.to_datetime(raw_batch_df["trending_date"]).dt.date
    raw_batch_df["publish_date"] = pd.to_datetime(raw_batch_df["publish_time"]).dt.date
    raw_batch_df["publish_time"] = pd.to_datetime(raw_batch_df["publish_time"]).dt.time

    # --- Drop Columns ---
    cols_to_drop = [
        "tags", "thumbnail_link", "ratings_disabled",
        "video_error_or_removed", "comments_disabled", "description", "title", "channel_title"
    ]
    raw_batch_df.drop(columns=[c for c in cols_to_drop if c in raw_batch_df.columns], inplace=True)

    # --- Save ---
    out_path = transform_root / f"day={date_str}.csv"
    raw_batch_df.to_csv(out_path, index=False)
    print(f"✅ Transformed batch saved: {out_path}")


✅ Transformed batch saved: transform/day=2017-11-14.csv
✅ Transformed batch saved: transform/day=2017-11-15.csv
✅ Transformed batch saved: transform/day=2017-11-16.csv


✅ Transformed batch saved: transform/day=2017-11-17.csv
✅ Transformed batch saved: transform/day=2017-11-18.csv
✅ Transformed batch saved: transform/day=2017-11-19.csv
✅ Transformed batch saved: transform/day=2017-11-20.csv
✅ Transformed batch saved: transform/day=2017-11-21.csv
✅ Transformed batch saved: transform/day=2017-11-22.csv
✅ Transformed batch saved: transform/day=2017-11-23.csv
✅ Transformed batch saved: transform/day=2017-11-24.csv
✅ Transformed batch saved: transform/day=2017-11-25.csv
✅ Transformed batch saved: transform/day=2017-11-26.csv
✅ Transformed batch saved: transform/day=2017-11-27.csv
✅ Transformed batch saved: transform/day=2017-11-28.csv
✅ Transformed batch saved: transform/day=2017-11-29.csv
✅ Transformed batch saved: transform/day=2017-11-30.csv
✅ Transformed batch saved: transform/day=2017-12-01.csv
✅ Transformed batch saved: transform/day=2017-12-02.csv
✅ Transformed batch saved: transform/day=2017-12-03.csv
✅ Transformed batch saved: transform/day=2017-12

In [13]:
print(raw_batch_df.columns)

Index(['id', 'video_id', 'trending_date', 'category_id', 'publish_time',
       'views', 'likes', 'dislikes', 'comment_count', 'Author', 'Title',
       'Artist', 'publish_date'],
      dtype='object')


In [14]:
## tests
pre_row_count = len(raw_batch_df)
pre_cols = set(raw_batch_df.columns)

# --- Acceptance Checks ---
issues = []

# 1️⃣ Row count consistency
post_row_count = len(raw_batch_df)
if pre_row_count != post_row_count:
    issues.append(f"Row count mismatch: before={pre_row_count}, after={post_row_count}")

# 2️⃣ Music split validation
if "Artist" in raw_batch_df.columns:
    invalid_music = raw_batch_df[
        (raw_batch_df["category_id"] == 10) & (raw_batch_df["Artist"].isna())
    ]
    if not invalid_music.empty:
        issues.append(f"{len(invalid_music)} music rows missing Artist field.")

# 3️⃣ Category 23 validation
if "Author" in raw_batch_df.columns:
    invalid_cat23 = raw_batch_df[
        (raw_batch_df["category_id"] == 23) & (raw_batch_df["Author"].isna())
    ]
    if not invalid_cat23.empty:
        issues.append(f"{len(invalid_cat23)} parody rows missing Author field.")

# 4️⃣ Trending date ISO check
invalid_dates = raw_batch_df[
    ~raw_batch_df["trending_date"].astype(str).str.match(r"^\d{4}-\d{2}-\d{2}$")
]
if not invalid_dates.empty:
    issues.append(f"{len(invalid_dates)} rows have invalid trending_date format.")

# 5️⃣ Publish date/time columns exist
missing_cols = [c for c in ["publish_date", "publish_time"] if c not in raw_batch_df.columns]
if missing_cols:
    issues.append(f"Missing required columns: {missing_cols}")

# 6️⃣ Dropped columns check
expected_drops = {
    "tags", "thumbnail_link", "ratings_disabled",
    "video_error_or_removed", "comments_disabled", "description"
}
still_there = pre_cols.intersection(raw_batch_df.columns).intersection(expected_drops)
if still_there:
    issues.append(f"Columns not dropped: {still_there}")

# --- Report Results ---
if issues:
    print(f"⚠️ Issues found in batch {date_str}:")
    for i in issues:
        print(f"   - {i}")
else:
    print(f"✅ Batch {date_str} passed all acceptance checks.")


⚠️ Issues found in batch 2018-06-14:
   - 1 rows have invalid trending_date format.


In [15]:
log_path = transform_root / f"day={date_str}_validation.txt"
with open(log_path, "w") as f:
    if issues:
        f.write(f"⚠️ Issues in batch {date_str}:\n")
        for i in issues:
            f.write(f" - {i}\n")
    else:
        f.write(f"✅ Batch {date_str} passed all checks.\n")


##LOAD

In [17]:
cat = raw_batch_df['category_id'].unique()
print("Unique categories in batch:", cat)

Unique categories in batch: [24.  1. 10. 23. 17. 20. 26. 25. 22. 15. nan 43. 27.]
