In [None]:
import os
import json
import googleapiclient.discovery
import re
import pandas as pd
from pydantic import BaseModel
from youtube_transcript_api import YouTubeTranscriptApi

## Get transcripts from a video

In [None]:
def contains_any(string, phrases):
    return any(phrase in string for phrase in phrases)

def get_transcript_segments(video_id, video_title):
  # Set up the API key and service object.
  api_service_name = "youtube"
  api_version = "v3"
  youtube = googleapiclient.discovery.build(
      api_service_name, api_version, developerKey=API_KEY)

  # Call the YouTube API to retrieve the metadata of the video.
  request = youtube.videos().list(
      part="id,snippet,contentDetails,statistics",
      id=video_id
  )
  response = request.execute()

  # Retrieve the segments from the video description.
  segments = []
  titles = []
  if 'description' in response['items'][0]['snippet']:
    description = response['items'][0]['snippet']['description']
    lines = description.split("\n")
    for line in lines:
      timestamp_match = re.search(r"\d+:\d+:\d+|\d+:\d+", line)
      if timestamp_match:
        segments.append(timestamp_match.group())
        titles.append(line[timestamp_match.end():][1:])

  # Retrieve the transcript of the video.
  transcript = YouTubeTranscriptApi.get_transcript(video_id)

  sponsor_segment_phrases = ["AG1", "Athletic Greens", "Inside Tracker", "Sponsors", "Sponsor", "ROKA", "Levels", "Magic Spoon", "Blinkist"]

  # Iterate through the items in the transcript and match them with the relevant segments.
  matched_segments = {}
  for i in range(len(segments) - 1):
    start_segment = segments[i]
    end_segment = segments[i+1]
    if not contains_any(titles[i], sponsor_segment_phrases):
      segment_title = titles[i][:100]
      key_title = f"Episode {video_title}, Segment {segment_title} ({start_segment} {end_segment})".replace("/", " ").replace(":", "-")
      matched_segments[key_title] = []
      for item in transcript:
        start_time = item['start']
        end_time = item['start'] + item['duration']
        if start_time >= to_seconds(start_segment) and end_time <= to_seconds(end_segment):
          matched_segments[key_title].append(item['text'])
  for title, text in matched_segments.items():
    matched_segments[title] = ' '.join(text).replace('\n',' ')
  return matched_segments

def to_seconds(timestamp):
  parts = list(map(int, timestamp.split(":")))
  if len(parts) == 2:
    return parts[0] * 60 + parts[1]
  elif len(parts) == 3:
    return parts[0] * 3600 + parts[1] * 60 + parts[2]
  
def save_segments_to_txt(matched_segments):
  for title, text in matched_segments.items():
    print(title)
    with open(f"../data/transcripts/{title}.txt", "w") as f:
      f.write(text)




In [None]:
video_data = pd.read_csv('../data/video_metadata.csv')
# Replace the API key below with a valid API key.
API_KEY = json.load(open('../config/yt_api.json', 'r'))['YT_API_KEY']

# Replace the video ID below with a valid video ID.
VIDEO_ID = video_data.loc[0, 'videoId']
VIDEO_TITLE = video_data.loc[0, 'title']

In [None]:
video_data

Saving all podcast segments to files:

In [None]:
failed_videos = []
for key, item in video_data.iterrows():
    video_id = item['videoId']
    video_title = item['title']
    print(video_title)
    try:
        matched_segments = get_transcript_segments(video_id, video_title)
    except:
        print(f"Video failed: {video_title}")
        failed_videos.append(video_title)
    save_segments_to_txt(matched_segments)
print(matched_segments)

In [None]:
failed_videos