**Install dependency libraries**

In [1]:
!pip install -q google-adk google-genai


[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


**Configure Gemini API Key**

In [1]:
import os
os.environ["GOOGLE_API_KEY"] = 'YOUR_API_KEY'

**Import required libraries**

In [3]:
import json
import logging
from typing import Dict, List
import asyncio

from google.adk.agents import LlmAgent, SequentialAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

**schema registry** *(tool backend)*

In [4]:
SCHEMA_REGISTRY: Dict[str, Dict[str, str]] = {
    "users": {
        "name": "TEXT",
        "age": "INT",
        "email": "TEXT",
    },
    "orders": {
        "order_id": "INT",
        "status": "TEXT",
        "total": "DECIMAL(10,2)",
        "user_id": "INT",
    }
}

In [5]:
def get_collection_schema(collection_name: str) -> Dict[str, Dict[str, str]]:
    """
    Tool: returns a fake SQL-style schema for a MongoDB collection.

    Args:
        collection_name: e.g. "users", "orders"

    Returns:
        dict with keys:
          - collection
          - suggested_table
          - columns (name -> sql_type)
    """
    name = collection_name.strip()
    schema = SCHEMA_REGISTRY.get(name, {})

    # Fallback if collection not in registry
    if not schema:
        schema = {
            "_id": "TEXT",
            "field1": "TEXT",
            "field2": "TEXT",
        }

    return {
        "collection": name,
        "suggested_table": name,
        "columns": schema,
    }

**Mongo Analysis Agent** - Analyze Mongo query and call schema tool

In [6]:
analysis_agent = LlmAgent(
    name="mongo_analysis_agent",
    model="gemini-2.5-flash-lite",
    description="Analyzes MongoDB queries and normalizes them to a structured JSON plan.",
    instruction="""
You are a MongoDB query analyzer.

- The user will give you a single MongoDB query (JavaScript style).
- FIRST, call the `get_collection_schema` tool to infer the SQL table and columns.
- THEN, produce a JSON object with:

  {
    "collection": "<collection name>",
    "suggested_table": "<sql table name>",
    "operation": "find" | "insert" | "update" | "delete" | "aggregate",
    "filter": { ... },          // Mongo-style filter
    "projection": { ... } | null,
    "sort": { ... } | null,
    "limit": <int> | null,
    "other_clauses": "<text for joins, group by, etc>"
  }

Rules:
- Output ONLY that JSON (no explanation text).
- If some part is unknown, set it to null or an empty object {}.
""",
    tools=[get_collection_schema],
    # Save the JSON string into state["analysis_json"]
    output_key="analysis_json",
)

**SQL Converter Agent** - Convert the structured analysis to final SQL

In [7]:
converter_agent = LlmAgent(
    name="mongo_to_sql_converter",
    model="gemini-2.5-flash-lite",
    description="Converts analyzed MongoDB queries to ANSI SQL.",
    instruction="""
You are a MongoDB-to-SQL converter.

You are given a JSON string in the session state under key {{analysis_json}}.
This JSON describes the Mongo query in a normalized way.

Steps:
1. Read the value of {{analysis_json}} from the session state.
2. Parse it as JSON.
3. Generate a single ANSI SQL query that best matches the intent.
   - Handle operations: find, insert, update, delete, aggregate (as SELECT ... GROUP BY).
   - Use suggested_table as the FROM table.
   - Map filter to WHERE.
   - Map projection to SELECT columns (or * if missing).
   - Map sort to ORDER BY.
   - Map limit to LIMIT.

4. Return ONLY the SQL query as plain text (no explanations, no JSON).
""",
    # Optional: store final SQL in state["sql_query"] as well
    output_key="sql_query",
)

**Sequential Agent** - for ordered exexution of Normalizer -> Converter

In [8]:
mongo_to_sql_pipeline = SequentialAgent(
    name="mongo_to_sql_pipeline",
    sub_agents=[analysis_agent, converter_agent],
)

**Configuring Constants**

In [9]:
APP_NAME = "mongo_to_sql_app"
USER_ID = "demo_user"
SESSION_ID = "demo_session"

**Initialize Memory Service**

In [10]:
#Create Session Service
session_service = InMemorySessionService()

In [11]:
session = await session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID,
)

