# Video/Document Continuous Ingestion from Object Storage

## Purpose

This notebook demonstrates an **automated document and video ingestion pipeline** that:

1. Monitors emulated object storage for new uploads via Kafka events
2. Routes files to appropriate AI services based on file type, currently supports document and video ingestion
5. Enables RAG Agent for semantic search and contextual Q&A over all ingested content

## What Gets Deployed

1. **NVIDIA RAG** - Document indexing, vector search, and AI-powered Q&A (NIMs, Milvus, Ingestor)
2. **NVIDIA VSS** - Video understanding and summarization (VLM, LLM NIMs, VSS Engine)
3. **Continuous Ingestion** - Event-driven ingestion pipeline (Kafka, MinIO, Consumer)


## Prerequisites

### Hardware
- **GPU**: 8x RTX PRO 6000 Blackwell

### Software (pre-installed required)
- Ubuntu 22.04 or later
- Docker 24.0+ with Docker Compose v2
- NVIDIA Driver 570+
- NVIDIA Container Toolkit

### API Keys

<table style="margin-left: 0;">
<tr><th>Key</th><th>Purpose</th><th>How to Get</th></tr>
<tr><td><code>NGC_API_KEY</code></td><td>Docker login, NIM deployments</td><td><a href="https://org.ngc.nvidia.com/setup/api-keys">NGC Portal</a> â†’ Generate API Key</td></tr>
<tr><td><code>HF_TOKEN</code></td><td>Download VSS models</td><td><a href="https://huggingface.co/settings/tokens">HuggingFace Tokens</a> â†’ Create token with Read access</td></tr>
</table>


## Table of Contents

<table style="margin-left: 0;">
<tr><th>Section</th><th>Description</th></tr>
<tr><td><b>Setup</b></td><td>Clone repo, install deps, set API keys, load helpers</td></tr>
<tr><td><b>Deploy RAG</b></td><td>NIMs, Vector DB, Ingestor, RAG Server</td></tr>
<tr><td><b>Deploy VSS</b></td><td>Clone VSS, deploy NIMs and VLM</td></tr>
<tr><td><b>Deploy Continuous Ingestion</b></td><td>Kafka, MinIO, Consumer</td></tr>
<tr><td><b>Testing</b></td><td>Upload documents & videos, query RAG</td></tr>
<tr><td><b>Clean Up</b></td><td>Stop services, clean data</td></tr>
</table>


## References

