In [None]:
!wget -q https://raw.githubusercontent.com/alikarami76/genai-workshop/refs/heads/main/db_loader.py
!wget -q https://raw.githubusercontent.com/alikarami76/genai-workshop/refs/heads/main/llm_connector.py
!wget -q https://raw.githubusercontent.com/alikarami76/genai-workshop/refs/heads/main/requirements.txt

In [None]:
%pip install -r requirements.txt

# Custom SQL Agent Workshop

We are extending the Citi Bike commuter-pass case study by showing how to scaffold a LangChain SQL agent with guardrailed custom tools. The goal is to help workshop learners see how LLM-driven analytics can surface insights that inspire a targeted commuter offering for current "Customer" riders.

We'll walk through the full stack—from imports to agent execution—so you can narrate why each component exists and how it supports Retrieval-Augmented Generation (RAG) over the Citi Bike database.


## Load core libraries

This cell pulls in everything the agent relies on: standard Python utilities, pandas for lightweight data wrangling, and the LangChain components that wire prompts, tools, and the SQL agent together. Call out how the optional schema helpers (`ListSQLDatabaseTool`, `InfoSQLDatabaseTool`) let the agent inspect tables before composing queries—an important safety habit.


In [None]:
# Standard libs
import os, re, glob, json, sqlite3, textwrap
from datetime import datetime
from IPython.display import Markdown, display
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv

# Data
import pandas as pd

# LangChain core + tools
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.tools import tool

# Optional schema helpers (list/info) if you want the agent to peek at tables/columns
from langchain_community.utilities import SQLDatabase
from langchain_community.tools.sql_database.tool import ListSQLDatabaseTool, InfoSQLDatabaseTool

## Configure the Groq LLM and sampling controls

We connect to the Groq-backed `qwen3-32b` model using our helper, then bind runtime settings that govern how the model reasons:
- `tools=[{"type": "browser_search"}, {"type": "code_interpreter"}]` exposes built-in Groq capabilities alongside the SQL tools we'll register later.
- `tool_choice="auto"` lets the agent decide when to reach for each tool based on the conversation.
- `reasoning_effort="medium"` balances depth and latency—great for live demos.
- `top_p=1` keeps nucleus sampling wide open. In practice you can dial this down (or pair with `temperature`) to reduce creative variance during production runs.
- `max_completion_tokens=8192` caps how long responses may be. Adjust this if you're expecting especially long summaries.

**About other knobs:**
- `top_k` (not set here) limits how many candidate tokens the sampler considers; lowering it forces the model to stay closer to the most probable continuations.
- `temperature` defaults to the connector's setting. Lower values (≈0-0.3) make outputs deterministic—ideal when you want reproducible SQL plans.

Encourage learners to treat these parameters as their primary levers for controlling creativity, determinism, and cost.


In [None]:
from llm_connector import langchain_groq_llm_connector

llm = langchain_groq_llm_connector("Insert your API Key here","openai/gpt-oss-20b")
llm = llm.bind(
    tools=[{"type":"browser_search"},{"type":"code_interpreter"}],
    tool_choice="auto",
    reasoning_effort="medium",
    top_p=1,
    max_completion_tokens=8192,
)


# ---- Safety knobs for SQL and results ----------------------------------------
SQL_TIMEOUT_SECONDS = 300      # wall clock per query
SQL_MAX_RETURN_ROWS = 100000   # cap result rows to avoid huge pulls

## Point to the Citi Bike database

We reuse the pre-staged SQLite file so every participant queries the same snapshot. Framing this through the RAG lens: the database is our **retrieval store**. The agent will decide which SQL it needs, fetch structured rows, and then the LLM will **generate** the narrative explanation grounded in those results.


In [None]:
from db_loader import prepare_citibike_database

DB_PATH, conn, run_query = prepare_citibike_database()
SQLITE_URI = f"sqlite:///{DB_PATH}"

## Prepare an output workspace

Saving result artifacts (CSVs, JSON, Markdown) into `outputs/` makes the toolchain composable. Downstream tools—and learners exploring past runs—can pick up the cached files without rerunning expensive queries.


In [None]:
os.makedirs("outputs", exist_ok=True)

## Understand the helper functions and tools

This section defines the custom tooling our agent will call. Walk through the structure with learners:
- `_is_read_only` and `_add_limit_if_missing` are safety rails that filter out destructive SQL and enforce row limits.
- `run_sqlite_query` is a LangChain tool that executes vetted SQL, writes the results to disk, and returns a JSON payload with previews. Emphasize how it preserves provenance by echoing the actual SQL used.
- `pandas_aggregate` chains on the saved CSV to perform higher-level group-bys, filters, or descriptive stats, then publishes tidy outputs for later steps.

