# Video Download Pipeline

This notebook handles the automated downloading of YouTube videos from participant watch histories. It implements a robust downloading system with rate limiting, error handling, and comprehensive logging.

## Key Features:
1. Sample selection of 20 unique videos per participant
2. Controlled download speeds to prevent rate limiting
3. Authentication
4. Comprehensive logging of download attempts and outcomes
5. Automatic cleanup of unlogged/failed downloads

## Technical Specifications:
- Download speed throttle: 200KB/s
- Video format: Best quality at max 360p resolution
- Wait time between downloads: 2-40 seconds
- Sample size per participant: 20 videos

## Requirements:
- Python packages:

In [None]:
! python3 -m pip install -U --pre "yt-dlp[default]" 
! python3 -m pip install scenedetect   # pip install scenedetect    #!{sys.executable} -m pip install scenedetect
! python3 -m pip install ffmpeg-python        # pip install ffmpeg         #!{sys.executable} -m pip install ffmpeg
! python3 -m pip install opencv-python # pip install opencv-python  #!{sys.executable} -m pip install opencv-python

In [3]:
# import libraries
import pandas as pd
import yt_dlp as yt
import os
import numpy as np
import time
import random
import json
import zipfile
import sys
import ffmpeg
from datetime import datetime
from download_utils import *
import random

sys.path.append('..')
from ytutils import *

