In [1]:
import os
import re
import pandas as pd
import duckdb
import json
from datetime import datetime
from dotenv import load_dotenv
from typing import List, Dict, Annotated, Optional
from typing_extensions import TypedDict

# LangGraph imports
from langgraph.graph import START, END, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode # If multiple tool calls are requested, they will be run in parallel.
from langgraph.checkpoint.memory import InMemorySaver

# LangChain imports
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.tools import Tool
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain.schema import SystemMessage, HumanMessage

# Load environment variables
load_dotenv()

TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
GOOGLE_API_KEY = os.getenv("GEMINI_API_KEY")

llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", api_key=GOOGLE_API_KEY)
memory = InMemorySaver()

# Data file
TRANSACTION_DATA_FILE_PATH = 'C:\Langgraph\Store_Sales_Price_Elasticity_Promotions_Data.parquet'

  TRANSACTION_DATA_FILE_PATH = 'C:\Langgraph\Store_Sales_Price_Elasticity_Promotions_Data.parquet'


In [2]:
llm.invoke("Hello, world!")  # Test LLM connection

KeyboardInterrupt: 

In [1]:
# pip install google-genai
from google import genai
from google.genai import types

client = genai.Client()   # picks up GEMINI_API_KEY from env by default

# Disable "thinking" (speeds response)
resp = client.models.generate_content(
    model="gemini-2.5-flash",
    contents="Explain in one sentence what AI is.",
    config=types.GenerateContentConfig(
        thinking_config=types.ThinkingConfig(thinking_budget=0)  # disable thinking
    ),
)
print("response text:", resp.text)


ConnectError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1000)

In [2]:
import time
print(time.ctime())


Mon Sep 15 16:27:01 2025


In [3]:
import httpx
resp = httpx.get("https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent",
                 verify=False, timeout=20.0)
print(resp.status_code, resp.text[:200])


404 


In [2]:
# Global variable to store our database connection
# This makes it easy to access from any function
db_connection = None

def initialize_database():
    """
    Create a DuckDB connection that we'll use throughout our program
    This is like opening a notebook where we'll store all our data
    """
    global db_connection
    
    print("🚀 Starting database...")
    
    # Create a connection to DuckDB (in-memory database)
    # Think of this like creating a super-fast Excel in computer memory
    db_connection = duckdb.connect(':memory:')
    
    print("✅ Database ready!")
    return db_connection

def load_file_to_duckdb(file_path: str, table_name: str = "sales_data"):
    """
    Load a parquet or excel file into DuckDB
    
    Args:
        file_path: Where the file is located on your computer
        table_name: What name to give the table in our database
    
    Returns:
        Dictionary with information about what happened
    """
    global db_connection
    
    # Check if database is initialized
    if db_connection is None:
        return {"success": False, "error": "Database not initialized! Call initialize_database() first"}
    
    # Check if file exists
    if not os.path.exists(file_path):
        return {"success": False, "error": f"File not found: {file_path}"}
    
    try:
        print(f"📁 Loading file: {file_path}")
        
        # Get file extension to know how to read it
        file_extension = file_path.lower().split('.')[-1]
        
        if file_extension == 'parquet':
            # For parquet files, DuckDB can read directly - super fast!
            print("📊 Detected Parquet file")
            
            # Create table directly from parquet file
            db_connection.execute(f"""
                CREATE OR REPLACE TABLE {table_name} AS 
                SELECT * FROM read_parquet('{file_path}')
            """)
            
        elif file_extension in ['xlsx', 'xls']:
            # For Excel files, we need pandas to help us
            print("📊 Detected Excel file")
            
            # Read Excel file using pandas
            df = pd.read_excel(file_path)
            
            # Register the dataframe as a table in DuckDB
            db_connection.register('temp_df', df)
            
            # Create permanent table from the temporary dataframe
            db_connection.execute(f"""
                CREATE OR REPLACE TABLE {table_name} AS 
                SELECT * FROM temp_df
            """)
            
            # Clean up temporary registration
            db_connection.unregister('temp_df')
            
        else:
            return {"success": False, "error": f"Unsupported file type: {file_extension}"}
        
        # Get information about the loaded table
        table_info = analyze_table_schema(table_name)
        
        print(f"✅ Successfully loaded data into table: {table_name}")
        
        return {
            "success": True,
            "table_name": table_name,
            "file_path": file_path,
            "schema": table_info
        }
        
    except Exception as e:
        return {"success": False, "error": f"Error loading file: {str(e)}"}