- **RAG Blueprint**: [NVIDIA RAG Documentation](https://github.com/NVIDIA-AI-Blueprints/rag/blob/develop/docs/deploy-docker-self-hosted.md)
- **VSS**: [Video Search & Summarization Documentation](https://docs.nvidia.com/vss/latest/index.html)
- **NIM**: [NVIDIA NIM Documentation](https://docs.nvidia.com/nim/index.html)


# Setup

Clone the repository, configure API keys, and load helper functions.



## 1. Clone Repository

Clone the RAG Blueprint repo to `~/rag`. This includes the consumer source code, deploy configs, and sample test data.



In [None]:
import subprocess, sys, os

RAG_REPO_DIR = os.path.expanduser("~/rag")
RAG_REPO_URL = "https://github.com/NVIDIA-AI-Blueprints/rag.git"

# Clone from correct branch (skip if already exists)
if not os.path.exists(RAG_REPO_DIR):
    subprocess.run(f"git clone {RAG_REPO_URL} {RAG_REPO_DIR}", shell=True, check=True)
else:
    print(f"[OK] RAG repo already exists: {RAG_REPO_DIR}")
subprocess.run("git lfs pull", shell=True, cwd=RAG_REPO_DIR, check=True)

# Verify
for path in ["deploy/compose", "examples/rag_event_ingest/kafka_consumer", "examples/rag_event_ingest/data"]:
    status = "[OK]" if os.path.exists(os.path.join(RAG_REPO_DIR, path)) else "[MISSING]"
    print(f"  {status} {path}")



## 2. Install Dependencies


In [None]:
import shutil

def check_install_system_pkg(cmd: str, install_cmd: str):
    if shutil.which(cmd):
        print(f"  [OK] {cmd} found")
        return True
    print(f"  [INSTALLING] {cmd}...")
    result = subprocess.run(install_cmd, shell=True, capture_output=True, text=True)
    if result.returncode == 0:
        print(f"  [OK] {cmd} installed")
        return True
    print(f"  [ERROR] Failed to install {cmd}. Please install manually: {install_cmd}")
    return False

check_install_system_pkg("git", "sudo apt-get update && sudo apt-get install -y git")
check_install_system_pkg("git-lfs", "sudo apt-get install -y git-lfs && git lfs install")

# Install Python packages
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "minio", "aiohttp", "requests", "python-dotenv", "pyyaml"])
print("[OK] Ready")

## 3. Set API Keys

Configure NGC and HuggingFace API keys for NIM deployments and model downloads.



In [None]:
import getpass

def set_api_key(env_var: str, prompt: str, required: bool = True):
    if os.environ.get(env_var):
        print(f"  [OK] {env_var} already set ({os.environ[env_var][:10]}...)")
        return True
    key = getpass.getpass(prompt)
    if key:
        os.environ[env_var] = key
        print(f"  [OK] {env_var} set")
        return True
    if required:
        print(f"  [ERROR] {env_var} is required")
        return False
    print(f"  [SKIP] {env_var} (optional)")
    return True

set_api_key("NGC_API_KEY", "Enter NGC_API_KEY (starts with 'nvapi-'): ", required=True)
set_api_key("HF_TOKEN", "Enter HF_TOKEN (optional, press Enter to skip): ", required=False)


## 4. Helper Functions

Shared utilities for deployment, file upload, status checks, and RAG queries.



In [None]:
# Install dependencies
import sys
!{sys.executable} -m pip install -q minio aiohttp requests python-dotenv

In [None]:
import os, sys, json, re, subprocess, time, socket, asyncio
import aiohttp, requests
from typing import List, Optional, Dict

try:
    from minio import Minio
    from minio.error import S3Error
except ImportError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "minio"])
    from minio import Minio
    from minio.error import S3Error

# =============================================================================
# CONFIGURATION
# =============================================================================

# Paths relative to RAG repo root
RAG_REPO_DIR = os.path.expanduser("~/rag")
EXAMPLE_DIR = os.path.join(RAG_REPO_DIR, "examples/rag_event_ingest")
AIDP_COMPOSE_FILE = os.path.join(EXAMPLE_DIR, "deploy/docker-compose.yaml")
DATA_DIR = os.path.join(EXAMPLE_DIR, "data")
RAG_SERVER_URL = "http://localhost:8081"
INGESTOR_URL = "http://localhost:8082"

VSS_DIR = os.path.expanduser("~/video-search-and-summarization")
VSS_UI_PORT = 9110
VSS_API_PORT = 8110
VSS_LLM_PORT = 8107
VSS_EMBED_PORT = 8106
VSS_RERANK_PORT = 8105
LOCAL_NIM_CACHE = os.path.expanduser("~/.cache/nim")

MINIO_ENDPOINT = "localhost:9201"
MINIO_ACCESS_KEY = "minioadmin"
MINIO_SECRET_KEY = "minioadmin"
MINIO_BUCKET = "aidp-bucket"
MINIO_COLLECTION = "aidp_bucket"
MINIO_CONSOLE_PORT = 9211

# =============================================================================
# SHARED UTILITIES
# =============================================================================

def run_command(cmd: str, capture: bool = False) -> Optional[str]:
    """Execute a shell command and print it."""
    print(f"$ {cmd}")
    result = subprocess.run(cmd, shell=True, capture_output=capture, text=True)
    return result.stdout if capture else None

def get_host_ip() -> str:
    """Get host IP address for external access URLs."""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except OSError:
        return "localhost"

def get_minio_client() -> Minio:
    """Create MinIO client for AIDP bucket operations."""
    return Minio(MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=False)

def upload_file(local_path: str, object_name: Optional[str] = None) -> bool:
    """Upload a local file to MinIO AIDP bucket."""
    if not os.path.exists(local_path):
        print(f"[ERROR] File not found: {local_path}")
        return False
    obj = object_name or os.path.basename(local_path)
    try:
        client = get_minio_client()
        if not client.bucket_exists(MINIO_BUCKET):
            client.make_bucket(MINIO_BUCKET)
        client.fput_object(MINIO_BUCKET, obj, local_path)
        print(f"[OK] Uploaded: {obj}")
        return True
    except S3Error as e:
        print(f"[ERROR] {e}")
        return False