Highlight that each tool returns machine-readable JSON—this is crucial for predictable tool chaining and is a best practice students should emulate.


In [None]:
# -- 2) HELPERS ----------------------------------------------------------------
def _is_read_only(sql: str) -> bool:
    """Allow only SELECT and PRAGMA table_info (read-only)."""
    forbidden = r"\b(INSERT|UPDATE|DELETE|DROP|ALTER|CREATE|REPLACE|TRUNCATE|ATTACH|DETACH|VACUUM|BEGIN|COMMIT)\b"
    if re.search(forbidden, sql, flags=re.IGNORECASE):
        return False
    if re.search(r"\bPRAGMA\b", sql, re.IGNORECASE) and "table_info" not in sql.lower():
        return False
    return True

def _add_limit_if_missing(sql: str, limit: int) -> str:
    """Inject a LIMIT if the user didn't specify one on SELECT queries."""
    q = sql.strip().rstrip(";")
    if not q.lower().startswith("select"):
        return q + ";"
    if re.search(r"\blimit\b", q, flags=re.IGNORECASE):
        return q + ";"
    return f"SELECT * FROM (\n{q}\n) LIMIT {limit};"

# -- 2a) SQL tool: run a safe read-only query and save to CSV/JSON --------------
@tool("run_sqlite_query", return_direct=False)
def run_sqlite_query(query: str, limit: int = SQL_MAX_RETURN_ROWS) -> str:
    """
    Run a **read-only** SQL query against the local SQLite DB.
    Safety:
      - Only SELECT or PRAGMA table_info allowed.
      - LIMIT injected if missing (for SELECT).
    Returns a small JSON string holding paths & preview.
    """
    if not _is_read_only(query):
        return json.dumps({"error": "Only read-only queries allowed (SELECT/PRAGMA table_info)."})

    # Ensure limit for safety
    if query.strip().lower().startswith("select") and "limit" not in query.lower():
        query = _add_limit_if_missing(query, limit)

    # Execute with sqlite3; pandas makes it easy to load the result
    conn = sqlite3.connect(DB_PATH, check_same_thread=False)
    conn.execute(f"PRAGMA busy_timeout = {SQL_TIMEOUT_SECONDS*1000};")
    try:
        df = pd.read_sql_query(query, conn)
    except Exception as e:
        return json.dumps({"error": f"SQLite error: {e}"})
    finally:
        conn.close()

    # Persist to disk for the next tool
    csv_path  = os.path.join("outputs", "sql_result.csv")
    json_path = os.path.join("outputs", "sql_result.json")
    df.to_csv(csv_path, index=False)
    df.to_json(json_path, orient="records")

    preview_md = df.head(10).to_markdown(index=False) if not df.empty else "(no rows)"
    return json.dumps({
        "rows": len(df),
        "columns": list(df.columns),
        "csv_path": csv_path,
        "json_path": json_path,
        "preview": preview_md,
        "sql_used": query
    })

# -- 2b) Pandas tool: simple groupby/aggregate pipeline -------------------------
@tool("pandas_aggregate", return_direct=False)
def pandas_aggregate(csv_path: str,
                     analysis_type: str = "groupby_aggregate",
                     groupby: Optional[List[str]] = None,
                     metrics: Optional[List[Dict[str, str]]] = None,
                     filters: Optional[List[Dict[str, Any]]] = None) -> str:
    """
    Load CSV produced by the SQL tool and compute a final table.
    Supports:
      - analysis_type="groupby_aggregate" (default) with:
        * groupby: list of columns
        * metrics: list of {"column":..., "agg": one of sum|mean|count|max|min|median|std|nunique}
        * filters: optional list of {"column":..,"op":..,"value":..}
      - analysis_type="describe" as a fallback summary
    """
    if not os.path.exists(csv_path):
        return json.dumps({"error": f"CSV not found: {csv_path}"})

    df = pd.read_csv(csv_path)

    # Quick filter helper
    def _apply_filters(df, filters):
        if not filters: return df
        out = df.copy()
        for f in filters:
            col, op, val = f.get("column"), f.get("op"), f.get("value")
            if col not in out.columns:  # skip unknown columns
                continue
            if op in ("==","=","eq"):
                out = out[out[col] == val]
            elif op in (">",">=","<","<="):
                try:
                    out = out.query(f"{col} {op} @val")
                except Exception:
                    pass
            elif op == "in":
                val = val if isinstance(val, list) else [val]
                out = out[out[col].isin(val)]
            elif op == "contains":
                out = out[out[col].astype(str).str.contains(str(val), na=False)]
        return out

    if analysis_type == "describe":
        final_df = df.describe(include="all").reset_index()
    else:
        # Default: groupby_aggregate
        df2 = _apply_filters(df, filters)
        if not groupby or not metrics:
            final_df = df2.head(50)  # if underspecified, just show sample
        else:
            agg_map = {}
            for m in metrics:
                col = m.get("column")
                agg = m.get("agg","sum").lower()
                if col in df2.columns:
                    agg_map.setdefault(col, []).append(agg)
            if not agg_map:
                final_df = df2.head(50)
            else:
                out = df2.groupby(groupby).agg(agg_map)
                out.columns = ["__".join(x if isinstance(x, tuple) else (x,)) if isinstance(x, tuple) else x
                               for x in out.columns.values]
                final_df = out.reset_index()

    result_csv  = os.path.join("outputs", "final_result.csv")
    result_json = os.path.join("outputs", "final_result.json")
    final_df.to_csv(result_csv, index=False)
    final_df.to_json(result_json, orient="records")

    return json.dumps({
        "final_csv": result_csv,
        "final_json": result_json,
        "rows": len(final_df),
        "columns": list(final_df.columns),
        "preview": final_df.head(15).to_markdown(index=False)
    })


