# Finance SQL Database Interrogation in Natural Language

This project is more of an exercise prompted by NLP class of Professor E. Cabrio of Université Côte d'Azur. Here, we aim to provide a symbolic (and an attempted machine learning) approach to go from a natural language request to a properly parsed SQL request (tailored to our specific finance-oriented database).

**Author:** Nathan Amoussou, Tristan Patout\
**Course:** Natural Language Processing of Master 1 Informatique of Université Côte d'Azur\
**Date:** 04/20/2025

## 1. Project Setup

Import necessary libraries and define global configurations like file paths.

In [None]:
# Use %%capture to hide installation outputs if desired
# %%capture
# %pip install pandas spacy -q
# !python -m spacy download en_core_web_sm -q

In [None]:
import pandas as pd
import sqlite3
import spacy
from spacy.matcher import PhraseMatcher
from spacy.pipeline import EntityRuler
from spacy import displacy # For optional visualizations
import re
import json
import os
import warnings 

# Filter specific warnings if needed
# warnings.filterwarnings("ignore", message=r".*\[W036\].*") 

print("Libraries imported.")

In [None]:
# --- Configuration ---
sp500_file = 'Database_ressources/sp_500_companies_with_financial_information.csv'
marketcap_file = 'Database_ressources/top_global_companies_by_market_cap.csv'
db_file = 'companies_database.db'
table_name = 'companies'

print(f"Database file set to: {db_file}")

## 2. Database Creation & Population

Load raw data from CSV files into pandas DataFrames.

In [None]:
# Load CSVs
try:
    df_sp500 = pd.read_csv(sp500_file)
    df_marketcap = pd.read_csv(marketcap_file)
    print("CSV files loaded successfully.")
    # Optional: Display head/info
    # print("S&P 500 Head:\n", df_sp500.head())
    # print("\nMarket Cap Head:\n", df_marketcap.head())
except FileNotFoundError as e:
    print(f"Error loading files: {e}")
    # Handle error appropriately

Clean and prepare the DataFrames: select relevant columns, rename columns for consistency, clean numeric/date values.

In [None]:
# Prepare S&P 500 DataFrame (df1)
df1 = df_sp500[['Symbol', 'Security', 'GICS Sector', 'GICS Sub-Industry', 'Founded']].copy()
df1.rename(columns={'GICS Sector': 'Sector', 'GICS Sub-Industry': 'Industry'}, inplace=True)
df1['Founded'] = df1['Founded'].astype(str).str.extract(r'(\d{4})', expand=False)
print("S&P 500 DataFrame prepared.")

# Prepare Market Cap DataFrame (df2)
df2 = df_marketcap[['Company Code', 'Marketcap', 'Stock Price', 'Country']].copy()
df2.rename(columns={'Company Code': 'Symbol', 'Stock Price': 'Stockprice'}, inplace=True)
# --- Cleaning functions (define or copy here) ---
def clean_marketcap(value):
    if isinstance(value, (int, float)):
        return value
    if not isinstance(value, str):
        return None
    value = value.replace('$', '').replace(',', '').strip()
    if 'T' in value:
        # Handle potential spaces like '3.033 T'
        return float(value.replace('T', '').strip()) * 1e12
    elif 'B' in value:
        return float(value.replace('B', '').strip()) * 1e9
    elif 'M' in value:
        return float(value.replace('M', '').strip()) * 1e6
    try:
        # Attempt direct conversion after basic cleaning
        return float(value)
    except ValueError:
        return None # Return None if conversion still fails
def clean_stockprice(value):
    if isinstance(value, (int, float)):
        return value
    if not isinstance(value, str):
        return None
    # Remove '$' and ',' before converting
    value = value.replace('$', '').replace(',', '').strip()
    try:
        return float(value)
    except ValueError:
        return None # Return None if conversion fails
df2['Marketcap'] = df2['Marketcap'].apply(clean_marketcap)
df2['Stockprice'] = df2['Stockprice'].apply(clean_stockprice)
print("Market Cap DataFrame prepared and cleaned.")

Merge the prepared DataFrames and select the final columns for the database table.

In [None]:
# Merge DataFrames
merged_df = pd.merge(df1, df2, on='Symbol', how='inner')
merged_df = merged_df.sort_values('Marketcap', ascending=False).drop_duplicates('Symbol', keep='first')
print(f"DataFrames merged. Result shape: {merged_df.shape}")

# Prepare Final DataFrame for SQL
final_columns = ['Symbol', 'Security', 'Sector', 'Industry', 'Founded', 'Marketcap', 'Stockprice', 'Country']
# Ensure columns exist and select/reorder
missing_cols = [col for col in final_columns if col not in merged_df.columns]
if missing_cols: print(f"Warning: Missing columns: {missing_cols}")
final_df = merged_df[[col for col in final_columns if col in merged_df.columns]].copy() 
print("Final DataFrame prepared for SQL.")
# print(final_df.head())
# final_df.info()

Create the SQLite database and table, then populate it with the final DataFrame.