def verify_file_in_storage(object_name: str, bucket: str = MINIO_BUCKET) -> bool:
    """Check if a file exists in MinIO bucket and print verification status."""
    try:
        client = get_minio_client()
        stat = client.stat_object(bucket, object_name)
        print(f"[OK] File verified in storage:")
        print(f"  Bucket:   {bucket}")
        print(f"  Object:   {object_name}")
        print(f"  Size:     {stat.size:,} bytes")
        print(f"  Modified: {stat.last_modified}")
        return True
    except S3Error as e:
        print(f"[ERROR] File not found in storage: {object_name}")
        print(f"  Error: {e}")
        return False

def get_consumer_logs(lines: int = 30) -> None:
    """Show recent Kafka consumer logs."""
    run_command(f"docker logs kafka-consumer --tail {lines}")

async def query_rag(question: str, collection: str = None) -> Optional[str]:
    """Query RAG system and print the answer."""
    coll = collection or MINIO_COLLECTION
    print(f"Q: {question}\nCollection: {coll}\n" + "-" * 40)

    payload = {
        "messages": [{"role": "user", "content": question}],
        "use_knowledge_base": True,
        "collection_name": coll,
    }
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{RAG_SERVER_URL}/generate", json=payload,
                timeout=aiohttp.ClientTimeout(total=120),
            ) as resp:
                text = await resp.text()
                # Parse SSE response: extract content from each "data: {...}" line
                chunks = []
                for line in text.split("\n"):
                    if not line.startswith("data: ") or line[6:] == "[DONE]":
                        continue
                    try:
                        msg = json.loads(line[6:]).get("choices", [{}])[0].get("message", {})
                        if msg.get("content"):
                            chunks.append(msg["content"])
                    except json.JSONDecodeError:
                        pass
                answer = "".join(chunks)
                print(f"Answer: {answer}")
                return answer
    except aiohttp.ClientError as e:
        print(f"[ERROR] {e}")
        return None

print(f"[OK] Helpers loaded | Host IP: {get_host_ip()}")


# Deploy NVIDIA RAG

Deploy the NVIDIA RAG: NIMs (LLM, Embedding, Reranker), Milvus vector database, Ingestor server, and RAG server.


In [None]:
ngc_key = os.environ.get("NGC_API_KEY")
if not ngc_key:
    raise RuntimeError("NGC_API_KEY not set! Run the API keys cell first.")

os.chdir(RAG_REPO_DIR)

# Set env vars needed by docker compose
os.environ["NGC_API_KEY"] = ngc_key
os.environ["USERID"] = f"{os.getuid()}:{os.getgid()}"
os.environ["COLLECTION_NAME"] = MINIO_COLLECTION

# Load RAG .env defaults (MODEL_DIRECTORY, etc.)
from dotenv import load_dotenv
env_file = os.path.join(RAG_REPO_DIR, "deploy/compose/.env")
if os.path.exists(env_file):
    load_dotenv(env_file, override=False)

# Login to nvcr.io
subprocess.run(f"echo {ngc_key} | docker login nvcr.io -u '$oauthtoken' --password-stdin",
               shell=True, capture_output=True, text=True, executable="/bin/bash")

# Deploy components
for label, compose_file in [
    ("NIMs",      "deploy/compose/nims.yaml"),
    ("Vector DB", "deploy/compose/vectordb.yaml"),
]:
    print(f"Deploying {label}...")
    run_command(f"docker compose -f {compose_file} up -d")

print("Waiting 30s for Milvus...")
time.sleep(30)

for label, compose_file in [
    ("Ingestor", "deploy/compose/docker-compose-ingestor-server.yaml"),
    ("RAG Server", "deploy/compose/docker-compose-rag-server.yaml"),
]:
    print(f"Deploying {label}...")
    run_command(f"docker compose -f {compose_file} up -d")

