#YouTube Video Transcription with Q&A using LLM

#### Author: Abhishek Dubey
#### Linkedin: https://www.linkedin.com/in/abhishek-dubey96/
#### Email: abhishekdb.1996@gmail.com
#### Github: https://github.com/Abhiee8322



Given list of youtube videos, generate audio transcripts and then using NLP techniques query questions from the generated texts.

> Conditions:

1. If multiple videos are there, then first filter out videos that were long more than an hour.
2. If multiple videos are there then keep the videos only until the cumulative sum of duration of 2 hours is reached.


*  Save the embeddings for each video into google drive so that it is reusable.




Methodology followed:

1. Install/Import necessary libraries, modules.
2. Function creation of filtering out videos based on length.
3. Downloading the audio extracted from the output of the previous step and then storing it in a folder.
4. Deleting audio (optional)
5. Generated a function to split the input audio into small chunks due to token issue of whisper API and stored the processed audio into the folder.
6. Whisper API used to take input from previous step and then generate the transcriptions.
7. Document processing begins where recursive splitter is used to split the documents, then chroma is used as a vector db and openai embeddings are used and the resulting vector db is stored in the google drive.
8. Extracting the stored vector from the google drive and then querying is performed to get the desired output.


In [None]:
## installing necessary modules
!pip install yt-dlp
!pip install cohere
!pip install tiktoken
!pip uninstall typing-extensions -y quiet
!pip install typing-extensions==4.5.0 --quiet
!pip install --upgrade tensorflow-probability
!pip install --upgrade --quiet openai
!pip install pydub
!pip install chromadb --quiet
!pip install openai --quiet
!pip install langchain --quiet
!pip install tiktoken --quiet
!pip install docx2txt --quiet
!pip install pydub