In [None]:
# Write to SQLite
conn = None
try:
    conn = sqlite3.connect(db_file)
    cursor = conn.cursor()
    # Use pandas to_sql to create/replace table and insert data
    final_df.to_sql(table_name, conn, if_exists='replace', index=False)
    print(f"Data successfully imported into table '{table_name}' in '{db_file}'")
    
    # Verification query
    verify_df = pd.read_sql(f"SELECT COUNT(*) as count FROM {table_name}", conn)
    print(f"Verification: Found {verify_df['count'][0]} rows in the SQL table.")
    
except sqlite3.Error as e:
    print(f"SQLite error during import: {e}")
except Exception as e:
    print(f"An unexpected error occurred during SQL import: {e}")
finally:
    if conn:
        conn.commit()
        conn.close()
        print("Database connection closed.")

## 3. NLP Pipeline Setup & Definitions

### 3.1. Setup: NER Enhancement Helpers & Configuration
Define functions for loading gazetteers, mappings, and setup for PhraseMatcher and EntityRuler patterns.

In [None]:
# --- Gazetteer Loading Function ---
def load_terms_from_db(db_path, table, column_name):
    terms = set(); original_casing_terms = set()
    conn = None
    try:
        conn = sqlite3.connect(db_path)
        query = f'SELECT DISTINCT "{column_name}" FROM "{table}" WHERE "{column_name}" IS NOT NULL AND "{column_name}" != \'\''
        df = pd.read_sql_query(query, conn)
        if column_name in df.columns:
            original_casing_terms.update(term.strip() for term in df[column_name].astype(str) if term.strip())
            terms.update(term.lower().strip() for term in df[column_name].astype(str) if term.strip())
        else: print(f"Warning: Column '{column_name}' not found in table '{table}'.")
    except Exception as e: print(f"Error loading terms for '{column_name}': {e}")
    finally:
        if conn: conn.close()
    terms.discard(''); original_casing_terms.discard('')
    print(f"Loaded {len(terms)} unique lowercase terms for '{column_name}'.")
    return list(terms), list(original_casing_terms)

In [None]:
# --- Mappings ---
country_mapping = {
    "american": "usa", "us": "usa", "u.s.": "usa", "u.s.a": "usa",
    "uk": "united kingdom", "u.k.": "united kingdom",
    "french": "france", "german": "germany", "spanish": "spain",
    "indian": "india"
}
sector_alias_mapping = {
    "it": "information technology", "info tech": "information technology",
    "health": "health care"
}

In [None]:
# --- Entity Ruler Pattern Definitions (Define patterns list) ---
# Build the `patterns` list
# (Using separate lists for countries, aliases, custom rules, etc., then combining)
patterns = []

# Custom patterns (Monetary, Ranking, Ops, Keywords, Cardinal) - Revised v4
custom_patterns = [
    # Monetary Values (Keep previous version - seemed okay for '$500 billion')
    {"label": "MONEY_VALUE", "pattern": [{"LIKE_NUM": True}, {"LOWER": {"IN": ["b", "bn", "billion", "m", "mn", "million", "t", "tn", "trillion"]}}, {"IS_PUNCT": True, "OP": "?"}]},
    {"label": "MONEY_VALUE", "pattern": [{"TEXT": "$", "OP": "?"}, {"LIKE_NUM": True}, {"LOWER": {"IN": ["b", "bn", "billion", "m", "mn", "million", "t", "tn", "trillion"]}}, {"IS_PUNCT": True, "OP": "?"}]},
    # Pattern for simple money amounts like $500 (Needs PRICE_VALUE label?)
    {"label": "PRICE_VALUE", "pattern": [{"TEXT": "$", "OP": "?"}, {"LIKE_NUM": True}]}, # Label specifically for price context? Or reuse MONEY_VALUE? Let's try PRICE_VALUE first.

    # Cardinal Number (for limits, but avoid matching years if possible)
    {"label": "CARDINAL", "pattern": [{"POS": "NUM", "ENT_TYPE": "", "SHAPE": {"NOT_IN": ["dddd"]}}]}, # Avoid tagging 4-digit numbers as CARDINAL initially
    {"label": "YEAR_NUMBER", "pattern": [{"SHAPE": "dddd", "POS": "NUM"}]}, # Specific pattern for years

    # Ranking (Keep as is)
    {"label": "RANKING_MODIFIER", "pattern": [{"LOWER": {"IN": ["top", "most", "highest", "largest", "biggest", "least", "lowest", "smallest"]}}]},
    
    # Comparison Ops (Add before/after)
    {"label": "COMPARISON_OP", "pattern": [{"LOWER": {"IN": ["over", "above", "under", "below", "after", "before"]}}]},
    {"label": "COMPARISON_OP", "pattern": [{"LOWER": "more"}, {"LOWER": "than"}]},
    {"label": "COMPARISON_OP", "pattern": [{"LOWER": "greater"}, {"LOWER": "than"}]},
    {"label": "COMPARISON_OP", "pattern": [{"LOWER": "less"}, {"LOWER": "than"}]},
    {"label": "COMPARISON_OP", "pattern": [{"TEXT": {"IN": [">", "<"]}}]},

    # Value Keywords (Keep as is)
    {"label": "VALUE_KEYWORD", "pattern": [{"LOWER": {"IN": ["valued", "value", "marketcap", "worth"]}}]},
    {"label": "VALUE_KEYWORD", "pattern": [{"LOWER": "market"}, {"LOWER": "cap"}]},

    # Column Keywords (Add variations)
    {"label": "COLUMN_SECTOR", "pattern": [{"LOWER": {"IN": ["sector", "sectors"]}}]},
    {"label": "COLUMN_INDUSTRY", "pattern": [{"LOWER": {"IN": ["industry", "industries"]}}]},
    {"label": "COLUMN_COUNTRY", "pattern": [{"LOWER": {"IN": ["country", "countries", "location", "region"]}}]}, # Added synonyms
    {"label": "COLUMN_FOUNDED", "pattern": [{"LOWER": "founded"}]},
    {"label": "COLUMN_FOUNDED", "pattern": [{"LOWER": "founding"}, {"LOWER": "date"}]}, # Added variation
    {"label": "COLUMN_STOCKPRICE", "pattern": [{"LOWER": {"IN": ["stock", "price", "stockprice", "stockprices"]}}]}, # Added plural
    {"label": "COLUMN_STOCKPRICE", "pattern": [{"LOWER": "stock"}, {"LOWER": "price"}]}, # Ensure multi-word matches
    {"label": "COLUMN_STOCKPRICE", "pattern": [{"LOWER": "stock"}, {"LOWER": "prices"}]}, 

    # Ordering Keyword
    {"label": "ORDERING_KEYWORD", "pattern": [{"LOWER": "ordered"}, {"LOWER": "by"}]},
    {"label": "ORDERING_KEYWORD", "pattern": [{"LOWER": "sorted"}, {"LOWER": "by"}]},
]
print(f"Defined {len(patterns)} patterns for EntityRuler.")