ip = get_host_ip()
print(f"\nRAG deployed: http://{ip}:8081 (server) | http://{ip}:8082 (ingestor) | http://{ip}:8090 (UI)")
print(f"COLLECTION_NAME: {MINIO_COLLECTION}")
print("Wait 2-5 minutes for NIMs to load models, then run the status check cell.")


Verify RAG services are healthy. Wait 2-5 minutes for NIMs to load models.

The deployment status should be:
```
NAMES                            STATUS
rag-frontend                     Up About a minute
rag-server                       Up About a minute
ingestor-server                  Up About a minute
milvus-standalone                Up 2 minutes (healthy)
milvus-etcd                      Up 2 minutes (healthy)
milvus-minio                     Up 2 minutes (healthy)
nim-llm-ms                       Up 2 minutes (healthy)
```



In [None]:
# Check service status and print access URLs
print("Wait 2-5 minutes for services to become healthy.")
print("Run this cell again after waiting.\n")

ip = get_host_ip()
for name, port, path in [
    ("RAG Server", 8081, "/health"), ("Ingestor", 8082, "/health"),
    ("Frontend", 8090, "/"), ("Milvus", 19530, "/v1/vector/collections"),
]:
    try:
        s = "[OK]" if requests.get(f"http://localhost:{port}{path}", timeout=10).status_code == 200 else "[WARN]"
    except requests.ConnectionError:
        s = "[DOWN]"
    except requests.Timeout:
        s = "[TIMEOUT]"
    print(f"  {s} {name}: http://{ip}:{port}")
run_command("docker ps --format 'table {{.Names}}\t{{.Status}}' | grep -E '(rag|milvus|ingestor|nim|NAMES)'")


# Deploy NVIDIA VSS

Deploy the NVIDIA VSS: NIMs (LLM, Embedding, Reranker) and VLM for video analysis.


In [None]:
# VSS deployment configuration
VSS_REPO_URL = "https://github.com/NVIDIA-AI-Blueprints/video-search-and-summarization.git"
VSS_TAG = "2.4.1"  # VSS Blueprint version (must match VIA_IMAGE tag)
VSS_GPU_DEVICE = 4       # GPU for NIMs (LLM, Embedding, Reranker)
VSS_VLM_GPU_DEVICE = 5   # GPU for VLM (via-server with Cosmos-Reason2)

NIM_IMAGES = {
    "vss-llm":       ("nvcr.io/nim/meta/llama-3.1-8b-instruct:1.12.0",      VSS_LLM_PORT),
    "vss-embedding": ("nvcr.io/nim/nvidia/llama-3.2-nv-embedqa-1b-v2:1.9.0", VSS_EMBED_PORT),
    "vss-reranker":  ("nvcr.io/nim/nvidia/llama-3.2-nv-rerankqa-1b-v2:1.7.0", VSS_RERANK_PORT),
}

ngc_key = os.environ.get("NGC_API_KEY", "")
hf_token = os.environ.get("HF_TOKEN", "")
if not ngc_key:
    raise RuntimeError("NGC_API_KEY not set!")

# Docker login
subprocess.run(f"echo {ngc_key} | docker login nvcr.io -u '$oauthtoken' --password-stdin",
               shell=True, capture_output=True, text=True, executable="/bin/bash")

# Clone VSS repo with correct release tag
if not os.path.exists(VSS_DIR):
    print(f"Cloning {VSS_REPO_URL} (tag: {VSS_TAG})...")
    subprocess.run(f"git clone --branch {VSS_TAG} --depth 1 {VSS_REPO_URL} {VSS_DIR}", shell=True)
else:
    print(f"[OK] VSS repo exists: {VSS_DIR}")

# Deploy NIM containers (all on same GPU)
os.makedirs(LOCAL_NIM_CACHE, exist_ok=True)
for name, (image, port) in NIM_IMAGES.items():
    subprocess.run(f"docker rm -f {name} 2>/dev/null", shell=True, capture_output=True)
    cmd = f"""docker run -d --name {name} \
        -u $(id -u) --gpus '"device={VSS_GPU_DEVICE}"' --shm-size=16GB \
        --network nvidia-rag -e NGC_API_KEY={ngc_key} \
        -v "{LOCAL_NIM_CACHE}:/opt/nim/.cache" \
        -p {port}:8000 -e NIM_LOW_MEMORY_MODE=1 -e NIM_RELAX_MEM_CONSTRAINTS=1 \
        {image}"""
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True, executable="/bin/bash")
    status = "[OK]" if result.returncode == 0 else "[ERROR]"
    print(f"  {status} {name} -> port {port}")

