# Data Collection

## I. Metadata

Note that metadata collection was an iterative process, I have run these below scripts multiple times in multiple different formats to reach my final desired dataset. For example, if videos from a particular duration type or channel size were very low, I have added the filter for the same in the below scripts. This is done to ensure we replicate the real distribution on YouTube as much as possible. 
<br>Additionally, I've also collected some of the videos manually from YouTube.


YouTube API documentation used for reference: https://developers.google.com/youtube/v3/docs

In [2]:
from googleapiclient.discovery import build
import requests
import pandas as pd
import datetime
from isodate import parse_duration
from langdetect import detect

API_KEY = ""  # Generated an API key from Google Cloud - Hidden for now
youtube = build("youtube", "v3", developerKey=API_KEY) 
BASE_URL = "https://www.googleapis.com/youtube/v3/"

We will mostly collect videos from the US and UK to filter the language to English.
Let's see the number of different categories from these two countries.

In [3]:
regions = ["US", "GB"]   # Videos from US and UK

category_dict = {}  # Use a dict to avoid duplicates and preserve both ID and title

for region in regions:
    url = f"https://www.googleapis.com/youtube/v3/videoCategories?part=snippet&regionCode={region}&key={API_KEY}"
    response = requests.get(url).json()

    for item in response.get("items", []):
        cat_id = item['id']        # Category ID (string)
        title = item['snippet']['title']
        # Add only if not already present
        if cat_id not in category_dict:
            category_dict[cat_id] = title

# Convert to list of tuples
youtube_categories = [(cat_id, title) for cat_id, title in category_dict.items()]

print(f"Total categories: {len(youtube_categories)}")
for cat_id, title in youtube_categories:
    print(f"ID: {cat_id} - Category: {title}")

Total categories: 32
ID: 1 - Category: Film & Animation
ID: 2 - Category: Autos & Vehicles
ID: 10 - Category: Music
ID: 15 - Category: Pets & Animals
ID: 17 - Category: Sports
ID: 18 - Category: Short Movies
ID: 19 - Category: Travel & Events
ID: 20 - Category: Gaming
ID: 21 - Category: Videoblogging
ID: 22 - Category: People & Blogs
ID: 23 - Category: Comedy
ID: 24 - Category: Entertainment
ID: 25 - Category: News & Politics
ID: 26 - Category: Howto & Style
ID: 27 - Category: Education
ID: 28 - Category: Science & Technology
ID: 29 - Category: Nonprofits & Activism
ID: 30 - Category: Movies
ID: 31 - Category: Anime/Animation
ID: 32 - Category: Action/Adventure
ID: 33 - Category: Classics
ID: 34 - Category: Comedy
ID: 35 - Category: Documentary
ID: 36 - Category: Drama
ID: 37 - Category: Family
ID: 38 - Category: Foreign
ID: 39 - Category: Horror
ID: 40 - Category: Sci-Fi/Fantasy
ID: 41 - Category: Thriller
ID: 42 - Category: Shorts
ID: 43 - Category: Shows
ID: 44 - Category: Trailers


### Get video statistics

Note that the region_code tells the API to bias search results toward content popular or relevant in that region. It's not attached to videos or channels — it only influences search and trends.

This is why, some videos may not have the correct region_code in our dataset.
Hence, we'll later grab the country column using channel statistics. This is set by the channel owner when creating their account or editing settings.

