Define **API Keys** and a **Switching Function** the Keys **(GOAL: Prevent Quota Exsaution)**

In [None]:
import csv
import googleapiclient.discovery
import isodate
import time
import random
import json
import os

# YouTube API Keys Rotation
API_KEYS = ["YOUR_API_KEY"]
current_key = 0


def switch_api_key(): #In case of using multiple keys! "In our case, we use multiple keys with different accounts: 1 API Key per account"
    """Switches API key when quota is exceeded."""
    global current_key, youtube
    current_key = (current_key + 1) % len(API_KEYS)
    youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=API_KEYS[current_key])

# Initialize YouTube API
youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=API_KEYS[current_key])


In [None]:
def search_youtube_shorts(query):
    """Search for YouTube Shorts videos based on a query."""
    max_results = 50
    request = youtube.search().list(
        part='snippet',
        maxResults=max_results,
        q=f"{query} #shorts", #Put "shorts" query to push the searching API to fetch only short videos.
        type='video'
    )
    response = request.execute()
    return response

def get_video_durations(video_ids):
    """Get video durations and ensure they are 60s or less."""
    video_id_str = ",".join(video_ids)
    request = youtube.videos().list(part="contentDetails", id=video_id_str)
    response = request.execute()

    durations = {}
    for item in response.get("items", []):
        video_id = item["id"]
        duration_iso = item["contentDetails"]["duration"]
        duration_sec = isodate.parse_duration(duration_iso).total_seconds()

        if duration_sec <= 60:
            minutes, seconds = divmod(int(duration_sec), 60)
            durations[video_id] = f"{minutes}:{seconds:02d}"

    return durations

In [None]:
def load_existing_video_ids(filename):
    """Load existing video IDs to prevent duplicates."""
    if not os.path.exists(filename):
        return set()

    existing_ids = set()
    with open(filename, "r", encoding="utf-8") as file:
        reader = csv.DictReader(file)
        for row in reader:
            existing_ids.add(row['short_url'].split('/')[-1])
    return existing_ids

# Save Data to CSV
def save_to_csv(data, filename):
    keys = data[0].keys()
    with open(filename, 'w', newline='', encoding='utf-8') as output_file:
        dict_writer = csv.DictWriter(output_file, fieldnames=keys)
        dict_writer.writeheader()
        dict_writer.writerows(data)


def save_raw_data(data, filename):
    """Append new data to JSON file while preventing duplicates."""
    if os.path.exists(filename):
        with open(filename, 'r', encoding='utf-8') as json_file:
            try:
                existing_data = json.load(json_file)  # Load existing data
            except json.JSONDecodeError:
                existing_data = []  # Handle empty or corrupt JSON
    else:
        existing_data = []

    # Convert existing video URLs into a set to avoid duplicates
    existing_ids = {entry['short_url'] for entry in existing_data}

    # Add only new videos that aren't already in the dataset
    new_data = [video for video in data if video['short_url'] not in existing_ids]

    if new_data:  # Save only if there are new videos
        existing_data.extend(new_data)  # Append new videos

        with open(filename, 'w', encoding='utf-8') as json_file:
            json.dump(existing_data, json_file, ensure_ascii=False, indent=4)

        print(f"Added {len(new_data)} new videos to {filename}")
    else:
        print("No new videos to add. Dataset is up-to-date.")


