<a href="https://www.kaggle.com/code/entelpi/smartsql-ai-powered-database-analyst?scriptVersionId=292533535" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

This script acts as an autonomous Data Analyst agent. It connects to a PostgreSQL database, accepts natural language questions (in Turkish), generates SQL queries using a local LLM (Ollama), executes them, and provides reports with visualizations.

FEATURES:
    - Text-to-SQL conversion using local LLMs (via Ollama).
    - Self-Correction Mechanism: Retries and fixes SQL queries if execution fails.
    - Dynamic Schema Extraction: Automatically understands database structure.
    - Few-Shot Learning: Remembers successful queries to improve future accuracy.
    - Dual Interface: Works via Console (CLI) or Telegram Bot.
    - Automated Visualization: Generates Line, Bar, or Pie charts based on data type.

Note: Large Language Models were assisted in refactoring the code structure and enriching the documentation comments for better readability.

In [1]:
import subprocess
import time
import os

!sudo apt-get update && sudo apt-get install -y zstd

!curl -fsSL https://ollama.com/install.sh | sh

ollama_path = "/usr/local/bin/ollama"

if not os.path.exists(ollama_path):
    ollama_path = "/usr/bin/ollama"

process = subprocess.Popen([ollama_path, "serve"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

time.sleep(5)

!ollama pull qwen3-coder:30b

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 https://cli.github.com/packages stable InRelease [3,917 B]               
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]           
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease                         
Get:6 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]      
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]        
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [2,302 kB]
Get:9 https://cli.github.com/packages stable/main amd64 Packages [354 B]       
Get:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [9,619 kB] 
Get:12 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ub

In [2]:
!pip install ollama

Collecting ollama
  Downloading ollama-0.6.1-py3-none-any.whl.metadata (4.3 kB)
Downloading ollama-0.6.1-py3-none-any.whl (14 kB)
Installing collected packages: ollama
Successfully installed ollama-0.6.1


In [3]:
import logging
import io
import sys
import pandas as pd
import ollama
from sqlalchemy import create_engine, text, inspect
from matplotlib.figure import Figure
import matplotlib


matplotlib.use('Agg')

In [4]:
import subprocess
import time
from sqlalchemy import create_engine, text

print("🛠️ Installing and starting PostgreSQL...")
!sudo apt-get update > /dev/null
!sudo apt-get install -y postgresql postgresql-contrib > /dev/null
!service postgresql start

print("🔑 Setting user password...")
!sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'PASSWORD';"

print("📊 Loading mock data (Sales, Products, Users)...")

DB_URI = 'postgresql+psycopg2://postgres:PASSWORD@localhost/postgres'
setup_engine = create_engine(DB_URI)

with setup_engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS sales;"))
    conn.execute(text("DROP TABLE IF EXISTS products;"))
    conn.execute(text("DROP TABLE IF EXISTS users;"))
    
    # Table: Users
    conn.execute(text("""
        CREATE TABLE users (
            user_id SERIAL PRIMARY KEY,
            name VARCHAR(100),
            signup_date DATE,
            country VARCHAR(50)
        );
    """))
    
    # Table: Products
    conn.execute(text("""
        CREATE TABLE products (
            product_id SERIAL PRIMARY KEY,
            product_name VARCHAR(100),
            category VARCHAR(50),
            price NUMERIC(10, 2)
        );
    """))
    
    # Table: Sales
    conn.execute(text("""
        CREATE TABLE sales (
            sale_id SERIAL PRIMARY KEY,
            user_id INT,
            product_id INT,
            sale_date DATE,
            quantity INT,
            total_amount NUMERIC(10, 2),
            FOREIGN KEY (user_id) REFERENCES users(user_id),
            FOREIGN KEY (product_id) REFERENCES products(product_id)
        );
    """))
    
    # Insert Data (Users) - Internationalized Names
    conn.execute(text("""
        INSERT INTO users (name, signup_date, country) VALUES 
        ('Alice Smith', '2023-01-15', 'USA'),
        ('John Doe', '2023-02-10', 'UK'),
        ('Marco Polo', '2023-03-05', 'Italy'),
        ('Hans Muller', '2023-03-20', 'Germany');
    """))
    
    # Insert Data (Products)
    conn.execute(text("""
        INSERT INTO products (product_name, category, price) VALUES 
        ('Laptop Pro', 'Electronics', 1500.00),
        ('Wireless Mouse', 'Electronics', 25.50),
        ('Coffee Maker', 'Home', 85.00),
        ('Gaming Chair', 'Furniture', 200.00);
    """))
    
    # Insert Data (Sales)
    conn.execute(text("""
        INSERT INTO sales (user_id, product_id, sale_date, quantity, total_amount) VALUES 
        (1, 1, '2023-06-01', 1, 1500.00),
        (1, 2, '2023-06-01', 2, 51.00),
        (2, 3, '2023-06-05', 1, 85.00),
        (3, 4, '2023-07-10', 1, 200.00),
        (4, 1, '2023-08-15', 2, 3000.00);
    """))

