# -*- coding: utf-8 -*-
"""youtube_scaffolding_v5.ipynb

A scaffolding script to invoke the YouTube transcriber module and verify its output.
Designed to run in Google Colab.
"""


In [None]:
# !pip install --upgrade openai httpx langchain-openai openai-whisper

In [None]:
# ! pip install -qU langchain-pinecone pinecone-notebooks

In [None]:
# !pip install langchain-community

In [1]:
import subprocess
import sys

# may need to check if these installs are required

def install_requirements():
    """Install required packages using pip."""
    packages = [
        'importlib',
        'openai',
        'httpx',
        'langchain-openai',
        'openai-whisper',
        'docarray',
        'langchain-community',
        'langchain_pinecone',
        'pinecone_notebooks',
        'pathlib'
    ]
    try:
        for package in packages:
            subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", package])
    except Exception as e:
        print(f"Error installing requirements: {str(e)}")
        sys.exit(1)


In [2]:
# Install required packages
install_requirements()


In [3]:
# Import required libraries
import os
import importlib
import importlib.util
import traceback
import time
import logging
import openai
from pathlib import Path
from google.colab import drive
from pydantic import ValidationError


In [4]:
# Mount Google Drive
drive.mount('/content/drive')


Mounted at /content/drive


In [5]:
# VARS: Set these variables
version_number = 'v39'
youtube_url = "https://www.youtube.com/watch?v=cdiD-9MMpb0"
code_version = 'v17'  # Updated to match current transcriber version
# base_dir = "/content/LLM-projects/code/LangChain-proj/RAG_transcriber/"

root_dir = "/content/drive/My Drive/python-projects/kaggle_experiments/transcriber/"

repo_git = "https://github.com/ArindamBanerji/LLM-projects.git" # git repository
local_repo = "./LLM-projects" # local directory base for repo
root_offest = "/LLM-projects/code/LangChain-proj/RAG_transcriber/"
base_dir = root_dir + root_offest


fnm = "youtube_transcriber" + "_" + code_version + ".py"
cur_fnm = fnm
full_fnm = base_dir + cur_fnm
default_transcript_path = base_dir + "transcriber_" + version_number
print (full_fnm, default_transcript_path)


/content/drive/My Drive/python-projects/kaggle_experiments/transcriber//LLM-projects/code/LangChain-proj/RAG_transcriber/youtube_transcriber_v17.py /content/drive/My Drive/python-projects/kaggle_experiments/transcriber//LLM-projects/code/LangChain-proj/RAG_transcriber/transcriber_v39


In [6]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


In [7]:
# go to the root directory - helps for the git pulls
import os
os.chdir(root_dir)

In [8]:
# idempotent git pull
! (test -d $local_repo && git -C $local_repo pull --rebase) || git clone $repo_git

Already up to date.


In [9]:
# change directory into the local repo
import os

cwd = os.getcwd()
if (os.path.samefile(cwd, base_dir) == False) :
    os.chdir(base_dir)

In [10]:
#basic test to make sure that the file exists - should probably exit code
! (test -f "$fnm" && echo "file-exists") || echo "file-not-exists"

file-exists


In [11]:
def verify_transcript_file(transcript_dir):
    """
    Verify that the transcript file exists and is not empty.

    Args:
        transcript_dir (str): Directory where transcript should be located

    Returns:
        tuple: (bool, str) - (Success status, Full path of transcript file if found)
    """
    transcript_path = os.path.join(transcript_dir, "transcript.txt")

    print( "transcript path ", transcript_path )

    if not os.path.exists(transcript_path):
        logger.error(f"Transcript file not found at: {transcript_path}")
        return False, None

    if os.path.getsize(transcript_path) == 0:
        logger.error(f"Transcript file is empty: {transcript_path}")
        return False, None

    # Read first and last lines to verify content
    try:
        with open(transcript_path, 'r', encoding='utf-8') as f:
            first_line = f.readline().strip()
            # Go to the end and read last line
            f.seek(0, 2)  # Go to end
            pos = f.tell() - 2
            while pos > 0 and f.read(1) != "\n":
                pos -= 1
                f.seek(pos, 0)
            last_line = f.readline().strip()

        if not first_line or not last_line:
            logger.error(f"Transcript file appears incomplete: {transcript_path}")
            return False, None

        logger.info(f"Transcript file verified at: {transcript_path}")
        return True, transcript_path

    except Exception as e:
        logger.error(f"Error verifying transcript file: {str(e)}")
        return False, None


In [12]:

def get_transcript_dir(base_dir, version_num):
    """
    Construct the full path to the transcript directory.

    Args:
        base_dir (str): Base directory path
        version_num (str): Version number for directory

    Returns:
        str: Full path to transcript directory
    """
    return os.path.join(base_dir, f"transcriber_{version_num}")


In [13]:
def monitor_transcript_progress(transcript_dir):
    """
    Monitor progress by checking for the existence and growth of audio chunks.

    Args:
        transcript_dir (str): Directory to monitor
    """
    chunks_dir = os.path.join(transcript_dir, "chunks")
    if os.path.exists(chunks_dir):
        files = os.listdir(chunks_dir)
        return len(files)
    return 0


In [14]:
def import_transcriber_module(full_fnm, base_dir):
    """
    Import the transcriber module safely.

    Args:
        full_fnm (str): Full path to the module file
        base_dir (str): Base directory path

    Returns:
        module: Imported module object
    """
    try:
        # Add base directory to Python path
        if base_dir not in sys.path:
            sys.path.append(base_dir)

        # Get module name without .py extension
        module_name = os.path.splitext(os.path.basename(full_fnm))[0]

        # Import the module using spec
        spec = importlib.util.spec_from_file_location(module_name, full_fnm)
        if spec is None:
            raise ImportError(f"Could not load spec for module {module_name}")

        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)

        return module

    except Exception as e:
        logger.error(f"Error importing transcriber module: {str(e)}")
        raise


In [15]:

def invoke_youtube_transcriber_args(full_fnm, base_dir, url, version_num, clean_dir=False):
    """
    Invoke the YouTube transcriber module and verify its output.

    Args:
        full_fnm (str): Full path to the transcriber module
        base_dir (str): Base directory path
        url (str): YouTube URL to process
        version_num (str): Version number for directory naming
        clean_dir (bool): Whether to clean existing directory

    Returns:
        tuple: (bool, str) - (Success status, Full path of transcript file if successful)
    """
    try:
        # Verify transcriber module exists
        if not os.path.exists(full_fnm):
            logger.error(f"Transcriber module not found at: {full_fnm}")
            return False, None

        # Import the module
        module = import_transcriber_module(full_fnm, base_dir)

        # Get transcript directory path
        transcript_dir = get_transcript_dir(base_dir, version_num)

        # Process the video - this is a blocking call
        logger.info(f"Starting transcription process for URL: {url}")
        logger.info("This process may take 30+ minutes depending on video length...")

        start_time = time.time()

        # Start the processing in the main thread (blocking call)
        processing_completed = False
        try:
            module.process_video(
                youtube_url=url,
                version_num=version_num,
                clean_dir=clean_dir,
                base_dir=base_dir,
            )
            processing_completed = True
        except Exception as e:
            logger.error(f"Error in process_video: {str(e)}")
            raise

        if not processing_completed:
            logger.error("Processing did not complete successfully")
            return False, None

        # Additional verification after completion
        elapsed_time = time.time() - start_time
        logger.info(f"Processing completed after {elapsed_time:.1f} seconds")

        # Verify transcript
        success, transcript_path = verify_transcript_file(transcript_dir)

        if success:
            logger.info("Transcription process completed successfully")
            return True, transcript_path
        else:
            logger.error("Failed to verify transcript file")
            return False, None

    except Exception as e:
        logger.error(f"Error during transcription: {str(e)}")
        traceback.print_exc()
        return False, None




In [16]:
def display_transcript_sample(transcript_path, num_lines=5):
    """
    Display the first few lines of the transcript.

    Args:
        transcript_path (str): Path to the transcript file
        num_lines (int): Number of lines to display
    """
    try:
        with open(transcript_path, 'r', encoding='utf-8') as f:
            logger.info(f"\nFirst {num_lines} lines of transcript:")
            for i, line in enumerate(f):
                if i < num_lines:
                    print(line.strip())
                else:
                    break
    except Exception as e:
        logger.error(f"Error reading transcript: {str(e)}")


