In [None]:
! pip install chromadb
! pip install yt-dlp
! pip install requests
! pip install sentence_splitter

In [6]:
import json
import requests
import os
import time

def file_exists(file_path):
    return os.path.exists(file_path)

def load_and_parse_json(file_path):
    with open(file_path, 'r') as file:
        json_data = json.load(file)
        return json_data

def get_timestamp():
  return int(time.time_ns() / 1000)

In [7]:
# create project folder in your Drive and point to it here
# create subfolders:
# - videos
# - transcriptions
#
BASE_DATA_LOCATION = "drive/MyDrive/podcastopedia/YOUR_PROJECT_NAME"
CHROMA_API_IMPL="rest"
# deploy your Chroma and grab server URL
# see https://docs.trychroma.com/deployment#simple-aws-deployment
CHROMA_SERVER_HOST=""
CHROMA_SERVER_PORT=8000

In [None]:
################################################################################
#    Grab All videos for channel
################################################################################
# -> head to channel page - https://www.youtube.com/@MyFirstMillionPod/videos
# -> make sure videos are sorted in desc order by publish date
# -> grab all IDs with:
#    let links = []; document.querySelectorAll("a[href^='/watch'].ytd-thumbnail").forEach((el) => links.push(el.getAttribute("href").replace("/watch?v=", "").replace(/\&pp=.+$/ig, "").replace(/\&t=.+$/ig, "") ) ); console.log(JSON.stringify(links));
# -> saved videos to videos.json (added to podcastopedia/mfm/videos.json)

# list of IDs download
videos = load_and_parse_json(f'{BASE_DATA_LOCATION}/videos.json')

for video in videos:
  file = f'{BASE_DATA_LOCATION}/videos/{video}.mp3'
  if file_exists(file):
    print(f"video already downloaded {video}. skipping")
    continue

  print(f"gonna download {video} to {file}")
  !yt-dlp --force-overwrites -x --audio-format mp3 -o $file -- $video



In [8]:
import random
import chromadb
from chromadb.config import Settings

chroma = chromadb.HttpClient(host=CHROMA_SERVER_HOST, port=CHROMA_SERVER_PORT)

transcripts_collection = chroma.get_or_create_collection("transcripts")

In [33]:
transcripts_collection.count()


0

In [None]:
import random
from sentence_splitter import split_text_into_sentences

################################################################################
#    Index transcriptions
################################################################################

transcriptions_dir = f"{BASE_DATA_LOCATION}/transcriptions"
transcriptions = []

# look for valid transcriptions + annotate each transcription
# with id and source
for file in os.listdir(transcriptions_dir):
  if file.endswith(".json"):
    id = file.split(".")[0]
    source = "youtube"
    d = load_and_parse_json(f"{transcriptions_dir}/{file}")
    if "utterances" in d:
      transcriptions.append({
          "id": id,
          "source": source,
          "transcription": d
      })

try:
  # process transcriptions
  for i, t in enumerate(transcriptions):
    id = t["id"]
    source = t["source"]

    print(f'>>>>>>> processing {id} {i+1}/{len(transcriptions)}')

    for ts in t["transcription"]["utterances"]:
      sentences = split_text_into_sentences(text=ts["text"], language='en')
      transcripts_collection.add(
        documents=sentences,
        metadatas=list(map(lambda _: {
          "id": id,
          "source": source,
          "start": ts["start"],
          "end": ts["end"]
        }, range(len(sentences)))),
        ids=list(map(lambda _: f'{get_timestamp()}-{random.randint(0, 1000)}', range(len(sentences))))
      )
      print(".")

except Exception as e:
  print(f"failed with {e}")