🛠️ Installing and starting PostgreSQL...
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 78, <> line 16.)
debconf: falling back to frontend: Readline
 * Starting PostgreSQL 14 database server
   ...done.
🔑 Setting user password...
ALTER ROLE
📊 Loading mock data (Sales, Products, Users)...


# CONFIGURATION

In [5]:
BOT_TOKEN = "TELEGRAM_BOT_TOKEN_HERE"
ALLOWED_USER_ID = [123456789]  # List of Telegram User IDs allowed to interact
DB_URI = 'postgresql+psycopg2://postgres:PASSWORD@localhost/postgres'
MODEL_NAME = "qwen3-coder:30b"

In [6]:
# Whitelist specific tables to restrict LLM access
ALLOWED_TABLES = [] 

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)

# Initialize Database Engine
engine = create_engine(DB_URI, pool_pre_ping=True)

In [7]:
def init_memory_db():
    try:
        with engine.begin() as conn:
            conn.execute(text("""
                CREATE TABLE IF NOT EXISTS sql_examples (
                    id SERIAL PRIMARY KEY,
                    question TEXT UNIQUE,
                    sql_query TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            """))
    except Exception as e:
        logging.error(f"Memory DB Init Error: {e}")

In [8]:
def get_available_tables():
    """
    Scans the database and returns a list of all tables in non-system schemas.
    Returns:
        list: List of 'schema.table_name' strings.
    """
    try:
        inspector = inspect(engine)
        all_tables = []
        system_schemas = ['information_schema', 'pg_catalog', 'pg_toast']
        schemas = inspector.get_schema_names()
        for schema in schemas:
            if schema in system_schemas:
                continue
            tables = inspector.get_table_names(schema=schema)
            for table in tables:
                full_name = f"{schema}.{table}"
                all_tables.append(full_name)
        return all_tables
    except Exception as e:
        logging.error(f"Error fetching table list: {e}")
        return []

In [9]:
def get_dynamic_schema():
    """
    Constructs a text representation of the database schema (Tables & Columns).
    This text is injected into the LLM prompt context.
    
    Returns:
        str: Formatted string describing table structures.
    """
    schema_text = ""
    inspector = inspect(engine)
    
    available = get_available_tables()
    
    # Filter tables if whitelist is provided
    tables_to_scan = available
    if ALLOWED_TABLES:
        filtered = []
        for t in available:
            for allowed in ALLOWED_TABLES:
                if t == allowed or t.endswith(f".{allowed}"):
                    filtered.append(t)
        tables_to_scan = list(set(filtered))
    for full_table_name in tables_to_scan:
        if 'sql_examples' in full_table_name: continue
        try:
            if '.' in full_table_name:
                schema_name, table_name = full_table_name.split('.', 1)
            else:
                schema_name, table_name = 'public', full_table_name
        except:
            continue
        try:
            columns = inspector.get_columns(table_name, schema=schema_name)
            col_list = []
            for col in columns:
                col_def = f"{col['name']} ({col['type']})"
                if col.get('comment'):
                    col_def += f" -> {col['comment']}"
                col_list.append(col_def)
            schema_text += f"Table: {full_table_name} (Schema: {schema_name})\n  Columns:\n    - " + "\n    - ".join(col_list) + "\n\n"
        except Exception as e:
            logging.error(f"Could not read {full_table_name}: {e}")
    return schema_text

