<a href="https://colab.research.google.com/github/Yousif-A2/Multimodal_RAG_Systems_Image_Search_and_Q-A_with_Cohere_and_Gemini/blob/main/Visual_Memory_Assistant.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🧠 Visual Memory Assistant

Welcome to the **Visual Memory Assistant**, an AI-powered tool that helps you search your personal photo collection using natural language. Upload your photos, and ask questions like:

- *"Show me the photo from that cafe in Istanbul."*
- *"Which one has my red jacket?"*
- *"Find the picture from our last winter trip."*

### 💡 Powered By
- **Cohere Embed v4.0** for visual memory embeddings
- **Gemini** for natural language Q&A over images

Let's get started!

In [None]:
!pip install cohere

Collecting cohere
  Downloading cohere-5.16.1-py3-none-any.whl.metadata (3.4 kB)
Collecting fastavro<2.0.0,>=1.9.4 (from cohere)
  Downloading fastavro-1.11.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.7 kB)
Collecting httpx-sse==0.4.0 (from cohere)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting types-requests<3.0.0,>=2.0.0 (from cohere)
  Downloading types_requests-2.32.4.20250611-py3-none-any.whl.metadata (2.1 kB)
Downloading cohere-5.16.1-py3-none-any.whl (291 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m291.9/291.9 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading httpx_sse-0.4.0-py3-none-any.whl (7.8 kB)
Downloading fastavro-1.11.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m35.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading types_requests-2.32.4.20250611-py3-none-any.whl (20 kB)
Inst

In [None]:
from google.colab import userdata


In [None]:
import cohere
cohere_api_key = userdata.get('coher')
co = cohere.ClientV2(api_key=cohere_api_key)

import google.generativeai as genai
gemini_api_key = userdata.get('gemini')
genai.configure(api_key=gemini_api_key)
client = genai.GenerativeModel('gemini-2.0-flash')

In [None]:
import PIL
import io
import base64

max_pixels = 1568*1568  # Max resolution for images

# Resize too large images
def resize_image(pil_image):
    org_width, org_height = pil_image.size
    if org_width * org_height > max_pixels:
        scale_factor = (max_pixels / (org_width * org_height)) ** 0.5
        new_width = int(org_width * scale_factor)
        new_height = int(org_height * scale_factor)
        pil_image.thumbnail((new_width, new_height))

# Convert images to a base64 string
def base64_from_image(img_path):
    pil_image = PIL.Image.open(img_path)
    img_format = pil_image.format if pil_image.format else "PNG"

    resize_image(pil_image)

    with io.BytesIO() as img_buffer:
        pil_image.save(img_buffer, format=img_format)
        img_buffer.seek(0)
        img_data = f"data:image/{img_format.lower()};base64,"+base64.b64encode(img_buffer.read()).decode("utf-8")

    return img_data


In [None]:
import numpy as np
import os
import requests
import tqdm

# Define the image folder
img_folder = '/content/images'
os.makedirs(img_folder, exist_ok=True)

# Add a sample image dictionary
images = {
    "image1.png": "https://picsum.photos/seed/seed1/400/300",
    "image2.png": "https://picsum.photos/seed/seed2/400/300",
    "image3.png": "https://picsum.photos/seed/seed3/400/300",
    "image4.png": "https://picsum.photos/seed/seed4/400/300",
    "image5.png": "https://picsum.photos/seed/seed5/400/300",
    "image6.png": "https://picsum.photos/seed/seed6/400/300",
    "image7.png": "https://picsum.photos/seed/seed7/400/300",
    "image8.png": "https://picsum.photos/seed/seed8/400/300",
    "image9.png": "https://picsum.photos/seed/seed9/400/300",
    "image10.png": "https://picsum.photos/seed/seed10/400/300"
}


img_paths = []
doc_embeddings = []

for name, url in tqdm.tqdm(images.items()):
    img_path = os.path.join(img_folder, name)
    img_paths.append(img_path)

    # Download the image if needed
    if not os.path.exists(img_path):
        response = requests.get(url)
        response.raise_for_status()

        with open(img_path, "wb") as fOut:
            fOut.write(response.content)

    # Get the base64 representation of the image
    api_input_document = {
        "content": [
            {"type": "image", "image": base64_from_image(img_path)},
        ]
    }

    # Call the Embed v4.0 model
    api_response = co.embed(
        model="embed-v4.0",
        input_type="search_document",
        embedding_types=["float"],
        inputs=[api_input_document],
    )

    # Store embedding
    emb = np.asarray(api_response.embeddings.float[0])
    doc_embeddings.append(emb)

doc_embeddings = np.vstack(doc_embeddings)

100%|██████████| 10/10 [00:39<00:00,  3.95s/it]


In [None]:
def search(question, max_img_size=800):
    # Compute the embedding for the query
    api_response = co.embed(
        model="embed-v4.0",
        input_type="search_query",
        embedding_types=["float"],
        texts=[question],
    )

    query_emb = np.asarray(api_response.embeddings.float[0])

    # Compute cosine similarities
    cos_sim_scores = np.dot(query_emb, doc_embeddings.T)

    # Get the most relevant image
    top_idx = np.argmax(cos_sim_scores)
    hit_img_path = img_paths[top_idx]

    return hit_img_path


In [None]:
def answer(question, img_path):
    prompt = [f"""Answer the question based solely on the information from the image.
               Question: {question}""", PIL.Image.open(img_path)]

    response = client.generate_content(
        contents=prompt
    )

    return response.text

In [None]:
question = "Which one is seed7?"
top_image_path = search(question)
answer_text = answer(question, top_image_path)
print(answer_text)

Based on the information provided in the image, it is impossible to identify which one is seed7. The image only shows rows of crops.



# Receipt & Document Scanner

In [None]:
extract_document_info('/content/Receipt-template-example.jpg')

Error extracting document info: HTTPConnectionPool(host='localhost', port=42687): Read timed out. (read timeout=600.0)


{'document_type': 'unknown',
 'vendor_name': None,
 'date': None,
 'amount': None,
 'items': [],
 'bill_type': None,
 'account_number': None,
 'warranty_period': None,
 'product_name': None,
 'key_text': '',
 'expiry_date': None}

In [None]:
from typing import Dict, List, Tuple
import json
from datetime import datetime, timedelta

def extract_document_info(img_path: str) -> Dict:
    """Extract structured information from document/receipt using Gemini"""

    prompt = """Analyze this document/receipt and extract the following information in JSON format:
    {
        "document_type": "receipt/bill/warranty/invoice/contract/other",
        "vendor_name": "store/company name",
        "date": "YYYY-MM-DD format if found",
        "amount": "total amount if found",
        "items": ["list of items/services if receipt"],
        "bill_type": "electricity/water/gas/internet/phone/other if utility bill",
        "account_number": "account/reference number if found",
        "warranty_period": "warranty duration if warranty document",
        "product_name": "product name if warranty/purchase",
        "key_text": "important text snippets for search",
        "expiry_date": "YYYY-MM-DD if found (warranties, subscriptions, etc)"
    }

    If information is not found, use null. Be precise with dates and amounts."""

    try:
        pil_image = PIL.Image.open(img_path)
        response = client.generate_content([prompt, pil_image])

        # Clean up the response and parse JSON
        response_text = response.text.strip()
        if response_text.startswith('```json'):
            response_text = response_text[7:-3]
        elif response_text.startswith('```'):
            response_text = response_text[3:-3]

        return json.loads(response_text)

    except Exception as e:
        print(f"Error extracting document info: {e}")
        return {
            "document_type": "unknown",
            "vendor_name": None,
            "date": None,
            "amount": None,
            "items": [],
            "bill_type": None,
            "account_number": None,
            "warranty_period": None,
            "product_name": None,
            "key_text": "",
            "expiry_date": None
        }

def process_document(img_path: str):
    """Process a single document - extract info and create embedding"""

    # Extract structured information
    doc_info = extract_document_info(img_path)

    # Create search text from extracted info
    search_text_parts = []
    if doc_info.get('vendor_name'):
        search_text_parts.append(doc_info['vendor_name'])
    if doc_info.get('document_type'):
        search_text_parts.append(doc_info['document_type'])
    if doc_info.get('bill_type'):
        search_text_parts.append(doc_info['bill_type'])
    if doc_info.get('product_name'):
        search_text_parts.append(doc_info['product_name'])
    if doc_info.get('items'):
        search_text_parts.extend(doc_info['items'])
    if doc_info.get('key_text'):
        search_text_parts.append(doc_info['key_text'])

    search_text = " ".join(search_text_parts)

    # Create embedding using text + image
    api_input_document = {
        "content": [
            {"type": "text", "text": search_text},
            {"type": "image", "image": base64_from_image(img_path)},
        ]
    }

    # Get embedding
    api_response = co.embed(
        model="embed-v4.0",
        input_type="search_document",
        embedding_types=["float"],
        inputs=[api_input_document],
    )

    emb = np.asarray(api_response.embeddings.float[0])

    # This part needs to be adapted to your data storage strategy
    # For now, let's assume you have lists to store this data
    # self.doc_embeddings.append(emb)
    # self.doc_metadata.append(doc_info)
    # self.doc_paths.append(img_path)

    return doc_info

def base64_from_image(img_path: str) -> str:
    """Convert image to base64 string"""
    pil_image = PIL.Image.open(img_path)
    img_format = pil_image.format if pil_image.format else "PNG"

    # Resize if too large
    max_pixels = 1568 * 1568
    org_width, org_height = pil_image.size
    if org_width * org_height > max_pixels:
        scale_factor = (max_pixels / (org_width * org_height)) ** 0.5
        new_width = int(org_width * scale_factor)
        new_height = int(org_height * scale_factor)
        pil_image.thumbnail((new_width, new_height))

    with io.BytesIO() as img_buffer:
        pil_image.save(img_buffer, format=img_format)
        img_buffer.seek(0)
        img_data = f"data:image/{img_format.lower()};base64," + base64.b64encode(img_buffer.read()).decode("utf-8")

    return img_data

def search_documents(query: str, top_k: int = 5) -> List[Tuple[str, Dict, float]]:
    """Search documents by natural language query"""

    # This part needs to be adapted to your data storage strategy
    # if not self.doc_embeddings:
    #     return []

    # Create query embedding
    api_response = co.embed(
        model="embed-v4.0",
        input_type="search_query",
        embedding_types=["float"],
        texts=[query],
    )

    query_emb = np.asarray(api_response.embeddings.float[0])
    # doc_embeddings_matrix = np.vstack(self.doc_embeddings)

    # Calculate similarities
    # cos_sim_scores = np.dot(query_emb, doc_embeddings_matrix.T)

    # Get top results
    # top_indices = np.argsort(cos_sim_scores)[::-1][:top_k]

    results = []
    # for idx in top_indices:
    #     results.append((
    #         self.doc_paths[idx],
    #         self.doc_metadata[idx],
    #         float(cos_sim_scores[idx])
    #     ))

    return results

def search_by_date_range(start_date: str, end_date: str = None) -> List[Tuple[str, Dict]]:
    """Search documents by date range"""
    if end_date is None:
        end_date = start_date

    start_dt = datetime.strptime(start_date, "%Y-%m-%d")
    end_dt = datetime.strptime(end_date, "%Y-%m-%d")

    results = []
    # for i, metadata in enumerate(self.doc_metadata):
    #     if metadata.get('date'):
    #         try:
    #             doc_date = datetime.strptime(metadata['date'], "%Y-%m-%d")
    #             if start_dt <= doc_date <= end_dt:
    #                 results.append((self.doc_paths[i], metadata))
    #         except:
    #             continue

    return results

def search_by_vendor(vendor_name: str) -> List[Tuple[str, Dict]]:
    """Search documents by vendor/company name"""
    results = []
    vendor_lower = vendor_name.lower()

    # for i, metadata in enumerate(self.doc_metadata):
    #     if metadata.get('vendor_name'):
    #         if vendor_lower in metadata['vendor_name'].lower():
    #             results.append((self.doc_paths[i], metadata))

    return results

def get_expiring_warranties(days_ahead: int = 30) -> List[Tuple[str, Dict]]:
    """Find warranties expiring within specified days"""
    cutoff_date = datetime.now() + timedelta(days=days_ahead)
    results = []

    # for i, metadata in enumerate(self.doc_metadata):
    #     if metadata.get('expiry_date'):
    #         try:
    #             expiry_dt = datetime.strptime(metadata['expiry_date'], "%Y-%m-%d")
    #             if expiry_dt <= cutoff_date:
    #                 results.append((self.doc_paths[i], metadata))
    #         except:
    #             continue

    return results

def get_monthly_bills(year: int, month: int) -> Dict[str, List[Tuple[str, Dict]]]:
    """Get all bills for a specific month organized by type"""
    target_date = f"{year}-{month:02d}"
    bills_by_type = {}

    # for i, metadata in enumerate(self.doc_metadata):
    #     if (metadata.get('document_type') == 'bill' and
    #         metadata.get('date') and
    #         metadata['date'].startswith(target_date)):

    #         bill_type = metadata.get('bill_type', 'other')
    #         if bill_type not in bills_by_type:
    #             bills_by_type[bill_type] = []

    #         bills_by_type[bill_type].append((self.doc_paths[i], metadata))

    return bills_by_type

In [None]:
# Initialize (assuming you have your API keys set up)
# cohere_api_key = "your_key"
# gemini_api_key = "your_key"
# co = cohere.ClientV2(api_key=cohere_api_key)
# genai.configure(api_key=gemini_api_key)
# gemini_client = genai.GenerativeModel('gemini-2.0-flash')

# scanner = DocumentScanner(co, gemini_client)

print("🧾 Receipt & Document Scanner Demo")
print("=" * 50)

# Example usage scenarios:
scenarios = [
    "Find that warranty photo",
    "Show me my electricity bill from March",
    "All receipts from Target",
    "What warranties expire this month?",
    "Find my phone bill from last month",
    "Show me all grocery receipts",
    "Find receipts over $100",
    "What bills do I have for January 2024?"
]

for scenario in scenarios:
    print(f"\n📋 Scenario: {scenario}")
    print("💡 Implementation:")

    if "warranty" in scenario.lower() and "expire" in scenario.lower():
        print("   scanner.get_expiring_warranties(30)")
    elif "electricity bill" in scenario.lower() or "phone bill" in scenario.lower():
        print("   scanner.search_documents('electricity bill March')")
    elif "from" in scenario.lower() and any(store in scenario.lower() for store in ['target', 'walmart']):
        vendor = scenario.split('from ')[-1].strip()
        print(f"   scanner.search_by_vendor('{vendor}')")
    elif "bills" in scenario.lower() and "january" in scenario.lower():
        print("   scanner.get_monthly_bills(2024, 1)")
    else:
        print(f"   scanner.search_documents('{scenario}')")

🧾 Receipt & Document Scanner Demo

📋 Scenario: Find that warranty photo
💡 Implementation:
   scanner.search_documents('Find that warranty photo')

📋 Scenario: Show me my electricity bill from March
💡 Implementation:
   scanner.search_documents('electricity bill March')

📋 Scenario: All receipts from Target
💡 Implementation:
   scanner.search_by_vendor('Target')

📋 Scenario: What warranties expire this month?
💡 Implementation:
   scanner.search_documents('What warranties expire this month?')

📋 Scenario: Find my phone bill from last month
💡 Implementation:
   scanner.search_documents('electricity bill March')

📋 Scenario: Show me all grocery receipts
💡 Implementation:
   scanner.search_documents('Show me all grocery receipts')

📋 Scenario: Find receipts over $100
💡 Implementation:
   scanner.search_documents('Find receipts over $100')

📋 Scenario: What bills do I have for January 2024?
💡 Implementation:
   scanner.get_monthly_bills(2024, 1)


In [None]:
class DocumentScanner:
    def __init__(self, cohere_client, gemini_client):
        self.co = cohere_client
        self.client = gemini_client
        self.doc_embeddings = []
        self.doc_metadata = []
        self.doc_paths = []

    def process_document(self, img_path: str):
        """Process a single document - extract info and create embedding"""

        # Extract structured information
        doc_info = self.extract_document_info(img_path)

        # Create search text from extracted info
        search_text_parts = []
        if doc_info.get('vendor_name'):
            search_text_parts.append(doc_info['vendor_name'])
        if doc_info.get('document_type'):
            search_text_parts.append(doc_info['document_type'])
        if doc_info.get('bill_type'):
            search_text_parts.append(doc_info['bill_type'])
        if doc_info.get('product_name'):
            search_text_parts.append(doc_info['product_name'])
        if doc_info.get('items'):
            search_text_parts.extend(doc_info['items'])
        if doc_info.get('key_text'):
            search_text_parts.append(doc_info['key_text'])

        search_text = " ".join(filter(None, search_text_parts))

        # Create embedding using text + image
        api_input_document = {
            "content": [
                {"type": "text", "text": search_text},
                {"type": "image", "image": self.base64_from_image(img_path)},
            ]
        }

        # Get embedding
        api_response = self.co.embed(
            model="embed-v4.0",
            input_type="search_document",
            embedding_types=["float"],
            inputs=[api_input_document],
        )

        emb = np.asarray(api_response.embeddings.float[0])

        self.doc_embeddings.append(emb)
        self.doc_metadata.append(doc_info)
        self.doc_paths.append(img_path)

        return doc_info

    def extract_document_info(self, img_path: str) -> Dict:
        """Extract structured information from document/receipt using Gemini"""

        prompt = """Analyze this document/receipt and extract the following information in JSON format:
        {
            "document_type": "receipt/bill/warranty/invoice/contract/other",
            "vendor_name": "store/company name",
            "date": "YYYY-MM-DD format if found",
            "amount": "total amount if found",
            "items": ["list of items/services if receipt"],
            "bill_type": "electricity/water/gas/internet/phone/other if utility bill",
            "account_number": "account/reference number if found",
            "warranty_period": "warranty duration if warranty document",
            "product_name": "product name if warranty/purchase",
            "key_text": "important text snippets for search",
            "expiry_date": "YYYY-MM-DD if found (warranties, subscriptions, etc)"
        }

        If information is not found, use null. Be precise with dates and amounts."""

        try:
            pil_image = PIL.Image.open(img_path)
            response = self.client.generate_content([prompt, pil_image])

            # Clean up the response and parse JSON
            response_text = response.text.strip()
            if response_text.startswith('```json'):
                response_text = response_text[7:-3]
            elif response_text.startswith('```'):
                response_text = response_text[3:-3]

            return json.loads(response_text)

        except Exception as e:
            print(f"Error extracting document info: {e}")
            return {
                "document_type": "unknown",
                "vendor_name": None,
                "date": None,
                "amount": None,
                "items": [],
                "bill_type": None,
                "account_number": None,
                "warranty_period": None,
                "product_name": None,
                "key_text": "",
                "expiry_date": None
            }

    def base64_from_image(self, img_path: str) -> str:
        """Convert image to base64 string"""
        pil_image = PIL.Image.open(img_path)
        img_format = pil_image.format if pil_image.format else "PNG"

        # Resize if too large
        max_pixels = 1568 * 1568
        org_width, org_height = pil_image.size
        if org_width * org_height > max_pixels:
            scale_factor = (max_pixels / (org_width * org_height)) ** 0.5
            new_width = int(org_width * scale_factor)
            new_height = int(org_height * scale_factor)
            pil_image.thumbnail((new_width, new_height))

        with io.BytesIO() as img_buffer:
            pil_image.save(img_buffer, format=img_format)
            img_buffer.seek(0)
            img_data = f"data:image/{img_format.lower()};base64," + base64.b64encode(img_buffer.read()).decode("utf-8")

        return img_data

    def search_documents(self, query: str, top_k: int = 5) -> List[Tuple[str, Dict, float]]:
        """Search documents by natural language query"""

        if not self.doc_embeddings:
            return []

        # Create query embedding
        api_response = self.co.embed(
            model="embed-v4.0",
            input_type="search_query",
            embedding_types=["float"],
            texts=[query],
        )

        query_emb = np.asarray(api_response.embeddings.float[0])
        doc_embeddings_matrix = np.vstack(self.doc_embeddings)

        # Calculate similarities
        cos_sim_scores = np.dot(query_emb, doc_embeddings_matrix.T)

        # Get top results
        top_indices = np.argsort(cos_sim_scores)[::-1][:top_k]

        results = []
        for idx in top_indices:
            results.append((
                self.doc_paths[idx],
                self.doc_metadata[idx],
                float(cos_sim_scores[idx])
            ))

        return results

    def search_by_date_range(self, start_date: str, end_date: str = None) -> List[Tuple[str, Dict]]:
        """Search documents by date range"""
        if end_date is None:
            end_date = start_date

        start_dt = datetime.strptime(start_date, "%Y-%m-%d")
        end_dt = datetime.strptime(end_date, "%Y-%m-%d")

        results = []
        for i, metadata in enumerate(self.doc_metadata):
            if metadata.get('date'):
                try:
                    doc_date = datetime.strptime(metadata['date'], "%Y-%m-%d")
                    if start_dt <= doc_date <= end_dt:
                        results.append((self.doc_paths[i], metadata))
                except:
                    continue

        return results

    def search_by_vendor(self, vendor_name: str) -> List[Tuple[str, Dict]]:
        """Search documents by vendor/company name"""
        results = []
        vendor_lower = vendor_name.lower()

        for i, metadata in enumerate(self.doc_metadata):
            if metadata.get('vendor_name'):
                if vendor_lower in metadata['vendor_name'].lower():
                    results.append((self.doc_paths[i], metadata))

        return results

    def get_expiring_warranties(self, days_ahead: int = 30) -> List[Tuple[str, Dict]]:
        """Find warranties expiring within specified days"""
        cutoff_date = datetime.now() + timedelta(days=days_ahead)
        results = []

        for i, metadata in enumerate(self.doc_metadata):
            if metadata.get('expiry_date'):
                try:
                    expiry_dt = datetime.strptime(metadata['expiry_date'], "%Y-%m-%d")
                    if expiry_dt <= cutoff_date:
                        results.append((self.doc_paths[i], metadata))
                except:
                    continue

        return results

    def get_monthly_bills(self, year: int, month: int) -> Dict[str, List[Tuple[str, Dict]]]:
        """Get all bills for a specific month organized by type"""
        target_date = f"{year}-{month:02d}"
        bills_by_type = {}

        for i, metadata in enumerate(self.doc_metadata):
            if (metadata.get('document_type') == 'bill' and
                metadata.get('date') and
                metadata['date'].startswith(target_date)):

                bill_type = metadata.get('bill_type', 'other')
                if bill_type not in bills_by_type:
                    bills_by_type[bill_type] = []

                bills_by_type[bill_type].append((self.doc_paths[i], metadata))

        return bills_by_type

# Instantiate the scanner
scanner = DocumentScanner(co, client)

# Now you can use the scanner object to process your documents, for example:
# scanner.process_document('path/to/your/receipt.jpg')
# search_results = scanner.search_documents("groceries from last week")
# print(search_results)

In [None]:
def process_and_display_no_gradio(image_path):
    """Processes a document and prints the extracted information."""
    if image_path is None:
        print("Please provide an image path.")
        return
    doc_info = scanner.process_document(image_path)
    print(f"Processed document: {image_path}")
    print(json.dumps(doc_info, indent=2))

def search_and_display_no_gradio(query):
    """Searches for documents and prints the results."""
    search_results = scanner.search_documents(query)
    if not search_results:
        print("No results found.")
        return

    for path, metadata, score in search_results:
        print(f"Score: {score:.4f}")
        print(f"Metadata: {json.dumps(metadata, indent=2)}")
        print(f"Image Path: {path}")
        print("-" * 20)

# --- Example Usage ---

# 1. Process a document
#    (replace with the actual path to your image)
# process_and_display_no_gradio('/content/images/image1.png')

# 2. Search for documents
# search_and_display_no_gradio("your search query")

In [None]:
import gradio as gr

def process_and_display(image_path):
    if image_path is None:
        return "Please upload an image.", "{}"
    doc_info = scanner.process_document(image_path)
    return f"Processed document: {image_path}", json.dumps(doc_info, indent=2)

def search_and_display(query):
    search_results = scanner.search_documents(query)
    if not search_results:
        return "No results found.", []

    results_html = ""
    images = []
    for path, metadata, score in search_results:
        results_html += f"<b>Score:</b> {score:.4f}<br>"
        results_html += f"<b>Metadata:</b><pre>{json.dumps(metadata, indent=2)}</pre><hr>"
        images.append(path)

    return results_html, images


with gr.Blocks() as demo:
    gr.Markdown("# Receipt & Document Scanner")

    with gr.Tab("Upload & Process"):
        with gr.Row():
            image_input = gr.Image(type="filepath", label="Upload Document")
            json_output = gr.JSON(label="Extracted Information")
        process_button = gr.Button("Process Document")
        status_output = gr.Textbox(label="Status")
        process_button.click(process_and_display, inputs=image_input, outputs=[status_output, json_output])


    with gr.Tab("Search"):
        search_input = gr.Textbox(label="Search Query")
        search_button = gr.Button("Search")
        search_results_html = gr.HTML()
        search_results_images = gr.Gallery(label="Search Results")
        search_button.click(search_and_display, inputs=search_input, outputs=[search_results_html, search_results_images])

demo.launch(debug=True)

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://f9ab4846fee6335b3d.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7863 <> https://f9ab4846fee6335b3d.gradio.live




In [None]:
# Process the first sample image
process_and_display_no_gradio('/content/images/image1.png')

# Now, let's try searching for it
search_and_display_no_gradio("seed1")

Error extracting document info: HTTPConnectionPool(host='localhost', port=42687): Read timed out. (read timeout=600.0)
Error extracting document info: HTTPConnectionPool(host='localhost', port=42687): Read timed out. (read timeout=600.0)
Error extracting document info: HTTPConnectionPool(host='localhost', port=42687): Read timed out. (read timeout=600.0)
Processed document: /content/images/image1.png
{
  "document_type": "unknown",
  "vendor_name": null,
  "date": null,
  "amount": null,
  "items": [],
  "bill_type": null,
  "account_number": null,
  "warranty_period": null,
  "product_name": null,
  "key_text": "",
  "expiry_date": null
}
Score: 0.0776
Metadata: {
  "document_type": "unknown",
  "vendor_name": null,
  "date": null,
  "amount": null,
  "items": [],
  "bill_type": null,
  "account_number": null,
  "warranty_period": null,
  "product_name": null,
  "key_text": "",
  "expiry_date": null
}
Image Path: /tmp/gradio/10970daf0c7f5ff4f30a7a92275006db915f1be89c3bb1f29e3ce51c8061