# Deploy VSS application (VLM on separate GPU)
vss_deploy_dir = f"{VSS_DIR}/deploy/docker/local_deployment_single_gpu"
env_content = f"""NGC_API_KEY={ngc_key}
HF_TOKEN={hf_token}
VIA_IMAGE=nvcr.io/nvidia/blueprint/vss-engine:2.4.1
FRONTEND_PORT={VSS_UI_PORT}
BACKEND_PORT={VSS_API_PORT}
MILVUS_DB_HTTP_PORT=19091
MILVUS_DB_GRPC_PORT=29530
MINIO_PORT=9002
MINIO_WEBUI_PORT=9003
GRAPH_DB_USERNAME=neo4j
GRAPH_DB_PASSWORD=password
ARANGO_DB_USERNAME=arangodb
ARANGO_DB_PASSWORD=password
CA_RAG_CONFIG=./config.yaml
GUARDRAILS_CONFIG=./guardrails
NVIDIA_VISIBLE_DEVICES={VSS_VLM_GPU_DEVICE}
VLM_MODEL_TO_USE=cosmos-reason2
MODEL_PATH=git:https://huggingface.co/nvidia/Cosmos-Reason2-8B
VLLM_GPU_MEMORY_UTILIZATION=0.4
VLM_MAX_MODEL_LEN=20480
DISABLE_GUARDRAILS=true
DISABLE_CV_PIPELINE=true
ENABLE_AUDIO=false
"""
with open(f"{vss_deploy_dir}/.env", "w") as f:
    f.write(env_content)

# Patch config.yaml to use our NIM ports
config_file = f"{vss_deploy_dir}/config.yaml"
if os.path.exists(config_file):
    cfg = open(config_file).read()
    cfg = re.sub(r":8007/v1", f":{VSS_LLM_PORT}/v1", cfg)
    cfg = re.sub(r":8006/v1", f":{VSS_EMBED_PORT}/v1", cfg)
    cfg = re.sub(r":8005/v1", f":{VSS_RERANK_PORT}/v1", cfg)
    open(config_file, "w").write(cfg)

cmd = f"cd {vss_deploy_dir} && set -a && source .env && set +a && docker compose up -d"
subprocess.run(cmd, shell=True, capture_output=True, text=True, executable="/bin/bash")

ip = get_host_ip()
print(f"\nVSS deployed: http://{ip}:{VSS_UI_PORT} (UI) | http://{ip}:{VSS_API_PORT} (API)")
print("Wait 2-5 minutes for NIMs to load models, then run the status check cell.")


Verify VSS services are healthy. Wait 2-5 minutes for NIMs to load models.

The deployment status should be:
```
NAMES                                             STATUS
local_deployment_single_gpu-via-server-1          Up About a minute
local_deployment_single_gpu-elasticsearch-1       Up About a minute
local_deployment_single_gpu-graph-db-1            Up About a minute
local_deployment_single_gpu-minio-1               Up About a minute
local_deployment_single_gpu-arango-db-1           Up About a minute
local_deployment_single_gpu-milvus-standalone-1   Up About a minute (healthy)
vss-reranker                                      Up About a minute
vss-embedding                                     Up About a minute
vss-llm                                           Up About a minute
```



In [None]:
# Check service status and print access URLs
ip = get_host_ip()
for name, port, path in [
    ("VSS UI", VSS_UI_PORT, "/"), ("VSS API", VSS_API_PORT, "/"),
    ("LLM NIM", VSS_LLM_PORT, "/v1/health/ready"),
    ("Embedding", VSS_EMBED_PORT, "/v1/health/ready"),
    ("Reranker", VSS_RERANK_PORT, "/v1/health/ready"),
]:
    try:
        requests.get(f"http://localhost:{port}{path}", timeout=10)
        s = "[OK]"
    except requests.ConnectionError:
        s = "[DOWN]"
    except requests.Timeout:
        s = "[TIMEOUT]"
    print(f"  {s} {name}: http://{ip}:{port}")
