# Multimodal RAG with Multimodal Langchain

## Setup

In [22]:
# %% Cell 1: Imports & Utilities

import os
import json
from PIL import Image

from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage

def load_json_file(path: str):
    """Load JSON content from a file."""
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


In [23]:
# %% Cell 2: API Key
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your-key")


In [24]:
# %% Cell 3: Embeddings + VectorStore
embeddings = OpenAIEmbeddings(openai_api_key='YOUR-KEY')
vectorstore = Chroma(
    persist_directory="./chroma_db",
    embedding_function=embeddings
)


In [25]:
# %% Cell 4: Retriever
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})


In [53]:
# %% Cell 5: LLM Inference Client
chat = ChatOpenAI(
    model_name="gpt-4",
    temperature=0.0,
    openai_api_key='YOUR-KEY'
)

def run_lvml_inference(prompt: str, image_path: str = None) -> str:
    """
    Run LLM on the given prompt, including an image if a path is provided.
    """
    content_parts = [{"type": "text", "text": prompt}]

    if image_path and os.path.exists(image_path):
        try:
            # Read the image and encode it to base64
            with Image.open(image_path) as img:
                buffered = io.BytesIO()
                img.save(buffered, format="JPEG")
                base64_image = base64.b64encode(buffered.getvalue()).decode("utf-8")
                
                content_parts.append({
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/jpeg;base64,{base64_image}"
                    }
                })
        except Exception as e:
            print(f"Error processing image {image_path}: {e}")
    
    msgs = [HumanMessage(content=content_parts)]
    response = llm(msgs)
    return response.content

In [28]:
def retrieve_video_segment(query: str) -> dict:
    """
    Retrieve the most relevant video segment for `query`.
    Returns metadata including video_path, frame_path, transcript, and mid_time_ms.
    """
    docs = retriever_module.get_relevant_documents(query)
    if not docs:
        return {}
    meta = docs[0].metadata
    return {
        "video_path":      meta.get("video_path"),
        "extracted_frame": meta.get("frame_path"),
        "transcript":      meta.get("transcript"),
        "mid_time_ms":     meta.get("start_time"),
    }


### Preprocessing

#### Setup vectorstore

In [29]:
#  Path where Chroma will read/write its on-disk index
PERSIST_DIR     = "./shared_data/chroma_db"

# Name of the “collection” inside Chroma (was your LanceDB table)
COLLECTION_NAME = "test_tbl"

vectorstore = Chroma(
    persist_directory=PERSIST_DIR,
    embedding_function=embeddings,
    collection_name=COLLECTION_NAME
)


### Retrieval Module
#### Initialize Embedding Model

In [30]:
from langchain.embeddings import OpenAIEmbeddings

embedder = OpenAIEmbeddings(
    openai_api_key='YOUR-KEY'
)


#### Create Retrieval

In [31]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma

# 1) “uri”  →  where Chroma persists its on-disk index
PERSIST_DIR     = "./shared_data/chroma_db"
# 2) “table_name”  →  Chroma’s collection_name
COLLECTION_NAME = "test_tbl"

# 3) Create the embedder
embedder = OpenAIEmbeddings(openai_api_key='YOUR-KEY')

# 4) Initialize Chroma
vectorstore = Chroma(
    persist_directory=PERSIST_DIR,
    embedding_function=embedder,
    collection_name=COLLECTION_NAME
)

# 5) Create a retriever (defaults to similarity search under the hood)
retriever_module = vectorstore.as_retriever(
    search_kwargs={"k": 1}   # exactly the same “k=1” you had
)


#### Invoke Retrieval with User Query

In [32]:
query = "What do the astronauts feel about their work?" # Define the query here
data = retrieve_video_segment(query) # Corrected function call
if data: # Check if data was retrieved
    print("Transcript:\n", data["transcript"])
    print("Frame path:", data["extracted_frame"])
else:
    print("No relevant video segment found for the query.")

Transcript:
 None
Frame path: None


### LVLM Inference Module

#### Initialize Client and LVLM for Inference

In [33]:
# alias the wrapper as your LVLM module
lvlm_inference_module = run_lvml_inference


#### Invoke LVLM Inference with User Query

In [34]:
if data: # Only proceed if data was successfully retrieved
    augmented_query = (
        f"The transcript associated with the image is '{data['transcript']}'. "
        f"{query}"
    )
    print("Augmented query is:\n", augmented_query)

    # Now run your “LVLM” (ChatOpenAI wrapper) on this augmented prompt:
    answer = run_lvml_inference(
        prompt=augmented_query,
        image_path=data["extracted_frame"] # Changed to 'extracted_frame' for consistency with `retrieve_video_segment`
    )
    print("\nAnswer:\n", answer)
else:
    print("Cannot run LVLM inference: No data was retrieved in the previous step.")

Augmented query is:
 The transcript associated with the image is 'None'. What do the astronauts feel about their work?

Answer:
 As an AI, I'm unable to view images or infer emotions. Therefore, I can't provide information about what the astronauts feel about their work based on an image.


In [35]:
if data and 'augmented_query' in locals(): # Check if data and augmented_query exist
    response = run_lvml_inference(
        prompt=augmented_query,
        image_path=data["extracted_frame"] # Changed to 'extracted_frame'
    )

    print('LVLM Response:')
    print(response)
else:
    print("Cannot run LVLM inference: `augmented_query` or `data` is not defined.")


LVLM Response:
As an AI, I'm unable to provide an analysis without specific text, audio, or visual data. Please provide more information.