def analyze_table_schema(table_name: str):
    """
    Analyze the table to understand its structure and data types
    This helps us know what columns we have and what kind of data is in them
    
    Args:
        table_name: Name of the table to analyze
    
    Returns:
        Dictionary with table information
    """
    global db_connection
    
    try:
        # Get column information
        schema_query = f"""
            SELECT 
                column_name,
                data_type,
                is_nullable
            FROM information_schema.columns
            WHERE table_name = '{table_name}'
        """
        
        schema_df = db_connection.execute(schema_query).df()
        
        # Get row count
        row_count = db_connection.execute(f"SELECT COUNT(*) as count FROM {table_name}").fetchone()[0]
        
        # Get sample data (first 5 rows)
        sample_data = db_connection.execute(f"SELECT * FROM {table_name} LIMIT 5").df()
        
        # Get basic statistics for numeric columns
        numeric_columns = schema_df[schema_df['data_type'].str.contains('INT|FLOAT|DOUBLE|DECIMAL', case=False)]['column_name'].tolist()
        
        stats = {}
        for col in numeric_columns:
            try:
                stat_query = f"""
                    SELECT 
                        MIN({col}) as min_val,
                        MAX({col}) as max_val,
                        AVG({col}) as avg_val,
                        COUNT(DISTINCT {col}) as unique_count
                    FROM {table_name}
                    WHERE {col} IS NOT NULL
                """
                stat_result = db_connection.execute(stat_query).fetchone()
                stats[col] = {
                    "min": stat_result[0],
                    "max": stat_result[1],
                    "average": stat_result[2],
                    "unique_values": stat_result[3]
                }
            except:
                pass
        
        return {
            "columns": schema_df.to_dict('records'),
            "row_count": row_count,
            "numeric_stats": stats,
            "sample_data": sample_data.head().to_dict('records')
        }
        
    except Exception as e:
        return {"error": f"Error analyzing schema: {str(e)}"}

def test_database_operations():
    """
    Test function to make sure everything is working
    """
    print("\n" + "="*50)
    print("🧪 TESTING DATABASE OPERATIONS")
    print("="*50 + "\n")
    
    # Step 1: Initialize database
    db = initialize_database()
    
    # Step 2: Load the file
    result = load_file_to_duckdb(TRANSACTION_DATA_FILE_PATH)
    
    if result["success"]:
        print("\n✅ File loaded successfully!")
        print(f"\n📊 Table Schema:")
        print("-" * 30)
        
        # Show column information
        for col in result["schema"]["columns"]:
            print(f"  • {col['column_name']}: {col['data_type']}")
        
        print(f"\n📈 Total rows: {result['schema']['row_count']:,}")
        
        # Show statistics for numeric columns
        if result["schema"]["numeric_stats"]:
            print(f"\n📊 Numeric Column Statistics:")
            print("-" * 30)
            for col, stats in result["schema"]["numeric_stats"].items():
                print(f"\n  {col}:")
                print(f"    - Min: {stats['min']}")
                print(f"    - Max: {stats['max']}")
                print(f"    - Average: {stats['average']:.2f}")
                print(f"    - Unique values: {stats['unique_values']}")
        
        # Test a simple query
        print("\n🔍 Testing a simple query...")
        test_query = "SELECT COUNT(*) as total_records FROM sales_data"
        result = db_connection.execute(test_query).fetchone()
        print(f"✅ Query successful! Total records: {result[0]:,}")
        
    else:
        print(f"\n❌ Error: {result['error']}")

# Run the test
if __name__ == "__main__":
    test_database_operations()


🧪 TESTING DATABASE OPERATIONS

🚀 Starting database...
✅ Database ready!
📁 Loading file: C:\Langgraph\Store_Sales_Price_Elasticity_Promotions_Data.parquet
📊 Detected Parquet file
✅ Successfully loaded data into table: sales_data

✅ File loaded successfully!

📊 Table Schema:
------------------------------
  • Store_Number: SMALLINT
  • SKU_Coded: INTEGER
  • Product_Class_Code: SMALLINT
  • Sold_Date: DATE
  • Qty_Sold: SMALLINT
  • Total_Sale_Value: FLOAT
  • On_Promo: TINYINT

📈 Total rows: 697,894

📊 Numeric Column Statistics:
------------------------------

  Store_Number:
    - Min: 330
    - Max: 4840
    - Average: 2322.06
    - Unique values: 35

  SKU_Coded:
    - Min: 6172800
    - Max: 6205900
    - Average: 6188239.92
    - Unique values: 659

  Product_Class_Code:
    - Min: 22800
    - Max: 24425
    - Average: 23284.50
    - Unique values: 11

  Qty_Sold:
    - Min: 1
    - Max: 239
    - Average: 1.48
    - Unique values: 128

  Total_Sale_Value:
    - Min: 0.00999999977

