# File Search Store Management for Rickbot

A notebook to experiment with the FileSearchStore and how it can be used to manage file search in the Rickbot Agent.

The best way to run this notebook is from Google Colab.

<a target="_blank" href="https://colab.research.google.com/github/derailed-dash/rickbot-adk/blob/main/notebooks/file_search_store.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" height=30/></a>

## Pre-Reqs and Notes

- The `file_search_stores` is a feature exclusive to the Gemini Developer API. 
  - It does not work with the Vertex AI API or the Gen AI SDK in Vertex AI mode.
  - Therefore: don't set env vars for `GOOGLE_CLOUD_LOCATION` or `GOOGLE_GENAI_USE_VERTEXAI` and do not initialise Vertex AI.
- Make sure you have an up-to-date version of the `google-genai` package installed. 
  - Versions older than 1.49.0 do not support the File Search Tool.
  - You can upgrade all packages using `uv sync --upgrade`.
  - Or just `google-genai` using `uv sync --upgrade-package google-genai`
  - Or, if using `pip`: `pip install --upgrade google-genai`.
  - You can add to your `pyproject.toml` file; since we don't explicitly need it outside 
- Add your Gemini API Key to Colab as a secret. Then you can retrieve it using `userdata.get("GEMINI_API_KEY")`

**Remember that your File Search Store is linked to your project, and this is linked to your GEMINI_API_KEY.**

## Setup

In [None]:
import asyncio
import glob
import os

from dotenv import load_dotenv
from google import genai
from google.genai import types
from pydantic import BaseModel


### Local Only

If running locally, setup the Google Cloud environment:

```bash
source scripts/setup-env.sh
```

Then to install the package dependencies into the virtual environment, use the `uv` tool:

1. From your agent's root directory, run `make install` to set up the virtual environment (`.venv`).
2. In this Jupyter notebook, select the kernel from the `.venv` folder to ensure all dependencies are available.

In [None]:
# Load env vars
if load_dotenv(override=True):
    print("Successfully loaded environment variables.")
else:
    print("Failed to load environment variables.")

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    raise ValueError("GEMINI_API_KEY environment variable not set.")
else:
    print(f"Successfully loaded Gemini API key, ending {GEMINI_API_KEY[-3:]}")

MODEL = os.getenv("MODEL")
if not MODEL:
    print("Warning: MODEL environment variable not set.")
else:
    print(f"Successfully loaded model: {MODEL}")

### Or In Colab

In [None]:
%pip install -q -U "google-genai>=1.49.0"

In [None]:
from google.colab import userdata

os.environ["GEMINI_API_KEY"] = userdata.get("GEMINI_API_KEY")
os.environ["MODEL"] = userdata.get("MODEL")

### Client Initialisation

In [None]:
client = genai.Client()
STORE_NAME = "rickbot-dazbo-ref" # as per personalities.yaml

## Store Management

### View All Stores

In [None]:
try:
    for a_store in client.file_search_stores.list():
        print(a_store)
except Exception as e:
    print(f"Error listing stores (check creds?): {e}")

### Retrieve the Store

Here's a utility function to retrieve the store(s) that match a given display name. Note that display name is not unique, so this function returns the first matching store.

In [None]:
def get_store(store_name: str):
    """Retrieve a store by its display name"""
    try:
        for a_store in client.file_search_stores.list():
            if a_store.display_name == store_name:
                return a_store
    except Exception as e:
        print(f"Error in get_store path: {e}")
    return None

### Create the Store (One Time)

Once you've created the store, save the store ID for use in your application.

In [None]:
if not get_store(STORE_NAME):
    file_search_store = client.file_search_stores.create(config={"display_name": STORE_NAME})
    print(f"Created store: {file_search_store.name}")
else:
    print(f"Store {STORE_NAME} already exists.")

### View the Store

We can interrogate a store and see what files have been uploaded to it.

In [None]:
file_search_store = get_store(STORE_NAME)
if not file_search_store:
    print(f"Store {STORE_NAME} not found.")
