# Building the Data Analyst Agent with LangGraph

This notebook walks through building a complete Data Analyst Agent step-by-step.

## What We'll Build

A LangGraph-powered agent that can:
- Parse natural language queries about supply chain data
- Execute SQL queries on our datasets
- Detect anomalies and trends
- Generate visualizations
- Provide insights and recommendations

## Architecture

```
User Query ‚Üí Query Parser ‚Üí SQL Executor ‚Üí Visualizer ‚Üí Response Generator
                          ‚Üì
                    Anomaly Detector
```

## Step 1: Install Dependencies

First, let's ensure all required packages are installed.

In [1]:
# Check if running in a notebook
import sys

# Uncomment to install dependencies (if needed)
# !pip install langchain langgraph langchain-openai duckdb plotly scikit-learn opik pandas openpyxl python-dotenv tiktoken

In [2]:
# Import all required libraries
import os
import pandas as pd
import duckdb
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
from typing import TypedDict, Annotated, List, Optional, Literal
import warnings
warnings.filterwarnings('ignore')

# LangChain & LangGraph
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END

# Opik for observability
from opik import track
from opik.integrations.langchain import OpikTracer

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

print("‚úÖ All imports successful!")

‚úÖ All imports successful!


## Step 2: Load and Prepare Data with DuckDB

We'll load all 4 datasets into an in-memory DuckDB database for fast SQL queries.

In [3]:
class SupplyChainDBManager:
    """Manages supply chain data in DuckDB"""
    
    def __init__(self, data_dir: str = '../data'):
        self.data_dir = data_dir
        self.conn = duckdb.connect(':memory:')
        self.tables = {}
        print("üì¶ Initializing DuckDB database...")
        
    def load_csv_with_encoding(self, file_path: str) -> pd.DataFrame:
        """Load CSV with automatic encoding detection"""
        encodings = ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252']
        for encoding in encodings:
            try:
                df = pd.read_csv(file_path, encoding=encoding)
                print(f"  ‚úì Loaded {os.path.basename(file_path)} with {encoding} encoding")
                return df
            except UnicodeDecodeError:
                continue
        raise ValueError(f"Could not read {file_path} with any encoding")
    
    def load_all_datasets(self):
        """Load all supply chain datasets into DuckDB"""
        print("\nüìä Loading datasets...\n")
        
        # 1. DataCo Supply Chain Dataset
        dataco_df = self.load_csv_with_encoding(f"{self.data_dir}/DataCoSupplyChainDataset.csv")
        # Clean column names
        dataco_df.columns = [col.strip().lower().replace(' ', '_').replace('(', '').replace(')', '') for col in dataco_df.columns]
        self.conn.register('dataco_supply_chain', dataco_df)
        self.tables['dataco_supply_chain'] = dataco_df
        print(f"    Shape: {dataco_df.shape}")
        
        # 2. Dynamic Logistics Dataset
        logistics_df = self.load_csv_with_encoding(f"{self.data_dir}/dynamic_supply_chain_logistics_dataset.csv")
        logistics_df.columns = [col.strip().lower().replace(' ', '_') for col in logistics_df.columns]
        self.conn.register('logistics', logistics_df)
        self.tables['logistics'] = logistics_df
        print(f"    Shape: {logistics_df.shape}")
        
        # 3. Supply Chain Data
        supply_df = self.load_csv_with_encoding(f"{self.data_dir}/supply_chain_data.csv")
        supply_df.columns = [col.strip().lower().replace(' ', '_') for col in supply_df.columns]
        self.conn.register('supply_chain', supply_df)
        self.tables['supply_chain'] = supply_df
        print(f"    Shape: {supply_df.shape}")
        
        # 4. Retail Sales Dataset
        retail_df = pd.read_excel(f"{self.data_dir}/Retail-Supply-Chain-Sales-Dataset.xlsx")
        retail_df.columns = [col.strip().lower().replace(' ', '_') for col in retail_df.columns]
        self.conn.register('retail_sales', retail_df)
        self.tables['retail_sales'] = retail_df
        print(f"  ‚úì Loaded Retail-Supply-Chain-Sales-Dataset.xlsx")
        print(f"    Shape: {retail_df.shape}")
        
        print(f"\n‚úÖ Loaded {len(self.tables)} tables into DuckDB")
        
    def get_schema(self) -> str:
        """Get database schema for LLM context"""
        schema_info = []
        for table_name, df in self.tables.items():
            schema_info.append(f"\nTable: {table_name}")
            schema_info.append(f"Columns: {', '.join(df.columns.tolist()[:10])}...")
            schema_info.append(f"Rows: {len(df):,}")
        return "\n".join(schema_info)
    
    def execute_query(self, query: str) -> pd.DataFrame:
        """Execute SQL query safely"""
        try:
            # Add LIMIT if not present
            if 'LIMIT' not in query.upper():
                query = query.rstrip(';') + ' LIMIT 1000'
            result = self.conn.execute(query).df()
            return result
        except Exception as e:
            print(f"‚ùå SQL Error: {e}")
            return pd.DataFrame()