In [5]:
# Add these imports to your existing imports
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
import json
from datetime import datetime

# ===== STEP 2: ROUTER AND TOOLS SETUP =====

# Define the state that will be passed between nodes in our graph
class AgentState(TypedDict):
    """
    This is like a notebook that gets passed around between different parts of our agent
    Each part can read from it and write to it
    """
    messages: Annotated[List, add_messages]  # Stores all conversation messages
    current_tool: Optional[str]  # Which tool should be used
    query_result: Optional[Dict]  # Results from database queries
    analysis_result: Optional[Dict]  # Results from analysis
    visualization_code: Optional[str]  # Generated visualization code
    error: Optional[str]  # Any errors that occur

# ===== TOOL 1: SQL QUERY TOOL =====
@tool
def sql_query_tool(query: str) -> str:
    """
    Execute SQL queries on the sales data
    
    Args:
        query: SQL query to run on the sales_data table
        
    Returns:
        Query results as a formatted string
    """
    global db_connection
    
    print(f"\n🔍 Executing SQL Query: {query}")
    
    try:
        # Execute the query
        result = db_connection.execute(query).df()
        
        # If result is too large, show only first 100 rows
        if len(result) > 100:
            print(f"⚠️ Large result set ({len(result)} rows). Showing first 100 rows.")
            result = result.head(100)
        
        # Convert to string format for display
        result_str = f"Query returned {len(result)} rows:\n\n"
        result_str += result.to_string()
        
        return result_str
        
    except Exception as e:
        error_msg = f"❌ SQL Error: {str(e)}"
        print(error_msg)
        return error_msg

# ===== TOOL 2: DATA ANALYSIS TOOL =====
@tool
def data_analysis_tool(analysis_type: str, column: str = None, groupby: str = None) -> str:
    """
    Perform various data analyses on the sales data
    
    Args:
        analysis_type: Type of analysis - 'summary', 'top_products', 'sales_trend', 'promotion_impact'
        column: Column to analyze (optional)
        groupby: Column to group by (optional)
        
    Returns:
        Analysis results as a formatted string
    """
    global db_connection
    
    print(f"\n📊 Performing analysis: {analysis_type}")
    
    try:
        if analysis_type == "summary":
            # Overall sales summary
            query = """
                SELECT 
                    COUNT(*) as total_transactions,
                    COUNT(DISTINCT Store_Number) as total_stores,
                    COUNT(DISTINCT SKU_Coded) as total_products,
                    SUM(Qty_Sold) as total_quantity,
                    SUM(Total_Sale_Value) as total_revenue,
                    AVG(Total_Sale_Value) as avg_sale_value,
                    MIN(Sold_Date) as first_date,
                    MAX(Sold_Date) as last_date
                FROM sales_data
            """
            result = db_connection.execute(query).df()
            
            return f"""
📈 SALES SUMMARY:
- Total Transactions: {result['total_transactions'][0]:,}
- Total Stores: {result['total_stores'][0]}
- Total Products: {result['total_products'][0]:,}
- Total Quantity Sold: {result['total_quantity'][0]:,}
- Total Revenue: ${result['total_revenue'][0]:,.2f}
- Average Sale Value: ${result['avg_sale_value'][0]:.2f}
- Date Range: {result['first_date'][0]} to {result['last_date'][0]}
"""
        
        elif analysis_type == "top_products":
            # Top selling products
            query = """
                SELECT 
                    SKU_Coded,
                    SUM(Qty_Sold) as total_quantity,
                    SUM(Total_Sale_Value) as total_revenue,
                    COUNT(*) as transaction_count
                FROM sales_data
                GROUP BY SKU_Coded
                ORDER BY total_revenue DESC
                LIMIT 10
            """
            result = db_connection.execute(query).df()
            
            output = "🏆 TOP 10 PRODUCTS BY REVENUE:\n"
            output += "-" * 50 + "\n"
            for i, row in result.iterrows():
                output += f"{i+1}. SKU {row['SKU_Coded']}: ${row['total_revenue']:,.2f} ({row['total_quantity']:,} units)\n"
            
            return output
        
        elif analysis_type == "sales_trend":
            # Monthly sales trend
            query = """
                SELECT 
                    DATE_TRUNC('month', Sold_Date) as month,
                    SUM(Total_Sale_Value) as monthly_revenue,
                    SUM(Qty_Sold) as monthly_quantity,
                    COUNT(*) as transaction_count
                FROM sales_data
                GROUP BY month
                ORDER BY month
            """
            result = db_connection.execute(query).df()
            
            output = "📅 MONTHLY SALES TREND:\n"
            output += "-" * 50 + "\n"
            for _, row in result.iterrows():
                month_str = row['month'].strftime('%Y-%m')
                output += f"{month_str}: ${row['monthly_revenue']:,.2f} ({row['monthly_quantity']:,} units)\n"
            
            return output
        
        elif analysis_type == "promotion_impact":
            # Analyze promotion effectiveness
            query = """
                SELECT 
                    On_Promo,
                    COUNT(*) as transaction_count,
                    SUM(Total_Sale_Value) as total_revenue,
                    AVG(Total_Sale_Value) as avg_sale_value,
                    SUM(Qty_Sold) as total_quantity
                FROM sales_data
                GROUP BY On_Promo
            """
            result = db_connection.execute(query).df()
            
            promo_data = result[result['On_Promo'] == 1].iloc[0]
            no_promo_data = result[result['On_Promo'] == 0].iloc[0]
            
            revenue_lift = ((promo_data['avg_sale_value'] - no_promo_data['avg_sale_value']) / no_promo_data['avg_sale_value']) * 100
            
            return f"""
🎯 PROMOTION IMPACT ANALYSIS:

WITH PROMOTION:
- Transactions: {promo_data['transaction_count']:,}
- Total Revenue: ${promo_data['total_revenue']:,.2f}
- Average Sale: ${promo_data['avg_sale_value']:.2f}

WITHOUT PROMOTION:
- Transactions: {no_promo_data['transaction_count']:,}
- Total Revenue: ${no_promo_data['total_revenue']:,.2f}
- Average Sale: ${no_promo_data['avg_sale_value']:.2f}

📊 Revenue Lift from Promotions: {revenue_lift:.1f}%
"""
        
        else:
            return f"❌ Unknown analysis type: {analysis_type}. Available types: summary, top_products, sales_trend, promotion_impact"
            
    except Exception as e:
        error_msg = f"❌ Analysis Error: {str(e)}"
        print(error_msg)
        return error_msg