## Assemble the prompt, tools, and agent

Here we wire everything together:
- `SYSTEM_RULES` is the **system prompt**—effectively the agent's prefix message. It encodes durable behavior (safety, workflow order, required output sections). Because it persists across turns, remind learners that system prompts trump user prompts when conflicts arise.
- The human template (`"User analysis request: {analysis_request}"`) is the **user prompt** for this run. It conveys the immediate question while inheriting all guardrails from the system message.
- The `OUTPUT FORMAT` portion inside `SYSTEM_RULES` acts like explicit **format instructions**, guiding the agent to emit PLAN / SQL / RESULTS / SUMMARY blocks. Keeping formatting directions separate from business logic is a best practice.
- `MessagesPlaceholder("agent_scratchpad")` is where LangChain tucks intermediate Thought/Action/Observation steps so the agent can reason iteratively.
- `tools = [...]` collects both LangChain-native schema helpers and our custom SQL+pandas utilities. This is the toolbox the agent may activate.
- `AgentExecutor(..., max_iterations=8)` limits how many tool invocations the agent can make before giving up—a safeguard against runaway loops. Encourage tuning this alongside `temperature`/`top_p` when debugging stubborn prompts.

When discussing best practices for writing system prompts, stress clarity (bullet lists beat prose), sequencing (spell out the workflow), and explicit failure modes (what to do when data is missing, how to keep outputs concise).


In [None]:
# -- 3) PROMPT + AGENT ---------------------------------------------------------
# Optional: give the agent quick access to schema overview and table info.
db = SQLDatabase.from_uri(SQLITE_URI, sample_rows_in_table_info=2)
list_tables_tool = ListSQLDatabaseTool(db=db)
info_table_tool  = InfoSQLDatabaseTool(db=db)

SYSTEM_RULES = textwrap.dedent("""
You are a careful data analyst working with a local SQLite database.
You MUST follow these rules:

- SAFETY: Only run read-only SQL (SELECT or PRAGMA table_info). No writes or DDL.
- EFFICIENCY: Avoid huge pulls. If the user didn't specify limits/time windows, start small.
- CHOOSE METHOD: Decide the simplest correct analysis (groupby aggregate, describe, or basic stats).
- PIPELINE:
  1) In case any metrics are requested to be calculated, clarify your formula for that metrics first.
  2) For the metric's only, try to search online for refernces of those metrics and clarify your references.
  3) (Optional) Inspect schema with list_tables/info if unsure.
  4) Write a SELECT query with the minimal set of columns and filters needed.
  5) Call `run_sqlite_query` to execute and save results.
  6) If aggregation is needed, call `pandas_aggregate` with a structured spec.
  7) Create output according to exact guidelines in the OUTPUT FORMAT section below.
- OUTPUT FORMAT:
  - PLAN SECTION: Start with a short PLAN (bullets). It should outline the assumptions you have made, as well as definition of any metrics you've created.
  - SQL CODE SECTION: Show the SQL used (fenced code). It should be clear what was the SQL queries you've used to reach to the results.
  - RESULTS SECTION: Show the of your final result and output in a table section.
  - SUMMARY SECTION: End with a crisp, non-verbose paragraph describing the data to the user.
""").strip()