# Initialize database
db = SupplyChainDBManager()
db.load_all_datasets()

üì¶ Initializing DuckDB database...

üìä Loading datasets...

  ‚úì Loaded DataCoSupplyChainDataset.csv with latin-1 encoding
    Shape: (180519, 53)
  ‚úì Loaded dynamic_supply_chain_logistics_dataset.csv with utf-8 encoding
    Shape: (32065, 26)
  ‚úì Loaded supply_chain_data.csv with utf-8 encoding
    Shape: (100, 24)
  ‚úì Loaded Retail-Supply-Chain-Sales-Dataset.xlsx
    Shape: (9994, 23)

‚úÖ Loaded 4 tables into DuckDB


In [4]:
# Test the database
print("\nüìã Database Schema:\n")
print(db.get_schema())

# Test query
test_result = db.execute_query("SELECT COUNT(*) as total_orders FROM dataco_supply_chain")
print(f"\n‚úÖ Test query successful! Total orders: {test_result['total_orders'].iloc[0]:,}")


üìã Database Schema:


Table: dataco_supply_chain
Columns: type, days_for_shipping_real, days_for_shipment_scheduled, benefit_per_order, sales_per_customer, delivery_status, late_delivery_risk, category_id, category_name, customer_city...
Rows: 180,519

Table: logistics
Columns: timestamp, vehicle_gps_latitude, vehicle_gps_longitude, fuel_consumption_rate, eta_variation_hours, traffic_congestion_level, warehouse_inventory_level, loading_unloading_time, handling_equipment_availability, order_fulfillment_status...
Rows: 32,065

Table: supply_chain
Columns: product_type, sku, price, availability, number_of_products_sold, revenue_generated, customer_demographics, stock_levels, lead_times, order_quantities...
Rows: 100

Table: retail_sales
Columns: row_id, order_id, order_date, ship_date, ship_mode, customer_id, customer_name, segment, country, city...
Rows: 9,994

‚úÖ Test query successful! Total orders: 180,519


## Step 3: Define Agent State

The state flows through all nodes in the graph.

In [5]:
class DataAnalystState(TypedDict):
    """State for Data Analyst Agent"""
    # Input
    user_query: str
    messages: Annotated[List[BaseMessage], "Conversation messages"]
    
    # Query Analysis
    query_type: Optional[Literal['sql', 'anomaly', 'trend', 'visualization']]
    intent: Optional[str]
    
    # SQL Execution
    sql_query: Optional[str]
    query_results: Optional[pd.DataFrame]
    
    # Analysis
    insights: List[str]
    anomalies: List[dict]
    trends: List[dict]
    
    # Visualization
    charts: List[dict]
    
    # Output
    final_response: Optional[str]
    error: Optional[str]

print("‚úÖ State schema defined")

‚úÖ State schema defined


## Step 4: Initialize LLM with Opik Tracing

In [6]:
# Initialize OpenAI LLM
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.1,
    callbacks=[OpikTracer()]  # Enable Opik tracing
)

print("‚úÖ LLM initialized with Opik tracing")

‚úÖ LLM initialized with Opik tracing


## Step 5: Build Agent Nodes

Each node performs a specific task in the workflow.

### Node 1: Query Parser

Understands user intent and classifies the query type.

In [None]:
def parse_query_node(state: DataAnalystState) -> DataAnalystState:
    """Parse user query to determine intent and query type"""
    print("\nüîç Node: Query Parser")
    
    user_query = state['user_query']
    
    prompt = f"""You are a query classifier for a supply chain analytics system.

    User Query: "{user_query}"

    Classify this query into ONE of these types:
    1. 'sql' - Needs database query (e.g., "What's average sales by region?")
    2. 'anomaly' - Needs anomaly detection (e.g., "Find unusual deliveries")
    3. 'trend' - Needs trend analysis (e.g., "Show sales over time")
    4. 'visualization' - Needs specific chart (e.g., "Plot profit by category")

    Respond with ONLY the type (sql/anomaly/trend/visualization) and a brief intent description.
    Format: TYPE | Intent description
    """
    
    response = llm.invoke(prompt)
    result = response.content.strip()
    
    # Parse response
    if '|' in result:
        query_type, intent = result.split('|', 1)
        query_type = query_type.strip().lower()
        intent = intent.strip()
    else:
        query_type = result.lower()
        intent = user_query
    
    print(f"  Query Type: {query_type}")
    print(f"  Intent: {intent}")
    
    state['query_type'] = query_type
    state['intent'] = intent
    state['messages'].append(HumanMessage(content=user_query))
    
    return state