#Create Runner
runner = Runner(
    agent=mongo_to_sql_pipeline,
    app_name=APP_NAME,
    session_service=session_service,
)

**Helper:** Run the agent for a Mongo query

In [12]:
def convert_mongo_to_sql(mongo_query: str) -> str:
    """
    Sends a MongoDB query string to the SequentialAgent pipeline and
    returns the final SQL query (from the last agent).
    """
    content = types.Content(
        role="user",
        parts=[types.Part(text=mongo_query)],
    )

    final_text = None

    for event in runner.run(
        user_id=USER_ID,
        session_id=SESSION_ID,
        new_message=content,
    ):
        # For debugging: you can uncomment this to see all events
        #print("EVENT:", event)

        if event.is_final_response():
            # The final response comes from the last sub-agent in the sequence
            if event.content and event.content.parts:
                final_text = event.content.parts[0].text

    return final_text or ""

In [13]:
def normalize_sql(sql: str) -> str:
    """Naive normalization: lowercase and compress whitespace."""
    return " ".join(sql.lower().split())

**Test Cases** for evaluation

In [14]:
TEST_CASES: List[Dict[str, str]] = [
    {
        "id": "case_1_simple_find",
        "mongo": "db.users.find({ age: { $gt: 30 } })",
        "expected_sql": "SELECT * FROM users WHERE age > 30;",
    },
    {
        "id": "case_2_projection_limit",
        "mongo": (
            'db.orders.find({ status: "SHIPPED" }, '
            '{ _id: 0, order_id: 1, total: 1 }).limit(10)'
        ),
        "expected_sql": (
            "SELECT order_id, total FROM orders "
            "WHERE status = 'SHIPPED' LIMIT 10;"
        ),
    },
    {
        "id": "case_3_range_and_sort",
        "mongo": (
            "db.users.find("
            "  { $and: ["
            "      { age: { $gte: 18 } },"
            "      { age: { $lte: 60 } },"
            "      { status: 'ACTIVE' }"
            "    ]"
            "  },"
            "  { _id: 0, name: 1, email: 1 }"
            ").sort({ age: 1, name: 1 })"
        ),
        "expected_sql": (
            "SELECT name, email "
            "FROM users "
            "WHERE age >= 18 AND age <= 60 AND status = 'ACTIVE' "
            "ORDER BY age ASC, name ASC;"
        ),
    },
    {
        "id": "case_4_or_in_limit",
        "mongo": (
            "db.orders.find("
            "  { $or: ["
            "      { status: { $in: ['SHIPPED', 'DELIVERED'] } },"
            "      { total: { $gt: 1000 } }"
            "    ]"
            "  }"
            ").limit(20)"
        ),
        "expected_sql": (
            "SELECT * "
            "FROM orders "
            "WHERE (status IN ('SHIPPED', 'DELIVERED') OR total > 1000) "
            "LIMIT 20;"
        ),
    },
    {
        "id": "case_5_projection_ne_in_sort",
        "mongo": (
            "db.users.find("
            "  {"
            "    country: { $in: ['US', 'UK', 'IN'] },"
            "    status: { $ne: 'BLOCKED' }"
            "  },"
            "  { _id: 0, name: 1, email: 1, country: 1 }"
            ").sort({ country: 1, name: -1 })"
        ),
        "expected_sql": (
            "SELECT name, email, country "
            "FROM users "
            "WHERE country IN ('US', 'UK', 'IN') "
            "AND status <> 'BLOCKED' "
            "ORDER BY country ASC, name DESC;"
        ),
    },
    {
        "id": "case_6_aggregate_group_by_having",
        "mongo": (
            "db.orders.aggregate(["
            "  { $match: { status: { $in: ['SHIPPED', 'DELIVERED'] } } },"
            "  { $group: { "
            "      _id: '$user_id', "
            "      total_spent: { $sum: '$total' }, "
            "      order_count: { $sum: 1 }"
            "  }},"
            "  { $match: { total_spent: { $gt: 500 } } },"
            "  { $sort: { total_spent: -1 } },"
            "  { $limit: 10 }"
            "])"
        ),
        "expected_sql": (
            "SELECT user_id AS _id, "
            "SUM(total) AS total_spent, "
            "COUNT(*) AS order_count "
            "FROM orders "
            "WHERE status IN ('SHIPPED', 'DELIVERED') "
            "GROUP BY user_id "
            "HAVING SUM(total) > 500 "
            "ORDER BY total_spent DESC "
            "LIMIT 10;"
        ),
    },
    {
        "id": "case_7_aggregate_status_counts",
        "mongo": (
            "db.orders.aggregate(["
            "  { $group: { "
            "      _id: '$status', "
            "      count: { $sum: 1 }"
            "  }},"
            "  { $sort: { count: -1 } }"
            "])"
        ),
        "expected_sql": (
            "SELECT status AS _id, COUNT(*) AS count "
            "FROM orders "
            "GROUP BY status "
            "ORDER BY count DESC;"
        ),
    },
]