In [None]:
def get_video_metadata(video_data, region_code="US"):
    for cat_id, cat_title in youtube_categories:     # We will loop through all the categories mentioned above
        # Execute a search.list request
        results = youtube.search().list(
            part="snippet",
            regionCode=region_code,
            type="video",
            videoCategoryId=cat_id,
            publishedAfter="2023-01-01T00:00:00Z",   # Videos from 2023 and 2024
            publishedBefore="2024-12-31T23:59:59Z",
            maxResults=3,        # API allows up to 50
            pageToken=next_page_token
        ).execute()
    
        # Extract video IDs
        video_ids = [item['id']['videoId'] for item in results['items'] if 'videoId' in item['id']]
    
        if not video_ids:
            continue  # Skip if no valid videos found
    
        # Get video details (contentDetails + statistics)
        video_response = youtube.videos().list(
            part="snippet,statistics,contentDetails",
            id=",".join(video_ids)
        ).execute()
    
        skipped_count = 0
        
        for item in video_response["items"]:
            snippet = item["snippet"]            # video title, channel info, publishedAt, etc.
            stats = item.get("statistics", {})   # view/like/comment counts (may be hidden)
            content = item["contentDetails"]     # duration, dimension, etc.

            # Skip this video if no duration is present (can't classify length buckets)
            if 'duration' not in content:
                skipped_count += 1
                continue
                
            # Parse upload datetime (UTC) from ISO format (e.g., '2023-08-05T12:34:56Z')
            published_at = snippet["publishedAt"]
            dt = datetime.datetime.strptime(published_at, "%Y-%m-%dT%H:%M:%SZ")
    
            # # Parse ISO 8601 duration ('PT#H#M#S') and convert to total seconds
            duration_iso = content["duration"]
            duration_sec = parse_duration(duration_iso).total_seconds()
    
            # Classify duration type
            if duration_sec < 300:
                duration_type = "short"
            elif duration_sec <= 900:
                duration_type = "medium"
            else:
                duration_type = "long"
            
    
            video_data.append({
                "video_id": item["id"],
                "video_title": snippet["title"],
                "video_url": f"https://www.youtube.com/watch?v={item['id']}",
                "region_code": region_code,  
                "video_category_id": cat_id,
                "video_category_title": cat_title,
                "upload_date": dt.date(),
                "upload_time": dt.time(),
                "duration_type": duration_type,
                "video_duration": duration_iso,
                "duration_seconds": duration_sec
                "like_count": stats.get("likeCount"),
                "comment_count": stats.get("commentCount"),
                "view_count": stats.get("viewCount"),
                "channel_id": snippet["channelId"],
                "channel_title": snippet["channelTitle"]
            })

        next_page_token = results.get("nextPageToken")
            if not next_page_token:
                break

### Get channel data

Once we have the video IDs and channel IDs, we can use these channel IDs to get the channel statistics like number of subscribers, etc.

In [None]:
def get_channel_metadata(channel_ids):
    channel_data = []
    
    for i in range(0, len(channel_ids), 50):  # Process channel IDs in chunks of 50
        batch_ids = channel_ids[i:i+50]
        response = youtube.channels().list(
            part="snippet,statistics",
            id=','.join(batch_ids)
        ).execute()
        
        for item in response['items']:
            stats = item['statistics']   # may lack subscriberCount if hidden - We have only later deleted any rows with null values
            snippet = item['snippet']    # contains country, title, etc.
            subs = int(stats.get('subscriberCount', 0))  # safely parse subscriber count (default 0)

            # Bucket channels by subscriber count
            if subs <= 10_000:
                size = "Small"
            elif subs <= 100_000:
                size = "Medium"
            else:
                size = "Large"
            
            channel_data.append({
                'channel_id': item['id'],
                'channel_url': f'https://www.youtube.com/channel/{item["id"]}',
                'subscriber_count': subs,
                'channel_size': size              # Small/Medium/Large bucket
                'video_count': int(stats.get('videoCount', 0)),  # Total number of videos uploaded
                'country': snippet.get('country', 'Unknown') # Unknown if the country field is empty
            })

    return pd.DataFrame(channel_data)

### Previous Video Statistics

Once we have the final video IDs, we can now get the statistics of videos uploaded prior to the current video.

In [None]:
import pandas as pd
from googleapiclient.discovery import build
from tqdm import tqdm
from datetime import datetime
import time

INPUT_CSV = "FinalDataset.csv"
MAX_RESULTS = 50