# The prompt ensures the agent builds a simple plan + uses tools in-order
PROMPT = ChatPromptTemplate.from_messages([
    ("system", SYSTEM_RULES),
    ("human", "User analysis request: {analysis_request}\n\nProceed."),
    MessagesPlaceholder("agent_scratchpad"),
])

# Attach both tools to the same agent
tools = [list_tables_tool, info_table_tool, run_sqlite_query, pandas_aggregate]

# Create a tool-calling agent with our prompt
agent_runnable = create_tool_calling_agent(llm, tools, PROMPT)

# Wrap it in an executor so we can capture intermediate steps (the “action trace”)
executor = AgentExecutor(
    agent=agent_runnable,
    tools=tools,
    verbose=True,                     # prints a readable trace in the cell output
    return_intermediate_steps=True,   # lets us programmatically inspect tool calls
    max_iterations=16,
    handle_parsing_errors=True,
)
print("Agent ready.")


## Invoke the agent and inspect activations

We set `analysis_request` to a commuter-focused utilization question and call `executor.invoke`. LangChain then:
1. Injects the system prompt (our prefix) and user prompt into the LLM.
2. Lets the model draft a plan, choose tools, and issue SQL—this is the agent "activation" loop you can narrate from the verbose trace.
3. Bridges retrieval and generation: `run_sqlite_query` pulls structured data, `pandas_aggregate` can refine it, and the LLM synthesizes the final explanation.

Encourage learners to swap in their own commuter-pass hypotheses (e.g., subscriber share on 7–10 AM rides) to see how the tooling adapts.


In [None]:
# -- 4) RUN THE AGENT ----------------------------------------------------------
# Try your own request here. A few examples:
# "What is the average utilization rate of the bikes for each station in Palo Alto? Meaning that how much each station's bikes are utilized by time?"
# "What are the top 10 stations by departures on weekdays 7–10 AM?"
# "Which routes between stations have the highest subscriber share compared to customers?"
# "What share of rides are done by commuters (7-10 AM weekdays) vs non-commuters?"

analysis_request = "WRITE YOUR REQUEST HERE"

result = executor.invoke({"analysis_request": analysis_request})

# The AgentExecutor returns:
# - 'output' (final text answer)
# - 'intermediate_steps' ([(AgentAction, observation), ...]) = our action trace
print("\n=== FINAL ANSWER ===")
print(result["output"])
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
markdown_path = os.path.join("outputs", f"agent_output_{timestamp}.md")
with open(markdown_path, "w", encoding="utf-8") as md_file:
    md_file.write(result["output"])
print(f"\nSaved markdown output to {markdown_path}")

print("\n=== ACTION TRACE (tools & observations) ===")
for i, step in enumerate(result["intermediate_steps"], 1):
    action, observation = step
    print(f"\n-- Step {i}:")
    print("Tool:", getattr(action, "tool", "<unknown>"))
    print("Tool Input:", getattr(action, "tool_input", {}))
    # Observations can be large JSON strings; truncate for readability
    obs_str = str(observation)
    print("Observation:", (obs_str[:800] + "...") if len(obs_str) > 800 else obs_str)
    print("Final Answer:", result["output"])


## Present the polished answer

Rendering the final Markdown output keeps the workshop recap clean. It's also a reminder that agents can return structured artifacts—not just plain text—which you can embed in reports or dashboards.


In [None]:
display(Markdown(result["output"]))

## Experiment sandbox

Use the empty cell below for live tinkering—try new prompts, inspect saved CSVs, or prototype additional helper tools without touching the core workflow above.


## Assignment: Add visualization tools and guide the agent

For the next exercise, extend this notebook so the agent can produce charts and interactive heatmaps that support the commuter-pass pitch.

**Procedure for learners:**
1. Share this script with ChatGPT (or your assistant of choice) along with a clear prompt describing the visualization capability you need—graphing plus heatmaps for Citi Bike data.
2. State firm guardrails in that prompt so ChatGPT only adds the necessary tooling and leaves existing logic intact. Remind it to follow best practices for LangChain tool design (typed arguments, structured JSON responses, helpful docstrings).
3. Provide a couple of example tool definitions in the prompt so the model mimics the structure used here (`run_sqlite_query`, `pandas_aggregate`).
4. Review the returned script, plug the new visualization tools into this notebook, and update the system prompt so it instructs the agent when to call them.
5. Finally, run the enhanced agent and ask it to generate an interactive heatmap based on the commuter-time routes—then evaluate how well the output supports the pass hypothesis.