print("‚úÖ Query Parser node created")

### Node 2: SQL Executor

Converts natural language to SQL and executes it.

In [None]:
def execute_sql_node(state: DataAnalystState) -> DataAnalystState:
    """Generate and execute SQL query"""
    print("\nüíæ Node: SQL Executor")
    
    intent = state['intent']
    schema = db.get_schema()
    
    prompt = f"""You are a SQL expert for a supply chain database.

    Database Schema:
    {schema}

    User wants: {intent}

    Generate a valid DuckDB SQL query. Rules:
    1. Use only tables in the schema
    2. Use clear column aliases
    3. Return ONLY the SQL query, no explanations or markdown
    4. Do not add LIMIT (it will be added automatically)

    SQL Query:
    """
    
    response = llm.invoke(prompt)
    sql_query = response.content.strip()
    
    # Clean SQL query
    sql_query = sql_query.replace('```sql', '').replace('```', '').strip()
    
    print(f"  Generated SQL:\n  {sql_query}")
    
    # Execute query
    results = db.execute_query(sql_query)
    
    if not results.empty:
        print(f"  ‚úì Query returned {len(results)} rows")
        state['sql_query'] = sql_query
        state['query_results'] = results
    else:
        print("  ‚ö†Ô∏è  Query returned no results")
        state['error'] = "SQL query returned no results"
    
    return state

print("‚úÖ SQL Executor node created")

### Node 3: Anomaly Detector

Detects statistical anomalies in the data.

In [None]:
from sklearn.ensemble import IsolationForest
import numpy as np

def detect_anomalies_node(state: DataAnalystState) -> DataAnalystState:
    """Detect anomalies in the data"""
    print("\n‚ö†Ô∏è  Node: Anomaly Detector")
    
    # For this demo, we'll detect late deliveries from the dataco dataset
    query = """
    SELECT 
        order_id,
        customer_city,
        days_for_shipping_real,
        days_for_shipment_scheduled,
        (days_for_shipping_real - days_for_shipment_scheduled) as delay,
        delivery_status,
        late_delivery_risk
    FROM dataco_supply_chain
    WHERE days_for_shipping_real IS NOT NULL
    """
    
    results = db.execute_query(query)
    
    anomalies = []
    
    if not results.empty and 'delay' in results.columns:
        # Find delays > 2 days
        late_orders = results[results['delay'] > 2]
        
        if len(late_orders) > 0:
            anomalies.append({
                'type': 'late_delivery',
                'count': len(late_orders),
                'avg_delay': float(late_orders['delay'].mean()),
                'max_delay': float(late_orders['delay'].max())
            })
            
            print(f"  ‚ö†Ô∏è  Found {len(late_orders)} late deliveries")
            print(f"     Average delay: {late_orders['delay'].mean():.1f} days")
    
    state['anomalies'] = anomalies
    state['query_results'] = results
    
    return state

print("‚úÖ Anomaly Detector node created")

### Node 4: Visualizer

Creates charts based on the data.

In [None]:
def visualize_node(state: DataAnalystState) -> DataAnalystState:
    """Generate visualizations"""
    print("\nüìä Node: Visualizer")
    
    results = state.get('query_results')
    charts = []
    
    if results is not None and not results.empty:
        # Determine chart type based on data
        if len(results.columns) >= 2:
            # Create a simple chart
            first_col = results.columns[0]
            second_col = results.columns[1]
            
            # If second column is numeric, create bar chart
            if pd.api.types.is_numeric_dtype(results[second_col]):
                fig = px.bar(
                    results.head(10),
                    x=first_col,
                    y=second_col,
                    title=f"{second_col} by {first_col}"
                )
                charts.append({
                    'type': 'bar',
                    'title': f"{second_col} by {first_col}",
                    'figure': fig
                })
                print(f"  ‚úì Created bar chart: {second_col} by {first_col}")
    
    state['charts'] = charts
    return state

print("‚úÖ Visualizer node created")