In [10]:
def get_few_shot_examples(limit=3):
    try:
        query = "SELECT question, sql_query FROM sql_examples ORDER BY created_at DESC LIMIT :limit"
        
        with engine.connect() as conn:
            result = conn.execute(text(query), {"limit": limit})
            df = pd.DataFrame(result.fetchall(), columns=result.keys())
            
        if df.empty: return ""
        
        examples = "Correct Examples (Reference):\n"
        for _, row in df.iterrows():
            examples += f"Question: {row['question']}\nSQL: {row['sql_query']}\n---\n"
        return examples
    except Exception as e:
        return ""

In [11]:
def generate_sql_with_ollama(user_question):
    """
    Main logic for Text-to-SQL generation.
    1. Prepares context (Schema + Examples).
    2. Sends prompt to Ollama.
    3. Validates SQL using 'EXPLAIN'.
    4. Implements Retry/Self-Correction mechanism on error.
    """
    schema = get_dynamic_schema()
    examples = get_few_shot_examples()
    
    # System Prompt (Turkish instructions for the LLM)
    system_prompt = f"""You are a PostgreSQL expert Data Analyst.
    
    TASK:
    Write a single valid SQL query based on the given schema and question.
    
    RULES:
    1. Return ONLY the SQL code. No Markdown (```sql), no explanations.
    2. ALWAYS use schema names with table names (e.g., "public.users", "sales.orders").
    3. Strictly use table/column names from the SCHEMA below.
    
    CURRENT SCHEMA:
    {schema}
    
    {examples}
    """
    
    conversation = [{'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': user_question}]
    
    # Retry loop (Max 3 attempts)
    for attempt in range(3):
        try:
            response = ollama.chat(model=MODEL_NAME, messages=conversation)
            sql = response['message']['content'].strip().replace("```sql", "").replace("```", "").strip()
            
            # Validation step: Try to explain the query without executing
            with engine.connect() as conn:
                conn.execute(text(f"EXPLAIN {sql}"))
            
            return sql
            
        except Exception as e:
            error_msg = str(e).split('\n')[0]
            logging.warning(f"Attempt {attempt+1} Failed. Error: {error_msg}")
            
            # Feed the error back to LLM for self-correction
            conversation.append({'role': 'assistant', 'content': sql})
            conversation.append({'role': 'user', 'content': f"This SQL threw an error: {error_msg}. Please fix it paying attention to schema names."})
            
    return None

In [12]:
def determine_chart_type(df):
    """
    Heuristic to decide the best visualization type based on data shape/types.
    Returns: 'line', 'pie', 'bar', or None.
    """
    if df.empty or len(df.columns) < 2: return None
    
    x_col = df.columns[0]
    
    # If X-axis is datetime -> Line Chart
    if pd.api.types.is_datetime64_any_dtype(df[x_col]): return 'line'
    try:
        pd.to_datetime(df[x_col], format='%Y-%m-%d', errors='raise')
        return 'line'
    except: pass

    # If few categories -> Pie Chart
    if len(df) <= 5: return 'pie'
    
    # Default -> Bar Chart
    return 'bar'