### Prompt Processing Module

In [54]:
def prompt_processing_module(retrieved_results: List, user_query: str) -> dict:
    """
    Given a list of retrieved Document objects and the original query,
    construct the prompt and return its text & image path.
    """
    # Take the first result
    doc = retrieved_results[0]
    meta = doc.metadata

    document_text = meta.get("transcript") or doc.page_content
    frame_path = meta.get("frame_path")

    prompt = (
        f"The relevant document content is: '{document_text}'. "
        f"Based on this, answer the following question: {user_query}"
    )
    return {"prompt": prompt, "image": frame_path, "document_text": document_text}

#### Invoke Prompt Processing Module with Retrieved Results and User Query

### Multimodal RAG

#### Define Multimodal RAG System as a Chain in LangChain

In [62]:
def mm_rag_chain(user_query: str) -> tuple[str, str, str]:
    """
    End-to-end chain: retrieve document, process prompt, and run inference.
    Returns the final answer string, the image path, and the extracted text.
    """
    global VECTOR_STORE
    
    if VECTOR_STORE is None:
        return "Error: RAG system is not initialized. Please check the setup.", None, None
    
    # Create a retriever from the vector store
    retriever_module = VECTOR_STORE.as_retriever(search_kwargs={"k": 1})
    
    # 1) Retrieve top-1 document
    docs = retriever_module.get_relevant_documents(user_query)
    if not docs:
        return "No relevant document found.", None, None

    # 2) Build prompt & extract image path
    processed = prompt_processing_module(docs, user_query)
    prompt = processed['prompt']
    image_path = processed['image']
    document_text = processed['document_text']

    # --- Debugging print statement added here ---
    print(f"Retrieved image path: {image_path}")

    # 3) Run LLM inference
    answer = run_lvml_inference(
        prompt=prompt,
        image_path=image_path
    )
    return answer, image_path, document_text

***

# Create Gradio Interface

In [63]:
import os
import gradio as gr
from typing import List
import json

# --- Global Configuration and Setup ---

# Set up your OpenAI API key
os.environ["OPENAI_API_KEY"] = "YOUR-KEY" 

# Initialize LangChain components
llm = ChatOpenAI(temperature=0.7, model_name="gpt-4o-mini")
embeddings = OpenAIEmbeddings()

# Path to the Chroma database (assuming it's already populated with your data)
PERSIST_DIR = "./shared_data/chroma_db"
COLLECTION_NAME = "test_tbl"

# Global variable for the vector store
VECTOR_STORE = None
CHAT_HISTORY = []

# --- Backend Functions ---

def init_rag_system():
    """
    Initializes the Chroma vector store from the persistent directory.
    This function is called once when the app starts.
    """
    global VECTOR_STORE
    
    try:
        # Check if the directory exists
        if not os.path.exists(PERSIST_DIR):
            return f"Error: The directory '{PERSIST_DIR}' was not found. Please ensure your ChromaDB is at this location."
        
        # Load the Chroma vector store
        VECTOR_STORE = Chroma(
            persist_directory=PERSIST_DIR,
            embedding_function=embeddings,
            collection_name=COLLECTION_NAME
        )
        
        return "RAG system initialized successfully from local data."

    except Exception as e:
        return f"An error occurred during RAG system initialization: {str(e)}"

def respond(message, chat_history):
    """
    Responds to a user message using the multimodal RAG chain and returns the
    response, image path, and extracted text.
    """
    global CHAT_HISTORY
    
    # Call the mm_rag_chain with the user's message
    response, image_path, extracted_text = mm_rag_chain(message)
    
    # Check if an image was retrieved and add it to the chat history
    if image_path:
        # Gradio chatbot can display images by using a tuple (text, image_path)
        CHAT_HISTORY.append((message, (response, image_path)))
    else:
        CHAT_HISTORY.append((message, response))
    
    return "", CHAT_HISTORY, extracted_text

# --- Gradio Interface ---

with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown(
        """
        # Welcome to VidChat !
        Ask questions based on your locally processed video.
        """
    )
    
    # Status message to display RAG system initialization feedback
    status_message = gr.Textbox(
        label="Status",
        value=init_rag_system(),
        interactive=False
    )
    
    # Extracted text display
    extracted_text_display = gr.Textbox(
        label="Extracted Content",
        lines=5,
        interactive=False
    )
    
    # Chatbot interface
    chatbot = gr.Chatbot(
        label="Chat about your video",
        height=500
    )
    
    # User message input
    msg = gr.Textbox(
        label="Your Question",
        placeholder="e.g., What is the main topic of the content?",
        interactive=True
    )
    
    # Action buttons
    with gr.Row():
        submit_button = gr.Button("Send")
        clear_button = gr.ClearButton([msg, chatbot, extracted_text_display])

    # Event handlers
    submit_button.click(
        fn=respond,
        inputs=[msg, chatbot],
        outputs=[msg, chatbot, extracted_text_display]
    )
    
    # Handle submitting with Enter key
    msg.submit(
        fn=respond,
        inputs=[msg, chatbot],
        outputs=[msg, chatbot, extracted_text_display]
    )

# Launch the app
if __name__ == "__main__":
    # The 'inline=True' parameter tells Gradio to display the app within the notebook
    # instead of opening a new browser tab.
    demo.launch(inline=True)


  chatbot = gr.Chatbot(


* Running on local URL:  http://127.0.0.1:7871
* To create a public link, set `share=True` in `launch()`.


Retrieved image path: None
Retrieved image path: None