### Node 5: Response Generator

Synthesizes all findings into a human-readable response.

In [None]:
def generate_response_node(state: DataAnalystState) -> DataAnalystState:
    """Generate final response with insights"""
    print("\nüìù Node: Response Generator")
    
    query_results = state.get('query_results')
    anomalies = state.get('anomalies', [])
    sql_query = state.get('sql_query')
    charts = state.get('charts', [])
    
    # Prepare context for LLM
    context_parts = []
    
    if sql_query:
        context_parts.append(f"SQL Query Executed:\n{sql_query}")
    
    if query_results is not None and not query_results.empty:
        context_parts.append(f"\nQuery Results Summary:")
        context_parts.append(f"- Rows: {len(query_results)}")
        context_parts.append(f"- Columns: {', '.join(query_results.columns.tolist())}")
        context_parts.append(f"\nFirst few rows:\n{query_results.head(5).to_string()}")
    
    if anomalies:
        context_parts.append(f"\nAnomalies Detected: {len(anomalies)}")
        for anomaly in anomalies:
            context_parts.append(f"- {anomaly}")
    
    if charts:
        context_parts.append(f"\nVisualizations Created: {len(charts)} chart(s)")
    
    context = "\n".join(context_parts)
    
    prompt = f"""You are a data analyst providing insights from supply chain data.

User Query: {state['user_query']}

Analysis Results:
{context}

Generate a concise analysis report:
1. Key Findings (2-3 bullet points)
2. Notable Patterns or Trends
3. Recommendations (if applicable)

Use markdown formatting. Be specific with numbers.
"""
    
    response = llm.invoke(prompt)
    final_response = response.content
    
    print("  ‚úì Generated insights")
    
    state['final_response'] = final_response
    state['messages'].append(AIMessage(content=final_response))
    
    return state

print("‚úÖ Response Generator node created")

## Step 6: Build the LangGraph Workflow

In [None]:
def route_query(state: DataAnalystState) -> str:
    """Route to appropriate node based on query type"""
    query_type = state.get('query_type', 'sql')
    
    if query_type == 'anomaly':
        return 'detect_anomalies'
    elif query_type in ['sql', 'trend']:
        return 'execute_sql'
    elif query_type == 'visualization':
        return 'execute_sql'  # Need data first
    else:
        return 'execute_sql'

# Create the graph
workflow = StateGraph(DataAnalystState)

# Add nodes
workflow.add_node("parse_query", parse_query_node)
workflow.add_node("execute_sql", execute_sql_node)
workflow.add_node("detect_anomalies", detect_anomalies_node)
workflow.add_node("visualize", visualize_node)
workflow.add_node("generate_response", generate_response_node)

# Set entry point
workflow.set_entry_point("parse_query")

# Add conditional routing
workflow.add_conditional_edges(
    "parse_query",
    route_query,
    {
        "execute_sql": "execute_sql",
        "detect_anomalies": "detect_anomalies"
    }
)

# Add edges to visualizer
workflow.add_edge("execute_sql", "visualize")
workflow.add_edge("detect_anomalies", "visualize")

# Visualizer goes to response generator
workflow.add_edge("visualize", "generate_response")

# Response generator is the end
workflow.add_edge("generate_response", END)

# Compile the graph
agent = workflow.compile()

print("‚úÖ LangGraph workflow compiled!")

## Step 7: Create the Agent Interface

In [None]:
class DataAnalystAgent:
    """Data Analyst Agent with Opik tracking"""
    
    def __init__(self, graph, db_manager):
        self.graph = graph
        self.db = db_manager
    
    @track(name="data_analyst_query", project_name="omnisupply-data-analyst")
    def analyze(self, query: str) -> dict:
        """Analyze a user query"""
        print("="*80)
        print(f"ü§ñ Data Analyst Agent")
        print(f"üìù Query: {query}")
        print("="*80)
        
        # Initialize state
        initial_state = {
            "user_query": query,
            "messages": [],
            "insights": [],
            "anomalies": [],
            "trends": [],
            "charts": [],
            "query_type": None,
            "intent": None,
            "sql_query": None,
            "query_results": None,
            "final_response": None,
            "error": None
        }
        
        # Run the graph
        result = self.graph.invoke(
            initial_state,
            config={"callbacks": [OpikTracer()]}
        )
        
        return result

# Initialize agent
data_analyst = DataAnalystAgent(agent, db)
print("\n‚úÖ Data Analyst Agent ready!")

## Step 8: Test the Agent!

Let's try some example queries.

