
 Natural Language to SQL Multi-Agent Workflow System - Improved Version

 Goal is :
   - Parse a user's natural language query
   - Map it into SQL conditions
   - Generate an optimized SQL query
   - Execute it on a SQLite database
   - Rank and return the most relevant results

 Key Components:
   - Query Parsing
   - SQL Query Building
   - Result Ranking (TF-IDF similarity)
   - End-to-End Search Pipeline
   

In [1]:
!pip install openai requests sqlite3 pandas numpy scikit-learn sentence-transformers

[31mERROR: Could not find a version that satisfies the requirement sqlite3 (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for sqlite3[0m[31m
[0m

In [13]:
import sqlite3
import json
import re
import numpy as np
import time
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

AI system that turns natural language queries into SQL, executes them, and ranks result

In [14]:
class SystemConfig:
    def __init__(self):
        self.database_name = "companies.db"
        self.max_records = 100
        self.min_results = 5

@dataclass
class ParsedQuery:
    city: str = ""
    state: str = ""
    country: str = ""
    main_industry: str = ""
    specific_industry: str = ""
    search_terms: List[str] = None
    company_size: str = ""
    revenue_level: str = ""
    employee_count: str = ""
    detail_keywords: List[str] = None

    def __post_init__(self):
        if self.search_terms is None:
            self.search_terms = []
        if self.detail_keywords is None:
            self.detail_keywords = []

@dataclass
class DatabaseQuery:
    conditions: List[str]
    keywords: List[str]
    confidence: float
    expected_count: int

@dataclass
class ScoredResult:
    company_info: Dict
    score: float
    reasons: List[str]

class CompanyDatabase:
    def __init__(self, db_name):
        self.db_name = db_name
        self.setup_database()

    def setup_database(self):
        connection = sqlite3.connect(self.db_name)
        cursor = connection.cursor()

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS companies (
                id INTEGER PRIMARY KEY,
                name TEXT,
                description TEXT,
                industry TEXT,
                revenue INTEGER,
                headcount INTEGER,
                address TEXT,
                city TEXT,
                state TEXT,
                country TEXT,
                zipcode TEXT
            )
        """)

        cursor.execute("SELECT COUNT(*) FROM companies")
        if cursor.fetchone()[0] == 0:
            self.add_sample_data(cursor)

        connection.commit()
        connection.close()


CompanyDatabase: SQLite Manager for Creating, Populating, and Querying Company Data

In [16]:
class CompanyDatabase:
    def __init__(self, db_name):
        self.db_name = db_name
        self.setup_database()

    def setup_database(self):
        connection = sqlite3.connect(self.db_name)
        cursor = connection.cursor()

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS companies (
                id INTEGER PRIMARY KEY,
                name TEXT,
                description TEXT,
                industry TEXT,
                revenue INTEGER,
                headcount INTEGER,
                address TEXT,
                city TEXT,
                state TEXT,
                country TEXT,
                zipcode TEXT
            )
        """)

        cursor.execute("SELECT COUNT(*) FROM companies")
        if cursor.fetchone()[0] == 0:
            self.add_sample_data(cursor)

        connection.commit()
        connection.close()

    def add_sample_data(self, cursor):

        companies = [
            (1, "TechFlow Solutions", "Leading POS software provider for restaurants and coffee shops. Cloud-based point of sale system with inventory management.", "Software", 5000000, 50, "123 Tech St", "Boston", "MA", "USA", "02101"),
            (2, "CloudCafe Systems", "Specialized POS and ordering solutions for cafes and coffee shops. Mobile payment integration and customer loyalty programs.", "Technology", 2000000, 25, "456 Innovation Ave", "Boston", "MA", "USA", "02102"),
            (3, "RetailMaster Pro", "Enterprise retail management software including POS, inventory, and analytics for large retail chains.", "Software", 15000000, 150, "789 Business Blvd", "San Francisco", "CA", "USA", "94105"),
            (4, "Local Brew Coffee", "Artisanal coffee roaster and cafe chain specializing in organic, fair-trade coffee beans.", "Food & Beverage", 800000, 15, "321 Coffee Lane", "Boston", "MA", "USA", "02103"),
            (5, "MegaTech Corp", "Large technology corporation specializing in cloud infrastructure and enterprise software solutions.", "Technology", 500000000, 2500, "100 Silicon Valley", "San Jose", "CA", "USA", "95101"),
            (6, "StartupLab Inc", "Early-stage software development company focusing on mobile apps and web development.", "Software", 300000, 8, "555 Startup St", "Austin", "TX", "USA", "73301"),
            (7, "DataAnalytics Plus", "Business intelligence and data analytics software for enterprise clients. Machine learning and AI solutions.", "Software", 12000000, 80, "777 Data Drive", "Seattle", "WA", "USA", "98101"),
            (8, "GreenTech Innovations", "Renewable energy solutions and smart grid technology. Solar panel installation and maintenance.", "Clean Energy", 3000000, 35, "888 Green Ave", "Portland", "OR", "USA", "97201"),
            (9, "HealthCare Systems Ltd", "Healthcare management software including patient records, scheduling, and billing systems.", "Healthcare Technology", 8000000, 60, "999 Medical Plaza", "Boston", "MA", "USA", "02104"),
            (10, "GlobalSoft Enterprise", "Large software company providing enterprise solutions and cloud services worldwide.", "Technology", 800000000, 1200, "200 Tech Campus", "Los Angeles", "CA", "USA", "90210"),
            (11, "CodeCraft Studios", "Small software startup developing mobile games and educational apps.", "Software", 150000, 12, "303 Indie Lane", "Portland", "OR", "USA", "97202"),
            (12, "InnovateTech", "Mid-size technology company specializing in AI and machine learning solutions.", "Technology", 25000000, 180, "404 AI Boulevard", "San Francisco", "CA", "USA", "94106"),
            (13, "ByteSize Solutions", "Small software consultancy providing custom development services.", "Software", 400000, 18, "505 Code Street", "Austin", "TX", "USA", "73302"),
            (14, "MegaCorp Tech", "Fortune 500 technology company with global presence in cloud computing.", "Technology", 2000000000, 15000, "600 Enterprise Way", "Mountain View", "CA", "USA", "94041"),
            (15, "Boston Bites Cafe", "Local restaurant chain serving artisanal coffee, pastries and light meals.", "Food & Beverage", 1200000, 45, "707 Foodie Ave", "Boston", "MA", "USA", "02105")
        ]

        cursor.executemany("""
            INSERT INTO companies (id, name, description, industry, revenue, headcount, address, city, state, country, zipcode)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, companies)

    def run_query(self, sql_query):
        connection = sqlite3.connect(self.db_name)
        connection.row_factory = sqlite3.Row
        cursor = connection.cursor()

        cursor.execute(sql_query)
        results = [dict(row) for row in cursor.fetchall()]

        connection.close()
        return results


QueryParser: Extracts city, state, industry, size, and keywords from natural language queries

In [17]:
class QueryParser:
    def manual_parse(self, query):
        parsed = ParsedQuery()
        text = query.lower()


        city_state_pattern = re.search(r'([a-zA-Z\s]+),?\s*([A-Z]{2})\b', query)
        if city_state_pattern:
            parsed.city = city_state_pattern.group(1).strip()
            parsed.state = city_state_pattern.group(2)


        states = {'california': 'CA', 'texas': 'TX', 'new york': 'NY',
                 'massachusetts': 'MA', 'washington': 'WA', 'oregon': 'OR'}
        for state_name, code in states.items():
            if state_name in text:
                parsed.state = code
                break


        cities = ['boston', 'san francisco', 'new york', 'seattle', 'austin', 'portland',
                 'san jose', 'los angeles', 'mountain view']
        for city in cities:
            if city in text:
                parsed.city = city.title()
                break


        if 'software' in text:
            parsed.main_industry = 'Software'
        elif any(word in text for word in ['tech', 'technology']):
            parsed.main_industry = 'Technology'
        elif any(word in text for word in ['restaurant', 'food', 'coffee', 'cafe']):
            parsed.main_industry = 'Food & Beverage'
        elif 'healthcare' in text:
            parsed.main_industry = 'Healthcare Technology'
        elif 'manufacturing' in text:
            parsed.main_industry = 'Manufacturing'


        if any(word in text for word in ['large', 'big', 'major', 'enterprise']):
            parsed.company_size = 'large'
        elif any(word in text for word in ['small', 'startup', 'early-stage']):
            parsed.company_size = 'small'
        elif any(word in text for word in ['mid-size', 'medium']):
            parsed.company_size = 'medium'


        employee_patterns = [
            (r'over (\d+) employees', 'over {}'),
            (r'more than (\d+) employees', 'over {}'),
            (r'(\d+)\+ employees', 'over {}'),
            (r'with (\d+) or more employees', 'over {}')
        ]

        for pattern, format_str in employee_patterns:
            match = re.search(pattern, text)
            if match:
                parsed.employee_count = format_str.format(match.group(1))
                break


        if any(phrase in text for phrase in ['good revenue', 'high revenue', 'profitable', 'well-funded']):
            parsed.revenue_level = 'good'
        elif any(phrase in text for phrase in ['low revenue', 'small revenue']):
            parsed.revenue_level = 'low'


        words = re.findall(r'\b[a-zA-Z]{3,}\b', text)
        stopwords = {'the', 'and', 'for', 'with', 'that', 'are', 'have', 'looking',
                    'find', 'show', 'companies', 'company', 'business', 'businesses'}
        parsed.search_terms = [word for word in words if word.lower() not in stopwords]
        parsed.detail_keywords = parsed.search_terms

        return parsed

QueryBuilder, SQLGenerator & ResultRanker: Convert parsed queries into SQL, optimize execution, and rank results by relevance.

In [19]:
class QueryBuilder:
    def __init__(self, database):
        self.db = database

    def build_sql_conditions(self, parsed_query):
        conditions = []
        confidence = 1.0

        # Location conditions
        if parsed_query.city:
            clean_city = parsed_query.city.replace("'", "''")
            conditions.append(f"city LIKE '%{clean_city}%'")

        if parsed_query.state:
            conditions.append(f"state = '{parsed_query.state}'")

        if parsed_query.country:
            conditions.append(f"country = '{parsed_query.country}'")

        # industry conditions
        if parsed_query.main_industry:
            conditions.append(f"industry LIKE '%{parsed_query.main_industry}%'")

        # Size based conditions
        headcount_conditions = []

        if parsed_query.company_size == 'large':
            headcount_conditions.append(100)
        elif parsed_query.company_size == 'medium':
            headcount_conditions.append(50)
            conditions.append("headcount < 500")
        elif parsed_query.company_size == 'small':
            headcount_conditions.append(0)
            conditions.append("headcount < 50")

        # employee count
        if parsed_query.employee_count:
            if 'over' in parsed_query.employee_count:
                match = re.search(r'over (\d+)', parsed_query.employee_count)
                if match:
                    number = int(match.group(1))
                    headcount_conditions.append(number)


        if headcount_conditions:
            max_headcount = max(headcount_conditions)
            conditions.append(f"headcount > {max_headcount}")

        # Revenue conditions
        if parsed_query.revenue_level:
            if 'good' in parsed_query.revenue_level.lower():
                conditions.append("revenue > 5000000")
            elif 'high' in parsed_query.revenue_level.lower():
                conditions.append("revenue > 10000000")
            elif 'low' in parsed_query.revenue_level.lower():
                conditions.append("revenue < 1000000")

        estimated = self.estimate_results(conditions)

        return DatabaseQuery(
            conditions=conditions,
            keywords=parsed_query.detail_keywords or parsed_query.search_terms,
            confidence=confidence,
            expected_count=estimated
        )

    def estimate_results(self, conditions):
        if not conditions:
            return 100
        return max(5, 50 // len(conditions))

class SQLGenerator:
    def __init__(self, config):
        self.config = config

    def create_optimized_query(self, db_query):
        select_part = """SELECT id, name, description, industry, revenue, headcount,
                          address, city, state, country, zipcode"""
        from_part = "FROM companies"

        where_part = ""
        if db_query.conditions:
            where_part = "WHERE " + " AND ".join(db_query.conditions)

        # better limit calculation
        expected = db_query.expected_count
        if expected > self.config.max_records:
            limit_part = f"LIMIT {self.config.max_records}"
        else:
            limit_part = f"LIMIT {max(expected * 2, 10)}"

        full_query = f"{select_part} {from_part}"
        if where_part:
            full_query += f" {where_part}"
        full_query += f" {limit_part}"

        return full_query.strip()

class ResultRanker:
    def __init__(self):
        self.text_analyzer = TfidfVectorizer(stop_words='english', max_features=1000)

    def rank_by_relevance(self, raw_results, keywords, original_query):
        if not raw_results:
            return []

        documents = []
        for company in raw_results:
            text = f"{company['name']} {company['description']} {company['industry']}"
            documents.append(text)

        query_text = " ".join(keywords) if keywords else original_query
        documents.append(query_text)

        try:
            tfidf_matrix = self.text_analyzer.fit_transform(documents)
            query_vector = tfidf_matrix[-1]
            company_vectors = tfidf_matrix[:-1]

            similarity_scores = cosine_similarity(query_vector, company_vectors).flatten()
        except:
            similarity_scores = self.simple_scoring(raw_results, keywords)

        scored_results = []
        for i, company in enumerate(raw_results):
            matching_reasons = self.find_matches(company, keywords)

            result = ScoredResult(
                company_info=company,
                score=similarity_scores[i] if i < len(similarity_scores) else 0.0,
                reasons=matching_reasons
            )
            scored_results.append(result)

        scored_results.sort(key=lambda x: x.score, reverse=True)
        return scored_results

    def simple_scoring(self, results, keywords):
        scores = []
        for result in results:
            company_text = f"{result['name']} {result['description']} {result['industry']}".lower()
            matches = sum(1 for keyword in keywords if keyword.lower() in company_text)
            scores.append(matches / max(len(keywords), 1))
        return np.array(scores)

    def find_matches(self, company, keywords):
        matches = []
        company_text = f"{company['name']} {company['description']} {company['industry']}".lower()

        for keyword in keywords:
            if keyword.lower() in company_text:
                matches.append(f"Contains '{keyword}'")

        return matches

NaturalLanguageToSQL: End-to-End Pipeline for Parsing Queries, Generating SQL, Executing, and Ranking Company Search Results

In [20]:
class NaturalLanguageToSQL:
    def __init__(self):
        self.config = SystemConfig()
        self.database = CompanyDatabase(self.config.database_name)
        self.parser = QueryParser()
        self.query_builder = QueryBuilder(self.database)
        self.sql_generator = SQLGenerator(self.config)
        self.ranker = ResultRanker()

    def search(self, natural_language_query):
        print(f"Processing query: '{natural_language_query}'\n")

        # Parse the query
        print("Step 1: Understanding the query...")
        parsed = self.parser.manual_parse(natural_language_query)
        print(f"   Found: {parsed}\n")

        # build database conditions
        print("Step 2: Converting to database conditions...")
        db_query = self.query_builder.build_sql_conditions(parsed)
        print(f"   Conditions: {db_query.conditions}")
        print(f"   Keywords: {db_query.keywords}")
        print(f"   Expected results: {db_query.expected_count}\n")

        # generate SQL
        print("Step 3: Creating optimized SQL...")
        sql = self.sql_generator.create_optimized_query(db_query)
        print(f"   SQL: {sql}\n")

        # Execute query
        print("Step 4: Running database query...")
        raw_results = self.database.run_query(sql)
        print(f"   Got {len(raw_results)} results\n")

        # rank results
        print("Step 5: Ranking by relevance...")
        ranked_results = self.ranker.rank_by_relevance(
            raw_results,
            db_query.keywords,
            natural_language_query
        )
        print(f"   Ranked {len(ranked_results)} companies\n")

        metadata = {
            "parsed_query": parsed,
            "generated_sql": sql,
            "total_results": len(raw_results),
            "steps": ["Parse", "Map", "Optimize", "Execute", "Rank"]
        }

        return ranked_results, metadata

    def show_results(self, results, metadata, show_count=5):
        print("="*80)
        print("SEARCH RESULTS")
        print("="*80)

        print(f"\nQuery Analysis:")
        print(f"   Generated SQL: {metadata['generated_sql']}")
        print(f"   Total records found: {metadata['total_results']}")
        print(f"   Processing steps: {' -> '.join(metadata['steps'])}")

        print(f"\nTop {min(show_count, len(results))} Results:")
        print("-" * 80)

        for i, result in enumerate(results[:show_count]):
            company = result.company_info
            print(f"\n{i+1}. {company['name']} (Relevance: {result.score:.3f})")
            print(f"   Industry: {company['industry']}")
            print(f"   Location: {company['city']}, {company['state']}")
            print(f"   Size: {company['headcount']} employees, ${company['revenue']:,} revenue")
            print(f"   About: {company['description']}")
            if result.reasons:
                print(f"   Why it matches: {', '.join(result.reasons)}")


In [21]:
def run_tests():
    print("Starting Improved Natural Language to SQL System...\n")

    system = NaturalLanguageToSQL()

    test_queries = [
        "I'm looking for software companies in Boston, MA that offer POS Software for coffee shops.",
        "Find large technology companies in California with over 1000 employees",
        "Show me restaurants in Boston that serve coffee",
        "Looking for small startups in the software industry",
        "Find healthcare technology companies with good revenue"
    ]

    print(f"Testing with {len(test_queries)} queries...\n")

    for i, query in enumerate(test_queries):
        print(f"\n{'='*20} TEST {i+1} {'='*20}")

        try:
            results, metadata = system.search(query)
            system.show_results(results, metadata, show_count=3)

        except Exception as error:
            print(f"Error processing query: {error}")

    print(f"\n{'='*80}")
    print("Testing completed successfully!")
    print(f"{'='*80}")

if __name__ == "__main__":
    run_tests()

Starting Improved Natural Language to SQL System...

Testing with 5 queries...


Processing query: 'I'm looking for software companies in Boston, MA that offer POS Software for coffee shops.'

Step 1: Understanding the query...
   Found: ParsedQuery(city='Boston', state='MA', country='', main_industry='Software', specific_industry='', search_terms=['software', 'boston', 'offer', 'pos', 'software', 'coffee', 'shops'], company_size='', revenue_level='', employee_count='', detail_keywords=['software', 'boston', 'offer', 'pos', 'software', 'coffee', 'shops'])

Step 2: Converting to database conditions...
   Conditions: ["city LIKE '%Boston%'", "state = 'MA'", "industry LIKE '%Software%'"]
   Keywords: ['software', 'boston', 'offer', 'pos', 'software', 'coffee', 'shops']
   Expected results: 16

Step 3: Creating optimized SQL...
   SQL: SELECT id, name, description, industry, revenue, headcount, 
                          address, city, state, country, zipcode FROM companies WHERE city LIKE

In [27]:
class InteractiveSQLAgent:
    def __init__(self, pipeline: NaturalLanguageToSQL):
        self.pipeline = pipeline

    def run(self):
        print("Welcome to Natural Language → SQL Agent")
        print("Type your query (or 'exit' to quit). Example inputs:")
        print("  • software companies in California with more than 1000 employees")
        print("  • restaurants in Texas with revenue over 10M")
        print("  • healthcare technology companies in New York with fewer than 500 employees")
        print("-------------------------------------------------------------")

        while True:
            user_query = input("\nYour Query: ")
            if user_query.lower() in ["exit", "quit", "q"]:
                print("👋 Exiting agent.")
                break

            try:
                results, metadata = self.pipeline.search(user_query)

                print("\n📌 Parsed & Optimized SQL Query:")
                print(metadata["generated_sql"])

                print("\n🏢 Ranked Results (top matches):")
                if results:
                    for i, r in enumerate(results[:5], start=1):  # show top 5
                        company = r.company_info
                        print(f"{i}. {company['name']} | {company['industry']} | "
                              f"{company['city']}, {company['state']} | "
                              f"Headcount: {company['headcount']} | Revenue: ${company['revenue']:,}")
                else:
                    print("No results found.")

            except Exception as e:
                print(f" Error: {e}")


In [29]:
if __name__ == "__main__":
    pipeline = NaturalLanguageToSQL()
    agent = InteractiveSQLAgent(pipeline)
    agent.run()


Welcome to Natural Language → SQL Agent
Type your query (or 'exit' to quit). Example inputs:
  • software companies in California with more than 1000 employees
  • restaurants in Texas with revenue over 10M
  • healthcare technology companies in New York with fewer than 500 employees
-------------------------------------------------------------

Your Query: software companies in Boston with over 20 employees
Processing query: 'software companies in Boston with over 20 employees'

Step 1: Understanding the query...
   Found: ParsedQuery(city='Boston', state='', country='', main_industry='Software', specific_industry='', search_terms=['software', 'boston', 'over', 'employees'], company_size='', revenue_level='', employee_count='over 20', detail_keywords=['software', 'boston', 'over', 'employees'])

Step 2: Converting to database conditions...
   Conditions: ["city LIKE '%Boston%'", "industry LIKE '%Software%'", 'headcount > 20']
   Keywords: ['software', 'boston', 'over', 'employees']
  