In [None]:
# --- Placeholder for loaded gazetteers (will be loaded before testing) ---
lc_sectors, unique_sectors_orig = [], []
lc_countries, unique_countries_orig = [], []
lc_industries, unique_industries_orig = [], []
lc_companies, unique_companies_orig = [], []

In [None]:
# --- Load spaCy Model ---
nlp = None
try:
    # Consider loading a potentially larger model if accuracy is needed later
    nlp = spacy.load("en_core_web_sm") 
    print("Loaded 'en_core_web_sm' spaCy model.")
    # Clean ruler if it exists from previous runs
    if "entity_ruler" in nlp.pipe_names: nlp.remove_pipe("entity_ruler")
except Exception as e:
    print(f"Error loading spaCy model: {e}")

In [None]:
# --- Setup PhraseMatcher ---
matcher = None
if nlp:
    matcher = PhraseMatcher(nlp.vocab, attr='LOWER') 
    print("PhraseMatcher initialized.")
else:
    print("Warning: spaCy model not loaded, PhraseMatcher not initialized.")

In [None]:
# --- Setup EntityRuler ---
ruler = None
if nlp:
    if "entity_ruler" in nlp.pipe_names: nlp.remove_pipe("entity_ruler") # Ensure clean state
    ruler = nlp.add_pipe("entity_ruler", config={"overwrite_ents": True}, before="ner")
    print("EntityRuler added to pipeline.")
else:
     print("Warning: spaCy model not loaded, EntityRuler not added.")

### 3.2. Query Parsing Function & Helpers
Define helper functions for parsing values and the main `parse_query_structure_refactored` function.

In [None]:
# --- Helper Functions ---
def parse_monetary_value(text_value):
    """Converts text like '1B', '$500 million', '1.5T', '$500' to a number."""
    if text_value is None: return None
    
    text_value = str(text_value).lower().replace('$', '').replace(',', '').strip()
    multiplier = 1.0 

    # Check for multipliers first
    if 't' in text_value: 
        multiplier = 1e12
        text_value = text_value.replace('trillion', '').replace('tn', '').replace('t', '')
    elif 'b' in text_value: 
        multiplier = 1e9
        text_value = text_value.replace('billion', '').replace('bn', '').replace('b', '')
    elif 'm' in text_value: 
        multiplier = 1e6
        text_value = text_value.replace('million', '').replace('mn', '').replace('m', '')
    
    # Now extract the numeric part using regex
    try:
        # Regex to find float or integer part, ignoring trailing non-numeric chars
        numeric_part = re.findall(r"^[-+]?\d*\.?\d+", text_value.strip())
        if numeric_part:
            return float(numeric_part[0]) * multiplier 
        else: 
            # print(f"DEBUG: No numeric part found in '{text_value}' after cleaning.")
            return None # No number found
    except (ValueError, TypeError, IndexError):
        # print(f"DEBUG: Error converting '{text_value}' to float.")
        return None # Conversion failed

def map_comparison_operator(op_text):
    """Maps textual comparison operators to SQL operators."""
    op_text = op_text.lower().strip()
    
    # Direct mappings
    if op_text in ["over", "above", "greater than", "more than", ">", "after"]:
        return ">"
    elif op_text in ["under", "below", "less than", "<", "before"]:
        return "<"
    elif op_text in ["equal to", "equals", "is equal to"]:
        return "="
    elif op_text in ["at least", "at most"]:
        return ">=" if "least" in op_text else "<="
    return None
    