### Test 1: Simple SQL Query

In [None]:
result = data_analyst.analyze("What are the top 5 cities by number of orders?")

print("\n" + "="*80)
print("üìä RESULTS")
print("="*80)
print(result['final_response'])

if result.get('query_results') is not None:
    print("\nüìã Data:")
    display(result['query_results'].head(10))

if result.get('charts'):
    print("\nüìà Visualizations:")
    for chart in result['charts']:
        chart['figure'].show()

### Test 2: Anomaly Detection

In [None]:
result = data_analyst.analyze("Find anomalies in delivery times")

print("\n" + "="*80)
print("üìä RESULTS")
print("="*80)
print(result['final_response'])

if result.get('anomalies'):
    print("\n‚ö†Ô∏è  Anomalies Detected:")
    for anomaly in result['anomalies']:
        print(f"  - {anomaly}")

### Test 3: Trend Analysis

In [None]:
result = data_analyst.analyze("Show me the average sales per customer by shipping mode")

print("\n" + "="*80)
print("üìä RESULTS")
print("="*80)
print(result['final_response'])

if result.get('query_results') is not None:
    print("\nüìã Data:")
    display(result['query_results'])

if result.get('charts'):
    for chart in result['charts']:
        chart['figure'].show()

### Test 4: Custom Query

In [None]:
# Try your own query!
custom_query = "What is the average benefit per order for each payment type?"

result = data_analyst.analyze(custom_query)

print("\n" + "="*80)
print("üìä RESULTS")
print("="*80)
print(result['final_response'])

if result.get('query_results') is not None:
    display(result['query_results'])

if result.get('charts'):
    for chart in result['charts']:
        chart['figure'].show()

## Step 9: View Traces in Opik

All your queries are being tracked in Opik! 

Visit your Comet workspace to see:
- Full execution traces
- LLM calls and responses
- Token usage and costs
- Latency metrics

üîó [Open Comet Dashboard](https://www.comet.com)

## Step 10: Evaluate Agent Performance

In [None]:
from opik.evaluation import evaluate
from opik.evaluation.metrics import Hallucination, AnswerRelevance

# Create evaluation dataset
eval_dataset = [
    {
        'input': 'What are the top 5 cities by order count?',
        'expected_query_type': 'sql'
    },
    {
        'input': 'Find late deliveries',
        'expected_query_type': 'anomaly'
    },
    {
        'input': 'Show sales trends over time',
        'expected_query_type': 'trend'
    }
]

def evaluation_task(item):
    """Run agent on evaluation item"""
    result = data_analyst.analyze(item['input'])
    return {'output': result['final_response']}

# Run evaluation
print("\nüîç Running evaluation...")
evaluation_results = evaluate(
    dataset=eval_dataset,
    task=evaluation_task,
    scoring_metrics=[Hallucination(), AnswerRelevance()],
    experiment_name="data_analyst_evaluation"
)

print("\n‚úÖ Evaluation complete! Check Comet for results.")

## Summary

### What We Built

‚úÖ **Data Analyst Agent** with:
- Query parsing and intent classification
- SQL query generation and execution
- Anomaly detection
- Automatic visualizations
- Natural language insights
- Opik observability tracking

### Key Features

1. **Multi-step reasoning** with LangGraph
2. **SQL on DataFrames** with DuckDB
3. **Interactive charts** with Plotly
4. **Anomaly detection** with statistical methods
5. **Full observability** with Opik

### Next Steps

1. **Export to Python modules**: Move code from notebook to `src/` directory
2. **Add more nodes**: Forecasting, recommendations, etc.
3. **Improve prompts**: Fine-tune for better SQL generation
4. **Build other agents**: Risk Agent, Finance Agent, etc.
5. **Create multi-agent orchestration**: Connect all agents with LangGraph

### Files to Create Next

```
src/
‚îú‚îÄ‚îÄ agents/
‚îÇ   ‚îî‚îÄ‚îÄ data_analyst/
‚îÇ       ‚îú‚îÄ‚îÄ agent.py          (from this notebook)
‚îÇ       ‚îú‚îÄ‚îÄ nodes.py          (all node functions)
‚îÇ       ‚îî‚îÄ‚îÄ graph.py          (workflow definition)
‚îú‚îÄ‚îÄ data/
‚îÇ   ‚îî‚îÄ‚îÄ db_manager.py         (SupplyChainDBManager)
‚îî‚îÄ‚îÄ utils/
    ‚îî‚îÄ‚îÄ opik_tracker.py       (Opik integration)
```