Legal Documents AI Search

In [1]:
!pip install google-genai



Generate 10 dummy JSON files for legal documents, each containing fields like 'title', 'case_id', 'date', 'parties', 'summary', and 'document_type', and save them in the "my_docs" folder.

In [None]:
import os
import json
import random
from datetime import datetime, timedelta

# Ensure the 'my_docs' directory exists
if not os.path.exists('my_docs'):
    os.makedirs('my_docs')

def generate_dummy_legal_document(doc_id):
    parties = [
        f"Plaintiff_{random.randint(1, 100)}",
        f"Defendant_{random.randint(1, 100)}"
    ]
    document_types = ["Complaint", "Motion", "Order", "Brief", "Judgment"]

    # Generate a random date within the last year
    random_days = random.randint(1, 365)
    date = (datetime.now() - timedelta(days=random_days)).strftime("%Y-%m-%d")

    summary_templates = [
        "This document pertains to a dispute over {item} between {party1} and {party2}. The court is reviewing the evidence.",
        "A summary judgment was requested in the case of {party1} v. {party2} concerning {issue}. The outcome is pending.",
        "An appeal was filed by {party1} against {party2} regarding the previous ruling on {topic}.",
        "This {document_type} outlines the terms of a settlement reached between {party1} and {party2} concerning {matter}.",
        "The court issued an {document_type} in favor of {party1} regarding {subject} following extensive hearings."
    ]
    random_summary_template = random.choice(summary_templates)
    summary = random_summary_template.format(
        item=random.choice(["contract terms", "property rights", "copyright infringement", "patent dispute", "personal injury"]),
        party1=parties[0],
        party2=parties[1],
        issue=random.choice(["breach of contract", "negligence claim", "unfair competition", "custody battle", "fraudulent misrepresentation"]),
        topic=random.choice(["asset division", "corporate governance", "environmental regulations"]),
        document_type=random.choice(document_types),
        matter=random.choice(["a real estate transaction", "employment dispute", "a product liability claim"])
    )

    return {
        "title": f"{random.choice(document_types)} - Case {doc_id}",
        "case_id": f"CASE-{random.randint(10000, 99999)}-{doc_id}",
        "date": date,
        "parties": parties,
        "summary": summary,
        "document_type": random.choice(document_types)
    }

# Generate 10 dummy JSON files
for i in range(1, 11):
    doc_data = generate_dummy_legal_document(i)
    file_name = f"my_docs/legal_document_{i}.json"
    with open(file_name, 'w') as f:
        json.dump(doc_data, f, indent=4)
    print(f"Generated: {file_name}")

print(f"\nSuccessfully generated 10 dummy legal document JSON files in the '{os.path.abspath('my_docs')}' folder.")

In [None]:
import os
import time
import glob
from google import genai
from google.genai import types

# --- CONFIGURATION ---
os.environ["GEMINI_API_KEY"] = "xxxx"
API_KEY = os.environ.get("GEMINI_API_KEY")
FOLDER_PATH = "my_docs"
STORE_NAME = "rooms_reference_store"
MODEL_ID = "gemini-2.5-flash"

client = genai.Client(api_key=API_KEY)

def _get_metadata_logic(filename):
    """
    Returns a simple dictionary.
    """
    metadata = {"status": "active", "uploaded_via": "script"}

    if "invoice" in filename.lower():
        metadata["category"] = "finance"
    elif "manual" in filename.lower():
        metadata["category"] = "technical"
    else:
        metadata["category"] = "general"

    return metadata

def upload_folder_and_get_ids(store_name, folder_path):
    files = glob.glob(os.path.join(folder_path, "*.*"))
    valid_files = [f for f in files if f.endswith(('.txt', '.pdf', '.csv', '.md', '.json'))]

    database_records = {}

    if not valid_files:
        print("No files found.")
        return {}

    print(f"Uploading {len(valid_files)} files with metadata...")

    for file_path in valid_files:
        filename = os.path.basename(file_path)

        # 1. Get the simple dict
        raw_meta = _get_metadata_logic(filename)

        # 2. CONVERT DICT TO LIST OF CustomMetadata OBJECTS
        # FIX: Use 'string_value' instead of 'value'
        formatted_metadata = [
            types.CustomMetadata(key=k, string_value=str(v))
            for k, v in raw_meta.items()
        ]

        print(f" > Processing: {filename} | Meta: {raw_meta}...", end="")

        try:
            operation = client.file_search_stores.upload_to_file_search_store(
                file=file_path,
                file_search_store_name=store_name,
                config={
                    'display_name': filename,
                    'custom_metadata': formatted_metadata
                }
            )

            while not operation.done:
                time.sleep(1)
                operation = client.operations.get(operation)

            if hasattr(operation, 'result') and operation.result:
                doc_id = operation.result.name
                database_records[filename] = doc_id
                print(f" [Indexed] -> ID: {doc_id}")
            else:
                print(" [Error: No ID returned]")

        except Exception as e:
            print(f" [Upload Failed]: {e}")

    return database_records