def normalize_term(term, original_list, alias_mapping=None):
    """Normalizes a found term against a list of canonical terms (original casing) and optional aliases."""
    term_lower = term.lower().strip()
    
    # 1. Check alias mapping first
    if alias_mapping and term_lower in alias_mapping:
        canonical_lower = alias_mapping[term_lower]
        # Find original casing for the canonical name
        for orig_term in original_list:
            if orig_term.lower() == canonical_lower:
                return orig_term
        return canonical_lower # Return lowercase canonical if original casing not found (shouldn't happen ideally)

    # 2. Check direct match (case-insensitive) against original list
    for orig_term in original_list:
        if orig_term.lower() == term_lower:
            return orig_term # Return the original casing

    # 3. Return original term if no match/normalization found
    return term

In [None]:
# --- Column Map ---
COLUMN_MAP = {
    "COMPANY_NAME": "Security", "ORG": "Security", 
    "SECTOR_TERM": "Sector", "INDUSTRY_TERM": "Industry",
    "COUNTRY_TERM": "Country", "GPE": "Country",
    "VALUE_KEYWORD": "Marketcap", "COLUMN_SECTOR": "Sector",
    "COLUMN_INDUSTRY": "Industry", "COLUMN_COUNTRY": "Country",
    "COLUMN_FOUNDED": "Founded", "COLUMN_STOCKPRICE": "Stockprice",
    "MONEY_VALUE": "Marketcap", # Default column for monetary values
    "CARDINAL": None # Cardinal numbers usually represent LIMIT, not a filter column
}

