---


SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0

# Agentic RAG-Enabled Video Interaction Pipeline with Morpheus, RIVA and VIA Microservices

This notebook will go over the steps to setup a VIA Microservice Workflow that integrates NVIDIA Morpheus and RIVA NIM.

- **VIA Microservice**: This is used to process the videos with Vision-Language Models (VLM) and provide summarized answers to queries.
- **NVIDIA RIVA NIMs**: This is a hosted GPU-accelerated multilingual speech and translation microservice for building fully customizable, real-time conversational AI pipelines. Riva includes automatic speech recognition (ASR), text-to-speech (TTS).
- **NVIDIA Morpheus**: Morpheus allows users to build their own optimized pipelines. Here, it is used for creating async Agentic workflow, and support for integration with NIMs alongside the VIA to maximize impact in user queries.


We then show a sample use case demonstrating how this workflow can enhance accessibility for the visually impaired.

![pipeline flowchart](images/Video%20Agentic%20RAG%20with%20VIA.png)



## Step 0 - Environment Setup


In [1]:
%%capture 
!pip install grpcio_tools nvidia-riva-client ffmpeg-python
!pip install -r https://raw.githubusercontent.com/nvidia-riva/python-clients/main/requirements.txt
!pip install --force-reinstall git+https://github.com/nvidia-riva/python-clients.git

In [2]:
%load_ext autoreload
%aimport -logging
%autoreload 2

import sys
import os
import warnings
import cudf
import logging

# Ensuring Morpheus is installed correctly
import morpheus._lib

warnings.simplefilter(action='ignore', category=FutureWarning)

sys.path.append(os.path.abspath("../"))

Ensure the necessary environment variables are set. As a last resort, try to load them from a `.env` file.


In [4]:
# Ensure that the current environment is set up with API keys
required_env_vars = ["NVIDIA_API_KEY", "SERPAPI_API_KEY"]

if (not all([var in os.environ for var in required_env_vars])):

    # Try loading an .env file if it exists
    from dotenv import load_dotenv

    load_dotenv()

    # Check again
    if (not all([var in os.environ for var in required_env_vars])):
        raise ValueError(f"Please set the following environment variables: {required_env_vars}")

Configure logging to allow Morpheus messages to appear in the notebook.


In [5]:
import src

# Create a logger for this module.
logger = logging.getLogger(src.__name__)

# Configure the parent logger log level
logger.parent.setLevel(logging.INFO)

Finally, test out the logger to ensure that it is working correctly. You should see a message printed to the console.


In [6]:
logger.info("Successfully configured logging!")

Successfully configured logging!


In [7]:
! mkdir /root/.cache/morpheus 
! mkdir /root/.cache/morpheus/log

mkdir: cannot create directory ‘/root/.cache/morpheus’: File exists
mkdir: cannot create directory ‘/root/.cache/morpheus/log’: File exists


## Step 1: Process the video with VIA Microservice and store dense captions in a database

First we process the complete video using VIA Microservice. VIA can process long videos and generate insightful summaries. This is then stored in a vector database for text base RAG of older factual information.


Every time we run this agentic RAG pipeline:

1. Agent can process the last `n_seconds` of a video directly and can interactively query it with different prompts using VIA. 
2. The vector database generated in this step is used to fetch relevant information from the complete video.


We begin by uploading the relevant video file to VIA Microservice. VIA also supports a live streaming video as input.



The following prompt is used to generate these dense captions:
***

```
You are an expert at world understanding and description. Your task is to capture, in as much detail as possible, the events in a provided ego-centric video. Be sure to capture as much description as possible about the environment, people, objects, and actions performed in the video. Please be explicit about what kinds of objects. Also note shifts in direction of the camera and the relative change in location of the objects in the environment that result.```

***

In [8]:
from src.video_preprocess import VideoPreprocessor

processor = VideoPreprocessor(host="localhost", vlm_port=31012, milvus_port=31014)
file_id = processor.preprocess(file_path = './pov-kitchen-stove.mp4',
                               chunk_duration=4,
                               chunk_overlap=1,
                               faiss_dir='./vdb')