In [13]:
def create_smart_chart(df):
    """
    Generates a matplotlib chart in memory (BytesIO) for sending via Telegram.
    """
    if df.empty or len(df.columns) < 2: return None
    
    chart_type = determine_chart_type(df)
    if not chart_type: return None

    fig = Figure(figsize=(10, 6))
    ax = fig.subplots()
    
    x_col = df.columns[0]
    numeric_cols = df.select_dtypes(include=['number']).columns
    if len(numeric_cols) == 0: return None
    y_col = numeric_cols[0]

    try:
        if chart_type == 'line':
            df_plot = df.copy()
            if not pd.api.types.is_datetime64_any_dtype(df_plot[x_col]):
                try: df_plot[x_col] = pd.to_datetime(df_plot[x_col])
                except: pass
            ax.plot(df_plot[x_col], df_plot[y_col], marker='o', linestyle='-', color='tab:blue')
            ax.set_title(f"Trend Analysis: {y_col}")
            fig.autofmt_xdate()
        elif chart_type == 'pie':
            ax.pie(df[y_col], labels=df[x_col], autopct='%1.1f%%', startangle=90)
            ax.set_title(f"Distribution: {y_col}")
        else:
            plot_df = df.head(15) # Limit to top 15 for readability
            ax.bar(plot_df[x_col].astype(str), plot_df[y_col], color='steelblue')
            ax.set_title(f"Comparison: {y_col}")
            for tick in ax.get_xticklabels():
                tick.set_rotation(45)
                tick.set_ha('right')

        img_buffer = io.BytesIO()
        fig.tight_layout()
        fig.savefig(img_buffer, format='png')
        img_buffer.seek(0)
        return img_buffer

    except Exception as e:
        logging.error(f"Chart Generation Error: {e}")
        return None

In [14]:
def process_query_pipeline(question):
    result = {"sql": None, "df": None, "summary": "", "chart": None, "excel": None, "error": None}
    
    try:
        sql = generate_sql_with_ollama(question)
        if not sql:
            result["error"] = "Could not generate valid SQL. Please try simplifying the question."
            return result
        
        result["sql"] = sql
        
        try:
            with engine.connect() as conn:
                cursor_result = conn.execute(text(sql))
                rows = cursor_result.fetchall()
                columns = cursor_result.keys()
                
                df = pd.DataFrame(rows, columns=columns)
            result["df"] = df
        except Exception as e:
            result["error"] = f"SQL Execution Error: {str(e)}"
            return result
        
        try:
            with engine.begin() as conn:
                conn.execute(
                    text("INSERT INTO sql_examples (question, sql_query) VALUES (:q, :s) ON CONFLICT DO NOTHING"), 
                    {"q": question, "s": sql}
                )
        except Exception as mem_e: 
            logging.warning(f"Could not save to memory: {mem_e}")

        if df.empty:
            result["summary"] = f"✅ Query executed successfully but returned no data.\nSQL: `{sql}`"
            return result
            
        summary = f"📊 **Analysis Result**\n\nSQL: `{sql}`\nRow Count: {len(df)}\n\n{df.head().to_string(index=False)}"
        result["summary"] = summary
        result["chart"] = create_smart_chart(df)
        
        excel_buffer = io.BytesIO()
        with pd.ExcelWriter(excel_buffer, engine='openpyxl') as writer:
            df.to_excel(writer, index=False, sheet_name='Report Data')
            
            info_data = {
                'Parameter': ['User Question', 'Executed SQL', 'Row Count', 'Model'],
                'Value': [question, sql, len(df), MODEL_NAME]
            }
            df_info = pd.DataFrame(info_data)
            df_info.to_excel(writer, index=False, sheet_name='Query Info')
            
        excel_buffer.seek(0)
        result["excel"] = excel_buffer
        
    except Exception as e:
        result["error"] = str(e)
        
    return result

In [15]:
def run_console_mode():
    print(f"\n--- CONSOLE MODE ({MODEL_NAME}) ---")
    print("Type 'exit' to quit.\n")
    
    while True:
        try:
            q = input("Question: ")
            if q.lower() in ['exit', 'q', 'quit']: break
            if not q.strip(): continue
            
            print("⏳ Analyzing...")
            res = process_query_pipeline(q)
            
            if res["error"]:
                print(f"❌ ERROR: {res['error']}")
            else:
                print("\n" + "="*40)
                print(res["summary"])
                print("="*40)
                
                if res["excel"]:
                    with open("report.xlsx", "wb") as f:
                        f.write(res["excel"].getbuffer())
                    print("✅ 'report.xlsx' saved.")
                
                if res["chart"]:
                    with open("chart.png", "wb") as f:
                        f.write(res["chart"].getbuffer())
                    print("✅ 'chart.png' saved.")
                    
        except KeyboardInterrupt: break
    print("Exited.")