In [15]:
def evaluate_agent(test_cases: List[Dict[str, str]]) -> None:
    """
    Simple evaluation:
    - runs the agent for each Mongo query
    - prints expected vs predicted
    - uses a lenient string match to decide 'match'
    """
    total = len(test_cases)
    correct = 0
    results = []

    for case in test_cases:
        mongo = case["mongo"]
        expected = case["expected_sql"]

        predicted = convert_mongo_to_sql(mongo)

        norm_expected = normalize_sql(expected).rstrip(";")
        norm_pred = normalize_sql(predicted).rstrip(";")

        # --- MATCHING ---
        # 1) exact
        # 2) one is substring of the other
        # 3) they only differ by trailing semicolon
        ok = (
            norm_pred == norm_expected
            or norm_expected in norm_pred
            or norm_pred in norm_expected
        )

        if ok:
            correct += 1

        results.append(
            {
                "id": case["id"],
                "mongo": mongo,
                "expected": expected,
                "predicted": predicted,
                "match": ok,
            }
        )

    print(f"✅ Evaluation complete: {correct}/{total} correct "
          f"({100 * correct / max(total, 1):.1f}% accuracy)\n")

    for r in results:
        print(f"--- {r['id']} ---")
        print("MongoDB :", r["mongo"])
        print("Expected:", r["expected"])
        print("Predicted:")
        print(r["predicted"] if r["predicted"] else "[EMPTY RESPONSE]")
        print("Match   :", r["match"])
        print()

Manual Testing

In [16]:
example_query = "db.users.find({ age: { $gt: 25 }, email: { $exists: true } })"
sql_out = convert_mongo_to_sql(example_query)
print("Mongo:", example_query)
print("SQL  :", sql_out)

# Run the mini evaluation
print("\nRunning small evaluation set...\n")
evaluate_agent(TEST_CASES)



Mongo: db.users.find({ age: { $gt: 25 }, email: { $exists: true } })
SQL  : SELECT * FROM users WHERE age > 25 AND email IS NOT NULL

Running small evaluation set...

✅ Evaluation complete: 4/7 correct (57.1% accuracy)

--- case_1_simple_find ---
MongoDB : db.users.find({ age: { $gt: 30 } })
Expected: SELECT * FROM users WHERE age > 30;
Predicted:
SELECT * FROM users WHERE age > 30
Match   : True

--- case_2_projection_limit ---
MongoDB : db.orders.find({ status: "SHIPPED" }, { _id: 0, order_id: 1, total: 1 }).limit(10)
Expected: SELECT order_id, total FROM orders WHERE status = 'SHIPPED' LIMIT 10;
Predicted:
SELECT order_id, total FROM orders WHERE status = 'SHIPPED' LIMIT 10
Match   : True

--- case_3_range_and_sort ---
MongoDB : db.users.find(  { $and: [      { age: { $gte: 18 } },      { age: { $lte: 60 } },      { status: 'ACTIVE' }    ]  },  { _id: 0, name: 1, email: 1 }).sort({ age: 1, name: 1 })
Expected: SELECT name, email FROM users WHERE age >= 18 AND age <= 60 AND status = 