Collecting yt-dlp
  Downloading yt_dlp-2023.12.30-py2.py3-none-any.whl (3.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m29.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting mutagen (from yt-dlp)
  Downloading mutagen-1.47.0-py3-none-any.whl (194 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.4/194.4 kB[0m [31m23.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pycryptodomex (from yt-dlp)
  Downloading pycryptodomex-3.20.0-cp35-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m69.1 MB/s[0m eta [36m0:00:00[0m
Collecting websockets>=12.0 (from yt-dlp)
  Downloading websockets-12.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (130 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m130.2/130.2 kB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting 

In [None]:
## importing necessary libraries
import os
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.llms import OpenAI
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.document_loaders import Docx2txtLoader
import yt_dlp
import warnings
warnings.filterwarnings('ignore')
from pydub import AudioSegment
import os
from datetime import datetime
from langchain.schema import Document

In [None]:
## get your openAI key and then put it here
from openai import OpenAI
client = OpenAI(api_key=userdata.get('OPENAI_API_KEY'))

Timing function where any video more than 1 hours is removed from the list and cumulative video length should not be more than 2 hours.

In [None]:
def get_youtube_audio_length(video_urls):
    # Configure YouTube downloader options
    ydl_opts = {'format': 'bestaudio'}

    # Create a YoutubeDL object with the specified options
    ydl = yt_dlp.YoutubeDL(ydl_opts)

    audio_lengths = []

    # Iterate through each video URL
    for url in video_urls:
        try:
            # Extract information about the video without downloading it
            info_dict = ydl.extract_info(url, download=False)

            # Get the duration of the audio in seconds (default to 0 if not available)
            duration = info_dict.get('duration', 0)

            # Append the duration to the list
            audio_lengths.append(duration)
        except yt_dlp.DownloadError as e:
            # Handle download errors, if any
            print(f"An error occurred: {e}")

    return audio_lengths

def filter_youtube_audio(video_urls):
    # Get the durations of the YouTube audio for each video URL
    audio_lengths = get_youtube_audio_length(video_urls)

    filtered_video_urls = []
    total_duration = 0

    # Iterate through each video URL and its corresponding duration
    for video_url, length in zip(video_urls, audio_lengths):
        # Check if adding the current video does not exceed the total duration limit (2 hours)
        if total_duration + length <= 7200 and length <= 7200 - total_duration:
            filtered_video_urls.append(video_url)
            total_duration += length
        elif total_duration >= 7200:
            # Break the loop if the total duration exceeds the limit
            break

    return filtered_video_urls

# Example usage:
video_urls = ['https://www.youtube.com/watch?v=LWebKGrFjcM',
              'https://www.youtube.com/watch?v=mW7RSGPK_NU']

# Call the function to filter YouTube videos based on duration
filtered_videos = filter_youtube_audio(video_urls)

# Print the filtered video URLs
print("Filtered Video URLs:", filtered_videos)


[youtube] Extracting URL: https://www.youtube.com/watch?v=LWebKGrFjcM
[youtube] LWebKGrFjcM: Downloading webpage
[youtube] LWebKGrFjcM: Downloading ios player API JSON
[youtube] LWebKGrFjcM: Downloading android player API JSON
[youtube] LWebKGrFjcM: Downloading m3u8 information
[youtube] Extracting URL: https://www.youtube.com/watch?v=mW7RSGPK_NU
[youtube] mW7RSGPK_NU: Downloading webpage
[youtube] mW7RSGPK_NU: Downloading ios player API JSON
[youtube] mW7RSGPK_NU: Downloading android player API JSON
[youtube] mW7RSGPK_NU: Downloading m3u8 information
Filtered Video URLs: ['https://www.youtube.com/watch?v=LWebKGrFjcM', 'https://www.youtube.com/watch?v=mW7RSGPK_NU']


Explanation:


1.   **get_youtube_audio_length Function:**



*   Uses yt_dlp library to create a YouTube downloader object with specified options ('format': 'bestaudio').
*  Iterates through a list of video URLs.
*  Extracts information about each video without downloading it.
*  Retrieves the audio duration (in seconds) from the information and appends it to the audio_lengths list.
*  Handles download errors using a try-except block.

2. **filter_youtube_audio Function:**

* Calls get_youtube_audio_length to get the durations of YouTube audio for each video URL.

* Iterates through the video URLs and their corresponding durations.

* Filters videos based on their duration to ensure that the total duration does not exceed 2 hours (7200 seconds).

* Appends valid video URLs to the filtered_video_urls list.

* Breaks the loop if the total duration exceeds the limit.


**Audio download** happens and input_folder is created where the audio downloaded is stored and audio is named according to the youtube video name.

In [None]:
def download_audio(youtube_url, output_folder='audio'):
    try:
        # Create the input folder if it doesn't exist
        os.makedirs(output_folder, exist_ok=True)

        # Specify the options for downloading audio
        options = {
            'format': 'bestaudio/best',
            'postprocessors': [{
                'key': 'FFmpegExtractAudio',
                'preferredcodec': 'mp3',
                'preferredquality': '192',
            }],
            'outtmpl': os.path.join(output_folder, f'%(title)s.%(ext)s'),
        }

        # Create a yt_dlp object with the specified options
        ydl = yt_dlp.YoutubeDL(options)

        # Download the audio
        with ydl:
            result = ydl.extract_info(youtube_url, download=True)

        print(f"Audio for {youtube_url} downloaded successfully!")
    except Exception as e:
        print(f"Error: {e}")

# Example usage:
filtered_videos = ['https://www.youtube.com/watch?v=LWebKGrFjcM', 'https://www.youtube.com/watch?v=mW7RSGPK_NU']
input_folder = 'input_folder'

# Download audio for each video and store in the input folder
for video_url in filtered_videos:
    download_audio(video_url, input_folder)

[youtube] Extracting URL: https://www.youtube.com/watch?v=LWebKGrFjcM
[youtube] LWebKGrFjcM: Downloading webpage
[youtube] LWebKGrFjcM: Downloading ios player API JSON
[youtube] LWebKGrFjcM: Downloading android player API JSON
[youtube] LWebKGrFjcM: Downloading m3u8 information
[info] LWebKGrFjcM: Downloading 1 format(s): 251
[download] Destination: input_folder/Eliezer Yudkowsky response to Sam Altman ｜ Lex Fridman Podcast Clips.webm
[download] 100% of   13.45MiB in 00:00:00 at 46.33MiB/s  
[ExtractAudio] Destination: input_folder/Eliezer Yudkowsky response to Sam Altman ｜ Lex Fridman Podcast Clips.mp3
Deleting original file input_folder/Eliezer Yudkowsky response to Sam Altman ｜ Lex Fridman Podcast Clips.webm (pass -k to keep)
Audio for https://www.youtube.com/watch?v=LWebKGrFjcM downloaded successfully!
[youtube] Extracting URL: https://www.youtube.com/watch?v=mW7RSGPK_NU
[youtube] mW7RSGPK_NU: Downloading webpage
[youtube] mW7RSGPK_NU: Downloading ios player API JSON
[youtube] mW7R

**Explanation:**

**download_audio Function:**

*   Creates the output folder (default is 'audio') if it doesn't exist using os.makedirs.
*   Specifies options for downloading audio using the yt_dlp library. The options include the desired audio format (mp3), codec, and quality.
* Creates a yt_dlp object with the specified options.
* Downloads the audio from the provided YouTube URL and saves it to the input folder.
* Prints a success message if the download is successful, or an error message if an exception occurs.







In [None]:
def delete_audio(audio_filename, input_folder='input_audio'):
    try:
        # Construct the full path to the audio file
        audio_path = os.path.join(input_folder, audio_filename)

        # Check if the file exists before attempting to delete
        if os.path.exists(audio_path):
            os.remove(audio_path)
            print(f"Audio file '{audio_filename}' deleted successfully.")
        else:
            print(f"Audio file '{audio_filename}' does not exist.")
    except Exception as e:
        print(f"Error: {e}")

# Example usage:
# Assuming you want to delete a file named 'example_audio.mp3' from the 'input_audio' folder
delete_audio('/content/input_folder/Eliezer Yudkowsky response to Sam Altman ｜ Lex Fridman Podcast Clips.mp3')

Audio file '/content/input_folder/Eliezer Yudkowsky response to Sam Altman ｜ Lex Fridman Podcast Clips.mp3' deleted successfully.


One issue encountered that the **input audio** must be **broken down** into **smaller chunks** so that the input to the OpenAI whisper API is short, because if longer audio is provided excess token issue comes, so here the input audio is broken into 3 min chunks and are in sequence, so that transcribed text has its meaning intact. Also the output of the function is stored in the output_folder. Here **pydub library** is used to split the input audio into chunks.

In [None]:
def break_audio_into_chunks(input_folder, output_folder, chunk_size_minutes=3):
    # Create input and output folders if they don't exist
    os.makedirs(input_folder, exist_ok=True)
    os.makedirs(output_folder, exist_ok=True)

    for filename in os.listdir(input_folder):
        if filename.endswith(".mp3"):
            input_path = os.path.join(input_folder, filename)

            # Load the audio file using pydub
            song = AudioSegment.from_mp3(input_path)

            # Calculate the chunk size in milliseconds
            chunk_size_ms = chunk_size_minutes * 60 * 1000

            # Split the audio into chunks
            chunks = [song[i:i + chunk_size_ms] for i in range(0, len(song), chunk_size_ms)]

            # Export each chunk to the output folder with a unique identifier
            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            for i, chunk in enumerate(chunks):
                output_identifier = f"{timestamp}_chunk_{i + 1}"
                output_path = os.path.join(output_folder, f"{os.path.splitext(filename)[0]}_{output_identifier}.mp3")
                chunk.export(output_path, format="mp3")

if __name__ == "__main__":
    input_folder = "/content/input_folder"
    output_folder = "/content/output_folder"

    break_audio_into_chunks(input_folder, output_folder)


**Explanation:**

break_audio_into_chunks Function:



*   Creates input and output folders if they don't exist using os.makedirs.
*   Iterates through the files in the input folder (assumed to be mp3 files).
*   Loads each audio file using pydub.
*   Calculates the chunk size in milliseconds based on the specified duration.
*   Splits the audio file into chunks using list comprehension.
*   Exports each chunk to the output folder with a unique identifier based on the current timestamp and chunk index.












Transcription begins and takes input from the output_folder and then stores it in the list transcriptions

In [None]:
def generate_transcripts(api_key, input_folder):
    # Initialize the OpenAI client with your API key
    client = OpenAI(api_key=api_key)

    transcriptions = []  # List to store transcriptions

    for filename in os.listdir(input_folder):
        if filename.endswith(".mp3"):
            input_path = os.path.join(input_folder, filename)

            # Open the audio file
            with open(input_path, "rb") as audio_file:
                # Create transcription for the audio file
                transcript = client.audio.transcriptions.create(
                    model="whisper-1",
                    file=audio_file,
                    response_format="text"
                )

                # Append the transcription to the list
                transcriptions.append(transcript)

    return transcriptions

if __name__ == "__main__":
    api_key = userdata.get('OPENAI_API_KEY')
    output_folder = "/content/output_folder"  # Replace with the path to your output folder

    transcriptions = generate_transcripts(api_key, output_folder)

    # Now 'transcriptions' contains a list of transcriptions that can be accessed
    print(transcriptions)

all_transcriptions = '\n'.join(transcriptions)


["There is some aspect, and I'm torn here, because it's difficult to reason about the exponential improvement of technology. But also, I've seen time and time again how transparent and iterative trying out, as you improve the technology, trying it out, releasing it, testing it, how that can improve your understanding of the technology. Such that the philosophy of how to do, for example, safety of any kind of technology, but AI safety, gets adjusted over time, rapidly. A lot of the formative AI safety work was done before people even believed in deep learning, and certainly before people believed in large language models. And I don't think it's updated enough, given everything we've learned now, and everything we will learn going forward. So I think it's gotta be this very tight feedback loop. I think the theory does play a real role, of course, but continuing to learn what we learn from how the technology trajectory goes, is quite important. I think now is a very good time, and we're t

**Explanation:**

**generate_transcripts Function:**


* Initializes the OpenAI client with the provided API key.
* Iterates through the files in the input folder (assumed to be mp3 files).
* Opens each audio file and creates a transcription using the Whisper ASR model through the OpenAI API.
*Appends each transcription to the transcriptions list.





The provided code defines a function **process_documents** that takes a list of transcriptions, an OpenAI API key, and a persist directory as input. It processes the transcriptions by creating documents, splitting them into texts, generating embeddings, and finally, creating a Chroma instance with the given parameters.

In [None]:
def process_documents(all_transcriptions, api_key, persist_directory):


    # Split documents into texts using RecursiveCharacterTextSplitter
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=0)
    texts = text_splitter.split_text(all_transcriptions)

    # Generate embeddings using OpenAI API
    embeddings = OpenAIEmbeddings(api_key=api_key)

    # Create Chroma instance from texts, embeddings, and persist_directory
    docsearch = Chroma.from_texts(texts, embeddings, persist_directory=persist_directory)

    return docsearch

# Example usage:
# Assuming 'all_transcriptions' is a list of transcriptions and 'userdata' contains the OpenAI API key
all_transcriptions = all_transcriptions
api_key = userdata.get('OPENAI_API_KEY')
persist_directory = 'your drive folder link'

# Call the function to process documents
result = process_documents(all_transcriptions, api_key, persist_directory)


**Explanation:**

**process_documents Function:**

* Takes a list of transcriptions, an OpenAI API key, and a persist directory as input.
* Creates documents from the transcriptions using the Document class from langchain.schema.
* Splits the documents into texts using **RecursiveCharacterTextSplitter** from langchain.text_splitter.
* Generates embeddings using OpenAIEmbeddings from langchain.embeddings.
* Creates a Chroma instance using Chroma.from_documents with the processed texts, embeddings, and persist directory.

In the following function vector is retrieved from google drive and then input with the embedding to return the query instance.

In [None]:
def create_chroma_and_qa(persist_directory, api_key):
    # Load vectors from the drive
    db3 = Chroma(persist_directory=persist_directory, embedding_function=OpenAIEmbeddings(api_key=api_key))

    # Create RetrievalQA instance
    qa = RetrievalQA.from_chain_type(llm=OpenAI(api_key=api_key,model_name='gpt-3.5-turbo-instruct'),
                                     chain_type="stuff", retriever=db3.as_retriever(search_type="similarity", search_kwargs={"k":2}) )


    return db3, qa

# Example usage:
persist_directory = 'drive link'
api_key = userdata.get('OPENAI_API_KEY')

db3, qa = create_chroma_and_qa(persist_directory, api_key)


**Explanation:**

**create_chroma_and_qa Function:**

* Creates a Chroma instance (db3) using the specified persist_directory and an OpenAIEmbeddings instance as the embedding function.
* Creates a RetrievalQA instance (qa) using an OpenAI language model (OpenAI(api_key=api_key, model_name='gpt-3.5-turbo-instruct')), specifying the chain type as "stuff," and setting the retriever to the previously created Chroma instance.

In [None]:
query = "What Eliezer Yudkowsky thinks about AI safety?"
qa.run(query)

' Eliezer Yudkowsky believes that AI safety is a major concern and warns that as AI becomes super-intelligent, it may become impossible for humans to keep it aligned and prevent it from harming humans. \n'