- Authentication files:
  - cookies.txt
  - po-token_value.txt


  For details on how to obtain these files, please refer to yt-dlp's documentation: 
  - [cookies.txt](https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp) 
  - [po-token_value.txt](https://github.com/yt-dlp/yt-dlp/wiki/PO-Token-Guide)

## Input/Output:
- Input: Cleaned watch history CSV from Pre_Download pipeline
- Output: 
  - Downloaded videos in MP4 format
  - JSON metadata files
  - Comprehensive download logs



## Sampling & Downloading
- per participant identify and download 20 unique videos that have not been downloaded yet
    - download speed throttle: 200K
    - format: best possible quality at max 360p resolution
    - no subtitles
    - wait time between downloads < 15s
- skip participants where < 20 videos are available

In [4]:
def refresh_auth(auth_dir):
    po_token_path = os.path.join(auth_dir, "po-token_value.txt")
    cookie_file_path = os.path.join(auth_dir, "cookies.txt")
    
    # Load PO token and cookies
    try:
        with open(po_token_path, "r") as token_file:
            po_token = token_file.read().strip()
    except FileNotFoundError:
        raise FileNotFoundError(f"PO token file not found at {po_token_path}")

    if not os.path.exists(cookie_file_path):
        raise FileNotFoundError(f"Cookie file not found at {cookie_file_path}")

    return po_token, cookie_file_path

In [8]:
def download_video(video_id, download_dir, speed_limit, logger=None, po_token=None, cookie_file=None):
    """
    Downloads a YouTube video based on the provided video ID and saves it in the specified directory 
    with a set download speed limit and resolution (no av1 codec!!). Returns download status and a server response message.

    Parameters:
        video_id (str): Unique identifier of the video to download.
        download_dir (str): Directory where the video will be saved.
        speed_limit (int): Download rate limit in bytes per second.
        logger (class): class defined in download_utils. Will save all output from yt-dlp so it can be added to the logs.
        po_token (str): Personal OAuth token for authentication (if needed).
        cookie_file (str): Path to the cookie file (if needed).

    Returns:
        tuple: (bool, str) where the boolean indicates success (True) or failure (False), 
        the string provides a message detailing the outcome or error encountered, and the log of outputs (list).
    """
    # Ensure the output folder path ends with a separator
    if not download_dir.endswith(os.sep):
        download_dir += os.sep

    video_url = f"https://www.youtube.com/watch?v={video_id}"

    # Set options for downloading
    ydl_opts = {
        'ratelimit': speed_limit,
        'throttledratelimit': int(200 * 1024),  # Throttling limit
        'format': 'bv*[ext=mp4][height<=360][vcodec!*=av01]+ba[ext=m4a]/b[ext=mp4][height<=360][vcodec!*=av01]/b[ext=mp4][height<=360]/18',
        'outtmpl': os.path.join(download_dir, f'{video_id}.%(ext)s'),
        'noplaylist': True,
        'quiet': False,
        'verbose': True,
        'writeinfojson': True,
        'geo_bypass': True,
        'age_limit': 18,
        'retries': 3,
        'logger': logger,
        'cookiefile': cookie_file,
        'extractor_args': {'youtube': {'player_client': ['web'], 'po_token': [f"web.gsv+{po_token}"]}}, #'extractor_args': {'youtube': {'player_client': ['web'], 'po_token': [f"web+{po_token}"]}},
        'match_filter': lambda info: (
            "Skipping livestream (live or past live)" if info.get('is_live') or info.get('was_live') else None
        ),
    }


    try:
        # Use yt-dlp with the specified options
        with yt.YoutubeDL(ydl_opts) as ydl:
            ydl.download([video_url])

        log = logger.logs if logger else None
        return True, "Download successful", log

    except Exception as e:
        error_message = str(e)
        log = logger.logs if logger else None
        return False, error_message, log


# New main function sample and download
def download_unique_videos(df, download_dir, log_path, speed_limit, log_df, wait_time_range=(5, 10), sample_size=20, seed=42, auth_dir="Authentication"):
    """
    Downloads a specified number of unique videos for each participant from a watch history DataFrame, 
    logs the download attempts, and handles various download scenarios such as checking existing downloads 
    and logging entries. The function shuffles the unique videos for each participant before filtering out 
    any videos that have already been downloaded or attempted to insure reproducibility.

    Parameters:
        df (pd.DataFrame): Watch history DataFrame containing video data with columns for 'Participant ID' and 'video_id'.
        download_dir (str): Path to the directory where downloaded videos will be saved.
        log_path (str): Path to the directory where log files are stored and new entries will be created.
        speed_limit (int): Download rate limit in bytes per second to avoid overwhelming the server.
        wait_time_range (tuple): Range of wait time (in seconds) to pause between downloads to avoid 
                                 triggering rate limits (default is (5, 35)).
        sample_size (int): Target number of unique videos to download for each participant (default is 20).
        seed (int): Random seed for reproducibility in shuffling video lists (default is 42).

    Returns:
        pd.DataFrame: A concatenated DataFrame of log entries for all download attempts, capturing successes, 
                      failures, and insufficient video cases.

    Process Overview:
    - The function begins by setting a random seed for reproducibility and concatenates existing log files 
      to track previous downloads.
    - It iterates through each unique participant, checking for prior download attempts and ensuring sufficient 
      videos remain for downloading.
    - Unique videos for each participant are shuffled to randomize the download order before filtering out 
      videos that have been previously attempted or successfully downloaded.
    - The function downloads videos until the specified sample size is achieved (or no more videos are available), 
      logging each attempt's success or failure along with relevant timing information.
    - Random wait times are introduced between downloads to mitigate potential rate-limiting issues from the 
      video source.

    Notes:
        - The function creates log entries for each video download attempt, noting successes, failures, and 
          cases where participants do not have enough unique videos available.
        - If a participant has already met the sample size requirement, they are skipped in subsequent runs.
        - Ensures that each download is unique and that previously downloaded videos are not re-attempted.
        - Note that sometimes downloads is slowed to a halt regardles of video size (maybe something done on youtube's end)
    """
    random.seed(seed)  # Set the seed for reproducibility

    # get all participents
    participants = df['Participant ID'].unique()
    
    n_participants = len(participants)
    counter = 0

    for participant in participants:
        counter += 1

        # check if they have been logged as having  insufficiant videos
        if not_enough_videos(participant, log_path):
            print(f"Skipping Participant {participant}: Not enough videos. See log entry {f"{log_path}/insufficiant_vids_{participant}.log.csv"}")
            continue      
        
        # how many downloaded for this participant
        downloaded_count = nb_videos_downloaded(log_df, participant)
        
        if downloaded_count >= sample_size:
            print(f"Download already complete for participant {participant}.")
            continue

        participant_videos = df[df['Participant ID'] == participant]
        unique_videos = participant_videos.drop_duplicates(subset=['video_id'])

        needed_vids = sample_size - downloaded_count 

        if len(unique_videos) < needed_vids:
            m = f"Skipping Participant {participant}: Less than {needed_vids} unique video(s) left."
            print(m)
            make_log_entry(participant, None, False, m, now(), now(), log_path, exept=True)
            continue

        # randomly shuffle unique videos (with given random state)
        unique_videos = unique_videos.sample(frac=1, random_state=seed).reset_index(drop=True)

        # filter any videos that have previously been atempted downloaded (logged)
        unique_videos = unique_videos[~unique_videos['video_id'].apply(
            lambda vid: is_video_attempted_downloded(vid, log_path))]

        if len(unique_videos) < needed_vids:
            m = f"Skipping Participant {participant}: Fewer than {needed_vids} new videos to download."
            print(m)
            make_log_entry(participant, None, False, m, now(), now(), log_path, exept=True)
            continue

        video_list = unique_videos['video_id'].tolist()

        print(f"Downloading videos for Participant {participant}...")

        for video_id in video_list:
            if downloaded_count >= sample_size:
                break

            # we have videos that are downloaded but not logged
            if is_video_downloaded(video_id, download_dir):
                print(f"Video {video_id} already in download folder. Going to next video", flush=True)
                make_log_entry(participant, video_id, True, "Already in download folder", now(), now(), log_path)
                downloaded_count += 1
                continue

            print(f"Attempting download for video {video_id}", flush=True) 
            po_token, cookie_file = refresh_auth(auth_dir)
            
            my_logger = MyLogger() # Instantiate the logger
            start_time = now() # start timer
            success, server_reply, log = download_video(video_id, download_dir, speed_limit, my_logger, po_token, cookie_file)
            end_time = now() # end timer
            
            time_min = (end_time - start_time).total_seconds()/60
            print(f"video: {video_id}, result: {success}, time: {time_min:.2f} min, message: {server_reply}", flush=True)

            try:
                size = os.path.getsize(download_dir + f"/{video_id}.mp4")  # Get file size in bytes
            except FileNotFoundError:
                size = None

            info = get_video_info(video_id, download_dir)
            
            make_log_entry(participant, video_id, success, server_reply, start_time, end_time, log_path, log=log, size=size, info=info)
            
            if success:
                    downloaded_count += 1

            # Random wait between downloads (to avoid rate-limiting) 
            wait_time = random.uniform(*wait_time_range)
            print(f"Waiting for {wait_time:.2f} seconds...")
            time.sleep(wait_time)

        print(f"Done with participant number {counter} of {n_participants} ({round(counter/n_participants * 100, 2)}% completed)")
        print('─' * 20) 

    print("Download process completed.")
    return concatenate_logs(log_path)


In [6]:
clean_wh = pd.read_csv('clean_watch_history.csv')

# Ensure all entries in the "Participant ID" column are treated as strings
clean_wh['Participant ID'] = clean_wh['Participant ID'].astype(str)

In [7]:
download_dir = 'Downloads'
log_path = 'Log_Entries'
auth_dir= "Authentication"
log_df = concatenate_logs(log_path)

In [7]:
download_dir = 'Downloads'
log_path = 'Log_Entries'
auth_dir= "Authentication"

log_df = download_unique_videos(clean_wh, download_dir, log_path, speed_limit=int(800*1024), log_df=log_df, wait_time_range=(2, 40), sample_size=20, seed=42, auth_dir = "Authentication")

log_df.to_csv('Downloads_log.csv', index=False)



Download already complete for participant 26958.
Download already complete for participant 20521.
Download already complete for participant 17749.
Download already complete for participant 20349.
Download already complete for participant 24514.
Download already complete for participant am84r.
Download already complete for participant 1977.
Download already complete for participant 18357.
Download already complete for participant 21675.
Download already complete for participant bjnn7.
Download already complete for participant 38499.
Download already complete for participant 37430.
Download already complete for participant 2470.
Download already complete for participant 13932.
Download already complete for participant 29870.
Download already complete for participant 17682.
Download already complete for participant 7162.
Download already complete for participant 19176.
Download already complete for participant 14192.
Download already complete for participant 7719.
Download already complet

KeyboardInterrupt: 

---

In [None]:
def delete_unlogged_files(download_dir, log_df):
    # Step 1: Extract the video IDs from the log DataFrame
    logged_video_ids = set(log_df['video_id'].unique())  # Ensure unique video IDs
    removed_files = []
    # Step 2: Iterate over files in the download directory
    for filename in os.listdir(download_dir):
        if os.path.isdir(filename): # Skip directories (only delete files)
                continue
        # Extract the video ID (assuming filename format is 'video_id.mp4' or 'video_id.info.json' and so on)
        video_id = filename.split('.')[0]
        
        # Step 3: Delete files not in the log DataFrame
        if video_id not in logged_video_ids:
            file_path = os.path.join(download_dir, filename)
            os.remove(file_path)
            removed_files.append(filename)
            print(f"Deleted: {file_path}")
                
    with open("deleted_files.txt", "a") as file:
        for item in removed_files:
            file.write(f"{item}\n")

In [None]:
delete_unlogged_files("Downloads", log_df) # ! changed to "Downloads" during debugging. Original str uncertain