In [1]:
import pandas as pd
import numpy as np
import requests
import base64
from datetime import datetime, timedelta
from dateutil import tz
import sys
import time
import json
import os
from dotenv import load_dotenv
from pandas import json_normalize
from glob import glob
from pathlib import Path

In [None]:
class YoutubeScraper:
    """
    Thin wrapper around YouTube Data API (v3) for search and video stats.

    This class provides:
      1) `search(...)`: Save paginated search results to JSON files.
      2) `batch(...)`: Split a DataFrame of video IDs into batches of <=50 IDs.
      3) `video(...)`: Fetch video statistics for IDs in batches and save to disk.

    Attributes:
        BASE_URL (str): Root URL for Google APIs.
        api_key (str): YouTube Data API key.
        header (dict): Default HTTP headers for requests.
    """
    
    BASE_URL="https://www.googleapis.com"

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}


    def search(self,
               query: str,
               type_: str = 'video',
               max_result: int= 50,
               ):
        """Search YouTube and save each page of results as a JSON file.

        Uses the `/youtube/v3/search` endpoint, follows `nextPageToken` until
        all pages are collected, and writes each page to `data/search-<query>-<timestamp>/`.

        Args:
            query: Search keywords.
            type_: One of {'video', 'playlist', 'channel'}.
            max_result: Items per page (YouTube caps at 50). Must be in [1, 50].

        Returns:
            Path to the created folder containing all JSON page files.

        Raises:
            TypeError: If `max_result` or `type_` have wrong types.
            ValueError: If `max_result` is out of range, or `type_` is invalid.
            requests.RequestException: If the HTTP request fails (non-200/403).
        """
        
        if not isinstance(max_result,int):
            raise TypeError('Input number 1 - 50 only')
        if max_result > 50 or max_result < 1:
            raise ValueError('Input number 1 - 50 only')
    
        if not isinstance(type_,str):
            raise TypeError('Input video|playlist|channel')
        if type_ != 'video' and type_ !='playlist' and type_ !='channel':
            raise ValueError("Input video|playlist|channel")
        
        ENDPOINT_AUTH = "/youtube/v3/search"
        folder_name = os.path.join("data", f"search")
        os.makedirs(folder_name, exist_ok=True)
        response = {}

        while True:
            current_key = self.api_key
            params = {'part':'snippet',
                      'q':query,
                      'type':type_,
                      "maxResults": max_result,
                      'key' : current_key
                      }
            nextpage = response.get("nextPageToken", None)
            print(f"Getting page {nextpage}")

            if nextpage is None:
                r = requests.get(
                    f"{self.BASE_URL}{ENDPOINT_AUTH}",
                    params=params,
                    headers=self.header,
                )
                filename = f'{query}.json'
            else:
                params["pageToken"] = nextpage
                r = requests.get(
                    f"{self.BASE_URL}{ENDPOINT_AUTH}",
                    params=params,
                    headers=self.header,
                )
                filename=f'{query}-{nextpage}.json'
            if r.status_code == 200:
                print(f"Code 200 - Using Quota")
                response = r.json()
                nextpage = response.get("nextPageToken", None)
                file_path = os.path.join(folder_name, filename)

                with open(file_path, "w", encoding="utf-8") as file:
                    json.dump(response, file, ensure_ascii=False, sort_keys=True, indent=2)

                print(f"Saved Json file page {filename}")
                if nextpage is None:
                    print("Collected All Search")
                    break

            elif r.status_code == 403:
                print(
                    f"Key reached limit quota"
                )
                continue

            else:
                print(f"code {r.status_code}")
                break
        return folder_name
    

    def batch (self, dataframe: pd.DataFrame, col_name: str ='video_id', batch_size: int = 50) -> list[tuple[int,str]]:
        """YouTube Video API accepts up to 50 video IDs per requests. This function splits the full list of IDs into batches of 50 for multiple API calls."""

        if col_name not in dataframe.columns:
            raise KeyError(f"Column '{col_name}' not found in DataFrame")
        col = dataframe[col_name].astype(str)
        col_length = len(col)

        if (col_length % batch_size) !=0:
            num_batch = (col_length // batch_size) + 1
        else:
            num_batch = (col_length // batch_size)

        return [
        (i + 1, ",".join(col.iloc[i*batch_size:(i+1)*batch_size].tolist()))
        for i in range(num_batch)
    ]



    def video(self, dataframe: pd.DataFrame):
        """Fetch video statistics for a set of IDs in batches and save to JSON files.

        Uses `/youtube/v3/videos` with batched comma-separated IDs produced by `batch(...)`.
        Each response is saved to `data/video/<batch_idx>.json`.

        Args:
            dataframe: Pandas DataFrame with a column named 'video_id' by default.

        Returns:
            Path to the folder containing saved JSON files (one per batch).

        Notes:
            - This method only requests `part=statistics`. Add `snippet,contentDetails`
              if you need more fields.
            - The `/videos` endpoint with `id=` param does NOT paginate with nextPageToken.
              Each call returns stats for the provided list of IDs only.

        Raises:
            requests.RequestException: If a non-200/403 HTTP error occurs.
        """
        ENDPOINT_AUTH = '/youtube/v3/videos'
        folder_name = os.path.join("data", f"video")
        os.makedirs(folder_name, exist_ok=True)
        response = {}
        batches = self.batch(dataframe)

        for batch_idx, batch in batches:
            current_key = self.api_key
            params = {
                'part':'statistics',
                'id':batch,
                'key':current_key,
                }

            r = requests.get(
                f"{self.BASE_URL}{ENDPOINT_AUTH}",
                params=params,
                headers=self.header,
            )

            if r.status_code == 200:
                print(f"Code 200 - Using Quota")
                response = r.json()
                filename= f"{batch_idx}.json"
                file_path = os.path.join(folder_name, filename)

                with open(file_path, "w", encoding="utf-8") as file:
                    json.dump(response, file, ensure_ascii=False, sort_keys=True, indent=2)

                print(f"Saved Json file page {filename}")

            elif r.status_code == 403:
                print(
                    f"Key reached limit quota"
                )
                break

            else:
                print(f"code {r.status_code}")
                break

        return folder_name   
        

In [160]:
class ProcessJSON:
    def __init__(self,path: str = 'data'):
        self.path=path

    def search (self, file_search_name_format: str ='search*') -> pd.DataFrame :
        """
        Load a batch of YouTube Search API JSON files into a DataFrame.
        """
        pattern = str(Path(self.path)/file_search_name_format/"*.json")
        search_path=glob(pattern)
        all_data =[]

        for p in search_path:
            try:
                with open(p, 'r', encoding='utf-8') as file:
                    record = json.load(file)
            except json.JSONDecodeError as e: 
                print(f'Check {p}: {e}')
                continue
            except Exception as e:
                print(f'check {p}: {e}')
                continue

            try:
                if 'items' in record and isinstance(record['items'], list):
                    for item in record['items']:
                        try:
                            snippet = item['snippet']
                            video_id = item['id']['videoId']
                            
                            flat_record = {
                                'video_id': video_id,
                                'channel_id': snippet.get('channelId'),
                                'channel_title': snippet.get('channelTitle'),
                                'title': snippet.get('title'),
                                'publish_time': snippet.get('publishTime'),
                            }
                            all_data.append(flat_record)
                        except:
                            print (f'check {item}') 
                else:
                    print(f"check file (no 'items'): {p}")
            except:
                print(f'check {p}: {e}')

        return pd.DataFrame(all_data)
    
    def video (self, file_video_name_format: str ='video*') -> pd.DataFrame:
        """
        Load a batch of YouTube Video API JSON files into a DataFrame.
        """
        pattern = str(Path(self.path)/file_video_name_format/"*.json")
        video_path = glob(pattern)
        all_data =[]

        for p in video_path:
            try:
                with open(p, 'r', encoding='utf-8') as file:
                    record = json.load(file)
            except json.JSONDecodeError as e: 
                print(f'Check {p}: {e}')
                continue
            except Exception as e:
                print(f'check {p}: {e}')
                continue

            try:
                if 'items' in record and isinstance(record['items'], list):
                    for item in record['items']:
                        try:
                            snippet = item['statistics']
                            video_id = item['id']
                            
                            flat_record = {
                                'video_id': video_id,
                                'total_comment': snippet.get('commentCount'),
                                'total_favorite': snippet.get("favoriteCount"),
                                'total_like': snippet.get("likeCount"),
                                'views':snippet.get('viewCount')
                            }
                            all_data.append(flat_record)
                        except:
                            print (f'check {item} 1234') 
                else:
                    print(f"check file (no 'items'): {p}")
            except:
                print(f'check {p}: {e}')

            return pd.DataFrame(all_data)

In [None]:
search_api = YoutubeScraper('YOUR_API_KEY').search('YOUR-QUERY','video',50)
search = ProcessJSON('data').search('search*')
video_api = YoutubeScraper('YOUR_API_KEY').video(search)
video = ProcessJSON('data').video('video')
df = pd.merge(search,video, on ='video_id')