# Retail Analytics Copilot - Self-Contained Run
This notebook creates all necessary project files on the Colab instance and runs the agent using the Tesla T4 GPU.


## 1. Setup Environment & Install Dependencies


In [42]:
!curl -fsSL https://ollama.com/install.sh | sh
!pip install dspy-ai langgraph langchain-core pydantic click rich numpy pandas scikit-learn rank-bm25


>>> Cleaning up old version at /usr/local/lib/ollama
>>> Installing ollama to /usr/local
>>> Downloading Linux amd64 bundle
######################################################################## 100.0%
>>> Adding ollama user to video group...
>>> Adding current user to ollama group...
>>> Creating ollama systemd service...
>>> The Ollama API is now available at 127.0.0.1:11434.
>>> Install complete. Run "ollama" from the command line.


## 2. Start Ollama & Pull Model


In [43]:
import subprocess
import time

# Start Ollama server in the background
subprocess.Popen("ollama serve", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("Starting Ollama server...")
time.sleep(5)

!ollama pull phi3.5:3.8b-mini-instruct-q4_K_M


Starting Ollama server...
[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l[?2026h[?25l[1G[?25h[?2026l


## 3. Create Project Files (Scaffolding)


In [44]:
import os

# Create directories
os.makedirs("agent/rag", exist_ok=True)
os.makedirs("agent/tools", exist_ok=True)
os.makedirs("data", exist_ok=True)
os.makedirs("docs", exist_ok=True)

# Create __init__.py files
for path in ["agent/__init__.py", "agent/rag/__init__.py", "agent/tools/__init__.py"]:
    with open(path, "w") as f: pass


## 4. Create Data & Docs


In [45]:
# Download Database
import urllib.request
if not os.path.exists("data/northwind.sqlite"):
    urllib.request.urlretrieve('https://raw.githubusercontent.com/jpwhite3/northwind-SQLite3/main/dist/northwind.db', 'data/northwind.sqlite')

# Create Docs
files = {
    "docs/marketing_calendar.md": """# Northwind Marketing Calendar (1997)
## Summer Beverages 1997
- Dates: 1997-06-01 to 1997-06-30
- Notes: Focus on Beverages and Condiments.
## Winter Classics 1997
- Dates: 1997-12-01 to 1997-12-31
- Notes: Push Dairy Products and Confections for holiday gifting.""",
    
    "docs/kpi_definitions.md": """# KPI Definitions
## Average Order Value (AOV)
- AOV = SUM(UnitPrice * Quantity * (1 - Discount)) / COUNT(DISTINCT OrderID)
## Gross Margin
- GM = SUM((UnitPrice - CostOfGoods) * Quantity * (1 - Discount))
- If cost is missing, approximate with category-level average (document your approach).""",
    
    "docs/catalog.md": """# Catalog Snapshot
- Categories include Beverages, Condiments, Confections, Dairy Products, Grains/Cereals, Meat/Poultry, Produce, Seafood.
- Products map to categories as in the Northwind DB.""",
    
    "docs/product_policy.md": """# Returns & Policy
- Perishables (Produce, Seafood, Dairy): 3 7 days.
- Beverages unopened: 14 days; opened: no returns.
- Non-perishables: 30 days."""
}

for path, content in files.items():
    with open(path, "w") as f: f.write(content)


## 5. Create Code Modules


In [46]:
# agent/tools/sqlite_tool.py
with open("agent/tools/sqlite_tool.py", "w") as f:
    f.write('''import sqlite3
from typing import List, Dict, Any, Tuple, Optional

class SQLiteTool:
    def __init__(self, db_path: str = "data/northwind.sqlite"):
        self.db_path = db_path

    def _get_connection(self):
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        return conn

    def list_tables(self) -> List[str]:
        conn = self._get_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';")
        tables = [row[0] for row in cursor.fetchall()]
        conn.close()
        return tables

    def get_schema(self, table_names: Optional[List[str]] = None) -> str:
        conn = self._get_connection()
        cursor = conn.cursor()
        if not table_names:
            table_names = self.list_tables()
        schema_str = []
        for table in table_names:
            cursor.execute(f"SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table,))
            res = cursor.fetchone()
            if res:
                schema_str.append(res[0] + ";")
        conn.close()
        sep = chr(10) + chr(10)
        return sep.join(schema_str)

    def execute_sql(self, query: str) -> Tuple[List[Dict[str, Any]], List[str], Optional[str]]:
        conn = self._get_connection()
        cursor = conn.cursor()
        try:
            cursor.execute(query)
            if query.strip().lower().startswith("select"):
                results = [dict(row) for row in cursor.fetchall()]
                columns = [description[0] for description in cursor.description] if cursor.description else []
                conn.close()
                return results, columns, None
            else:
                conn.commit()
                conn.close()
                return [], [], None
        except Exception as e:
            conn.close()
            return [], [], str(e)
''')

# agent/rag/retrieval.py
with open("agent/rag/retrieval.py", "w") as f:
    f.write('''import os
import re
from typing import List, Dict
from rank_bm25 import BM25Okapi

class SimpleRetriever:
    def __init__(self, docs_dir: str = "docs"):
        self.docs_dir = docs_dir
        self.chunks = []
        self.bm25 = None
        self._load_and_index()

    def _load_and_index(self):
        self.chunks = []
        tokenized_corpus = []
        for filename in os.listdir(self.docs_dir):
            if filename.endswith(".md"):
                filepath = os.path.join(self.docs_dir, filename)
                with open(filepath, "r", encoding="utf-8") as f:
                    content = f.read()
                newline = chr(10)
                pattern = newline + r'#{1,3} |' + newline + newline
                raw_chunks = re.split(pattern, content)
                for i, text in enumerate(raw_chunks):
                    if text.strip():
                        chunk_id = f"{filename}::chunk{i}"
                        self.chunks.append({
                            "id": chunk_id,
                            "content": text.strip(),
                            "source": filename
                        })
                        tokenized_corpus.append(text.lower().split())
        if tokenized_corpus:
            self.bm25 = BM25Okapi(tokenized_corpus)

    def search(self, query: str, k: int = 3) -> List[Dict]:
        if not self.bm25:
            return []
        tokenized_query = query.lower().split()
        scores = self.bm25.get_scores(tokenized_query)
        top_n_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:k]
        results = []
        for idx in top_n_indices:
            results.append({
                **self.chunks[idx],
                "score": scores[idx]
            })
        return results
''')

# agent/dspy_signatures.py
with open("agent/dspy_signatures.py", "w") as f:
    f.write('''import dspy
from typing import List, Optional

class RouterSignature(dspy.Signature):
    question = dspy.InputField(desc="The user's retail analytics question")
    classification = dspy.OutputField(desc="One of: 'rag', 'sql', 'hybrid'")

class Router(dspy.Module):
    def __init__(self):
        super().__init__()
        self.prog = dspy.ChainOfThought(RouterSignature)
    def forward(self, question):
        return self.prog(question=question)

class PlannerSignature(dspy.Signature):
    question = dspy.InputField()
    context = dspy.InputField(desc="Relevant chunks from documentation")
    date_range_start = dspy.OutputField(desc="Start date YYYY-MM-DD or None")
    date_range_end = dspy.OutputField(desc="End date YYYY-MM-DD or None")
    kpi_formula = dspy.OutputField(desc="Relevant KPI formula text or None")
    entities = dspy.OutputField(desc="List of relevant entities (products, categories, customers)")

class Planner(dspy.Module):
    def __init__(self):
        super().__init__()
        self.prog = dspy.ChainOfThought(PlannerSignature)
    def forward(self, question, context):
        return self.prog(question=question, context=context)

class TextToSQLSignature(dspy.Signature):
    question = dspy.InputField()
    schema = dspy.InputField(desc="SQLite CREATE TABLE statements")
    constraints = dspy.InputField(desc="Specific constraints (dates, formulas) from docs")
    sql_query = dspy.OutputField(desc="Valid SQLite query")
    explanation = dspy.OutputField(desc="Brief explanation of the logic")

class TextToSQL(dspy.Module):
    def __init__(self):
        super().__init__()
        self.prog = dspy.ChainOfThought(TextToSQLSignature)
    def forward(self, question, schema, constraints):
        return self.prog(question=question, schema=schema, constraints=constraints)

class SynthesizerSignature(dspy.Signature):
    question = dspy.InputField()
    sql_query = dspy.InputField(desc="Executed SQL query (if any)")
    sql_result = dspy.InputField(desc="Result rows from SQL execution")
    retrieved_context = dspy.InputField(desc="Text chunks from documentation")
    format_hint = dspy.InputField(desc="Expected output format (e.g., 'int', 'float', 'json')")
    final_answer = dspy.OutputField(desc="The typed answer matching format_hint exactly")
    citations = dspy.OutputField(desc="List of strings: table names and doc chunk IDs used")

class Synthesizer(dspy.Module):
    def __init__(self):
        super().__init__()
        self.prog = dspy.ChainOfThought(SynthesizerSignature)
    def forward(self, question, sql_query, sql_result, retrieved_context, format_hint):
        return self.prog(question=question, sql_query=sql_query, sql_result=sql_result, retrieved_context=retrieved_context, format_hint=format_hint)
''')

# agent/graph_hybrid.py
with open("agent/graph_hybrid.py", "w") as f:
    f.write('''import dspy
import sqlite3
import os
from typing import Annotated, TypedDict, List, Dict, Any, Optional, Union
from langgraph.graph import StateGraph, END
from agent.dspy_signatures import Router, Planner, TextToSQL, Synthesizer
from agent.rag.retrieval import SimpleRetriever
from agent.tools.sqlite_tool import SQLiteTool

def setup_dspy():
    lm = dspy.LM(model="ollama_chat/phi3.5:3.8b-mini-instruct-q4_K_M", api_base="http://localhost:11434", api_key="")
    dspy.settings.configure(lm=lm)

class AgentState(TypedDict):
    question: str
    format_hint: str
    classification: Optional[str]
    retrieved_docs: List[Dict]
    constraints: Dict[str, Any]
    schema: str
    sql_query: Optional[str]
    sql_results: Optional[List[Dict]]
    sql_columns: Optional[List[str]]
    sql_error: Optional[str]
    final_answer: Any
    citations: List[str]
    repair_count: int
    repair_feedback: Optional[str]

def router_node(state: AgentState):
    router = Router()
    pred = router(question=state["question"])
    cls = pred.classification.lower().strip()
    if "hybrid" in cls: return {"classification": "hybrid"}
    if "sql" in cls: return {"classification": "sql"}
    return {"classification": "rag"}

def retriever_node(state: AgentState):
    retriever = SimpleRetriever()
    docs = retriever.search(state["question"], k=3)
    return {"retrieved_docs": docs}

def planner_node(state: AgentState):
    planner = Planner()
    sep = chr(10) + chr(10)
    context_str = sep.join([d['content'] for d in state["retrieved_docs"]])
    try:
        pred = planner(question=state["question"], context=context_str)
        constraints = {
            "date_range_start": getattr(pred, 'date_range_start', None),
            "date_range_end": getattr(pred, 'date_range_end', None),
            "kpi_formula": getattr(pred, 'kpi_formula', None),
            "entities": getattr(pred, 'entities', [])
        }
    except Exception as e:
        print(f"Planner parsing error: {e}, using default constraints")
        constraints = {"date_range_start": None, "date_range_end": None, "kpi_formula": None, "entities": []}
    return {"constraints": constraints}

def sql_generator_node(state: AgentState):
    tool = SQLiteTool()
    schema = tool.get_schema()
    constraints_str = str(state.get("constraints", "None"))
    if state.get("repair_feedback"):
        constraints_str += chr(10) + f"PREVIOUS ERROR: {state['repair_feedback']}. FIX THIS."
    generator = TextToSQL()
    opt_path = os.path.join(os.getcwd(), "agent", "optimized_sql_module.json")
    if os.path.exists(opt_path):
        try:
            generator.load(opt_path)
            print(f"Loaded optimized SQL module from {opt_path}")
        except Exception as e:
            print(f"Warning: Could not load optimized module: {e}")
    pred = generator(question=state["question"], schema=schema, constraints=constraints_str)
    raw_sql = pred.sql_query.strip()
    clean_sql = raw_sql.replace("```sql", "").replace("```", "").strip()
    return {"sql_query": clean_sql, "schema": schema}

def executor_node(state: AgentState):
    tool = SQLiteTool()
    results, cols, error = tool.execute_sql(state["sql_query"])
    return {"sql_results": results, "sql_columns": cols, "sql_error": error}

def repair_check_node(state: AgentState):
    if state["sql_error"] and state["repair_count"] < 2:
        return "repair"
    return "synthesize"

def repair_node(state: AgentState):
    return {"repair_count": state["repair_count"] + 1, "repair_feedback": state["sql_error"] or "Invalid format or empty result"}

def synthesizer_node(state: AgentState):
    synthesizer = Synthesizer()
    sep = chr(10) + chr(10)
    context_str = sep.join([d['content'] for d in state.get("retrieved_docs", [])])
    pred = synthesizer(question=state["question"], sql_query=state.get("sql_query", ""), sql_result=str(state.get("sql_results", [])), retrieved_context=context_str, format_hint=state["format_hint"])
    return {"final_answer": pred.final_answer, "citations": pred.citations}

def build_graph():
    workflow = StateGraph(AgentState)
    workflow.add_node("router", router_node)
    workflow.add_node("retriever", retriever_node)
    workflow.add_node("planner", planner_node)
    workflow.add_node("sql_generator", sql_generator_node)
    workflow.add_node("executor", executor_node)
    workflow.add_node("repair", repair_node)
    workflow.add_node("synthesizer", synthesizer_node)
    workflow.set_entry_point("router")
    workflow.add_conditional_edges("router", lambda x: x["classification"], {"rag": "retriever", "sql": "sql_generator", "hybrid": "retriever"})
    workflow.add_conditional_edges("retriever", lambda x: x["classification"], {"rag": "synthesizer", "hybrid": "planner"})
    workflow.add_edge("planner", "sql_generator")
    workflow.add_edge("sql_generator", "executor")
    workflow.add_conditional_edges("executor", repair_check_node, {"repair": "repair", "synthesize": "synthesizer"})
    workflow.add_edge("repair", "sql_generator")
    workflow.add_edge("synthesizer", END)
    return workflow.compile()
''')

# agent/optimize_sql.py
with open("agent/optimize_sql.py", "w") as f:
    f.write('''import sys, os, dspy
sys.path.append(os.getcwd())
from agent.dspy_signatures import TextToSQL
from agent.tools.sqlite_tool import SQLiteTool

def sql_metric(example, pred, trace=None):
    tool = SQLiteTool()
    sql = pred.sql_query.replace("```sql", "").replace("```", "").strip()
    results, cols, error = tool.execute_sql(sql)
    return error is None

def optimize_sql_module():
    lm = dspy.LM(model="ollama_chat/phi3.5:3.8b-mini-instruct-q4_K_M", api_base="http://localhost:11434", api_key="")
    dspy.settings.configure(lm=lm)
    tool = SQLiteTool()
    schema = tool.get_schema(["Orders", "Order Details", "Products"])
    train_examples = [
        dspy.Example(question="How many products are there?", schema=schema, constraints="None", sql_query="SELECT COUNT(*) FROM Products;").with_inputs("question", "schema", "constraints"),
        dspy.Example(question="What is the total revenue from Order 10248?", schema=schema, constraints="Revenue = UnitPrice * Quantity * (1-Discount)", sql_query='SELECT SUM(UnitPrice * Quantity * (1 - Discount)) FROM "Order Details" WHERE OrderID = 10248;').with_inputs("question", "schema", "constraints"),
        dspy.Example(question="List all products in CategoryID 1.", schema=schema, constraints="None", sql_query="SELECT ProductName FROM Products WHERE CategoryID = 1;").with_inputs("question", "schema", "constraints")
    ]
    print("Starting optimization with BootstrapFewShot...")
    from dspy.teleprompt import BootstrapFewShot
    teleprompter = BootstrapFewShot(metric=sql_metric, max_bootstrapped_demos=2)
    compiled_sql = teleprompter.compile(TextToSQL(), trainset=train_examples)
    output_path = os.path.join("agent", "optimized_sql_module.json")
    compiled_sql.save(output_path)
    print(f"Optimization complete! Saved to {output_path}")

if __name__ == "__main__":
    optimize_sql_module()
''')

# sample_questions_hybrid_eval.jsonl
with open("sample_questions_hybrid_eval.jsonl", "w") as f:
    f.write('''{"id":"rag_policy_beverages_return_days","question":"According to the product policy, what is the return window (days) for unopened Beverages? Return an integer.","format_hint":"int"}
{"id":"hybrid_top_category_qty_summer_1997","question":"During 'Summer Beverages 1997' as defined in the marketing calendar, which product category had the highest total quantity sold? Return {category:str, quantity:int} .","format_hint":"{category:str, quantity:int}"}
{"id":"hybrid_aov_winter_1997","question":"Using the AOV definition from the KPI docs, what was the Average Order Value during 'Winter Classics 1997'? Return a float rounded to 2 decimals.","format_hint":"float"}
{"id":"sql_top3_products_by_revenue_alltime","question":"Top 3 products by total revenue all-time. Revenue uses Order Details: SUM(UnitPrice*Quantity*(1-Discount)). Return list[{product:str, revenue:float}] .","format_hint":"list[{product:str, revenue:float}]"}
{"id":"hybrid_revenue_beverages_summer_1997","question":"Total revenue from the 'Beverages' category during 'Summer Beverages 1997' dates. Return a float rounded to 2 decimals.","format_hint":"float"}
{"id":"hybrid_best_customer_margin_1997","question":"Per the KPI definition of gross margin, who was the top customer by gross margin in 1997? Assume CostOfGoods is approximated by 70% of UnitPrice if not available. Return {customer:str, margin:float} .","format_hint":"{customer:str, margin:float}"}
''')


In [47]:
!python agent/optimize_sql.py


  cls = super().__new__(mcs, signature_name, bases, namespace, **kwargs)
Starting optimization with BootstrapFewShot...
  cls = super().__new__(mcs, signature_name, bases, namespace, **kwargs)
100% 3/3 [00:00<00:00, 16.72it/s]
Bootstrapped 2 full traces after 2 examples for up to 1 rounds, amounting to 3 attempts.
Optimization complete! Saved to agent/optimized_sql_module.json


## 7. Run Agent


In [50]:
# run_agent_hybrid.py script logic inlined here for simplicity or we can write it to file too
import json
import click
from tqdm import tqdm
from agent.graph_hybrid import build_graph, setup_dspy

setup_dspy()
questions = []
with open("sample_questions_hybrid_eval.jsonl", 'r') as f:
    for line in f:
        if line.strip():
            questions.append(json.loads(line))

app = build_graph()
results = []
print(f"Processing {len(questions)} questions...")

for q in tqdm(questions):
    initial_state = {
        "question": q["question"],
        "format_hint": q.get("format_hint", "str"),
        "repair_count": 0,
        "retrieved_docs": [],
        "constraints": {},
        "sql_query": "",
        "sql_results": [],
        "sql_error": None,
        "citations": []
    }
    try:
        final_state = app.invoke(initial_state)
        output = {
            "id": q["id"],
            "final_answer": final_state.get("final_answer"),
            "sql": final_state.get("sql_query", ""),
            "confidence": 0.8 if not final_state.get("sql_error") else 0.2,
            "explanation": "Generated based on hybrid analysis of docs and DB.",
            "citations": final_state.get("citations", [])
        }
        results.append(output)
    except Exception as e:
        print(f"Error processing {q['id']}: {e}")
        results.append({"id": q["id"], "final_answer": None, "sql": "", "confidence": 0.0, "explanation": f"Error: {str(e)}", "citations": []})

with open("outputs_hybrid_colab.jsonl", 'w') as f:
    for res in results:
        f.write(json.dumps(res) + "\n")

print("Done! Results written to outputs_hybrid_colab.jsonl")


Processing 6 questions...


 17%|█▋        | 1/6 [00:00<00:00,  5.26it/s]

Loaded optimized SQL module from /content/agent/optimized_sql_module.json
Error processing hybrid_top_category_qty_summer_1997: Adapter JSONAdapter failed to parse the LM response. 

LM Response: {
 "reasoning": "The provided text does not contain enough information to determine which product category had the highest total sales for 'Summer Beverages' during that particular year (1997). To provide an accurate answer, specific data or statistics regarding monthly/quarterly sales of different categories within beverages sold would need to be provided. Without this detailed dataset information on how many units were sold in each category throughout the summer season of 1997 and a comparison among these quantities for 'Summer Beverages', it's impossible to discern which product had been outperforming others or determine if there was any significant shift from one beverage type over another. Therefore, an accurate classification is not feasible with only the given information."
 
  
   
 



 83%|████████▎ | 5/6 [00:00<00:00, 12.50it/s]

Error processing hybrid_aov_winter_1997: Adapter JSONAdapter failed to parse the LM response. 

LM Response: {
 "reasoning": "The instruction provided does not present an actual question but rather asks for information or data analysis based on AOV (Average Order Value) definition from KPIs. The task requires extracting the average value of a financial metric, which can be found in sales and business analytics context typically through analyzing numerical datasets related to orders during that specific period 'Winter Classics 1997'. Since there are no data provided within this instruction for an actual calculation or analysis (no numbers given), it's not possible to generate a real-world answer. However, if you were seeking the methodology on how one might extract such information from available datasets: The reasoning would be as follows -  'The Average Order Value is calculated by dividing total revenue for all orders during the specified period ('Winter Classics 1997') with the numb

100%|██████████| 6/6 [00:00<00:00, 11.64it/s]

Error processing hybrid_best_customer_margin_1997: Adapter JSONAdapter failed to parse the LM response. 

LM Response: { "question": {"user`:*tell me an example where you explain why it's difficult and understandable way. You are given two statements involving mathematical problem to solve this puzzle with a step-by-step explanation, we needlesenvolved: AI: To answer the following algorithmication; 问题 أن quizzical Q&nbsp;":"`answer"








 

   


 


  

 

 



 

Expected to find output fields in the LM response: [reasoning, classification] 

Actual output fields parsed from the LM response: [] 


Done! Results written to outputs_hybrid_colab.jsonl





In [49]:
!cat outputs_hybrid_colab.jsonl


{"id": "rag_policy_beverages_return_days", "final_answer": "None", "sql": "SELECT UnitsInStock FROM Products WHERE CategoryID = (SELECT CategoryID FROM Categories WHERE CategoryName='Beverages');", "confidence": 0.8, "explanation": "Generated based on hybrid analysis of docs and DB.", "citations": "['No direct reference or data in provided SQL Query about product returns', \"The question seems misinterpreted as there are no indicators of time frames like 'days' relating to Beverages and their stock. The UnitsInStock values given may suggest a count but not the return window.\", \"'UnitsInStock': 39, {'UnitsInStock': 17}, {'UnitsInStock': 20},{'UnitsInStock': 111}, {...}\", \"The question asks for 'return_window', which is not evident from the provided SQL query and result. There might be a misunderstanding as this information isn't available in either field\", \"'No applicable citations are found.\"]"}
{"id": "hybrid_top_category_qty_summer_1997", "final_answer": null, "sql": "", "conf