In [17]:
def process_transcription():
    """Main execution function."""
    logger.info(f"Starting transcription process with video: {youtube_url}")

    try:
        # Change to correct directory
        cwd = os.getcwd()
        if not os.path.samefile(cwd, base_dir):
            os.chdir(base_dir)
        logger.info(f"Working directory: {os.getcwd()}")

        # Verify transcriber file exists
        if not os.path.exists(full_fnm):
            logger.error(f"Transcriber file not found: {full_fnm}")
            sys.exit(1)

        # Invoke transcriber with long-running process handling
        logger.info("Starting transcription process - this may take 30+ minutes...")
        success, transcript_path = invoke_youtube_transcriber_args(
            full_fnm,
            base_dir,
            youtube_url,
            version_number,
            clean_dir=True
        )

        if success:
            logger.info(f"Transcription completed successfully")
            logger.info(f"Transcript file location: {transcript_path}")

            # Display sample of transcript
            display_transcript_sample(transcript_path)
            default_transcript_path = transcript_path
            logger.info(f"Default transcript path: {default_transcript_path}")

        else:
            logger.error("Transcription process failed")
            sys.exit(1)

    except Exception as e:
        logger.error(f"Error in main execution: {str(e)}")
        traceback.print_exc()
        sys.exit(1)



In [18]:
def transcribe_video():
    try:
        process_transcription()
    except Exception as e:
        logger.error(f"Scaffolding execution failed: {str(e)}")
        traceback.print_exc()
        sys.exit(1)

In [None]:
# transcribe_video()

In [19]:
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

os.environ["PINECONE_API_KEY"] = userdata.get('PINECONE_API_KEY')
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")

In [20]:
from openai import OpenAI
client = OpenAI()

completion = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {
            "role": "user",
            "content": "Write a short joke about elections."
        }
    ]
)

print(completion.choices[0].message)

ChatCompletionMessage(content='Why did the voter bring a ladder to the election?\n\nBecause they heard the stakes were high!', refusal=None, role='assistant', audio=None, function_call=None, tool_calls=None)


In [21]:
from langchain_openai.chat_models import ChatOpenAI

model = ChatOpenAI(openai_api_key=OPENAI_API_KEY, model="gpt-3.5-turbo")

In [22]:
model.invoke("What MLB team won the World Series during the COVID-19 pandemic?")

AIMessage(content='The Los Angeles Dodgers won the World Series during the COVID-19 pandemic in 2020. They defeated the Tampa Bay Rays in six games to capture their first championship since 1988.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 38, 'prompt_tokens': 21, 'total_tokens': 59, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-fbcf61ae-7e1b-4483-880b-269403da5f70-0', usage_metadata={'input_tokens': 21, 'output_tokens': 38, 'total_tokens': 59, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})

In [23]:
from langchain_core.output_parsers import StrOutputParser

parser = StrOutputParser()

chain = model | parser
chain.invoke("What MLB team won the World Series during the COVID-19 pandemic?")

'The Los Angeles Dodgers won the World Series during the COVID-19 pandemic. They defeated the Tampa Bay Rays in the 2020 World Series to claim their first championship since 1988.'

In [24]:
from langchain.prompts import ChatPromptTemplate