*Pre-processing video.
	**Uploading video
	**Creating dense captions. This could take a few minutes.


`embedding_function` is expected to be an Embeddings object, support for passing in a function will soon be removed.


	**Caching captions in FAISS.
The provided images are frames sampled from a video taken in a kitchen environment. Below is a detailed description of the events, objects, and environment observed in each frame:

### Frame at <0.1001> seconds
- **Environment**: The scene is a modern kitchen with light wooden cabinets and drawers. The countertop is white with a marble-like pattern.
- **Objects**: 
  - On the left side of the countertop, there is a bag of KOS Organic Plant Protein.
  - A wooden bowl with lemons and a small wooden container are also present.
  - A cutting board is placed vertically against the backsplash.
  - A gas stove with a stainless steel finish is partially visible on the right.
  - A window with greenery outside is visible in the background.
- **Camera Direction**: The camera is facing the countertop and cabinets directly.

### Frame at <0.5005> seconds
- **Environment**: The same kitchen setting.
- **Objects**: 
  - The bag of KOS Organic Plant Protein is still visi

## Step 2: Building the Agentic Pipeline

The agentic pipeline works in two steps. First, given a query from a user, a `checklist model` will dynamically create a checklist of items an agent will work through in order to answer a query. For this workflow example, the checklist is build using the following prompt:

***

```
You are an expert assistant to the visually impaired, helping to navigate, plan, and complex questions about the environment around you given a first person view video of the environment. Your objective is to add a "Checklist" section containing steps to use for a downstream agent to follow to achieve the result of a given query.\
For each checklist item, start with an action verb, making it clear and actionable

**Context**:
Understand the world around you and planning or describing certain scenarios are a complex task which could require multiple steps. Creating a checklist of to-dos to achieve a certain query helps reduce complexity while maintaining throoughness. 

**Example Format**:
Below is a format for an example that illustrate transforming a query into an actionable checklist, 

Example Query:
Where are the dirty dishes on the counter I'm looking at?

Example Scene Evaluation Checklist:
[
"Verify your environment: Check if the user is looking at a counter. Verify if there are dishes on the counter.",
"Produce absolute directions: Identifying where in the video (field of view) the object is helps provide direction (e.g. to the left)",
"Produce relative landmarks: Identifying the location of the dishes in the video is a mixutre of absolute directions like "to the left", but also relative to other objects in the field of view."
"Plan: Identifying if anything in the field of view needs to be done in order to access the object in the query such as moving things out of the way, etc."
]

**Criteria**:
- Checklists must relate to the information in the specific query.
- Checklists must include checks for mitigating conditions or queries for objects not present in the field of view.
- Avoid repetitive objectives between checklist items. The more concise your list, the better.

**Procedure**:
[
"Understand what the query is asking of you, and what common sense checks need to be done to validate.",
"Produce a checklist of action items or queries.",
"Format the checklist as comma separated list surrounded by square braces.",
"Output the checklist."
]
```
***

### Step 2.1 

Next, We'll build a utility function that uses an NVIDIA Morpheus pipeline's `LLMEngine` to allow for dynamic tool-enabled RAG that is event driven. 

To answer a question about the from a user prompt in a video, the model needs to look to ask questions about the 'live' `n_seconds` of video as well as look back further in time if needed using text based RAG. With multiple tools/knowledge sources- `Video QA Engine` and `Text QA Engine`,  we need a new framework to allow our LLM to choose what tools it needs to use and synthesize the responses. One method we can use is LangChain agents.

An agent in this sense is an LLM that has "agency" to determine what sources of information it needs to retrieve to answer questions. This can be achieved through prompting. The most simplistic prompt to use to turn an LLM into an agent with tool usage might look like this:

***
```
You are a helpful assistant. Help the user answer any questions.

You have access to the following tools:

{tools}

In order to use a tool, you can use <tool></tool> and <tool_input></tool_input> tags.
You will then get back a response in the form <observation></observation>
When you are done, respond with a final answer between <final_answer></final_answer>. 

Question: {input}
```
***

