In [None]:
# Install required packages
!pip install gradio boto3 faiss-cpu numpy

Collecting gradio
  Downloading gradio-5.23.3-py3-none-any.whl.metadata (16 kB)
Collecting boto3
  Downloading boto3-1.37.27-py3-none-any.whl.metadata (6.7 kB)
Collecting faiss-cpu
  Downloading faiss_cpu-1.10.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.4 kB)
Collecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl.metadata (9.7 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.12-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.5.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==1.8.0 (from gradio)
  Downloading gradio_client-1.8.0-py3-none-any.whl.metadata (7.1 kB)
Collecting groovy~=0.1 (from gradio)
  Downloading groovy-0.1.2-py3-none-any.whl.metadata (6.1 kB)
Collecting pydub (from gradio)
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting python-multipart>=0.0.18 (from gradio)
  Downloading python_multipart-0.0.20-py3-none-any.whl.m

In [None]:
# Convert search_video_motive.ipynb to Gradio interface

import gradio as gr
import boto3
import json
import numpy as np
import time
import faiss
import base64
import tempfile
import os
from datetime import datetime

# AWS credentials
aws_access_key_id = 
aws_secret_access_key = 
aws_region = 

# Global variables to store data
video_data = []
embeddings = None
faiss_index = None
embedding_service = None

# Part 2: Embedding Service for Vector Generation
class EmbeddingService:
    def __init__(self, aws_access_key_id, aws_secret_access_key, region_name='us-east-1'):
        """Initialize the embedding service with AWS credentials."""
        self.client = boto3.client(
            service_name='bedrock-runtime',
            region_name=region_name,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key
        )
        self.model_id = "amazon.titan-embed-text-v1"

    def get_embedding(self, text):
        """Generate embedding for the given text using Amazon Bedrock."""
        try:
            # Clean and prepare the text
            if not text or text.strip() == "":
                return np.zeros(1536)  # Return zero vector for empty text

            # Prepare the request body
            body = json.dumps({
                "inputText": text[:8000],  # Limit text length to model's maximum
            })

            # Call the Bedrock API
            response = self.client.invoke_model(
                modelId=self.model_id,
                body=body
            )

            # Parse the response
            response_body = json.loads(response['body'].read())
            embedding = response_body.get('embedding', [])

            return np.array(embedding, dtype=np.float32)

        except Exception as e:
            return f"Error generating embedding: {e}", np.zeros(1536)

def rerank_search_results(query, results, negation_penalty=0.5):
    """
    Rerank search results by penalizing negated concepts.
    """
    # Extract key terms from query
    query_terms = set(query.lower().split())

    # Define negation patterns
    negation_words = ["no", "not", "none", "isn't", "aren't", "doesn't", "don't",
                      "never", "without", "absent", "lack of"]

    # Define window size to check around key terms
    window_size = 5

    # Process each result
    for result in results:
        # Make sure description exists
        if 'description' not in result:
            result['original_similarity'] = result['similarity']
            continue

        # Extract text from description - handle as plain string
        description_text = result['description'].lower()
        description_words = description_text.split()

        # Initialize penalty score
        penalty = 0

        # Check for negated key terms
        for i, word in enumerate(description_words):
            if word in query_terms:
                # Check for negation words before the term
                window_start = max(0, i - window_size)
                context_before = description_words[window_start:i]

                for neg_word in negation_words:
                    if neg_word in context_before:
                        penalty += negation_penalty
                        break

        # Apply penalty to similarity score
        result['original_similarity'] = result['similarity']
        result['similarity'] = max(0, result['similarity'] * (1 - penalty))

    # Sort by adjusted similarity score
    return sorted(results, key=lambda x: x['similarity'], reverse=True)


def upload_json(file):
    global video_data

    if file is None:
        return "No file was uploaded."

    try:
        # Open and read the file using the path provided by Gradio
        with open(file.name, 'r') as f:
            content = f.read()
            video_data = json.loads(content)
            return f"Successfully loaded {len(video_data)} video descriptions."
    except Exception as e:
        return f"Error uploading JSON: {e}"

def generate_embeddings(status_text):
    global video_data, embeddings, faiss_index, embedding_service

    try:
        if not video_data:
            return "No video data available. Please upload JSON first."

        status_text += "Initializing embedding service...\n"
        embedding_service = EmbeddingService(aws_access_key_id, aws_secret_access_key, aws_region)

        status_text += f"Generating embeddings for {len(video_data)} videos...\n"

        # Convert video descriptions to plain text for embedding
        all_embeddings = []
        for i, video in enumerate(video_data):
            # Get the description text directly
            description_text = video.get('description', '')

            # Generate embedding
            embedding = embedding_service.get_embedding(description_text)
            all_embeddings.append(embedding)

            # Show progress
            if (i+1) % 10 == 0 or (i+1) == len(video_data):
                status_text += f"Processed {i+1}/{len(video_data)} videos\n"

        # Convert to numpy array
        embeddings = np.array(all_embeddings, dtype=np.float32)

        # Create FAISS index
        dimension = embeddings.shape[1]
        faiss_index = faiss.IndexFlatL2(dimension)
        faiss_index.add(embeddings)

        status_text += f"Successfully created FAISS index with {len(video_data)} vectors of dimension {dimension}."
        return status_text

    except Exception as e:
        return f"Error generating embeddings: {e}"

def search_videos(query, num_results=5):
    global embedding_service, faiss_index, video_data

    try:
        if not embedding_service or not faiss_index or not video_data:
            return "Please upload data and generate embeddings first."

        # Generate embedding for the query
        query_embedding = embedding_service.get_embedding(query)
        query_embedding = np.array([query_embedding], dtype=np.float32)

        # Search the FAISS index
        k = min(num_results * 3, len(video_data))
        distances, indices = faiss_index.search(query_embedding, k)

        # Prepare results for reranking
        results = []
        for i, idx in enumerate(indices[0]):
            if idx < len(video_data):
                similarity = 100 * (1 - distances[0][i] / 100)

                # Only include fields we know exist in the data
                result_item = {
                    'video_id': video_data[idx]['video_id'],
                    'similarity': similarity
                }

                # Add video_path if it exists
                if 'video_path' in video_data[idx]:
                    result_item['video_path'] = video_data[idx]['video_path']
                else:
                    # If no explicit path, construct a path from the ID
                    result_item['video_path'] = f"videos/{video_data[idx]['video_id']}"

                # Add description if it exists
                if 'description' in video_data[idx]:
                    result_item['description'] = video_data[idx]['description']

                results.append(result_item)

        # Apply reranking
        reranked_results = rerank_search_results(query, results)

        # Create a video grid layout with improved scrolling
        results_html = """
        <style>
        .results-container {
            width: 100%;
            padding: 10px 0;
        }
        .video-grid {
            display: grid;
            grid-template-columns: repeat(auto-fill, minmax(450px, 1fr));
            gap: 20px;
            width: 100%;
        }
        .video-card {
            background-color: #1a2233;
            border-radius: 8px;
            overflow: hidden;
            box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
            border-top: 4px solid #58a6ff;
        }
        .video-info {
            padding: 12px;
        }
        .video-player {
            width: 100%;
            height: 240px;
            background-color: #000;
        }
        .similarity-badge {
            background-color: #388bfd;
            color: #ffffff;
            padding: 4px 8px;
            border-radius: 12px;
            font-size: 0.85em;
            font-weight: bold;
        }
        .video-title {
            color: #58a6ff;
            margin: 0 0 8px 0;
            font-weight: 600;
            font-size: 16px;
            white-space: nowrap;
            overflow: hidden;
            text-overflow: ellipsis;
        }
        </style>
        <div class="results-container">
            <div class="video-grid">
        """

        # S3 bucket info
        s3_bucket = "motiverse-2025-data"

        # Create S3 client for accessing videos
        s3_client = boto3.client(
            's3',
            region_name=aws_region,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key
        )

        for i, result in enumerate(reranked_results[:num_results]):
            similarity_percent = round(result['similarity'], 2)
            original_similarity = round(result.get('original_similarity', similarity_percent), 2)

            # Add indication if result was reranked
            reranked_indicator = ""
            if 'original_similarity' in result and abs(result['original_similarity'] - result['similarity']) > 0.01:
                reranked_indicator = f"""<span style="color: #f85149; font-size: 0.8em; margin-left: 10px;">
                    (Reranked from {original_similarity}%)
                </span>"""

            # Generate a pre-signed URL for the video
            video_key = result.get('video_path', f"videos/{result['video_id']}")
            presigned_url = s3_client.generate_presigned_url(
                'get_object',
                Params={'Bucket': s3_bucket, 'Key': video_key},
                ExpiresIn=3600
            )

            results_html += f"""
            <div class="video-card">
                <div class="video-player">
                    <video width="100%" height="100%" controls preload="metadata">
                        <source src="{presigned_url}" type="video/mp4">
                        Your browser does not support the video tag.
                    </video>
                </div>
                <div class="video-info">
                    <h4 class="video-title">{i+1}. {result['video_id']}</h4>
                    <p><span class="similarity-badge">Similarity: {similarity_percent}%</span>{reranked_indicator}</p>
                </div>
            </div>
            """

        results_html += """
            </div>
        </div>
        """

        return results_html

    except Exception as e:
        return f"Error searching videos: {str(e)}"
# Gradio Interface
with gr.Blocks(theme=gr.themes.Monochrome(), css="""
    .gradio-container {background-color: #0d1117; color: #e6edf3;}
    .gradio-container h1, h2, h3, h4 {color: #58a6ff;}
    .button-primary {background-color: #238636 !important; border-color: #238636 !important;}
    .button-primary:hover {background-color: #2ea043 !important; border-color: #2ea043 !important;}
""") as app:
    gr.Markdown("# Video Description Search")
    gr.Markdown("Search for videos using natural language queries")

    with gr.Tabs():
        with gr.TabItem("Step 1: Upload Data"):
            file_input = gr.File(label="Upload JSON file with video descriptions")
            upload_status = gr.Textbox(label="Status", interactive=False)
            upload_btn = gr.Button("Upload JSON", variant="primary")

        with gr.TabItem("Step 2: Generate Embeddings"):
            embedding_status = gr.Textbox(label="Status", interactive=False)
            generate_btn = gr.Button("Generate Embeddings", variant="primary")

        with gr.TabItem("Step 3: Search Videos"):
            with gr.Row():
                search_input = gr.Textbox(label="Search Query", placeholder="Enter search query (e.g., car crash, mountain road)")
                results_dropdown = gr.Dropdown(choices=[5, 10, 20], value=5, label="Number of Results")
            search_btn = gr.Button("Search", variant="primary")
            results_output = gr.HTML(label="Search Results")

    # Event handlers
    upload_btn.click(fn=upload_json, inputs=file_input, outputs=upload_status)
    generate_btn.click(fn=generate_embeddings, inputs=embedding_status, outputs=embedding_status)
    search_btn.click(fn=search_videos, inputs=[search_input, results_dropdown], outputs=results_output)

    # Also trigger search on Enter key
    search_input.submit(fn=search_videos, inputs=[search_input, results_dropdown], outputs=results_output)

# Launch the app
app.launch()

Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://778cc9df3e92684f18.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


