# **YouTube Metadata Extraction with Parallelization**

This Google Colab file allows you to extract metadata from YouTube videos using `yt-dlp`, with parallelized processing to speed up the extraction for multiple videos. The workflow includes the following steps:

*   **Retrieve Video URLs**: The notebook takes a list of YouTube video URLs.
*   **Metadata Extraction**: It uses `yt-dlp` to extract metadata (e.g., title, description, upload date, etc.) from each video.
*   **Parallelization**: The extraction process is parallelized to handle multiple videos at the same time, improving efficiency.
*   **Handle Errors**: It handles errors such as private or unavailable videos and skips them.
*   **Store the Results**: The metadata for each video is stored in a structured format, typically a CSV or JSON file, with the following metadata fields:
   - `id`  
   - `title`  
   - `channel`  
   - `channel_id`  
   - `upload_date`  
   - `duration`  
   - `view_count`  
   - `like_count`  
   - `comment_count`  
   - `categories`  
   - `tags`  
   - `language`  
   - `description`  
*   **Download Results**: Once the extraction is complete, a file containing the extracted metadata is made available for download.

<br>

## **Instructions to Run in Google Colab:**

*   **Upload Video URLs File**: Ensure you have a file (CSV/JSON) containing the YouTube video URLs. Upload this file to Colab if required.

*   **Run the Cells**: Run each cell sequentially by pressing `Shift + Enter` or clicking the **Run** button.  
    The notebook will process the video URLs, extract metadata, and handle any errors like private or unavailable videos.

*   **Download the Results**: After the metadata extraction is complete, the results will be saved in a file (e.g., `yt_metadata.csv`).  
    A download link will appear, allowing you to download the file containing the extracted metadata.

By following these steps, you can easily run the notebook in Google Colab to extract YouTube video metadata with parallelization.


In [None]:

!pip install yt-dlp boto3 tqdm pandas concurrent-log-handler -q
!pip install yt-dlp boto3 tqdm pandas matplotlib seaborn nltk wordcloud -q
!pip install pandas matplotlib seaborn nltk wordcloud boto3 -q
!pip install git+https://github.com/yt-dlp/yt-dlp.git -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m171.9/171.9 kB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m30.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.4/13.4 MB[0m [31m72.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.2/84.2 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone


In [None]:
import pandas as pd
import yt_dlp
import os
import time
import json
import boto3
from tqdm.notebook import tqdm
from google.colab import files
import concurrent.futures
import logging
import random
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('metadata_extraction.log')
    ]
)
logger = logging.getLogger(__name__)


AWS_ACCESS_KEY = 'key'
AWS_SECRET_KEY = 'key'
BUCKET_NAME = 'yt-chunk-mp3'

s3 = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY
)

In [None]:
input_file = 'clean_video_ids.csv'
output_file = 'video_metadata.csv'
error_file = 'failed_video_ids.csv'
checkpoint_file = 'extraction_checkpoint.json'
s3_prefix = 'metadata/'  # S3 folder to store results

metadata_fields = [
    'id', 'title', 'channel', 'channel_id', 'upload_date',
    'duration', 'view_count', 'like_count', 'comment_count',
    'categories', 'tags', 'language', 'description'
]

if not os.path.exists(input_file):
    print(f"'{input_file}' not found. Please upload it.")
    uploaded = files.upload()

    if input_file not in uploaded:
        print(f"Error: {input_file} was not uploaded. Using the first uploaded file.")
        input_file = list(uploaded.keys())[0]

'clean_video_ids.csv' not found. Please upload it.


Saving clean_video_ids.csv to clean_video_ids.csv


In [None]:
def download_from_s3(key, local_path):
    try:
        s3.download_file(BUCKET_NAME, key, local_path)
        logger.info(f"Downloaded {key} from S3")
        return True
    except Exception as e:
        logger.error(f"Error downloading from S3: {e}")
        return False

def upload_to_s3(local_path, key):
    try:
        s3.upload_file(local_path, BUCKET_NAME, key)
        logger.info(f"Uploaded {local_path} to S3 as {key}")
        return True
    except Exception as e:
        logger.error(f"Error uploading to S3: {e}")
        return False

def extract_video_metadata(video_id):
    youtube_url = f"https://www.youtube.com/watch?v={video_id}"

    ydl_opts = {
        'quiet': True,
        'no_warnings': True,
        'skip_download': True,
        'writeinfojson': False,
        'extract_flat': False,
    }

    #delay to avoid throttling
    time.sleep(random.uniform(0.1, 0.5))

    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(youtube_url, download=False)

        metadata = {}
        metadata['id'] = video_id

        for field in metadata_fields[1:]:  # Skip 'id' as we already have it
            if field in info:
                metadata[field] = info[field]
            else:
                metadata[field] = None

        return metadata, None

    except Exception as e:
        error_msg = str(e)
        if "Video unavailable" in error_msg or "This video is not available" in error_msg:
            logger.warning(f"Video {video_id} is unavailable")
        elif "HTTP Error 429" in error_msg or "Too Many Requests" in error_msg:
            logger.error(f"Rate limited for video {video_id}. Consider reducing concurrency.")
        else:
            logger.error(f"Error for video {video_id}: {error_msg}")
        return None, (video_id, error_msg)