else:
    print(file_search_store)

    # List all documents in the store
    # The 'parent' argument is the resource name of the store
    docs = client.file_search_stores.documents.list(parent=file_search_store.name)
    try:
        doc_list = list(docs)
        print(f"Docs in {STORE_NAME}: {len(doc_list)}")

        if not doc_list:
            print("No documents found in the store.")
        else:
            for i, doc in enumerate(doc_list):
                section_heading = f"Document {i}:"
                print("-" * len(section_heading))
                print(section_heading)
                print("-" * len(section_heading))
                print(f"  Display name:{doc.display_name}")
                print(f"  ID: {doc.name}")
                print(f"  Metadata: {doc.custom_metadata}")
    except Exception as e:
        print(f"Error listing docs (might be empty): {e}")

### Delete Store(s)


In [None]:
# First, point to the right store. For example:
store_to_delete = get_store(STORE_NAME)

# Delete the store
if store_to_delete:
    print(f"Deleting: {store_to_delete.name}")
    # Uncomment to delete
    # client.file_search_stores.delete(name=store_to_delete.name, config={'force': True})
else:
    print("Store not found.")

## Upload and Process Files

Now we need to place the files in a suitable local folder to upload to the store.

In [None]:
# UPLOAD_PATH = "/content/upload-files/"
UPLOAD_PATH = "../scratch/"

Create some utility classes and functions:

In [None]:
class DocumentMetadata(BaseModel):
    """Metadata for a document"""    
    title: str
    author: str
    abstract: str


#### Async Implementation

Processing multiple input documents sequentially is slow. So let's add a multithreaded approach...

In [None]:
async def async_delete_doc(doc):
    """Async delete document from its file search store."""
    print(f"‚ôªÔ∏è DELETING DUPLICATE: '{doc.display_name}' (ID: {doc.name})")
    await client.aio.file_search_stores.documents.delete(name=doc.name, config={"force": True})
    await asyncio.sleep(1)

async def async_generate_metadata(file_name: str, temp_file) -> DocumentMetadata:
    """Async generate metadata for a document."""
    print(f"EXTRACTING METADATA FROM {file_name}...")
    response = await client.aio.models.generate_content(
        model=MODEL,
        contents=[
            """Please extract title, author, and short abstract from this document. 
            Each value should be under 200 characters.

            Abstracts should be succinct and NOT include preamble text like `This document describes...`

            Example bad abstract: 
            Now I want to cover a key consideration that can potentially 
            save you more in future IT spend than any other decision you can make: 
            embracing open source as a core element of your cloud strategy.

            Example good abstract:
            How you can significantly reduce IT spend by embracing open source
            as a core component of your cloud strategy.

            Example bad abstract:
            This article discusses how you can design your cloud landing zone.

            Example good abstract:
            How to design your cloud landing zone according to best practices.
            """,
            temp_file,
        ],
        config={"response_mime_type": "application/json", "response_schema": DocumentMetadata},
    )
    metadata: DocumentMetadata = response.parsed
    print(f"Title: {metadata.title}")
    print(f"Author: {metadata.author}")
    print(f"Abstract: {metadata.abstract}")

    return metadata

async def async_upload_doc(file_path, file_search_store, semaphore, last_modified):
    """Simple async upload with last_modified metadata."""
    async with semaphore:
        file_name = os.path.basename(file_path)
        temp_file = await client.aio.files.upload(file=file_path)

        while temp_file.state.name == "PROCESSING":
            await asyncio.sleep(2)
            temp_file = await client.aio.files.get(name=temp_file.name)

        # This is the expensive part we want to skip if possible!
        metadata = await async_generate_metadata(file_name, temp_file)

        operation = await client.aio.file_search_stores.upload_to_file_search_store(
            file_search_store_name=file_search_store.name,
            file=file_path,
            config={
                "display_name": metadata.title,
                "custom_metadata": [
                    {"key": "title", "string_value": metadata.title},
                    {"key": "file_name", "string_value": file_name},
                    {"key": "author", "string_value": metadata.author},
                    {"key": "abstract", "string_value": metadata.abstract},
                    {"key": "last_modified", "string_value": str(last_modified)},
                ],
            },
        )
        while not operation.done:
            await asyncio.sleep(5)
            operation = await client.aio.operations.get(operation)
        print(f"‚úÖ {file_name} UPLOADED")