# Returns a playlist which lists all uploaded videos
def get_uploads_playlist_id(channel_id):
    try:
        res = youtube.channels().list(part="contentDetails", id=channel_id).execute()
        return res["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]
    except:
        return None

# Gets tuples of video_id, published_at
def get_all_videos_in_playlist(playlist_id):
    videos = []
    next_page = None
    while True:
        res = youtube.playlistItems().list(
            part="contentDetails",
            playlistId=playlist_id,
            maxResults=MAX_RESULTS,
            pageToken=next_page
        ).execute()
        for item in res["items"]:
            vid = item["contentDetails"]["videoId"]  # Unique video ID
            time_published = item["contentDetails"]["videoPublishedAt"]  # ISO timestamp
            videos.append((vid, time_published))
        next_page = res.get("nextPageToken")
        if not next_page:
            break
        time.sleep(0.1) # Reduce risk of rate-limiting
    return videos

# Fetches statistics for the list of video IDs
def get_video_stats(video_ids):
    stats = {}
    for i in range(0, len(video_ids), 50):
        batch = video_ids[i:i+50]
        res = youtube.videos().list(part="statistics", id=",".join(batch)).execute()
        for item in res.get("items", []):
            vid = item["id"]
            s = item.get("statistics", {})
            stats[vid] = {
                "views": int(s.get("viewCount", 0)),
                "likes": int(s.get("likeCount", 0)),
                "comments": int(s.get("commentCount", 0))
            }
        time.sleep(0.1)
    return stats

# Load and parse your CSV
df = pd.read_csv(INPUT_CSV)
df["upload_time"] = pd.to_datetime(df["upload_time"], utc=True)

# Prepare result containers
results = []

# Iterate over every row (video) in the dataset with a progress bar
for idx, row in tqdm(df.iterrows(), total=len(df)):
    current_vid = row["video_id"]    # The video ID for which we'll compute history features
    channel_id = row["channel_id"]   # Channel ID to query uploads
    current_upload_time = row["upload_time"]   # Timestamp of the current video's upload (UTC)

    playlist_id = get_uploads_playlist_id(channel_id) # List of all the channel's uploads
    
    # If the channel lookup fails or playlist is unavailable, write zeros and continue
    if not playlist_id:
        results.append({
            "video_id": current_vid, 
            "prev_video_count": 0,
            "avg_views_prev10": 0,
            "avg_likes_prev10": 0,
            "avg_comments_prev10": 0
        })
        continue

    all_vids = get_all_videos_in_playlist(playlist_id) # Fetch (video_id, published_at) for all uploads from this channel

    # Keep only the videos published BEFORE the current video's upload time
    prev_vids = [(vid, pd.to_datetime(published, utc=True)) 
                 for vid, published in all_vids 
                 if pd.to_datetime(published, utc=True) < current_upload_time]

    prev_vids.sort(key=lambda x: x[1], reverse=True) # Sort previous uploads by publish time descending (most recent first)
    top_10 = prev_vids[:10]    # Take the 10 most recent prior uploads
    prev_vid_ids = [vid for vid, _ in top_10]  # Extract just the IDs for stats lookup

    stats = get_video_stats(prev_vid_ids)  # Query views/likes/comments for the top-10 previous uploads

    # Build the lists - 0 if stats are missing
    views = [stats.get(vid, {}).get("views", 0) for vid in prev_vid_ids]
    likes = [stats.get(vid, {}).get("likes", 0) for vid in prev_vid_ids]
    comments = [stats.get(vid, {}).get("comments", 0) for vid in prev_vid_ids]

    def safe_avg(values):
        return sum(values) / len(values) if values else 0

    results.append({
        "video_id": current_vid,             # Current video
        "prev_video_count": len(prev_vids),  # Number of videos uploaded prior to the current video
        "avg_views_prev10": safe_avg(views), # Average views on previous 10 videos
        "avg_likes_prev10": safe_avg(likes), # Average likes on previous 10 videos
        "avg_comments_prev10": safe_avg(comments)  # Average comments on previous 10 videos
    })

# II. Text Data

I have used Whisper-small ASR to extract transcripts from videos. I did try using the youtube_transcript_api, but it worked only for a few videos. Whisper turned out to work much better. 

Due to computational limitations, I have only used the small version here, the medium or large versions could give better results.

In [None]:
import os, gc, json
from pathlib import Path
import pandas as pd
from tqdm import tqdm
from faster_whisper import WhisperModel
import torch

CSV_PATH   = "FinalDataset.csv"
VIDEO_DIR  = Path("videos")                # Folder where the videos are stored
OUTPUT_CSV = "dataset_with_transcripts.csv"

MODEL_SIZE    = "small"                    # tiny|base|small|medium|large-v3
DEVICE        = "cpu"
COMPUTE_TYPE  = "int8_float32"
CHUNK_LENGTH  = 15                         # seconds per chunk
VAD_FILTER    = True                       # Voice Activity Detection
BEAM_SIZE     = 5                          # Controls the number of paths that are explored at each step when generating an output.
RELOAD_EVERY  = 50                         # reload model every N videos (set 0 to disable)

# In case the transcription gets interrupted, we can resume from the checkpoint - hence a partial file
if os.path.exists(OUTPUT_CSV + ".partial"):
    print("📄 Loading partial dataset...")
    df = pd.read_csv(OUTPUT_CSV + ".partial")
else:
    print("📄 Loading original dataset...")
    df = pd.read_csv(CSV_PATH)
    
if "video_id" not in df.columns:
    raise ValueError("Dataset must have a 'video_id' column.")

# Add output column(s) if missing
if "transcript" not in df.columns:
    df["transcript"] = None
if "transcript_lang" not in df.columns:
    df["transcript_lang"] = None

print("Torch CUDA available:", torch.cuda.is_available())

def load_model():
    print(f"Loading faster-whisper model='{MODEL_SIZE}', device='{DEVICE}', compute_type='{COMPUTE_TYPE}'")
    return WhisperModel(MODEL_SIZE, device=DEVICE, compute_type=COMPUTE_TYPE)

model = load_model()

def transcribe_one(video_path: Path):
    segments_gen, info = model.transcribe(  # Returns a generator of segments and an info object
        str(video_path),
        vad_filter=VAD_FILTER,
        beam_size=BEAM_SIZE,
        chunk_length=CHUNK_LENGTH,   # important for memory
        language=None,               # auto-detect; set "en" to force
    )
    segs = []
    texts = []
    for s in segments_gen:
        # collect immediately to free generator
        end_time = s.end if s.end is not None else s.start + s.duration
        segs.append({"start": float(s.start), "end": float(end_time), "text": s.text}) # strcutured segments
        texts.append(s.text)          # raw seagments to join into a string
    text = " ".join(texts).strip()    # full transcript (simple whitespace join)
    lang = getattr(info, "language", None)   # 2-letter code or None
    return text, lang, segs

# free Python and CUDA memory between videos
def cleanup():
    gc.collect()
    if DEVICE == "cuda":
        torch.cuda.empty_cache()


total = len(df)  # Total number of files
processed = 0    # Number of videos we attempt to transcribe in this run


# Iterate with a progress bar
for i, row in tqdm(df.iterrows(), total=total, desc="Transcribing videos"):  # df.iterrows() yields (index, row) pairs
    vid = str(row["video_id"])        # convert to to string for file naming
    if isinstance(row.get("transcript"), str) and row["transcript"].strip(): # Skip the video if it already has a transcript
        print(f"Skipping {vid} as transcript found.")
        continue

    video_path = VIDEO_DIR / f"{vid}.mp4"  # Path of the video file to be transcribed
    # Handle missing files
    if not video_path.exists():
        print(f"[{i+1}/{total}] Missing file: {video_path}")
        df.at[i, "transcript"] = None
        df.at[i, "transcript_lang"] = None
        continue

    try:
        print(f"[{i+1}/{total}] {video_path.name}")  # log progress
        text, lang, segs = transcribe_one(video_path) # run transcription
        df.at[i, "transcript"] = text  # store transcript string
        df.at[i, "transcript_lang"] = lang   # store detected language
    except Exception as e:         # On any error, record nulls and continue (don’t block the whole batch)
        print(f"  ERROR {video_path.name}: {type(e).__name__}: {e}")
        df.at[i, "transcript"] = None
        df.at[i, "transcript_lang"] = None

    processed += 1     # increment only when we attempted this file
    # After every 10 videos, we'll write to a partial csv file - This is to make sure we don't lose progress if interrupted
    if processed % 10 == 0:
        df.to_csv(OUTPUT_CSV + ".partial", index=False)

     # Optionally reload the model periodically to reduce memory fragmentation in long runs
    if RELOAD_EVERY and processed % RELOAD_EVERY == 0:
        del model     # drop current model reference
        cleanup()     # clear memory before re-allocating
        model = load_model()  # fresh model instance

    cleanup()         # always clean up between files

df.to_csv(OUTPUT_CSV, index=False)
print("Saved:", OUTPUT_CSV)

# III. Video Data

### Video Download

I have downloaded most of the videos using yt-dlp. However, it failed to download some of them, hence I had to manually download them from the web. 

In [None]:
import pandas as pd
import subprocess
import os

df = pd.read_csv('FinalDataset.csv')

# Create output directory
output_dir = 'videos'
os.makedirs(output_dir, exist_ok=True)  # Create if missing; do nothing if it already exists

video_urls = df['video_url'].dropna().drop_duplicates().tolist() # Get unique video URLs

failed_downloads = []   # Keep track of failed downloads

for i, url in enumerate(video_urls, start=1):
    try:
        yt_dlp_command = [
            "yt-dlp",
            url,
            "-o", f"{output_dir}/%(id)s.%(ext)s",     # Output name: videoID.ext
            "-f", "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]",  # Select best quality video and audio files or the best mp4 file
            "--merge-output-format", "mp4",       # In case we get two separate audio and video files, merge them into one mp4 file
            "--no-playlist",              # Treat URLS as a single video
            "--ffmpeg-location", r"C:\Users\Mansi Jadhav\ffmpeg\bin"  # Location of ffmpeg executables
        ]

        subprocess.run(yt_dlp_command, timeout=180, check=True) # Run with timeout (e.g., 180 seconds)
        print(f"[{i}/{len(video_urls)}] Downloaded: {url}")

    # Exceptions
    except subprocess.TimeoutExpired:    # yt-dlp took longer than 'timeout' seconds
        print(f"[{i}/{len(video_urls)}] Timeout: {url}") 
        failed_downloads.append((url, "timeout"))
    except subprocess.CalledProcessError:  # yt-dlp exited with a non-zero status (e.g., network error, unavailable video, DRM)
        print(f"[{i}/{len(video_urls)}] Failed: {url}")
        failed_downloads.append((url, "error"))
    except Exception as e:           # Catch-all for any other unexpected issues (e.g., OSError if yt-dlp not found)
        print(f"[{i}/{len(video_urls)}] Unexpected Error: {url} ({str(e)})")
        failed_downloads.append((url, str(e)))

# Save failed video IDs so that we can try them again
if failed_downloads:
    failed_df = pd.DataFrame(failed_downloads, columns=["video_url", "reason"])
    failed_df.to_csv("failed_downloads.csv", index=False)
    print(f"\nSome downloads failed. Logged to 'failed_downloads.csv'.")
else:
    print("\nAll videos downloaded successfully.")

### Frame Extraction

We have extracted frames using OpenCV.

The average of duration seconds in our dataset was 694 seconds. Hence, I decided to keep at least one frame per second on the average videos and extracted 700 frames per video.
<br>We have extracted frames uniformly, meaning we have divided the total number of frames into 700 parts and extracted the one frame from each. This ensures we get the content from the entire video, not just the start.
<br>In case there are less than 700 frames, we have repeated frames to reach the count.

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import isodate
from tqdm import tqdm
from PIL import Image
import shutil
import glob
import statistics

def FrameCaptureUniformOpenCV(directory, output_dir, frames_to_extract=700):
    os.makedirs(output_dir, exist_ok=True)

    video_files = sorted([f for f in os.listdir(directory) if f.endswith(".mp4")]) # Get the list of video files

    for filename in tqdm(video_files, desc="Processing videos"):
        video_path = os.path.join(directory, filename) # Get the entire path of the video file
        video_name = os.path.splitext(filename)[0]     # The video file name which is videoID
        video_folder = os.path.join(output_dir, video_name)   # Output folder for every video

        # Skip if folder exists and has exactly 700 frames
        if os.path.exists(video_folder):
            existing_frames = [f for f in os.listdir(video_folder) if f.endswith(".jpg")]
            if len(existing_frames) == frames_to_extract:
                print(f"Skipping {filename}: already has {frames_to_extract} frames")
                continue
            else:
                print(f"Deleting incomplete folder for {filename}") # If we have less than 700 frames, delete them and extract again
                shutil.rmtree(video_folder)

        os.makedirs(video_folder, exist_ok=True)  # Create an output folder

        cap = cv2.VideoCapture(video_path)  # Read the video
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))  # Get total number of frames in the video

        if total_frames == 0: # Skip if there are 0 frames
            print(f"Skipping {filename}: no readable frames")
            cap.release()
            continue

        # Generate frame indices
        if total_frames >= frames_to_extract:
            frame_indices = np.linspace(0, total_frames - 1, frames_to_extract, dtype=int) # Evenly spaced indices from [0, total_frames-1]
        else:
            base_indices = np.linspace(0, total_frames - 1, total_frames, dtype=int) # Get the base indices of all frames
            repeats = int(np.ceil(frames_to_extract / total_frames))    # Calculate how many times we have to repeat these frames
            frame_indices = np.tile(base_indices, repeats)[:frames_to_extract]  # Repeat the frames to get 700 

        saved = 0
        for idx in frame_indices:     # Loop over the selected frame indices
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)  # Seek the particular frame
            success, frame = cap.read()         # Read the frame
            if not success:
                print(f"Failed to read frame {idx} in {filename}")
                continue

            frame = cv2.resize(frame, (256, 256))   # Resize to lower resolution 256x256

            frame_path = os.path.join(video_folder, f"{video_name}_frame{saved:04d}.jpg") # Output path
            cv2.imwrite(frame_path, frame)   # Write the file to the output path
            saved += 1   # Increment the counter

        cap.release()   # Release the decoder
        print(f"{filename}: saved {saved} frames")