In [None]:
# This script gets the transcirptions for the videos in the video list

In [None]:
import os
import json
from dotenv import load_dotenv
import pandas as pd
from google.auth.transport.requests import Request as AuthRequest
from googleapiclient.discovery import build
import httplib2
from pprint import pprint
from datetime import timedelta
import re



# Load environment variables from .env file
load_dotenv()

# Access API key
yt_api_key = os.getenv("YOUTUBE_API_KEY")

In [None]:
# Create a YouTube API client
youtube = build('youtube', 'v3', developerKey=yt_api_key)

In [None]:
# Load videos list
videos_df = pd.read_csv('../resources/video_list/videos.csv')
videos_df.head()

In [None]:
# Function for video details
def get_video_details(video_ids):
    details = []

    # Retrieve video details
    request = youtube.videos().list(
        part="snippet,contentDetails",  # Include 'contentDetails' to get video duration
        id=','.join(video_ids)  # Convert the list of video IDs to a comma-separated string
    )
    
    # Create an HTTP instance
    http = httplib2.Http()
    headers = {'referer': 'https://youtube.com'}

    # Execute the request
    response, content = http.request(request.uri, method=request.method, body=request.body, headers=headers)
    response_data = json.loads(content)

    # Extract video details from the response
    for item in response_data['items']:
        duration_str = item['contentDetails']['duration']  # Duration is provided in ISO 8601 format
        video_length = parse_duration(duration_str)
        details.append({'length': str(video_length)})

    return details


# Function to parse duration string into timedelta object
def parse_duration(duration_str):
    # Regular expression to extract hours, minutes, and seconds
    pattern = r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?'
    match = re.match(pattern, duration_str)
    
    # Extract hours, minutes, and seconds
    hours = int(match.group(1)) if match.group(1) else 0
    minutes = int(match.group(2)) if match.group(2) else 0
    seconds = int(match.group(3)) if match.group(3) else 0
    
    # Create a timedelta object with the parsed hours, minutes, and seconds
    duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
    
    return duration


# Function for caption ids
def get_caption_ids(video_id):
    captions = []

    # Retrieve captions for the video
    request = youtube.captions().list(
        part="snippet",
        videoId=video_id
    )
    # Create an HTTP instance
    http = httplib2.Http()
    headers = {'referer': 'https://youtube.com'}

    # Execute the request
    response, content = http.request(request.uri, method=request.method, body=request.body, headers=headers)
    response_data = json.loads(content)
    pprint(response_data)

    # Extract captions from the response
    for item in response_data['items']:
        caption_id = item['id']
        captions.append({'id': caption_id})

    return captions


# Function to download and parse caption file to extract transcript text
def download_and_parse_caption(caption_id):
    # Download the caption file
    request = youtube.captions().download(
        id=caption_id,
        tfmt="srt"  # Choose the caption format (e.g., "srt", "vtt")
    )
    caption_response = request.execute()

    # Parse the caption file to extract transcript text
    transcript_text = caption_response.decode('utf-8')  # Assuming the response is in UTF-8 encoding

    return transcript_text

In [None]:
### MAIN

# Test Frame
test_df = videos_df.head(3)

#Set environment variable -- dev is for testing the code against the API in small batches to keep quota usage down
environment = 'dev'

if environment == 'prod':
    df = videos_df
elif environment == 'dev':
    df = test_df.copy()
else:
    print('Error: please set environment')

# Instead of looping over each row of the dataframe and passing the function, we pass one list to the function, which makes one API call
# Step 1: Create a list of video IDs from the DataFrame
video_ids = df['video_id'].tolist()

# Step 2: Call the function to get video details for all video IDs
video_details = get_video_details(video_ids)

# Step 3: Convert the video details to a DataFrame
video_details_df = pd.DataFrame(video_details)

# Step 4: Merge or join the original DataFrame with the video details DataFrame
merged_df = pd.merge(df, video_details_df, on='video_id', how='left')

merged_df


In [None]:
video_id = "YOUR_VIDEO_ID_HERE"
captions_data = get_captions(video_id)

# Create a directory to save transcripts
if not os.path.exists("transcripts"):
    os.makedirs("transcripts")

# Download and save transcripts for each caption
for caption in captions_data:
    caption_id = caption['id']
    transcript_text = download_and_parse_caption(caption_id)

    # Save transcript text to a file
    file_name = f"transcripts/{video_id}_{caption_id}.txt"
    with open(file_name, "w", encoding="utf-8") as file:
        file.write(transcript_text)

    print(f"Transcript saved to {file_name}")

- Completed Get Videos and copied to new modules folder
-- results saved to resources

- Updating transcripts notebook
-- succeeded in getting caption_ids and lengths
--- TO TEST: Updated function to accept list of video_ids thus reducing API calls
--- Saved previous working copy to text file