template = """
Answer the question based on the context below. If you can't
answer the question, reply "I don't know".

Context: {context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)
prompt.format(context="Mary's sister is Susana", question="Who is Mary's sister?")

'Human: \nAnswer the question based on the context below. If you can\'t\nanswer the question, reply "I don\'t know".\n\nContext: Mary\'s sister is Susana\n\nQuestion: Who is Mary\'s sister?\n'

In [25]:
chain = prompt | model | parser
chain.invoke({
    "context": "Mary's sister is Susana",
    "question": "Who is Mary's sister?"
})

'Susana'

In [26]:
translation_prompt = ChatPromptTemplate.from_template(
    "Translate {answer} to {language}"
)

In [27]:
from operator import itemgetter

translation_chain = (
    {"answer": chain, "language": itemgetter("language")} | translation_prompt | model | parser
)

translation_chain.invoke(
    {
        "context": "Mary's sister is Susana. She doesn't have any more siblings.",
        "question": "How many sisters does Mary have?",
        "language": "Hindi",
    }
)

'मेरी पास एक बहन है, सुसना।'

In [28]:
transcribe_video()

[youtube] Extracting URL: https://www.youtube.com/watch?v=cdiD-9MMpb0
[youtube] cdiD-9MMpb0: Downloading webpage
[youtube] cdiD-9MMpb0: Downloading ios player API JSON
[youtube] cdiD-9MMpb0: Downloading mweb player API JSON
[youtube] cdiD-9MMpb0: Downloading player 03dbdfab
[youtube] cdiD-9MMpb0: Downloading m3u8 information
[info] cdiD-9MMpb0: Downloading 1 format(s): 251
[download] Destination: /content/drive/My Drive/python-projects/kaggle_experiments/transcriber/LLM-projects/code/LangChain-proj/RAG_transcriber/transcriber_v39/audio_20241219_091528.webm
[download] 100% of  167.01MiB in 00:00:09 at 16.76MiB/s  
[ExtractAudio] Destination: /content/drive/My Drive/python-projects/kaggle_experiments/transcriber/LLM-projects/code/LangChain-proj/RAG_transcriber/transcriber_v39/audio_20241219_091528.mp3
Deleting original file /content/drive/My Drive/python-projects/kaggle_experiments/transcriber/LLM-projects/code/LangChain-proj/RAG_transcriber/transcriber_v39/audio_20241219_091528.webm (pa

In [29]:
print (default_transcript_path)

/content/drive/My Drive/python-projects/kaggle_experiments/transcriber//LLM-projects/code/LangChain-proj/RAG_transcriber/transcriber_v39


In [30]:
!pwd

/content/drive/MyDrive/python-projects/kaggle_experiments/transcriber/LLM-projects/code/LangChain-proj/RAG_transcriber/transcriber_v39


In [31]:
transcript_fnmm = default_transcript_path + "/transcript.txt"
print (transcript_fnmm)

/content/drive/My Drive/python-projects/kaggle_experiments/transcriber//LLM-projects/code/LangChain-proj/RAG_transcriber/transcriber_v39/transcript.txt


In [32]:
with open("transcript.txt") as file:
    transcription = file.read()

transcription[:100]

"I think it's possible that physics has exploits and we should be trying to find them. Arranging some"

In [33]:
try:
    chain.invoke({
        "context": transcription,
        "question": "Is reading papers a good idea?"
    })
except Exception as e:
    print(e)

Error code: 400 - {'error': {'message': "This model's maximum context length is 16385 tokens. However, your messages resulted in 47380 tokens. Please reduce the length of the messages.", 'type': 'invalid_request_error', 'param': 'messages', 'code': 'context_length_exceeded'}}


In [34]:
from langchain_community.document_loaders import TextLoader

loader = TextLoader("transcript.txt")
text_documents = loader.load()
text_documents

[Document(metadata={'source': 'transcript.txt'}, page_content="I think it's possible that physics has exploits and we should be trying to find them. Arranging some kind of a crazy quantum mechanical system that somehow gives you buffer overflow, somehow gives you a rounding error in the floating point. Synthetic intelligences are kind of like the next stage of development. And I don't know where it leads to, like at some point I suspect the universe is some kind of a puzzle. These synthetic AIs will uncover that puzzle and solve it. The following is a conversation with Andrej Karpathy, previously the director of AI at Tesla. And before that at OpenAI and Stanford. He is one of the greatest scientists, engineers and educators in the history of artificial intelligence. This is the Lex Friedman podcast. To support it, please check out our sponsors. And now dear friends, here's Andrej Karpathy. What is a neural network? And why does it seem to do such a surprisingly good job of learning? W

In [35]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=20)
text_splitter.split_documents(text_documents)[:5]

[Document(metadata={'source': 'transcript.txt'}, page_content="I think it's possible that physics has exploits and we should be trying to find them. Arranging some"),
 Document(metadata={'source': 'transcript.txt'}, page_content='Arranging some kind of a crazy quantum mechanical system that somehow gives you buffer overflow,'),
 Document(metadata={'source': 'transcript.txt'}, page_content='buffer overflow, somehow gives you a rounding error in the floating point. Synthetic intelligences'),
 Document(metadata={'source': 'transcript.txt'}, page_content="intelligences are kind of like the next stage of development. And I don't know where it leads to,"),
 Document(metadata={'source': 'transcript.txt'}, page_content='where it leads to, like at some point I suspect the universe is some kind of a puzzle. These')]

In [36]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=20)
documents = text_splitter.split_documents(text_documents)

In [37]:
from langchain_openai.embeddings import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
embedded_query = embeddings.embed_query("Who is Mary's sister?")

print(f"Embedding length: {len(embedded_query)}")
print(embedded_query[:10])

Embedding length: 1536
[-0.00137994228862226, -0.03450090438127518, -0.011502386070787907, 0.001241557183675468, -0.02611961029469967, 0.009081818163394928, -0.015649249777197838, 0.0017278597224503756, -0.011827629990875721, -0.03319992497563362]


In [38]:
sentence1 = embeddings.embed_query("Mary's sister is Susana")
sentence2 = embeddings.embed_query("Pedro's mother is a teacher")

In [39]:
from sklearn.metrics.pairwise import cosine_similarity

query_sentence1_similarity = cosine_similarity([embedded_query], [sentence1])[0][0]
query_sentence2_similarity = cosine_similarity([embedded_query], [sentence2])[0][0]

query_sentence1_similarity, query_sentence2_similarity

(0.9173235346162653, 0.7679756802174766)

In [54]:
from pydantic import ValidationError

In [55]:
from langchain_community.vectorstores import DocArrayInMemorySearch

vectorstore1 = DocArrayInMemorySearch.from_texts(
    [
        "Mary's sister is Susana",
        "John and Tommy are brothers",
        "Patricia likes white cars",
        "Pedro's mother is a teacher",
        "Lucia drives an Audi",
        "Mary has two siblings",
    ],
    embedding=embeddings,
)

In [42]:
vectorstore1.similarity_search_with_score(query="Who is Mary's sister?", k=3)

[(Document(metadata={}, page_content="Mary's sister is Susana"),
  0.917323542883913),
 (Document(metadata={}, page_content='Mary has two siblings'),
  0.9045029978848255),
 (Document(metadata={}, page_content='John and Tommy are brothers'),
  0.8013182122337678)]

In [43]:
retriever1 = vectorstore1.as_retriever()
retriever1.invoke("Who is Mary's sister?")

[Document(metadata={}, page_content="Mary's sister is Susana"),
 Document(metadata={}, page_content='Mary has two siblings'),
 Document(metadata={}, page_content='John and Tommy are brothers'),
 Document(metadata={}, page_content="Pedro's mother is a teacher")]

In [44]:
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

setup = RunnableParallel(context=retriever1, question=RunnablePassthrough())
setup.invoke("What color is Patricia's car?")

{'context': [Document(metadata={}, page_content='Patricia likes white cars'),
  Document(metadata={}, page_content='Lucia drives an Audi'),
  Document(metadata={}, page_content="Pedro's mother is a teacher"),
  Document(metadata={}, page_content="Mary's sister is Susana")],
 'question': "What color is Patricia's car?"}

In [45]:
chain = setup | prompt | model | parser
chain.invoke("What color is Patricia's car?")

'White'

In [46]:
chain.invoke("What car does Lucia drive?")

'Lucia drives an Audi.'

In [47]:
vectorstore2 = DocArrayInMemorySearch.from_documents(documents, embeddings)

In [48]:
chain = (
    {"context": vectorstore2.as_retriever(), "question": RunnablePassthrough()}
    | prompt
    | model
    | parser
)
chain.invoke("What is synthetic intelligence?")

'Synthetic intelligence is described as the next stage of development, where synthetic beings will be created that will share both the digital and physical realms with humans.'

In [49]:
from pinecone import Pinecone, ServerlessSpec

pinecone_api_key = os.environ.get("PINECONE_API_KEY")

pc = Pinecone(api_key=pinecone_api_key)

In [50]:
import time

index_name = "youtube-rag-index-new"  # change if desired

existing_indexes = [index_info["name"] for index_info in pc.list_indexes()]

if index_name not in existing_indexes:
    pc.create_index(
        name=index_name,
        dimension=1536,
        metric="cosine",
        spec=ServerlessSpec(cloud="aws", region="us-east-1"),
    )
    while not pc.describe_index(index_name).status["ready"]:
        time.sleep(1)

index = pc.Index(index_name)

In [51]:
from langchain_pinecone import PineconeVectorStore

# Use the index you created with dimension 1536
index_name = "youtube-rag-index-new"

pinecone = PineconeVectorStore.from_documents(
    documents, embeddings, index_name=index_name
)

In [52]:
pinecone.similarity_search("What is Hollywood going to start doing?")[:3]

[Document(id='b367990d-06a9-4fd3-b68b-8c6bf5e00e6f', metadata={'source': 'transcript.txt'}, page_content="to your phone to get your video. So if Hollywood will start using that to generate scenes, which completely opens up. Yeah. So you can make a movie like Avatar eventually for under a million dollars. Much less, maybe just by talking to your phone. I mean, I know it sounds kind of crazy. And then there'd be some voting mechanism. Like how do you have a, would there be a show on Netflix that's generated completely automatically? Semi-automatically. Yeah, potentially, yeah. And what does it look like also when you can just generate it on demand and there's infinity of it? Yeah. Oh man. All the synthetic content. I mean, it's humbling because we treat ourselves as special for being able to generate art and ideas and all that kind of stuff. If that can be done in an automated way by AI. I think it's fascinating to me how these, the predictions of AI and what it's going to look like and 

In [53]:
chain = (
    {"context": pinecone.as_retriever(), "question": RunnablePassthrough()}
    | prompt
    | model
    | parser
)

chain.invoke("What is Hollywood going to start doing?")

'Hollywood is going to start using AI technology to generate scenes, which opens up the possibility of creating movies like Avatar for under a million dollars.'