In [39]:
from moviepy import VideoFileClip
from pathlib import Path
import speech_recognition as sr
from yt_dlp import YoutubeDL
from pprint import pprint
from PIL import Image
import matplotlib.pyplot as plt
import json

In [None]:
import os
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

print(os.getcwd())

In [6]:
video_url = "https://youtu.be/sw_IK4M7S0A?si=FT_ZFohHnGmzaMHp"

In [7]:
output_video_path = "./video_data/"
output_folder = "./mixed_data/"
output_audio_path = "./mixed_data/output_audio.wav"

In [8]:
filepath = output_video_path + "input_vid.mp4"
Path(output_folder).mkdir(parents=True, exist_ok=True)

In [9]:
def download_video(url, output_path):
    options = {
        'outtmpl': f'{output_path}/input_vid.%(ext)s',
        'format': 'best[ext=mp4]/best',  # Prefer mp4 format to avoid merging
        'quiet': True,
        'no_warnings': True,
    }

    with YoutubeDL(options) as ydl:
        info = ydl.extract_info(url, download=True)
        metadata = {
            "Author": info.get("uploader"),
            "Title": info.get("title"),
            "Views": info.get("view_count"),
            "Duration": info.get("duration"),
        }
        return metadata

In [10]:
def video_to_images(video_path, output_folder):
    clip = VideoFileClip(video_path)
    clip.write_images_sequence(os.path.join(output_folder, "frame_%04d.png"), fps=0.2)
    

In [11]:
def video_to_audio(video_path, output_audio_path):
    clip = VideoFileClip(video_path)
    audio = clip.audio
    audio.write_audiofile(output_audio_path)

In [12]:
def audio_to_text(audio_path):
    recognizer = sr.Recognizer()
    audio=sr.AudioFile(audio_path)

    with audio as source:
        audio_data = recognizer.record(source)

        try:
            text = recognizer.recognize_whisper(audio_data)
        
        except sr.UnknownValueError:
            print("Audio not recognized")         
    return text

In [13]:
metadata_vid = download_video(video_url, output_video_path)

                             

In [14]:
video_to_images(filepath, output_folder)

                                                            

In [None]:
video_to_audio(filepath, output_audio_path)

In [16]:
text_data = audio_to_text(output_audio_path)

In [None]:
with open(os.path.join(output_folder, "text_data.txt"), "w") as f:
    f.write(text_data)
    print("Text data saved to text_data.txt")
    f.close()

In [None]:
os.remove(output_audio_path)
print("Audio file removed after processing.")

In [19]:
from llama_index.core.indices import MultiModalVectorStoreIndex
from llama_index.core import SimpleDirectoryReader, StorageContext
from llama_index.vector_stores.lancedb import LanceDBVectorStore

In [None]:
from llama_index.core import Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
Settings.embed_model = embed_model

In [21]:
text_store = LanceDBVectorStore(uri="lancedb", table_name="text_collection")
image_store = LanceDBVectorStore(uri="lancedb", table_name="image_collection")

In [22]:
storage_context = StorageContext.from_defaults(
    vector_store=text_store,
    image_store=image_store
)

In [23]:
documents = SimpleDirectoryReader(output_folder).load_data()


In [None]:
index = MultiModalVectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
)

In [25]:
retriever_engine = index.as_retriever(similarity_top_k=1, image_similarity_top_k=3)

In [27]:

from llama_index.core.response.notebook_utils import display_source_node
from llama_index.core.schema import ImageNode

In [28]:
def retrieve(retriever_engine, query_str):
    retrieval_results = retriever_engine.retrieve(query_str)

    retrieved_images = []
    retrieved_texts = []
    for res_node in retrieval_results:
        if isinstance(res_node.node, ImageNode):
            retrieved_images.append(res_node.node.metadata["file_path"])
        else:
            display_source_node(res_node, source_length=200)
            retrieved_texts.append(res_node.text)
    return retrieved_images, retrieved_texts

In [29]:
query = "What is langchain and how does it work?"

In [None]:
img, text = retrieve(retriever_engine, query)

In [31]:
def plot_images(image_paths):
    images_shown = 0
    plt.Figure(figsize=(16, 9))

    for img_path in image_paths:
        if os.path.isfile(img_path):
            image = Image.open(img_path)
            plt.subplot(2, 3, images_shown + 1)
            plt.imshow(image)
            plt.axis('off')
            images_shown += 1
            if images_shown >= 5:
                break

In [None]:
plot_images(img)

In [38]:
qa_tmpl_str = (
    "Given the provided information, including relevant images and retrieved context from the video, \
 accurately and precisely answer the query without any additional prior knowledge.\n"
    "Please ensure honesty and responsibility, refraining from any racist or sexist remarks.\n"
    "---------------------\n"
    "Context: {context_str}\n"
    "Metadata for video: {metadata_str} \n"
    "---------------------\n"
    "Query: {query_str}\n"
    "Answer: "
)

In [50]:
from llama_index.core.schema import ImageNode

image_nodes = []
for img_path in img:
    if os.path.isfile(img_path):
        image_node = ImageNode(image_path=img_path)
        image_nodes.append(image_node)

In [44]:
from llama_index.multi_modal_llms.openai import OpenAIMultiModal

In [None]:
mm_llm = OpenAIMultiModal(
    model="gpt-4o",  # Using gpt-4o which has better vision capabilities
    api_key=OPENAI_API_KEY,
    max_new_tokens=1500,
)

In [None]:
result = mm_llm.complete(
    prompt=qa_tmpl_str.format(
        query_str=query,
        context_str="\n".join(text),
        metadata_str=json.dumps(metadata_vid)
    ),
    image_documents=image_nodes,
)

In [None]:
pprint(result.text)