# Transcribe and Embed Youtube Videos

How to create a service that can transcribe any video using Whisper and then embed the transcript into a vector array to allow for vector search.

For more context, see the Atlas blog post, [Atlas: Find anything on Youtube](https://atila.ca/blog/tomiwa/atlas) and part 2 notebook: [Long Form Question Answering on Youtube](https://github.com/atilatech/atila-core-service/blob/master/atlas/notebooks/question_answering_youtube.ipynb).

See Also:

1. https://huggingface.co/docs/inference-endpoints/guides/custom_handler
2. https://www.philschmid.de/custom-inference-handler

Another alternative is to use a [Serverless Deployment of the sentence-transformer model](https://aseifert.com/p/serverless-sentence-transformer/)

<a href="https://colab.research.google.com/github/atilatech/atila-core-service/blob/master/notebooks/deploy_whisper_and_sentence_transformer_to_huggingface.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Install Dependencies

In [None]:
!pip install transformers pytube sentence-transformers

# optional install pytorch so you can use a gpu for faster transcription
# command below is for Linux. See instructions for mac and windows: https://pytorch.org/get-started/locally/
!pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cpu

!pip install git+https://github.com/openai/whisper.git -q
!apt install ffmpeg # https://stackoverflow.com/questions/51856340/how-to-install-package-ffmpeg-in-google-colab

## Deploying for Production

### Deploying your ML Inference Handler Model

1. Fork the [tomiwa1a/video-search](https://huggingface.co/tomiwa1a/video-search) repo with [repo_duplicator](https://huggingface.co/spaces/osanseviero/repo_duplicator)
1. Modify `handler.py` as necessary
1. [Deploy as an inference endpoint](https://ui.endpoints.huggingface.co/new?repository=tomiwa1a/video-search) (credit card required)

## Deploying as an API

The functons in this notebook that involve pre-processing the request to the ML model and post-processing the response, (everything except for the Custom Inference Handler) can be combined together and deployed as a web server to allow people to interact with this service.

[Deploy using Flask](https://github.com/atilatech/atlas-service/tree/master/atlas):  Flask is the simplest and quickest option but you will have to add support for database and authentication or deploy it using.

[Deploy using Django](https://github.com/atilatech/atila-core-service/tree/master/atlas): Django requires a bit more initial setup but will give you more features such as database and authentication support. If you plan on building something that lasts longer than a weekend, plan on working with others and you plan on having other people use it, I recommend using Django.

In [None]:
!pip install transformers pytube sentence-transformers

# optional install pytorch so you can use a gpu for faster transcription
# command below is for Linux. See instructions for mac and windows: https://pytorch.org/get-started/locally/
!pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cpu

!pip install git+https://github.com/openai/whisper.git -q
!apt install ffmpeg # https://stackoverflow.com/questions/51856340/how-to-install-package-ffmpeg-in-google-colab

In [None]:
from typing import Dict

from sentence_transformers import SentenceTransformer
from tqdm import tqdm
import whisper
import torch
import pytube
import time


class EndpointHandler():
    def __init__(self, path=""):
        # load the model
        WHISPER_MODEL_NAME = "tiny.en"
        SENTENCE_TRANSFORMER_MODEL_NAME = "multi-qa-mpnet-base-dot-v1"

        device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f'whisper will use: {device}')

        t0 = time.time()
        self.whisper_model = whisper.load_model(WHISPER_MODEL_NAME).to(device)
        t1 = time.time()

        total = t1 - t0
        print(f'Finished loading whisper_model in {total} seconds')

        t0 = time.time()
        self.sentence_transformer_model = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL_NAME)
        t1 = time.time()

        total = t1 - t0
        print(f'Finished loading sentence_transformer_model in {total} seconds')

    def __call__(self, data: Dict[str, str]) -> Dict:
        """
        Args:
            data (:obj:):
                includes the URL to video for transcription
        Return:
            A :obj:`dict`:. transcribed dict
        """
        # process input
        print('data', data)

        if "inputs" not in data:
            raise Exception(f"data is missing 'inputs' key which  EndpointHandler expects. Received: {data}"
                            f" See: https://huggingface.co/docs/inference-endpoints/guides/custom_handler#2-create-endpointhandler-cp")
        video_url = data.pop("video_url", None)
        query = data.pop("query", None)
        encoded_segments = {}
        if video_url:
            video_with_transcript = self.transcribe_video(video_url)
            encode_transcript = data.pop("encode_transcript", True)
            if encode_transcript:
                encoded_segments = self.combine_transcripts(video_with_transcript)
                encoded_segments = {
                    "encoded_segments": self.encode_sentences(encoded_segments)
                }
            return {
                **video_with_transcript,
                **encoded_segments
            }
        elif query:
            query = [{"text": query, "id": ""}]
            encoded_segments = self.encode_sentences(query)

            return {
                "encoded_segments": encoded_segments
            }

    def transcribe_video(self, video_url):
        decode_options = {
            # Set language to None to support multilingual,
            # but it will take longer to process while it detects the language.
            # Realized this by running in verbose mode and seeing how much time
            # was spent on the decoding language step
            "language": "en",
            "verbose": True
        }
        yt = pytube.YouTube(video_url)
        video_info = {
            'id': yt.video_id,
            'thumbnail': yt.thumbnail_url,
            'title': yt.title,
            'views': yt.views,
            'length': yt.length,
            # Althhough, this might seem redundant since we already have id
            # but it allows the link to the video be accessed in 1-click in the API response
            'url': f"https://www.youtube.com/watch?v={yt.video_id}"
        }
        stream = yt.streams.filter(only_audio=True)[0]
        path_to_audio = f"{yt.video_id}.mp3"
        stream.download(filename=path_to_audio)
        t0 = time.time()
        transcript = self.whisper_model.transcribe(path_to_audio, **decode_options)
        t1 = time.time()
        for segment in transcript['segments']:
            # Remove the tokens array, it makes the response too verbose
            segment.pop('tokens', None)

        total = t1 - t0
        print(f'Finished transcription in {total} seconds')

        # postprocess the prediction
        return {"transcript": transcript, 'video': video_info}

    def encode_sentences(self, transcripts, batch_size=64):
        """
        Encoding all of our segments at once or storing them locally would require too much compute or memory.
        So we do it in batches of 64
        :param transcripts:
        :param batch_size:
        :return:
        """
        # loop through in batches of 64
        all_batches = []
        for i in tqdm(range(0, len(transcripts), batch_size)):
            # find end position of batch (for when we hit end of data)
            i_end = min(len(transcripts), i + batch_size)
            # extract the metadata like text, start/end positions, etc
            batch_meta = [{
                **row
            } for row in transcripts[i:i_end]]
            # extract only text to be encoded by embedding model
            batch_text = [
                row['text'] for row in batch_meta
            ]
            # create the embedding vectors
            batch_vectors = self.sentence_transformer_model.encode(batch_text).tolist()

            batch_details = [
                {
                    **batch_meta[x],
                    'vectors':batch_vectors[x]
                } for x in range(0, len(batch_meta))
            ]
            all_batches.extend(batch_details)

        return all_batches

    @staticmethod
    def combine_transcripts(video, window=6, stride=3):
        """

        :param video:
        :param window: number of sentences to combine
        :param stride: number of sentences to 'stride' over, used to create overlap
        :return:
        """
        new_transcript_segments = []

        video_info = video['video']
        transcript_segments = video['transcript']['segments']
        for i in tqdm(range(0, len(transcript_segments), stride)):
            i_end = min(len(transcript_segments), i + window)
            text = ' '.join(transcript['text']
                            for transcript in
                            transcript_segments[i:i_end])
            # TODO: Should int (float to seconds) conversion happen at the API level?
            start = int(transcript_segments[i]['start'])
            end = int(transcript_segments[i]['end'])
            new_transcript_segments.append({
                **video_info,
                **{
                    'start': start,
                    'end': end,
                    'title': video_info['title'],
                    'text': text,
                    'id': f"{video_info['id']}-t{start}",
                    'url': f"https://youtu.be/{video_info['id']}?t={start}",
                    'video_id': video_info['id'],
                }
            })
        return new_transcript_segments


In [None]:
# Initialize the Handler

my_handler = EndpointHandler(path="")


In [None]:
# prepare sample payload
# payload = {"video_url": "https://www.youtube.com/watch?v=aNxigRg1yEQ"}
payload = {"video_url": "https://www.youtube.com/watch?v=bGk8qcHc1A0"} # Joe Rogan & Lex Fridman: Lionel Messi Is The GOAT Over Cristiano Ronaldo


# # test the handler
payload_pred=my_handler(payload)
payload_pred

In [None]:
my_handler = EndpointHandler(path="")
payload = {"query": "mighty mouse", "inputs": ""} # Joe Rogan & Lex Fridman: Lionel Messi Is The GOAT Over Cristiano Ronaldo


# # test the handler
payload_pred=my_handler(payload)
payload_pred

In [None]:
my_handler = EndpointHandler(path="")
payload = {"video_url": "https://www.youtube.com/watch?v=ciKdF97JWpU", "encode_transcript": True} # Jimmy Butler Reveals What Made Him Leave the Philadelphia 76ers | The JJ Redick Podcast | The Ringer


# # test the handler
payload_pred=my_handler(payload)
payload_pred


# Transcribe and Search Video

We can either run the `EndpointHandler` locally or send an HTTP request to the remote version on Huggingface.

Locally is good because you don't have to pay for HuggingFace GPU and you can [use a free GPU on Google Colab](https://www.tutorialspoint.com/google_colab/google_colab_using_free_gpu.htm). However, others won't be able to access your service. Note: You don't need a GPU to use this, you can use a CPU. But it will be much slower.

Remote on Huggingface is good because anyone can access it from any device.

1. Call the Model Inference we created in the last step
    1. Using the Inference endpoint class locally
    1. Using the deployed model on Huggingface

1. Combine 6 segments together to create more meaningful sentences

1. Embed sentences into vectors using transformers

1. Save vectors into a vector database

1. Query phrases using vector database

1. [Fixing YouTube Search with OpenAI's Whisper](https://www.pinecone.io/learn/openai-whisper/)


## Transcribe Video

In [None]:
!pip install requests

In [None]:
from getpass import getpass

HUGGING_FACE_ENDPOINT_URL = getpass('Enter HUGGING_FACE_ENDPOINT_URL')
HUGGING_FACE_API_KEY = getpass('Enter HUGGING_FACE_API_KEY')


Enter HUGGING_FACE_ENDPOINT_URL··········
Enter HUGGING_FACE_API_KEY··········


In [None]:
import json
from typing import List
import requests
import base64
import mimetypes

def send_transcription_request(url:str=None):
    payload = json.dumps({
      "inputs": video_url
    })
    headers = {
      'Authorization': f'Bearer {HUGGING_FACE_API_KEY}',
      'Content-Type': 'application/json'
    }

    response = requests.request("POST", HUGGING_FACE_ENDPOINT_URL, headers=headers, data=payload)
    return response.json()

video_url="https://www.youtube.com/watch?v=bGk8qcHc1A0" # Joe Rogan & Lex Fridman: Lionel Messi Is The GOAT Over Cristiano Ronaldo
video_data = send_transcription_request(video_url)

In [None]:
# verify that it worked
video_data['transcript']['segments'][0]

## Combine Transcript Segments

In [None]:
!pip install tqdm

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from tqdm.auto import tqdm

new_transcript_segments = []

def combine_transcripts(video):
  window = 6  # number of sentences to combine
  stride = 3  # number of sentences to 'stride' over, used to create overlap

  video_info=video['video']
  transcript_segments=video['transcript']['segments']
  for i in tqdm(range(0, len(transcript_segments), stride)):
      i_end = min(len(transcript_segments)-1, i+window)
      text = ' '.join(transcript['text'] 
                    for transcript in
                    transcript_segments[i:i_end])
      # TODO: Should int (float to seconds) conversion happen at the API level?
      start=int(transcript_segments[i]['start'])
      end=int(transcript_segments[i]['end'])
      new_transcript_segments.append({
          **video_info,
          **{
          'start': start,
          'end': end,
          'title': video_info['title'],
          'text': text,
          'id': f"{video_info['id']}-t{start}",
          'url': f"https://youtu.be/{video_info['id']}?t={start}",
          'video_id': video_info['id'],
          }
      })
  return new_transcript_segments
combined_transcripts = combine_transcripts(video_data)

In [None]:
combined_transcripts[3]

## Convert Transcripts to Vectors

1. Use Sentence Tranformers



In [None]:
!pip install -U sentence-transformers pinecone-client

In [None]:
from getpass import getpass

PINECONE_API_KEY = getpass('Enter PINECONE_API_KEY')

Enter PINECONE_API_KEY··········


In [None]:
from sentence_transformers import SentenceTransformer

model_id = "multi-qa-mpnet-base-dot-v1"

sentence_transformer_model = SentenceTransformer(model_id)
sentence_transformer_model

dimensions = sentence_transformer_model.get_sentence_embedding_dimension()

Downloading:   0%|          | 0.00/737 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/190 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/8.65k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/571 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/116 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/25.5k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/438M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/239 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/363 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/13.9k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/229 [00:00<?, ?B/s]

In [None]:
import pinecone  # !pip install pinecone-client

index_id = "youtube-search"

pinecone.init(
    api_key=PINECONE_API_KEY,  # app.pinecone.io
    environment="us-west1-gcp"
)

if index_id not in pinecone.list_indexes():
    pinecone.create_index(
        index_id,
        dimensions,
        metric="dotproduct"
    )

pinecone_index = pinecone.Index(index_id)
pinecone_index.describe_index_stats()

{'dimension': 768,
 'index_fullness': 0.0,
 'namespaces': {'': {'vector_count': 30}},
 'total_vector_count': 30}

## Upload to Vector Database

In [None]:
# we encode and insert in batches of 64
batch_size = 64

def upload_transcripts_to_vector_db(transcripts_for_upload):
  # loop through in batches of 64
  for i in tqdm(range(0, len(transcripts_for_upload), batch_size)):
      # find end position of batch (for when we hit end of data)
      i_end = min(len(transcripts_for_upload)-1, i+batch_size)
      # extract the metadata like text, start/end positions, etc
      batch_meta = [{
          **transcripts_for_upload[x]
      } for x in range(i, i_end)]
      # extract only text to be encoded by embedding model
      batch_text = [
          row['text'] for row in transcripts_for_upload[i:i_end]
      ]
      # create the embedding vectors
      batch_embeds = sentence_transformer_model.encode(batch_text).tolist()
      # extract IDs to be attached to each embedding and metadata
      batch_ids = [
          row['id'] for row in transcripts_for_upload[i:i_end]
      ]
      # 'upsert' (insert) IDs, embeddings, and metadata to index
      to_upsert = list(zip(
          batch_ids, batch_embeds, batch_meta
      ))
      pinecone_index.upsert(to_upsert)
      print(f'Uploaded Batches: {i} to {i_end}')

upload_transcripts_to_vector_db(combined_transcripts)

## Search Transcript

In [None]:
def query_model(query, video_id=""):
  encoded_query = sentence_transformer_model.encode(query).tolist()
  metadata_filter = { "video_id": {"$eq": video_id}} if video_id else None
  return pinecone_index.query(encoded_query, top_k=5,
                              include_metadata=True,
                              filter=metadata_filter)

query_phrase = "basketball"
results = query_model(query_phrase, "bGk8qcHc1A0")

results['matches'][0]

## Add some utility functions

In [None]:
from datetime import timedelta
import urllib

def convert_seconds_to_string(seconds):
    days, seconds = divmod(seconds, 86400)
    return str(timedelta(days=days, seconds=seconds)).split(',')[-1].strip()


def parse_video_id(url):
    # Parse the URL
    parsed_url = urllib.parse.urlparse(url)
    
    # Check if the URL is a YouTube URL
    if parsed_url.netloc in ['www.youtube.com', 'youtu.be']:
        # Extract the video ID from the path or query parameters
        if parsed_url.netloc == 'www.youtube.com':
            video_id = urllib.parse.parse_qs(parsed_url.query)['v'][0]
        else:
            video_id = parsed_url.path.split('/')[-1]
        return video_id
    else:
        return None

def does_video_exist(video_url):
  # create a placeholder vector of zeros to see if any vectors with the 
  # given video_id match.
  video_id = parse_video_id(video_url)
  query_response = pinecone_index.query(
      top_k=1,
      vector=[0] * dimensions,
      filter={
          "video_id": {"$eq": video_id}
      }
  )
  return len(query_response['matches']) > 0

# Putting it all Together

In [None]:
import time
video_url="https://www.youtube.com/watch?v=lKXv19eRLZg" # Making Friends with Machine Learning
query_phrase = "three degrees"


def transcribe_and_search_video(url, query, verbose=True):
  t0 = time.time()
  if not does_video_exist(url):
    video_with_transcript = send_transcription_request(url)
    video_with_transcript_combined = combine_transcripts(video_with_transcript)

    upload_transcripts_to_vector_db(video_with_transcript_combined)
  else:
    print(f'Skipping transcribing and embedding.'\
    ' Video already exists:{url}')
  results = query_model(query)
  t1 = time.time()
  total = t1-t0
  if verbose:
    video_length = f"{convert_seconds_to_string(results['matches'][0]['metadata']['length'])} "\
                      "long video" \
      if len(results['matches']) > 0 else 'no video found'
    print(f'Transcribed and searched {video_length} in {total} seconds')
  return results

search_results = transcribe_and_search_video(video_url, query_phrase)
search_results['matches'][0:3]

## Examples

In [None]:
video_url="https://www.youtube.com/watch?v=s5yguqapy6s" # How Jimmy Butler and Mark Wahlberg Became Close Friends
query_phrase = "filming transformers"

results = transcribe_and_search_video(video_url, query_phrase)
results['matches'][0]

In [None]:
video_url="https://www.youtube.com/watch?v=Gqev5NrWnvM" # Rio Ferdinand on Messi | Most Embarrassing Night of my Life
query_phrase = "we would have won"

results = transcribe_and_search_video(video_url, query_phrase)
results['matches'][0]