# ===== TOOL 3: VISUALIZATION CODE GENERATOR =====
@tool
def visualization_generator(chart_type: str, query: str, title: str = "Sales Analysis") -> str:
    """
    Generate Python code for creating visualizations
    
    Args:
        chart_type: Type of chart - 'bar', 'line', 'pie', 'scatter'
        query: SQL query to get the data
        title: Title for the chart
        
    Returns:
        Python code to create the visualization
    """
    print(f"\n🎨 Generating {chart_type} chart code...")
    
    # Template for different chart types
    code_templates = {
        "bar": """
import pandas as pd
import matplotlib.pyplot as plt
import duckdb

# Connect to database and get data
conn = duckdb.connect(':memory:')
# Note: In real usage, load your data first

# Execute query
query = "{query}"
df = conn.execute(query).df()

# Create bar chart
plt.figure(figsize=(10, 6))
plt.bar(df.iloc[:, 0], df.iloc[:, 1])
plt.title("{title}")
plt.xlabel(df.columns[0])
plt.ylabel(df.columns[1])
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
""",
        "line": """
import pandas as pd
import matplotlib.pyplot as plt
import duckdb

# Connect to database and get data
conn = duckdb.connect(':memory:')
# Note: In real usage, load your data first

# Execute query
query = "{query}"
df = conn.execute(query).df()

# Create line chart
plt.figure(figsize=(12, 6))
plt.plot(df.iloc[:, 0], df.iloc[:, 1], marker='o')
plt.title("{title}")
plt.xlabel(df.columns[0])
plt.ylabel(df.columns[1])
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
""",
        "pie": """
import pandas as pd
import matplotlib.pyplot as plt
import duckdb

# Connect to database and get data
conn = duckdb.connect(':memory:')
# Note: In real usage, load your data first

# Execute query
query = "{query}"
df = conn.execute(query).df()

# Create pie chart
plt.figure(figsize=(10, 8))
plt.pie(df.iloc[:, 1], labels=df.iloc[:, 0], autopct='%1.1f%%')
plt.title("{title}")
plt.axis('equal')
plt.tight_layout()
plt.show()
"""
    }
    
    if chart_type not in code_templates:
        return f"❌ Unknown chart type: {chart_type}. Available types: bar, line, pie"
    
    # Generate the code
    code = code_templates[chart_type].format(query=query, title=title)
    
    return f"📊 Generated {chart_type} chart code:\n\n```python\n{code}\n```"