run_command("docker ps --format 'table {{.Names}}\t{{.Status}}' | grep -E '(vss|via|local_deployment|NAMES)'")


# Deploy Continuous Ingestion from emulated object storage

Deploy the Continuous Ingestion: Kafka message broker, MinIO object storage, and Kafka consumer for automated ingestion.

## 1. Configure Video Analysis Prompts

Customize the prompts used by the Kafka consumer when processing videos through VSS.


In [None]:
# Consumer prompts for video analysis - customize these for your video content
# These are passed to the Kafka consumer which sends them to VSS for video summarization

CONSUMER_VSS_PROMPT = """Analyze this sports video. TIMESTAMP FORMAT: Convert seconds to MM:SS (40s=00:40, 80s=01:20, 150s=02:30, 200s=03:20). \
Describe: key plays, scoring, turnovers, big gains, defensive stops, celebrations."""

CONSUMER_VSS_SYSTEM_PROMPT = """You are a sports broadcaster. CRITICAL: Convert all timestamps from seconds to MM:SS format. \
Examples: 40 seconds = 00:40, 90 seconds = 01:30, 200 seconds = 03:20, 350 seconds = 05:50. Focus on action, field position, execution, and game momentum."""

CONSUMER_VSS_CAPTION_SUMMARIZATION_PROMPT = """Format: [MM:SS] Play description. CONVERT seconds to MM:SS (40s=00:40, 80s=01:20, 150s=02:30). \
Describe: what happened, how it was executed, the result. Be vivid like a TV broadcast."""

CONSUMER_VSS_SUMMARY_AGGREGATION_PROMPT = """Create game summary with MM:SS timestamps. CONVERT all times: 40s=00:40, 80s=01:20, 200s=03:20, 350s=05:50. \
Highlight scoring plays, turnovers, momentum shifts, spectacular plays. Write like a highlight reel narrator - vivid and exciting."""

print("Consumer VSS prompts configured:")
print(f"  VSS_PROMPT: {len(CONSUMER_VSS_PROMPT)} chars")
print(f"  VSS_SYSTEM_PROMPT: {len(CONSUMER_VSS_SYSTEM_PROMPT)} chars")
print(f"  VSS_CAPTION_SUMMARIZATION_PROMPT: {len(CONSUMER_VSS_CAPTION_SUMMARIZATION_PROMPT)} chars")
print(f"  VSS_SUMMARY_AGGREGATION_PROMPT: {len(CONSUMER_VSS_SUMMARY_AGGREGATION_PROMPT)} chars")

## 2. Deploy Services

Deploy Kafka, MinIO, and the Kafka consumer with custom prompts.

In [None]:
# Verify prerequisites
net_check = subprocess.run("docker network inspect nvidia-rag", shell=True, capture_output=True)
if net_check.returncode != 0:
    raise RuntimeError("nvidia-rag network not found. Deploy RAG first.")

ngc_key = os.environ.get("NGC_API_KEY", "")
if not ngc_key:
    raise RuntimeError("NGC_API_KEY not set!")

host_ip = get_host_ip()

# Set environment variables for docker compose (including custom prompts)
os.environ["VSS_SERVER_URL"] = f"http://{host_ip}:{VSS_API_PORT}"
os.environ["VSS_PROMPT"] = CONSUMER_VSS_PROMPT
os.environ["VSS_SYSTEM_PROMPT"] = CONSUMER_VSS_SYSTEM_PROMPT
os.environ["VSS_CAPTION_SUMMARIZATION_PROMPT"] = CONSUMER_VSS_CAPTION_SUMMARIZATION_PROMPT
os.environ["VSS_SUMMARY_AGGREGATION_PROMPT"] = CONSUMER_VSS_SUMMARY_AGGREGATION_PROMPT

# Login + pull + build
subprocess.run(f"echo {ngc_key} | docker login nvcr.io -u '$oauthtoken' --password-stdin",
               shell=True, capture_output=True, text=True, executable="/bin/bash")

compose = f"docker compose -f {AIDP_COMPOSE_FILE}"
subprocess.run(f"{compose} pull --ignore-pull-failures", shell=True, capture_output=True, text=True, executable="/bin/bash")
subprocess.run(f"{compose} up -d --build", shell=True, capture_output=True, text=True, executable="/bin/bash")