Ideally, with just one round of query-> tool-> observation-> final answer, the LLM will get the information it needs to answer simple queries such as `Do I have scissors on the counter?`

But we want the agent to be able to answer more complex queries, and also accomodate for the user's vision impairment by providing helpful information about obstructions in a field of view and other tactile reference points.  We would prompt the LLM to use a series of steps (repeated N times): Thought, Action, and Observation. This process loop of reasoning and acting is called a ReAct (Reason and Act) Agent.

***
```
To determine if the user is looking at a counter, I need to analyze the current view from the user's perspective.

Action: FPV Video QA System
Action Input: Describe the current view in the video.To answer the question, I need to get a detailed description of the objects on the counter from the video feed.

Action: FPV Video QA System
Action Input: Describe the objects on the counter in the video.To provide absolute directions for each item in the video, I need to first identify all the items present in the user's field of view and their respective locations.

Action: FPV Video QA System
Action Input: Describe all items in the field of view and their locations in the video.To accurately describe the location of each item relative to other objects on the counter, I need to first identify all the items present on the counter and their positions.
```
***

The below utility function will construct the pipeline for an asychronous ReAct agent for us. All source code can be found in the src directory. For this notebook, we will use a `gpt-4o` vision model for the VLM and for the checklist model, and a `gpt-4-turbo` LLM as the agent for reasoning and tool usage. This workflow is **fully compatible with NVIDIA NIMs** and models hosted on `build.nvidia.com`. 

In [9]:
from morpheus.config import Config
from morpheus.config import PipelineModes
from textwrap import dedent

from src.config import EngineConfig
from src.config import LLMModelConfig
from src.config import NVFoundationLLMModelConfig
from src.nim_llm_service import NIMLLMService
from src import config
from src.llm_service import LLMService 
from src.faiss_vdb_service import FaissVectorDBService

from morpheus._lib.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.rag_node import RAGNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.concat_df import concat_dataframes

from src.pipeline_utils import build_acc_llm_engine #This function intializes the tools and agents given a config.

# Create the pipeline config
pipeline_config = Config()
pipeline_config.mode = PipelineModes.OTHER

async def run_react_pipeline(p_config: Config, e_config: EngineConfig, input_cves: list[str], retry_bad_input = True):
    source_dfs = [cudf.DataFrame({"cve_info": input_cves})]

    # Create a completion task to be used by the DeserializeStage. This indicates which columns to use from the
    # dataframe
    completion_task = {"task_type": "completion", "task_dict": {"input_keys": ["cve_info"], }}

    pipe = LinearPipeline(p_config)

    # Create a source object which will emit our dataframe into the pipeline
    pipe.set_source(InMemorySourceStage(p_config, dataframes=source_dfs))

    # The deserialize stage will take the dataframe and convert it into a ControlMessage object
    pipe.add_stage(
        DeserializeStage(p_config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))

    # Add the LLM Engine stage to the pipeline. This executes our CVE workflow and runs the LLM model
    pipe.add_stage(LLMEngineStage(p_config, engine=build_acc_llm_engine(e_config, retry_bad_input)))

    # Add a sink to the pipeline to capture the output of the pipeline
    sink = pipe.add_stage(InMemorySinkStage(p_config))

    # Run the pipeline. This will complete once all messages have been processed
    await pipe.run_async()

    messages = sink.get_messages()
    responses = concat_dataframes(messages)

    print("Received %s responses:\n%s", len(messages), responses[["checklist", "summary"]].to_json(indent=2))
    return responses.to_dict(orient='records')[0]['summary']


  from google.protobuf import service as _service


One mechanism of finding the proper information from the database is to first embed the query into the same vector space and retrieve the top most similar items via a distance metric. The additional information is then presented in the prompt of the LLM. The neighboring vectors in the database are said to be "semantically similar" to the query and likely relevant. We'll embed some of the past video as text summaries in a FAISS vector database. 

We'll use the already pre-built FAISS vector store containing dense captions from the VLM Miluvs database.

### Step 2.2 
Finally, we will build the full `run_pipeline()` function that enables a demonstration of the capability. 