In [None]:
# --- Main Parsing Function ---
def parse_query_structure_refactored(doc, identified_items, 
                                     orig_sectors, orig_industries, orig_countries): 
    """
    Parses the spaCy Doc and identified entities/terms. (Refactored v3 - Fixes)
    """
    parsed_structure = {
        'intent': None, 'select_cols': [], 'filters': [], 'limit': None,
        'order_by': None, 'distinct': False, 'errors': []
    }
    lc_sectors_set = {s.lower() for s in orig_sectors}
    lc_industries_set = {i.lower() for i in orig_industries}
    lc_countries_set = {c.lower() for c in orig_countries}

    items_by_label = {}
    for item in identified_items:
        label = item['label']
        if label not in items_by_label: items_by_label[label] = []
        items_by_label[label].append(item)
        
    lemmas = [token.lemma_.lower() for token in doc if not token.is_punct | token.is_stop]
    root_verb = next((token for token in doc if token.dep_ == "ROOT" and token.pos_ == "VERB"), None)
    root_lemma = root_verb.lemma_.lower() if root_verb else None

    # --- Define Flags ---
    has_ranking_modifier = bool(items_by_label.get("RANKING_MODIFIER"))
    has_cardinal = bool(items_by_label.get("CARDINAL"))
    has_company = bool(items_by_label.get("COMPANY_NAME") or items_by_label.get("ORG"))
    has_comparison = bool(items_by_label.get("COMPARISON_OP"))
    has_money = bool(items_by_label.get("MONEY_VALUE"))
    has_price = bool(items_by_label.get("PRICE_VALUE")) 
    has_year = bool(items_by_label.get("YEAR_NUMBER")) 
    has_date_keyword = bool(items_by_label.get("COLUMN_FOUNDED")) 
    has_price_keyword = bool(items_by_label.get("COLUMN_STOCKPRICE")) 
    has_ordering_keyword = bool(items_by_label.get("ORDERING_KEYWORD")) 
    
    mentioned_columns_labels = [lbl for lbl in items_by_label if lbl.startswith("COLUMN_")]
    # Refined condition for specific column request
    is_specific_col_req = (root_lemma in ["what", "show", "list", "tell", "give"] and \
                           len(mentioned_columns_labels) == 1 and has_company and \
                           not has_ranking_modifier) # Added check to avoid conflict with ranking
                           
    is_list_all_keyword = (root_lemma in ["list", "show"] and any(l in lemmas for l in ["all", "available"]) and \
                           any(lbl in mentioned_columns_labels for lbl in ["COLUMN_SECTOR", "COLUMN_INDUSTRY", "COLUMN_COUNTRY"]))

    # --- Refined Intent Prioritization ---
    
    # 1. List distinct column values (Specific intent)
    if is_list_all_keyword:
        parsed_structure['intent'] = 'list_values'
        parsed_structure['distinct'] = True
        col_label = next((lbl for lbl in ["COLUMN_SECTOR", "COLUMN_INDUSTRY", "COLUMN_COUNTRY"] if lbl in items_by_label), None)
        if col_label and col_label in COLUMN_MAP:
            parsed_structure['select_cols'] = [COLUMN_MAP[col_label]]
        else:
            parsed_structure['errors'].append("Could not determine column for listing values.")
            parsed_structure['select_cols'] = ['*'] 

    # 2. Top/Bottom N queries (Specific intent - HIGHER PRIORITY NOW)
    elif has_ranking_modifier and has_cardinal: # Needs both modifier and number
        parsed_structure['intent'] = 'find_top'
        limit_item = items_by_label["CARDINAL"][0]
        try: parsed_structure['limit'] = int(limit_item['text'])
        except ValueError: parsed_structure['errors'].append(f"Could not parse limit value: {limit_item['text']}")
        order_col = "Marketcap"; order_dir = "DESC" # Defaults      
        rank_item = items_by_label["RANKING_MODIFIER"][0] 
        if has_ordering_keyword: # Check for explicit ordering column
             order_kw_item = items_by_label["ORDERING_KEYWORD"][0]
             following_cols = [item for lbl in mentioned_columns_labels for item in items_by_label[lbl] if item['start_char'] > order_kw_item['end_char']]
             following_cols.sort(key=lambda x: x['start_char'])
             if following_cols and following_cols[0]['label'] in COLUMN_MAP: order_col = COLUMN_MAP[following_cols[0]['label']]
        rank_token = doc.char_span(rank_item['start_char'], rank_item['end_char'], label=rank_item['label']) 
        # Corrected ASC check
        if rank_token and len(rank_token) > 0 and rank_token[0].lemma_.lower() in ["least", "low", "small"]: order_dir = "ASC"
        parsed_structure['order_by'] = {'column': order_col, 'direction': order_dir}
        parsed_structure['select_cols'] = ['Security', order_col] 
            
    # 3. Specific Column Lookup (Specific intent)
    elif is_specific_col_req:
         parsed_structure['intent'] = 'lookup_specific_column'
         company_item = items_by_label.get("COMPANY_NAME", items_by_label.get("ORG", []))[0]
         # ---> FIX: Ensure normalization <---
         company_name_norm = normalize_term(company_item['text'], unique_companies_orig) 
         parsed_structure['filters'].append({ 'column': 'Security', 'operator': '=', 'value': company_name_norm })
         col_label = mentioned_columns_labels[0]
         if col_label in COLUMN_MAP:
              parsed_structure['select_cols'] = [COLUMN_MAP[col_label]]
         else:
              parsed_structure['errors'].append(f"Could not map requested column: {col_label}")
              parsed_structure['select_cols'] = ['*'] 
              
    # 4. Simple Company Details Lookup (Specific intent)
    elif has_company and not has_ranking_modifier and not has_comparison and not mentioned_columns_labels and len(identified_items) < 5: 
        parsed_structure['intent'] = 'lookup_details'
        parsed_structure['select_cols'] = ['*'] 
        company_item = items_by_label.get("COMPANY_NAME", items_by_label.get("ORG", []))[0]
        company_name_norm = normalize_term(company_item['text'], unique_companies_orig) 
        parsed_structure['filters'].append({ 'column': 'Security', 'operator': '=', 'value': company_name_norm })
        
    # 5. Default to Filtered List / General Ordering
    else:
        parsed_structure['intent'] = 'filter_list'
        parsed_structure['select_cols'] = ['Security', 'Marketcap'] # Default columns
        if has_ordering_keyword:
             order_kw_item = items_by_label["ORDERING_KEYWORD"][0]
             following_cols = [item for lbl in mentioned_columns_labels for item in items_by_label[lbl] if item['start_char'] > order_kw_item['end_char']]
             following_cols.sort(key=lambda x: x['start_char'])
             if following_cols and following_cols[0]['label'] in COLUMN_MAP:
                  order_col = COLUMN_MAP[following_cols[0]['label']]; order_dir = 'ASC' 
                  parsed_structure['order_by'] = {'column': order_col, 'direction': order_dir}

    # --- Extract Filters (Common Logic - Ensure correct execution order) ---
    added_filter_tuples = set() 
    # Initialize with filters potentially added by specific intents (like lookup_details)
    for f in parsed_structure['filters']: 
        added_filter_tuples.add( (f['column'], f['operator'], str(f['value'])) )

    # Only add MORE filters if the intent isn't a simple specific lookup
    if parsed_structure['intent'] not in ['list_values', 'lookup_specific_column', 'lookup_details']:
        
        # Basic Filters (Country, Sector, Industry)
        filter_labels_priority = ["COUNTRY_TERM", "SECTOR_TERM", "INDUSTRY_TERM"]
        filter_labels_fallback = ["GPE"] 
        for item_label in filter_labels_priority + filter_labels_fallback:
            if item_label in items_by_label and item_label in COLUMN_MAP:
                db_column = COLUMN_MAP[item_label]
                if not db_column: continue 
                for item in items_by_label[item_label]:
                    filter_value_raw = item.get('mapped_value', item['text'])
                    # ---> FIX: Ensure normalization consistently applied <---
                    normalized_value = filter_value_raw 
                    if item_label in ["SECTOR_TERM", "COLUMN_SECTOR"]: normalized_value = normalize_term(filter_value_raw, unique_sectors_orig, sector_alias_mapping)
                    elif item_label in ["INDUSTRY_TERM", "COLUMN_INDUSTRY"]: normalized_value = normalize_term(filter_value_raw, unique_industries_orig)
                    elif item_label in ["COUNTRY_TERM", "GPE", "COLUMN_COUNTRY"]:
                         if item.get('mapped_value') and item['mapped_value'].lower() in lc_countries_set: normalized_value = normalize_term(item['mapped_value'], unique_countries_orig)
                         else: normalized_value = normalize_term(filter_value_raw, unique_countries_orig, country_mapping)
                    elif item_label in ["COMPANY_NAME", "ORG"]: normalized_value = normalize_term(filter_value_raw, unique_companies_orig)
                    
                    # is_known check
                    is_known = True
                    if db_column == 'Sector' and normalized_value.lower() not in lc_sectors_set: is_known = False
                    if db_column == 'Industry' and normalized_value.lower() not in lc_industries_set: is_known = False
                    if db_column == 'Country' and normalized_value.lower() not in lc_countries_set: is_known = False
                    if not is_known:
                         parsed_structure['errors'].append(f"Ignoring potential filter term '{item['text']}' for column '{db_column}' as it's not a known value.")
                         continue 
                         
                    filter_tuple = (db_column, '=', str(normalized_value))
                    if filter_tuple not in added_filter_tuples:
                        parsed_structure['filters'].append({'column': db_column, 'operator': '=', 'value': normalized_value})
                        added_filter_tuples.add(filter_tuple)

    # --- Filter blocks that should run for MOST intents (incl. filter_list, find_top) ---
    # Add Date, Price, MarketCap filters AFTER basic filters are processed
    
    # Date Filters 
    if has_comparison and has_year and has_date_keyword:
        op_item = items_by_label["COMPARISON_OP"][0]
        year_item = items_by_label["YEAR_NUMBER"][0]
        sql_op = map_comparison_operator(op_item['text'])
        year_val = year_item['text'] 
        if sql_op and year_val:
             filter_column = "Founded" 
             filter_tuple = (filter_column, sql_op, str(year_val))
             if filter_tuple not in added_filter_tuples:
                  parsed_structure['filters'].append({'column': filter_column, 'operator': sql_op, 'value': year_val})
                  added_filter_tuples.add(filter_tuple)
             
    # Price Filters 
    if has_comparison and has_price and has_price_keyword:
        op_item = items_by_label["COMPARISON_OP"][0]
        price_item = items_by_label["PRICE_VALUE"][0]
        sql_op = map_comparison_operator(op_item['text'])
        numeric_val = parse_monetary_value(price_item['text']) 
        if sql_op and numeric_val is not None:
             filter_column = "Stockprice"
             filter_tuple = (filter_column, sql_op, str(numeric_val))
             if filter_tuple not in added_filter_tuples:
                 parsed_structure['filters'].append({'column': filter_column, 'operator': sql_op, 'value': numeric_val})
                 added_filter_tuples.add(filter_tuple)

    # MarketCap Threshold Filters 
    if has_comparison and has_money and not has_price_keyword: 
         op_item = items_by_label["COMPARISON_OP"][0]
         val_item = items_by_label["MONEY_VALUE"][0]
         filter_column = "Marketcap" 
         sql_op = map_comparison_operator(op_item['text'])
         numeric_val = parse_monetary_value(val_item['text'])
         if sql_op and numeric_val is not None:
              filter_tuple = (filter_column, sql_op, str(numeric_val))
              if filter_tuple not in added_filter_tuples:
                   parsed_structure['filters'].append({'column': filter_column, 'operator': sql_op, 'value': numeric_val})
                   added_filter_tuples.add(filter_tuple)
         # Note: We still don't have a fix for "1B." -> MONEY_VALUE, so this block won't catch that.

    # --- Refine Select Columns (Enhanced) ---
    if parsed_structure['intent'] in ['filter_list', 'find_top']:
        # ... (Keep existing logic) ...
        current_cols = set(parsed_structure['select_cols']) 
        explicit_cols = set()
        explicit_col_request = False
        for col_label in mentioned_columns_labels:
             if col_label in COLUMN_MAP and COLUMN_MAP[col_label]:
                  explicit_cols.add(COLUMN_MAP[col_label])
                  explicit_col_request = True
        if explicit_col_request:
             final_cols = ['Security'] + sorted(list(explicit_cols))
             parsed_structure['select_cols'] = list(dict.fromkeys(final_cols)) 
             
    return parsed_structure