print(f"Continuous Ingestion deployed:")
print(f"  Kafka UI:      http://{host_ip}:8080")
print(f"  MinIO Console: http://{host_ip}:{MINIO_CONSOLE_PORT}")
print(f"  Credentials:   minioadmin / minioadmin")
print(f"  Custom prompts: âœ“ passed to consumer")

Verify continuous ingestion services are running.

The deployment status should be:
```
NAMES                            STATUS
kafka-consumer                   Up About a minute
aidp-kafka-ui                    Up About a minute
aidp-minio-mc                    Up About a minute
aidp-minio                       Up About a minute (healthy)
kafka                            Up About a minute (healthy)
```



In [None]:
# Check service status and print access URLs
ip = get_host_ip()
print(f"  Kafka UI:      http://{ip}:8080")
print(f"  MinIO Console: http://{ip}:{MINIO_CONSOLE_PORT}")
run_command("docker ps --format 'table {{.Names}}\t{{.Status}}' | grep -E '(kafka|minio|NAMES)'")


# Testing

Test the deployment by uploading documents and videos, then querying via RAG.


## 1. Document Upload

Upload a PDF document to MinIO, which triggers automatic ingestion via Kafka consumer.


### 1.1 Upload to Storage

Upload the document to MinIO object storage.


In [None]:
# Sample documents are included in the repo under examples/rag_event_ingest/data/
pdf_path = os.path.join(DATA_DIR, "documents", "Seahawks-Patriots in Super Bowl LX_ What We Learned from Seattle's 29-13 win.pdf")
upload_file(pdf_path, "Seahawks-Patriots_SuperBowl_LX_Analysis.pdf")


### 1.2 Verify Document Ingestion

Check consumer logs to verify document processing status.

The logs should show the document being picked up and successfully ingested:
```
services.document_indexer - INFO - Task ...: PENDING (0s)
services.document_indexer - INFO - Task ...: PENDING (5s)
handlers.base - INFO - [DocumentHandler] âœ“ Seahawks-Patriots_SuperBowl_LX_Analysis.pdf â†’ SUCCESS
consumer - INFO - âœ“ SUMMARY: Seahawks-Patriots_SuperBowl_LX_Analysis.pdf | Collection: aidp_bucket | Duration: 12.76s | Status: SUCCESS
```



In [None]:
# Verify file landed in object storage
verify_file_in_storage("Seahawks-Patriots_SuperBowl_LX_Analysis.pdf")

### 1.3 Verify Document Ingestion

Check consumer logs to verify document processing status.

The logs should show the document being picked up and successfully ingested:
```
services.document_indexer - INFO - Task ...: PENDING (0s)
services.document_indexer - INFO - Task ...: PENDING (5s)
handlers.base - INFO - [DocumentHandler] âœ“ Seahawks-Patriots_SuperBowl_LX_Analysis.pdf â†’ SUCCESS
consumer - INFO - âœ“ SUMMARY: Seahawks-Patriots_SuperBowl_LX_Analysis.pdf | Collection: aidp_bucket | Duration: 12.76s | Status: SUCCESS
```

In [None]:
# Check consumer logs for ingestion status
print("Waiting for document processing...")
get_consumer_logs(50)

### 1.3 Query Document via RAG

You can query the ingested document either **programmatically** below or via the **RAG Frontend UI**.

> **ðŸ’¡ RAG Frontend**: Open `http://<host-ip>:8090` in your browser for an interactive Q&A interface.
> Make sure to select the collection **`aidp_bucket`** in the UI.


In [None]:
# Query the document
await query_rag("What was the final score and who won Super Bowl LX?", MINIO_COLLECTION)

Ask another question about the document.


In [None]:
# Query about key takeaways
await query_rag("What were the key lessons learned from Seattle's victory in Super Bowl LX?", MINIO_COLLECTION)

## 2. Video Upload

Upload a video to MinIO, which triggers automatic ingestion via Kafka consumer â†’ VSS for video analysis â†’ RAG for indexing.



### 2.1 Upload to Storage

Upload the video to MinIO object storage.


