In [17]:
# Install all dependencies

!pip install psycopg2-binary --quiet
!pip install pytube --quiet
!pip install boto3 --quiet
!pip install python-dotenv --quiet

In [18]:
# Import environment variables

from dotenv import load_dotenv
load_dotenv()

True

In [None]:
# Get all videos from the database that havent been transcribed yet

import os
import psycopg2

class Video:
    def __init__(self, id, url, transcribed):
        self.id = id
        self.url = url
        self.transcribed = transcribed

    def __str__(self):
        return f"Video(id={self.id}, url='{self.url}', transcribed={self.transcribed})"

    def __repr__(self):
        return self.__str__()


conn = psycopg2.connect(
    host=os.getenv('DB_HOST', 'localhost'),
    port=os.getenv('DB_PORT', 5432),
    database=os.getenv('DB_NAME', 'yoogle'),
    user=os.getenv('DB_USER', 'postgres'),
    password=os.getenv('DB_PASSWORD', 'postgres')
)

def get_videos_to_transcribe():
    cur = conn.cursor()
    cur.execute("SELECT id, source_url, transcribed FROM videos WHERE transcribed = false")
    videos = cur.fetchall()
    video_objects = [Video(id=row[0], url=row[1], transcribed=row[2]) for row in videos]
    return video_objects

videos_to_transcribe = get_videos_to_transcribe()

In [16]:
# Download the videos from the videos_to_transcribe list

import os
from pytube import YouTube

videos_to_transcribe_dir = "videos_to_transcribe"

# Ensure the directory exists
if not os.path.exists(videos_to_transcribe_dir):
    os.makedirs(videos_to_transcribe_dir)

def is_video_downloaded(video_id):
    return os.path.isfile(os.path.join(videos_to_transcribe_dir, f"{video_id}.mp4"))

def download_video(url):
    if is_video_downloaded(video.id):
        return
    yt = YouTube(url)
    yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download(
        output_path=videos_to_transcribe_dir,
        filename=f"{video.id}.mp4"
    )

for video in videos_to_transcribe:
    download_video(video.url)

In [21]:
# Upload videos to S3

import os
import boto3

bucket = 'yoogle-videos'

s3_client = boto3.client('s3')

def upload_videos_to_s3(local_dir, bucket):
    for filename in os.listdir(local_dir):
        local_path = os.path.join(local_dir, filename)
        s3_path = filename
        if not check_if_file_exists_in_s3(bucket, s3_path):
            s3_client.upload_file(local_path, bucket, s3_path)
            print(f"Uploaded {filename} to S3 bucket {bucket}")
        os.remove(local_path)
        print(f"Deleted local file {filename}")

def check_if_file_exists_in_s3(bucket, s3_path):
    try:
        s3_client.head_object(Bucket=bucket, Key=s3_path)
        return True
    except:
        return False

upload_videos_to_s3(videos_to_transcribe_dir, bucket)

Deleted local file 7.mp4
Deleted local file 6.mp4
Deleted local file 4.mp4
Deleted local file 5.mp4
Deleted local file 3.mp4
Deleted local file 8.mp4


In [20]:
# For each video in the S3 bucket, create an AWS Transcription job

transcribe_client = boto3.client('transcribe')

def create_transcription_job(bucket, file_name, job_name):
    job_uri = f"s3://{bucket}/{file_name}"
    transcribe_client.start_transcription_job(
        TranscriptionJobName=job_name,
        Media={'MediaFileUri': job_uri},
        MediaFormat='mp4',
        LanguageCode='en-US',
    )

def list_s3_files(bucket):
    response = s3_client.list_objects_v2(Bucket=bucket)
    return [content['Key'] for content in response.get('Contents', [])]

def create_transcription_jobs_for_all_videos(bucket):
    files = list_s3_files(bucket)
    for file_name in files:
        formatted_file_name = file_name.split('.')[0].replace(' ', '_').lower()
        job_name = f"transcription_{formatted_file_name}"
        create_transcription_job(bucket, file_name, job_name)
        print(f"Started transcription job for {file_name}")

create_transcription_jobs_for_all_videos(bucket)

Started transcription job for 3.mp4
Started transcription job for 4.mp4
Started transcription job for 5.mp4
Started transcription job for 6.mp4
Started transcription job for 7.mp4
Started transcription job for 8.mp4


In [23]:
# Download the transcriptions from completed AWS Transcription jobs
# then delete the transcription job

import os
import requests

download_dir = 'transcripts'
local_video_dir = 'videos_to_upload'

if not os.path.exists(download_dir):
    os.makedirs(download_dir)

def delete_transcription_job(job_name):
    transcribe_client.delete_transcription_job(TranscriptionJobName=job_name)
    print(f"Deleted transcription job {job_name}")

