In [1]:
# library

import os, time, json, sqlite3
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import pandas as pd
from tqdm import tqdm
import os

In [11]:
#load api key

API_KEY = os.getenv("YOUTUBE_API_KEY")
youtube = build("youtube","v3", developerKey= API_KEY)

In [7]:
# Main with schema suggestions(columns)


def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

def get_video_details(video_ids):
    rows = []
    for batch in chunks(video_ids, 50):
        try:
            resp = youtube.videos().list(
                part = "snippet, contentDetails, statistics",
                id = ",".join(batch),
                maxResults = 50
            ).execute()
        except HttpError as e:
             print("HttpError:",e)
             time.sleep(5)
             continue

        for item in resp.get("items", []):
            snip = item.get("snippet",{})
            stats = item.get("statistics",{})
            content = item.get("contentDetails",{})
            rows.append({
             "video_id": item.get("id"),
             "title": snip.get("title"),
             "description": snip.get("description"),
             "tags": json.dumps(snip.get("tags", [])),
             "published_at": snip.get("publishedAt"),
             "channel_id": snip.get("channelId"),
             "channel_title": snip.get("channelTitle"),
             "category_id": snip.get("categoryId"),
             "duration": content.get("duration"),
             "definition": content.get("definition"),
             "viewCount": int(stats.get("viewCount", 0)),
             "likeCount": int(stats.get("likeCount")) if stats.get("likeCount") else None,
             "commentCount": int(stats.get("commentCount")) if stats.get("commentCount") else None,
            })
        time.sleep(0.1)
    return rows
             

In [8]:
# for search videos and fetch & store

def search_videos(query, max_results = 500, regionCode = "IN"):
    results = []
    request = youtube.search().list(
        q = query, part = "id", type = "video", maxResults = 50, regionCode = regionCode
    )
    while request and len(results) < max_results:
        resp = request.execute()
        for it in resp.get("items",[]):
            if it['id']['kind'] == 'youtube#video':
                vid = it["id"]["videoId"]
            results.append(vid)
            if len(results) >= max_results:
                break
        request = youtube.search().list_next(request, resp)
        time.sleep(0.1)
    return results

def fetch_and_store(queries, out_csv="youtube_videos.csv", sqlite_db="youtube_videos.db"):
    all_rows = []
    for q in queries:
        print("Searching:",q)
        ids = search_videos(q, max_results = 500)
        print(f" -> {len(ids)} ids")
        rows = get_video_details(ids)
        all_rows.extend(rows)

# saving to sql & csv file
     
    df = pd.DataFrame(all_rows).drop_duplicates(subset=["video_id"])
    df.to_csv(out_csv, index=False)
    conn = sqlite3.connect(sqlite_db)
    df.to_sql("videos", conn, if_exists= "replace", index = False)
    conn.close()
    print("Saved", len(df), "videos to", out_csv, "and", sqlite_db)

if __name__ == "__main__":
    queries = ["technology","education","music","gaming","sports","comedy","news","cooking","travel"]
    fetch_and_store(queries, out_csv="youtube_sample.csv")

Searching: technology
 -> 500 ids
Searching: education
 -> 350 ids
Searching: music
 -> 350 ids
Searching: gaming
 -> 500 ids
Searching: sports
 -> 500 ids
Searching: comedy
 -> 500 ids
Searching: news
 -> 500 ids
Searching: cooking
 -> 500 ids
Searching: travel
 -> 500 ids
Saved 3779 videos to youtube_sample.csv and youtube_videos.db