In [None]:
# Sample videos are included in the repo under examples/rag_event_ingest/data/
video_path = os.path.join(DATA_DIR, "videos", "Seattle Seahawks vs New England Patriots - Super Bowl LX Game Highlights.mp4")
upload_file(video_path)

print("\nVideo processing takes longer than documents. Check consumer logs for progress.")


### 2.2 Verify Video Ingestion

Check consumer logs to verify video processing status.

The logs should show the video being picked up and processed by VSS:
```
handlers.video - INFO - [VideoHandler] Processing video: Seattle Seahawks vs New England Patriots - Super Bowl LX Game Highlights.mp4
services.video_analyzer - INFO - Submitting video to VSS...
services.video_analyzer - INFO - VSS processing complete
handlers.base - INFO - [VideoHandler] âœ“ Seattle Seahawks...mp4 â†’ SUCCESS
consumer - INFO - âœ“ SUMMARY: Seattle Seahawks...mp4 | Collection: aidp_bucket | Duration: ~120s | Status: SUCCESS
```



In [None]:
# Verify video landed in object storage
video_filename = os.path.basename(video_path)
verify_file_in_storage(video_filename)

### 2.3 Verify Video Ingestion

Check consumer logs to verify video processing status.

The logs should show the video being picked up and processed by VSS:
```
handlers.video - INFO - [VideoHandler] Processing video: Seattle Seahawks vs New England Patriots - Super Bowl LX Game Highlights.mp4
services.video_analyzer - INFO - Submitting video to VSS...
services.video_analyzer - INFO - VSS processing complete
handlers.base - INFO - [VideoHandler] âœ“ Seattle Seahawks...mp4 â†’ SUCCESS
consumer - INFO - âœ“ SUMMARY: Seattle Seahawks...mp4 | Collection: aidp_bucket | Duration: ~120s | Status: SUCCESS
```

In [None]:
# Check consumer logs for ingestion status
print("Waiting for video processing...")
get_consumer_logs(50)

### 2.3 Query Video via RAG

Query the video content.


In [None]:
# Query about the video content
await query_rag("Summarize the video content", MINIO_COLLECTION)

Query about a specific time range in the video.


In [None]:
# Query about specific time range
await query_rag("What happened between 15:00 and 20:00?", MINIO_COLLECTION)


Additional query: analyze key defensive plays and turnovers.


In [None]:
# Defensive Analysis
await query_rag("Describe the key defensive plays and turnovers that impacted the game outcome.", MINIO_COLLECTION)

Additional query: identify critical momentum-changing plays in the second half.


In [None]:
# Momentum Shifts
await query_rag("What were the critical momentum-changing plays in the second half of the game?", MINIO_COLLECTION)

# Clean Up

Stop all services and clean up ingested data.


## 1. Stop RAG Deployment

Stop all RAG services (NIMs, Milvus, Ingestor, RAG server).


In [None]:
os.chdir(RAG_REPO_DIR)
for f in [
    "deploy/compose/docker-compose-rag-server.yaml",
    "deploy/compose/docker-compose-ingestor-server.yaml",
    "deploy/compose/vectordb.yaml",
    "deploy/compose/nims.yaml",
]:
    run_command(f"docker compose -f {f} down")
print("[OK] RAG stopped")


## 2. Stop VSS Deployment

Stop all VSS services (NIMs, VLM, via-server).


In [None]:
vss_deploy_dir = f"{VSS_DIR}/deploy/docker/local_deployment_single_gpu"
if os.path.exists(vss_deploy_dir):
    subprocess.run(f"cd {vss_deploy_dir} && set -a && source .env 2>/dev/null && set +a && docker compose down",
                   shell=True, executable="/bin/bash", capture_output=True)
for name in ["vss-llm", "vss-embedding", "vss-reranker"]:
    subprocess.run(f"docker rm -f {name} 2>/dev/null", shell=True, capture_output=True)
print("[OK] VSS stopped")


## 3. Stop Continuous ingestion Deployment

Stop Continuous ingestion services (Kafka, MinIO, Consumer).


In [None]:
run_command(f"docker compose -f {AIDP_COMPOSE_FILE} down")
print("[OK] Continuous ingestion stopped")