def delete_store_completely(store_name):
    """
    Deletes the store and all its contents (documents/chunks) in one go.
    Using force=True prevents 'FAILED_PRECONDITION' errors for non-empty stores.
    """
    print(f"\n--- Cleanup: Deleting Store {store_name} ---")

    try:
        # The 'force' parameter tells Gemini to cascade delete all files/chunks
        client.file_search_stores.delete(
            name=store_name,
            config={'force': True}
        )
        print("   Store and all contained files deleted successfully.")

    except Exception as e:
        print(f"   Error during deletion: {e}")


def print_citations(response):
    """
    Parses the response to extract and print source filenames.
    """
    if not response.candidates or not response.candidates[0].grounding_metadata:
        return

    metadata = response.candidates[0].grounding_metadata

    # 1. Collect all unique sources used
    unique_sources = {}

    if metadata.grounding_chunks:
        for i, chunk in enumerate(metadata.grounding_chunks):
            # For File Search, data is in 'retrieved_context'
            if chunk.retrieved_context:
                title = chunk.retrieved_context.title or "Unknown File"
                uri = chunk.retrieved_context.uri
                unique_sources[i] = {'title': title, 'uri': uri}

    # 2. Map supports to the text (Optional: detailed inline citation)
    # This part shows which sentence came from which file
    if metadata.grounding_supports:
        print("\n" + "="*20 + " CITATIONS " + "="*20)
        for support in metadata.grounding_supports:
            # The text segment that needs a citation
            segment_text = support.segment.text if support.segment else "Answer"

            # The indices of the chunks that support this segment
            indices = support.grounding_chunk_indices
            # print(indices)

            if indices:
                files = [unique_sources.get(idx, {}).get('title') for idx in indices]
                # Filter out None values just in case
                files = list(set(filter(None, files)))

                if files:
                    print(f"üìù CLAIM: \"...{segment_text.strip()[:50]}...\"")
                    print(f"   ‚Ü≥ SOURCE: {', '.join(files)}")
                    print("-" * 40)

In [None]:
# --- MAIN EXECUTION ---
if __name__ == "__main__":
    # Ensure dummy folder exists for testing
    if not os.path.exists(FOLDER_PATH):
        os.makedirs(FOLDER_PATH)
        print(f"Created '{FOLDER_PATH}'. Put files there and run again.")
        exit()

    store_id = None
    try:
        # 1. Create Store
        store = client.file_search_stores.create(config={'display_name': STORE_NAME})
        store_id = store.name
        print(f"Store Created: {store_id}")

        # 2. Upload and Capture IDs for Database
        # This returns the dictionary you requested
        db_references = upload_folder_and_get_ids(store_id, FOLDER_PATH)

        print("\n--- IDs for your Database ---")
        for fname, fid in db_references.items():
            print(f"File: {fname} | Ref_ID: {fid}")

    finally:
        pass

In [None]:
# --- 5. Generate Content (RAG) ---
question = "<search keywork or user query>"
print(f"\nAsking: '{question}'...")

response = client.models.generate_content(
            model=MODEL_ID,
            contents=question,
            config=types.GenerateContentConfig(
                tools=[
                    types.Tool(
                        file_search=types.FileSearch(
                            file_search_store_names=[store.name],
                            metadata_filter="status=active",
                        )
                    )
                ]
              )
          )

# 4. Print Answer
print("GEMINI ANSWER:")
print(response.text)

# 5. Print Citations
print_citations(response)

In [None]:
# 4. Clean Delete (Fixes the 400 Error)
if store_id:
    delete_store_completely(store_id)