# ===== ROUTER FUNCTION =====
def route_user_request(state: AgentState) -> str:
    """
    Decide which tool to use based on the user's message
    This is like a smart receptionist that knows which department to send you to
    """
    messages = state["messages"]
    last_message = messages[-1].content if messages else ""
    
    print(f"\n🤖 Router analyzing request: {last_message[:100]}...")
    
    # Create a prompt for the LLM to classify the intent
    router_prompt = f"""
    You are a router that decides which tool to use for a data analysis request.
    
    Available tools:
    1. sql_query - For direct SQL queries or when user wants specific data
    2. data_analysis - For analysis like summaries, trends, top products, promotion analysis
    3. visualization - For creating charts and graphs
    
    User request: {last_message}
    
    Respond with ONLY the tool name (sql_query, data_analysis, or visualization).
    """
    
    # Get LLM decision
    response = llm.invoke([SystemMessage(content=router_prompt)])
    tool_choice = response.content.strip().lower()
    
    print(f"✅ Router decision: {tool_choice}")
    
    # Map the response to actual tool names
    if "sql" in tool_choice:
        return "sql_query"
    elif "analysis" in tool_choice or "analyze" in tool_choice:
        return "data_analysis"
    elif "visual" in tool_choice or "chart" in tool_choice or "graph" in tool_choice:
        return "visualization"
    else:
        # Default to data analysis
        return "data_analysis"

# ===== TEST THE ROUTER AND TOOLS =====
def test_tools_and_router():
    """
    Test our tools and router to make sure they work
    """
    print("\n" + "="*50)
    print("🧪 TESTING ROUTER AND TOOLS")
    print("="*50 + "\n")
    
    # Make sure database is initialized
    if db_connection is None:
        initialize_database()
        load_file_to_duckdb(TRANSACTION_DATA_FILE_PATH)
    
    # Test 1: SQL Query Tool
    print("\n1️⃣ Testing SQL Query Tool:")
    result = sql_query_tool.invoke("SELECT COUNT(*) as count, SUM(Total_Sale_Value) as revenue FROM sales_data")
    print(result)
    
    # Test 2: Data Analysis Tool
    print("\n2️⃣ Testing Data Analysis Tool:")
    result = data_analysis_tool.invoke({
        "analysis_type": "summary"
    })
    print(result)
    
    # Test 3: Visualization Generator
    print("\n3️⃣ Testing Visualization Generator:")
    result = visualization_generator.invoke({
        "chart_type": "bar",
        "query": "SELECT Store_Number, SUM(Total_Sale_Value) as revenue FROM sales_data GROUP BY Store_Number ORDER BY revenue DESC LIMIT 10",
        "title": "Top 10 Stores by Revenue"
    })
    print(result)
    
    # Test 4: Router
    print("\n4️⃣ Testing Router:")
    
    # Create test state with different requests
    test_requests = [
        "Show me total sales by store",
        "Analyze the promotion effectiveness",
        "Create a line chart of monthly sales"
    ]
    
    for request in test_requests:
        test_state = AgentState(
            messages=[HumanMessage(content=request)],
            current_tool=None,
            query_result=None,
            analysis_result=None,
            visualization_code=None,
            error=None
        )
        
        tool = route_user_request(test_state)
        print(f"Request: '{request}' → Tool: {tool}")

# Run the test
if __name__ == "__main__":
    test_tools_and_router()


🧪 TESTING ROUTER AND TOOLS


1️⃣ Testing SQL Query Tool:

🔍 Executing SQL Query: SELECT COUNT(*) as count, SUM(Total_Sale_Value) as revenue FROM sales_data
Query returned 1 rows:

    count       revenue
0  697894  1.327264e+07

2️⃣ Testing Data Analysis Tool:

📊 Performing analysis: summary

📈 SALES SUMMARY:
- Total Transactions: 697,894
- Total Stores: 35
- Total Products: 659
- Total Quantity Sold: 1,036,049.0
- Total Revenue: $13,272,640.41
- Average Sale Value: $19.02
- Date Range: 2021-11-01 00:00:00 to 2024-03-31 00:00:00


3️⃣ Testing Visualization Generator:

🎨 Generating bar chart code...
📊 Generated bar chart code:

```python

import pandas as pd
import matplotlib.pyplot as plt
import duckdb

# Connect to database and get data
conn = duckdb.connect(':memory:')
# Note: In real usage, load your data first

# Execute query
query = "SELECT Store_Number, SUM(Total_Sale_Value) as revenue FROM sales_data GROUP BY Store_Number ORDER BY revenue DESC LIMIT 10"
df = conn.execute(query

KeyboardInterrupt: 