For demonstration, we will simulate a user query by transcribing a voice recording into text using NVIDIA `Riva Speech Services`.

In [10]:
from src.config import EngineConfig
import io
import IPython.display as ipd
import grpc
import numpy as np
from openai import OpenAI
from src import riva_nvcf_utils as riva_utils

async def run_pipeline(query_file_path:str, total_runtime:int, n_seconds:int=10, vector_db_dir:str='./vdb'):
    
    text_captionining_end_time = total_runtime - n_seconds
    
    print("*Transcribing Audio Query")
    text_query = riva_utils.transcribe_file(query_file_path)
    print(f"*\t Recieved Transcribed Query: {text_query}")
    
    print("*Starting AgenticRAG")
    # Create the engine configuration
    engine_config = EngineConfig.model_validate({
        "checklist": {
            "model": {
                "service": {
                    "type": "NIM", "api_key": None
                },
                "base_url": "https://integrate.api.nvidia.com/v1",
                "model_name": "meta/llama3-70b-instruct", #Vision model for VLM and Checklist
                "temperature": 0.01
            }
        },
        "agent": {
            "model": {
                "service": {
                    "type": "NIM", "api_key": None
                },
                "base_url": "https://integrate.api.nvidia.com/v1",
                "model_name": "meta/llama3-70b-instruct", #For ReAct Agent
                "temperature": 0.01
            },
            "video": {
                "file_id":file_id,
                "start_timestamp": text_captionining_end_time, #Start of n_seconds
                "end_timestamp" : total_runtime, # End of n_seconds
                "vlm_port": "31012"
            },
            "text_db" : {
                "faiss_dir" : vector_db_dir,
                "embedding_model_name": "all-MiniLM-L6-v2"           
            }
        }
    })

    responses = await run_react_pipeline(pipeline_config, engine_config, [text_query], retry_bad_input=True)
    
    kp_prompt = f"""Based on the context, please produce an answer to my query that is under a sentence long. \n\nQuery: {text_query} \n\nContext: {responses}"""
    
    api_key = os.getenv("NVIDIA_API_KEY")
    base_url = "https://integrate.api.nvidia.com/v1"

    llm_client = OpenAI(
      base_url = base_url,
      api_key = api_key
    )
    
    completion = llm_client.chat.completions.create(
          model="meta/llama3-70b-instruct",
          messages=[{"role":"user","content":kp_prompt}],
          temperature=0.5,
          top_p=1,
          max_tokens=1024,
          stream=False
        )
    
    responses = completion.choices[0].message.content
    print(f"Keyphrase: {responses}")
    
    print("*Completed Agentic RAG. Performing Text-To-Speech")
    riva_utils.generate_audio(responses)

## Step 3: Run the Pipeline

In [None]:
tts_response = await run_pipeline(
    query_file_path = "./audio_Did-I-turn-off-the-stove.wav",
    total_runtime=30,
    n_seconds=20,
)

*Transcribing Audio Query
*	 Recieved Transcribed Query: ## did i turn off the stove 

*Starting AgenticRAG
====Pipeline Pre-build====
====Pre-Building Segment: linear_segment_0====
====Pre-Building Segment Complete!====
====Pipeline Pre-build Complete!====
====Registering Pipeline====
====Building Pipeline====
====Building Pipeline Complete!====
====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
====Building Segment: linear_segment_0====
Added source: <from-mem-8; InMemorySourceStage(dataframes=[                         cve_info
0  ## did i turn off the stove \n], repeat=1)>
  └─> morpheus.MessageMeta
Added stage: <deserialize-9; DeserializeStage(ensure_sliceable_index=True, message_type=<class 'morpheus._lib.messages.ControlMessage'>, task_type=llm_engine, task_payload={'task_type': 'completion', 'task_dict': {'input_keys': ['cve_info']}})>
  └─ morpheus.MessageMeta -> morpheus.ControlMessage
Added stage: <llm-engine-10; LLMEngineStage(engine=<m

In [None]:
path = "output.wav"
with io.open(path, 'rb') as fh:
    content = fh.read()
ipd.Audio(path)