def delete_video_from_s3(bucket, file_name):
    s3_client.delete_object(Bucket=bucket, Key=file_name)
    print(f"Deleted {file_name} from S3 bucket {bucket}")

def download_transcripts_from_transcribe_then_delete(job_name, download_dir):
    response = transcribe_client.get_transcription_job(TranscriptionJobName=job_name)
    transcript_uri = response['TranscriptionJob']['Transcript']['TranscriptFileUri']
    
    local_path = os.path.join(download_dir, f"{job_name}.json")
    with requests.get(transcript_uri, stream=True) as r:
        r.raise_for_status()
        with open(local_path, 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)
    print(f"Downloaded transcript for {job_name} to {local_path}")

    delete_transcription_job(job_name)

response = transcribe_client.list_transcription_jobs(Status='COMPLETED')
job_names = [job['TranscriptionJobName'] for job in response['TranscriptionJobSummaries']]
for job_name in job_names:
    download_transcripts_from_transcribe_then_delete(job_name, download_dir)

Downloaded transcript for transcription_3 to transcripts/transcription_3.json
Deleted transcription job transcription_3


In [30]:
# Parse the transcripts into sentences

import glob
import json
from typing import List, Optional

class Sentence:
    def __init__(self, video_id, sentence, start_time, end_time):
        self.video_id = video_id
        self.sentence = sentence
        self.start_time = start_time
        self.end_time = end_time

    def __str__(self):
        return f"Sentence(video_id={self.video_id}, sentence={self.sentence}, start_time={self.start_time}, end_time={self.end_time})"

    def __repr__(self):
        return self.__str__()
    
transcripts = glob.glob('transcripts/*.json')
sentences: List[Sentence] = []

def parse_transcript(transcript):
    video_id = os.path.basename(transcript).split('_')[1].split('.')[0]

    # Load the JSON data
    with open(transcript, 'r') as file:
        data = json.load(file)

    items = data['results']['items']

    current_sentence: List[dict] = []
    start_time: Optional[float] = None

    for item in items:
        # If the item is a punctuation and it's a period, then it's the end of a sentence
        # so we add the sentence to the list of sentences
        if item['type'] == 'punctuation' and item['alternatives'][0]['content'] == '.':
            sentence = ' '.join(item['alternatives'][0]['content'] for item in current_sentence)
            sentences.append(Sentence(
                video_id=video_id,
                sentence=sentence,
                start_time=start_time,
                end_time=current_sentence[-1]['end_time']
            ))
            current_sentence = []
            start_time = None
        else:
            # Skip punctuation
            if item['type'] == 'punctuation':
                continue

            # If the start time is None, then this is the start of a sentence
            if start_time is None:
                start_time = item['start_time']
            current_sentence.append(item)

for transcript in transcripts:
    parse_transcript(transcript)

Sentence(video_id=6, sentence=Bathurst, start_time=0.009, end_time=0.419)
Sentence(video_id=6, sentence=I'll be in your town tomorrow, start_time=0.56, end_time=1.83)
Sentence(video_id=6, sentence=Get your tickets, start_time=1.84, end_time=2.789)
Sentence(video_id=6, sentence=Worst of series where we go and explore the entrails of certain services and industries and see how whiny people are about that specific thing existing that any caveman would be like on their knees and crying about enjoy like every single time people hate it with a passion doesn't matter what it is, start_time=2.799, end_time=23.229)
Sentence(video_id=6, sentence=So let's find out together, start_time=23.569, end_time=25.93)
Sentence(video_id=6, sentence=Do people like airports specifically Melbourne and Sydney I'm guessing that this is gonna buck the trend, start_time=26.02, end_time=32.79)
Sentence(video_id=6, sentence=Everyone's gonna come in together and say no we live in a miracle age where you can fly aroun

In [33]:
# Insert the sentences into the local DB and remove the transcripts from the local filesystem

cur = conn.cursor()

def insert_sentence_into_local_db(sentence):
    cur.execute("INSERT INTO sentences (video_id, sentence, start_at, end_at) VALUES (%s, %s, %s, %s)", (sentence.video_id, sentence.sentence, sentence.start_time, sentence.end_time))

def remove_transcript_from_local_filesystem(transcript):
    os.remove(transcript)

for sentence in sentences:
    insert_sentence_into_local_db(sentence)

conn.commit()
conn.close()

for transcript in transcripts:
    remove_transcript_from_local_filesystem(transcript)

In [34]:
# Update the videos table to set the transcribed flag to true for the video

cur = conn.cursor()

def update_video_transcribed_flag(video_id):
    cur.execute("UPDATE videos SET transcribed = TRUE WHERE id = %s", (video_id,))

for sentence in sentences:
    update_video_transcribed_flag(sentence.video_id)

conn.commit()
conn.close()


In [36]:
# Remove the directories

os.rmdir(videos_to_transcribe_dir)
os.rmdir(download_dir)