In [16]:
async def telegram_handle_message(update, context):
    """Handles incoming Telegram messages."""
    user_id = update.effective_user.id
    if user_id not in ALLOWED_USER_ID: return
    
    msg = await update.message.reply_text("⏳ Analyzing data...")
    q = update.message.text
    
    res = process_query_pipeline(q)
    
    if res["error"]:
        await msg.edit_text(f"⚠️ Error: {res['error']}")
    else:
        clean_summary = res["summary"][:4000] # Telegram limit
        await msg.edit_text(clean_summary, parse_mode='Markdown')
        
        if res["excel"]:
            await update.message.reply_document(document=res["excel"], filename="report.xlsx", caption="Excel Report")
        if res["chart"]:
            await update.message.reply_photo(photo=res["chart"], caption="Data Visualization")

In [17]:
def run_telegram_mode():
    from telegram import Update
    from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters
    
    print("\n--- TELEGRAM MODE STARTED ---")
    app = ApplicationBuilder().token(BOT_TOKEN).build()
    app.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), telegram_handle_message))
    
    print("Bot is running, waiting for messages...")
    app.run_polling()

In [18]:
if __name__ == '__main__':
    init_memory_db()
    
    print("🔍 Scanning Database...")
    available_tables = get_available_tables()
    
    if not available_tables:
        print("⚠️ CRITICAL ERROR: No tables found. Check your DB connection.")
        sys.exit(1)
        
    print(f"\n📂 Found Tables ({len(available_tables)}):")
    print("-" * 50)
    for i, tbl in enumerate(available_tables):
        if i < 20: print(f"  • {tbl}")
    if len(available_tables) > 20: print(f"  ... and {len(available_tables)-20} more.")
    print("-" * 50)

    # 2. Whitelist Selection
    print("\n[OPTIONAL] Filter tables to work with.")
    print("Press ENTER for full access.")
    print("Example: public.users, sales.orders")
    table_input = input("Selection: ").strip()
    
    if table_input:
        ALLOWED_TABLES = [t.strip() for t in table_input.split(',')]
        print(f"✅ Filter Active: {ALLOWED_TABLES}")
    else:
        print("✅ Full Access: Using all schemas and tables.")
        
    # 3. Mode Selection
    print("\nSelect Mode:")
    print("[1] Console (Terminal)")
    print("[2] Telegram Bot")
    
    while True:
        choice = input("Choice (1/2): ").strip()
        if choice == '1':
            run_console_mode()
            break
        elif choice == '2':
            run_telegram_mode()
            break
        else:
            print("Invalid choice.")

🔍 Scanning Database...

📂 Found Tables (4):
--------------------------------------------------
  • public.users
  • public.sales
  • public.products
  • public.sql_examples
--------------------------------------------------

[OPTIONAL] Filter tables to work with.
Press ENTER for full access.
Example: public.users, sales.orders


Selection:  


✅ Full Access: Using all schemas and tables.

Select Mode:
[1] Console (Terminal)
[2] Telegram Bot


Choice (1/2):  1



--- CONSOLE MODE (qwen3-coder:30b) ---
Type 'exit' to quit.



Question:  Show me the annual sales.


⏳ Analyzing...


2026-01-18 11:23:41,081 - INFO - HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"



📊 **Analysis Result**

SQL: `SELECT 
    EXTRACT(YEAR FROM sale_date) AS year,
    SUM(total_amount) AS annual_sales
FROM public.sales
GROUP BY EXTRACT(YEAR FROM sale_date)
ORDER BY year;`
Row Count: 1

year annual_sales
2023      4836.00
✅ 'report.xlsx' saved.


Question:  exit


Exited.