def load_checkpoint():
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, 'r') as f:
            checkpoint = json.load(f)
            return checkpoint
    else:
        s3_checkpoint_key = s3_prefix + checkpoint_file
        try:
            download_from_s3(s3_checkpoint_key, checkpoint_file)
            with open(checkpoint_file, 'r') as f:
                checkpoint = json.load(f)
                return checkpoint
        except:
            pass

    return {
        'processed_ids': [],
        'last_update': datetime.now().isoformat()
    }

def save_checkpoint(processed_ids):
    checkpoint = {
        'processed_ids': list(processed_ids),
        'last_update': datetime.now().isoformat()
    }

    with open(checkpoint_file, 'w') as f:
        json.dump(checkpoint, f)


    upload_to_s3(checkpoint_file, s3_prefix + checkpoint_file)

def process_batch(video_ids, max_workers):
    metadata_results = []
    error_results = []
    processed = set()

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_id = {executor.submit(extract_video_metadata, video_id): video_id for video_id in video_ids}

        # Process completed tasks as they come in
        for future in tqdm(concurrent.futures.as_completed(future_to_id), total=len(video_ids)):
            video_id = future_to_id[future]
            try:
                metadata, error = future.result()
                if metadata:
                    metadata_results.append(metadata)
                elif error:
                    error_results.append(error)

                processed.add(video_id)
            except Exception as e:
                logger.error(f"Exception for video {video_id}: {str(e)}")
                error_results.append((video_id, str(e)))
                processed.add(video_id)

    return metadata_results, error_results, processed


In [None]:

def main():

    logger.info(f"Loading video IDs from {input_file}...")
    df = pd.read_csv(input_file)
    video_ids = df['video_id'].tolist()
    total_videos = len(video_ids)
    logger.info(f"Found {total_videos} video IDs to process")


    s3_metadata_key = s3_prefix + output_file
    s3_error_key = s3_prefix + error_file

    try:
        download_from_s3(s3_metadata_key, output_file)
        download_from_s3(s3_error_key, error_file)
    except:
        logger.info("No existing files found in S3 or error downloading")

    metadata_list = []
    if os.path.exists(output_file):
        try:
            existing_df = pd.read_csv(output_file)
            metadata_list = existing_df.to_dict('records')
            logger.info(f"Loaded {len(metadata_list)} existing metadata records")
        except Exception as e:
            logger.error(f"Error loading existing metadata: {e}")

    error_list = []
    if os.path.exists(error_file):
        try:
            error_df = pd.read_csv(error_file)
            error_list = list(zip(error_df['video_id'].tolist(), error_df['error'].tolist()))
            logger.info(f"Loaded {len(error_list)} existing error records")
        except Exception as e:
            logger.error(f"Error loading existing errors: {e}")

    checkpoint = load_checkpoint()
    processed_ids = set(checkpoint.get('processed_ids', []))
    logger.info(f"Loaded {len(processed_ids)} processed IDs from checkpoint")

    if metadata_list:
        processed_ids.update([item['id'] for item in metadata_list])
    if error_list:
        processed_ids.update([item[0] for item in error_list])

    video_ids_to_process = [vid for vid in video_ids if vid not in processed_ids]
    logger.info(f"Already processed: {len(processed_ids)} videos, {len(video_ids_to_process)} remaining")

    try:
        batch_size = int(input(f"Enter batch size (recommended: 100-500) [default: 200]: ") or "200")
        max_workers = int(input(f"Enter number of parallel workers (recommended: 10-30) [default: 20]: ") or "20")
        max_videos = input(f"Enter max number of videos to process (leave empty for all): ")
        max_videos = int(max_videos) if max_videos else len(video_ids_to_process)
        save_interval = int(input(f"Save progress every N videos (recommended: 100-1000) [default: 200]: ") or "200")

        video_ids_to_process = video_ids_to_process[:max_videos]
    except ValueError:
        batch_size = 200
        max_workers = 20
        max_videos = len(video_ids_to_process)
        save_interval = 200

    logger.info(f"Processing {len(video_ids_to_process)} videos with {max_workers} parallel workers")
    logger.info(f"Using batch size of {batch_size} and saving every {save_interval} videos")

    start_time = time.time()
    videos_since_save = 0
    total_processed = 0

    for i in range(0, len(video_ids_to_process), batch_size):
        batch = video_ids_to_process[i:min(i+batch_size, len(video_ids_to_process))]
        logger.info(f"Processing batch {i//batch_size + 1}/{(len(video_ids_to_process)+batch_size-1)//batch_size} ({len(batch)} videos)")

        batch_start_time = time.time()
        batch_metadata, batch_errors, batch_processed = process_batch(batch, max_workers)
        batch_duration = time.time() - batch_start_time

        metadata_list.extend(batch_metadata)
        error_list.extend(batch_errors)
        processed_ids.update(batch_processed)

        videos_since_save += len(batch)
        total_processed += len(batch)

        success_count = len(batch_metadata)
        error_count = len(batch_errors)
        success_rate = success_count / len(batch) * 100 if batch else 0

        logger.info(f"Batch complete in {batch_duration:.1f}s: {success_count} successful, {error_count} failed ({success_rate:.1f}% success)")
        logger.info(f"Processing rate: {len(batch)/batch_duration:.1f} videos/second")

        if videos_since_save >= save_interval or i + batch_size >= len(video_ids_to_process):
            if metadata_list:
                metadata_df = pd.DataFrame(metadata_list)
                metadata_df.to_csv(output_file, index=False)
                logger.info(f"Metadata saved to {output_file} ({len(metadata_list)} videos)")
                upload_to_s3(output_file, s3_metadata_key)

            if error_list:
                error_df = pd.DataFrame(error_list, columns=['video_id', 'error'])
                error_df.to_csv(error_file, index=False)
                logger.info(f"Errors saved to {error_file} ({len(error_list)} videos)")
                upload_to_s3(error_file, s3_error_key)

            save_checkpoint(processed_ids)
            logger.info(f"Checkpoint saved with {len(processed_ids)} processed IDs")

            videos_since_save = 0

        elapsed_time = time.time() - start_time
        videos_per_second = total_processed / elapsed_time if elapsed_time > 0 else 0
        remaining_videos = len(video_ids_to_process) - total_processed
        estimated_time_remaining = remaining_videos / videos_per_second if videos_per_second > 0 else 0

        logger.info(f"Overall progress: {total_processed}/{len(video_ids_to_process)} videos processed ({total_processed/len(video_ids_to_process)*100:.1f}%)")
        logger.info(f"Overall rate: {videos_per_second:.2f} videos/second, estimated time remaining: {estimated_time_remaining/60:.1f} minutes")

        time.sleep(2)

    total_duration = time.time() - start_time
    logger.info("\nMetadata extraction completed!")
    logger.info(f"Total time: {total_duration/60:.1f} minutes")
    logger.info(f"Successfully processed: {len(metadata_list)} videos")
    logger.info(f"Failed: {len(error_list)} videos")

    if metadata_list:
        upload_to_s3(output_file, s3_metadata_key)
    if error_list:
        upload_to_s3(error_file, s3_error_key)

    print("\nDownload metadata CSV:")
    files.download(output_file)

    print("\nDownload error CSV:")
    files.download(error_file)

    print("\nDownload log file:")
    files.download('metadata_extraction.log')


In [None]:

if __name__ == "__main__":
    main()

ERROR:__main__:Error downloading from S3: An error occurred (404) when calling the HeadObject operation: Not Found


  0%|          | 0/500 [00:00<?, ?it/s]

  0%|          | 0/500 [00:00<?, ?it/s]

ERROR: [youtube] -jydr41QpOA: Private video. Sign in if you've been granted access to this video. Use --cookies-from-browser or --cookies for the authentication. See  https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp  for how to manually pass cookies. Also see  https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies  for tips on effectively exporting YouTube cookies
ERROR:__main__:Error for video -jydr41QpOA: ERROR: [youtube] -jydr41QpOA: Private video. Sign in if you've been granted access to this video. Use --cookies-from-browser or --cookies for the authentication. See  https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp  for how to manually pass cookies. Also see  https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies  for tips on effectively exporting YouTube cookies


  0%|          | 0/500 [00:00<?, ?it/s]

ERROR: [youtube] 05I5CNIpUMc: Private video. Sign in if you've been granted access to this video. Use --cookies-from-browser or --cookies for the authentication. See  https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp  for how to manually pass cookies. Also see  https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies  for tips on effectively exporting YouTube cookies
ERROR:__main__:Error for video 05I5CNIpUMc: ERROR: [youtube] 05I5CNIpUMc: Private video. Sign in if you've been granted access to this video. Use --cookies-from-browser or --cookies for the authentication. See  https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp  for how to manually pass cookies. Also see  https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies  for tips on effectively exporting YouTube cookies
ERROR: [youtube] 0Dn5QEFzwjM: Video unavailable. This video has been removed by the uploader


  0%|          | 0/500 [00:00<?, ?it/s]

  0%|          | 0/500 [00:00<?, ?it/s]

  0%|          | 0/500 [00:00<?, ?it/s]