### 3.3. Definition: SQL Generation Function & Helpers
Define the `generate_sql_from_structure` function and its `format_sql_value` helper.

In [None]:
# --- SQL Formatting Helper ---
def format_sql_value(value):
    """Formats a Python value for safe inclusion in an SQL query."""
    if isinstance(value, str):
        escaped_value = value.replace("'", "''")
        return f"'{escaped_value}'"
    elif isinstance(value, (int, float)):
        return str(value)
    elif value is None:
        return "NULL"
    elif isinstance(value, bool):
        return "1" if value else "0"
    else:
        escaped_value = str(value).replace("'", "''")
        # Removing the print warning from here, let the parsing step handle warnings
        # print(f"Warning: Formatting unexpected type {type(value)} as string: {value}") 
        return f"'{escaped_value}'"

In [None]:
# --- SQL Generation Function ---
def generate_sql_from_structure(parsed_structure):
    """
    Translates the structured dictionary from Step 6 into a valid SQLite query,
    and returns any parsing errors encountered in Step 6.

    Args:
        parsed_structure (dict): The dictionary output from parse_query_structure_refactored.

    Returns:
        tuple: (sql_query_string, list_of_parsing_errors)
    """
    parsing_errors = parsed_structure.get('errors', []) # Get errors from input structure
    sql_query = "-- No SQL generated due to invalid input." # Default SQL if structure is bad

    if not parsed_structure or not parsed_structure.get('intent'):
        parsing_errors.append("Invalid or unparsed query structure provided.")
        return (sql_query, parsing_errors)

    # --- Generate SQL based on structure (same logic as before) ---
    try:
        select_parts = []
        from_clause = 'FROM "companies"' 
        where_clause = None
        orderby_clause = None
        limit_clause = None

        select_prefix = "SELECT DISTINCT" if parsed_structure.get('distinct', False) else "SELECT"
        select_cols = parsed_structure.get('select_cols')
        
        if not select_cols: select_parts = ["*"]
        elif select_cols == ['*']: select_parts = ["*"]
        else: select_parts = [f'"{col}"' for col in select_cols]
        select_clause = f"{select_prefix} {', '.join(select_parts)}"

        filters = parsed_structure.get('filters')
        if filters:
            conditions = []
            for f in filters:
                col, op, val = f.get('column'), f.get('operator', '='), f.get('value')
                if not col: continue
                quoted_col = f'"{col}"'
                if val is None:
                     if op == '=': conditions.append(f"{quoted_col} IS NULL")
                     elif op in ['!=', '<>']: conditions.append(f"{quoted_col} IS NOT NULL")
                     else: continue 
                else:
                     formatted_val = format_sql_value(val)
                     conditions.append(f"{quoted_col} {op} {formatted_val}")
            if conditions: where_clause = "WHERE " + " AND ".join(conditions)

        order_by = parsed_structure.get('order_by')
        if order_by:
            col, direction = order_by.get('column'), order_by.get('direction', 'ASC').upper()
            if col and direction in ['ASC', 'DESC']: orderby_clause = f'ORDER BY "{col}" {direction}'

        limit = parsed_structure.get('limit')
        if limit is not None:
            try:
                limit_val = int(limit)
                if limit_val > 0: limit_clause = f"LIMIT {limit_val}"
            except (ValueError, TypeError): pass # Ignore invalid limit

        query_parts = [select_clause, from_clause, where_clause, orderby_clause, limit_clause]
        sql_query = " ".join(filter(None, query_parts)).strip() + ";"

    except Exception as e:
         # Catch potential errors during SQL string construction itself
         parsing_errors.append(f"Error during SQL generation: {e}")
         sql_query = "-- Error occurred during SQL generation."

    # --- Return both the generated SQL and the list of parsing errors ---
    return (sql_query, parsing_errors)