Now actually **upload and process our documents**.

We'll use an async method to process docs in batches.

In [None]:
async def robust_batch_upload(upload_path, store_name, batch_size=10):
    store = get_store(store_name)
    files = glob.glob(f"{upload_path}/*")
    if not files:
        print("No files found.")
        return

    # Phase 1: Scan Store and Evaluate
    print(f"üîç Scanning store {store.display_name} for existing files...")
    existing_docs = {}
    async for doc in await client.aio.file_search_stores.documents.list(parent=store.name):
        # Extract the original file name from metadata
        fname = next((m.string_value for m in doc.custom_metadata if m.key == 'file_name'), None)
        if fname:
            existing_docs.setdefault(fname, []).append(doc)

    delete_tasks = []
    files_to_upload = []

    for fp in files:
        fname = os.path.basename(fp)
        local_mtime = os.path.getmtime(fp)

        should_upload = True
        if fname in existing_docs:
            # Check if any existing version is up-to-date
            up_to_date = False
            for doc in existing_docs[fname]:
                stored_mtime_str = next((m.string_value for m in doc.custom_metadata if m.key == 'last_modified'), "0")
                try:
                    stored_mtime = float(stored_mtime_str)
                except ValueError:
                    stored_mtime = 0

                # If we have a version that is as new or newer, we don't need to do anything
                if stored_mtime >= local_mtime:
                    up_to_date = True
                    break

            if up_to_date:
                print(f"‚è≠Ô∏è  SKIPPING: '{fname}' is already up-to-date.")
                should_upload = False
            else:
                # Local version is newer (or no timestamp found), mark existing for deletion
                for doc in existing_docs[fname]:
                    delete_tasks.append(async_delete_doc(doc))

        if should_upload:
            files_to_upload.append((fp, local_mtime))

    # Phase 2: Cleanup Outdated Docs
    if delete_tasks:
        print(f"üóëÔ∏è  Deleting {len(delete_tasks)} outdated documents...")
        await asyncio.gather(*delete_tasks)

    # Phase 3: Parallel Upload New/Updated Files
    if files_to_upload:
        print(f"üöÄ Starting parallel upload of {len(files_to_upload)} files...")
        semaphore = asyncio.Semaphore(batch_size)
        upload_tasks = [async_upload_doc(fp, store, semaphore, mtime) for fp, mtime in files_to_upload]
        await asyncio.gather(*upload_tasks)
        print("üéâ Robust batch upload complete.")
    else:
        print("‚úÖ All files are already up-to-date in the filestore.")

await robust_batch_upload(UPLOAD_PATH, STORE_NAME)

## Verify with Query

Now that the data is uploaded, let's verify we can retrieve it using the File Search Tool.

In [None]:
# Retrieve the store again to be sure
store = get_store(STORE_NAME)
question = """Give me a brief 4 step plan to optimise migration to cloud, 
achieving the fastest ROI and lowest overall TCO"""

if store:
    print(f"Querying store: {store.name} ({store.display_name})")

    try:
        # Use the File Search Tool
        if hasattr(types, "FileSearch"):
            print("FileSearch tool config...")
            response = client.models.generate_content(
                model=MODEL,
                contents=question,
                config=types.GenerateContentConfig(
                    tools=[types.Tool(file_search=types.FileSearch(file_search_store_names=[store.name]))]
                ),
            )
            print("\nResponse:")
            print(response.text)
        else:
            print("types.FileSearch not found. Skipping in-notebook query verification.")

    except Exception as e:
        print(f"Query failed: {e}")

else:
    print("Store not found, cannot verify.")