In [None]:
CATEGORY_MAPPING = { # Mapping between category-name and category-id
    "1": "Film & Animation", "2": "Cars & Vehicles", "10": "Music",
    "15": "Pets & Animals", "17": "Sports",
    "19": "Travel & Events", "20": "Gaming", "22": "People & Blogs",
    "23": "Comedy", "24": "Entertainment", "25": "News & Politics",
    "26": "How-to & Style", "27": "Education", "28": "Science & Technology",
    "29": "Nonprofits & Activism"
}
def get_video_details(video_ids):
    """Get full details of videos and filter only Shorts (0-60s preferred, max 180s)."""
    video_id_str = ",".join(video_ids)
    request = youtube.videos().list(
        part="snippet,statistics,contentDetails",
        id=video_id_str
    )
    response = request.execute()

    video_details = {}
    for item in response.get("items", []):
        video_id = item["id"]
        snippet = item["snippet"]
        statistics = item.get("statistics", {})
        content_details = item["contentDetails"]

        # Extract and parse duration
        duration_sec = isodate.parse_duration(content_details["duration"]).total_seconds()

        # Strict filter: Ensure Shorts duration (preferring 0-60s, max 180s)
        if duration_sec > 180:  # Ignore >3 min and very short ones
            continue

        # Extract category ID & map to category name
        category_id = snippet.get("categoryId", "Unknown")
        category_name = CATEGORY_MAPPING.get(category_id, "Unknown")

        # Store video details
        video_details[video_id] = {
            'title': snippet.get("title", ""),
            'description': snippet.get("description", ""),
            'published_at': snippet.get("publishedAt", ""),
            'channel_id': snippet.get("channelId", ""),
            'channel_name': snippet.get("channelTitle", ""),
            'category_id': category_id,
            'category_name': category_name,
            'tags': ", ".join(snippet.get("tags", [])) if "tags" in snippet else "",
            'view_count': statistics.get("viewCount", 0),
            'like_count': statistics.get("likeCount", 0),
            'comment_count': statistics.get("commentCount", 0),
            'duration': duration_sec  # Store duration in seconds
        }

    return video_details




In [None]:
def main():
   # General TikTok Trending Keywords for YouTube Shorts
    trending_keywords = [
      "#Shorts", "#YouTubeShorts", "#ViralShorts", "#Trending", "#funnyvideos",
      "#lifestyle", "#football", "#sport", "#hightech", "#tourist", "#travel",
      "#streetphotography", "#food", "#healthyrecipes", "#gaming", "#videogames",
      "#art", "#craft", "#photography", "#easyrecipe", "#petitdejeuner", "#experiment",
      "#savoir", "#creativejournaling", "#fashion", "#entertainment", "#comedy",
      "#reaction", "#challenge", "#funny", "#fitness", "#motivation", "#education",
      "#tech", "#science", "#music", "#dance", "#howto", "#DIY", "#hacks", "#makeup",
      "#beauty", "#carreview", "#motorcycle", "#adventure", "#animals", "#nature",
      "#movieclips", "#cartoons", "#shortfilm", "#vlog", "#behindthescenes"
      ]


    dataset_file = 'categorized_youtube_shorts.csv'
    dataset_file_json = 'categorized_youtube_shorts.json'

    existing_ids = load_existing_video_ids(dataset_file)
    dataset = []

    for query in trending_keywords:
      try:
          response = search_youtube_shorts(query=query)
      except Exception as e:
          print(f"API Error: {e}. Switching API key...")
          switch_api_key()
          continue  # Retry with a new API key

      video_data = []
      for item in response.get('items', []):
          video_id = item['id']['videoId']
          if video_id in existing_ids:
              continue  # Skip duplicates

          snippet = item['snippet']
          video_data.append({
              'short_url': f"https://www.youtube.com/shorts/{video_id}",
              'channel_name': snippet['channelTitle']
          })
          existing_ids.add(video_id)  # Add to seen list

      if video_data:
          video_ids = [vid['short_url'].split('/')[-1] for vid in video_data]
          details = get_video_details(video_ids)

          for video in video_data:
              vid_id = video['short_url'].split('/')[-1]
              if vid_id in details:
                  video.update(details[vid_id])  # Merge details into video data
                  dataset.append(video)

      time.sleep(random.uniform(1, 3))  # Prevent hitting rate limits

    save_to_csv(dataset, dataset_file)
    save_raw_data(dataset, dataset_file_json)
    print("Dataset updated successfully!")

if __name__ == "__main__":
    main()