## 4. NLP Pipeline Execution & Testing

### 4.1. Load Gazetteers & Apply NER Pipeline
Load terms from the database, add patterns to the NLP components, and process test queries to extract entities. Store results for subsequent steps.

In [None]:
processed_docs = {}
identified_items_store = {}
test_queries = [] # Define the final list of test queries here

if nlp and matcher and ruler:
    # 1. Load Gazetteers
    print("\n--- Loading Gazetteers from Database ---")
    lc_companies, unique_companies_orig = load_terms_from_db(db_file, table_name, 'Security')
    lc_sectors, unique_sectors_orig = load_terms_from_db(db_file, table_name, 'Sector')
    lc_countries, unique_countries_orig = load_terms_from_db(db_file, table_name, 'Country')
    lc_industries, unique_industries_orig = load_terms_from_db(db_file, table_name, 'Industry')
    
    # Combine terms for matching (needed if PhraseMatcher uses combined lists)
    all_country_terms_lc = set(lc_countries) | set(country_mapping.keys())
    all_sector_terms_lc = set(lc_sectors) | set(sector_alias_mapping.keys())
    all_industry_terms_lc = set(lc_industries) 
    all_company_terms_lc = set(lc_companies)

    # 2. Add Patterns to Matcher
    print("\n--- Adding PhraseMatcher Patterns ---")
    # Only add if not empty and len > 1
    if all_company_terms_lc: matcher.add("COMPANY_NAME", [doc for doc in nlp.pipe(all_company_terms_lc) if len(doc) > 1])
    if all_sector_terms_lc: matcher.add("SECTOR_TERM", [doc for doc in nlp.pipe(all_sector_terms_lc) if len(doc) > 1])
    # Country patterns
    for term in lc_countries:
        if len(nlp(term)) == 1:  # Single token countries
            patterns.append({"label": "COUNTRY_TERM", "pattern": [{"LOWER": term}], "id": term})

    # Country aliases  
    for alias, canonical in country_mapping.items():
        patterns.append({"label": "COUNTRY_TERM", "pattern": [{"LOWER": alias}], "id": canonical})

    # Sector patterns - Carefully add single tokens if needed
    # for term in lc_sectors:
    #     if len(nlp(term)) == 1:
    #         patterns.append({"label": "SECTOR_TERM", "pattern": [{"LOWER": term}], "id": term})

    # Sector aliases
    for alias, canonical in sector_alias_mapping.items():
        patterns.append({"label": "SECTOR_TERM", "pattern": [{"LOWER": alias}], "id": canonical})
        
    # Add multi-word sector pattern for "info tech" explicitly
    patterns.append({"label": "SECTOR_TERM", "pattern": [{"LOWER": "info"}, {"LOWER": "tech"}], "id": "information technology"})

    # Industry patterns
    for term in lc_industries:
        if len(nlp(term)) == 1:
            patterns.append({"label": "INDUSTRY_TERM", "pattern": [{"LOWER": term}], "id": term})

    # Company patterns
    for term in lc_companies:
        if len(nlp(term)) == 1:
            patterns.append({"label": "COMPANY_NAME", "pattern": [{"LOWER": term}], "id": term})

    # Fix money value pattern to better handle "1B." cases
    patterns.append({"label": "MONEY_VALUE", "pattern": [
        {"LIKE_NUM": True}, 
        {"LOWER": {"IN": ["b", "bn", "billion", "m", "mn", "million", "t", "tn", "trillion"]}},
        {"TEXT": ".", "OP": "?"} # Optional period
    ]})
    
    patterns.extend(custom_patterns)
    
    # Add all patterns to the ruler
    ruler.add_patterns(patterns)

    # --- Process Example Queries ---
    test_queries = [
        "Top 10 most valuable American IT companies.",
        "Show details for Apple Inc.",
        "Every French company valued over 1B.",
        "List all sectors available.",
        "What is the market cap of Microsoft?",
        "Which German companies are in the Health Care sector?", 
        "Find companies worth more than 500 billion dollars", 
        "Show financials for Tesla", 
        "list info tech companies in the US", 
        "Lowest 5 market cap companies in France", 
        "Show country and sector for companies in Spain",
        "What sector is Apple in?",
        "Show the founding date for Microsoft.",
        "List stock prices for IT companies.",
        "Companies founded after 1990.",
        "Companies with stock price over $500.",
        "USA companies in IT founded before 2000.",
        "List companies ordered by founding date."
    ]

    print("\n--- Analyzing Queries with Refactored Enhanced NER ---")
    processed_docs = {}
    identified_items_store = {}

    for query in test_queries:
        print(f"\n--- Analyzing Query: '{query}' ---")
        doc = nlp(query)
        processed_docs[query] = doc 
        phrase_matches = matcher(doc)

        # --- Collect Entities/Matches (Revised Logic v2) ---
        # Prioritize pipeline entities (Ruler runs first with overwrite=True)
        found_items = []
        pipeline_ents = {} 
        
        for ent in doc.ents:
            span_key = (ent.start_char, ent.end_char)
            ent_id = ent.ent_id_ if ent.ent_id_ else None 
            
            # Attempt normalization based on label and ID/text
            mapped_value = ent.text # Default
            try:
                if ent.label_ == "COUNTRY_TERM":
                     # Use ID if it's a known canonical name, else try mapping the text
                    if ent_id and ent_id in lc_countries: mapped_value = ent_id 
                    elif ent.text.lower() in country_mapping: mapped_value = country_mapping[ent.text.lower()]
                elif ent.label_ == "SECTOR_TERM":
                    if ent_id and ent_id in lc_sectors: mapped_value = ent_id
                    elif ent.text.lower() in sector_alias_mapping: mapped_value = sector_alias_mapping[ent.text.lower()]
                # Add Industry/Company normalization if needed using ent_id and lc_ lists
                
            except Exception as e:
                 print(f"  Warning: Error during mapping/normalization for '{ent.text}' ({ent.label_}): {e}")

            # Store the entity, allowing overwrite based on Ruler's priority
            pipeline_ents[span_key] = {
                "text": ent.text, "label": ent.label_, "ent_id": ent_id,
                "mapped_value": mapped_value, 
                "start_char": ent.start_char, "end_char": ent.end_char,
                "source": "Pipeline (Ruler/NER)" 
            }

        found_items.extend(pipeline_ents.values())

        # Add non-overlapping PhraseMatcher results
        pipeline_spans = {(item['start_char'], item['end_char']) for item in found_items}
        for match_id, start, end in phrase_matches:
            span = doc[start:end]
            span_chars = (span.start_char, span.end_char)
            if span_chars not in pipeline_spans:
                label = nlp.vocab.strings[match_id]
                term_text = span.text
                mapped_value = term_text # Basic mapping for PhraseMatcher results if needed
                if label == "COUNTRY_TERM" and span.text.lower() in country_mapping: mapped_value = country_mapping[span.text.lower()]
                elif label == "SECTOR_TERM" and span.text.lower() in sector_alias_mapping: mapped_value = sector_alias_mapping[span.text.lower()]
                
                found_items.append({
                    "text": term_text, "label": label, "mapped_value": mapped_value,
                    "ent_id": None, 
                    "start_char": span.start_char, "end_char": span.end_char,
                    "source": "PhraseMatcher"
                })

        found_items.sort(key=lambda item: item['start_char'])
        identified_items_store[query] = found_items 

        # Display Results
        print("\nIdentified Entities & Terms:")
        if found_items:
            for item in found_items:
                details = f"- Text: '{item['text']}', Label: {item['label']}, Source: {item['source']}"
                if item['mapped_value'] != item['text']: details += f" (Mapped: '{item['mapped_value']}')"
                if item['ent_id']: details += f" (ID: {item['ent_id']})"
                print(details)
        else: print("- No specific entities or terms identified.")
        print("="*80)
else:
    print("\nSkipping analysis because the spaCy model could not be loaded.")