In [1]:
import clickhouse_connect
import os
import pandas as pd
from dotenv import load_dotenv
load_dotenv()

# Establish connection to clickhouse
client = clickhouse_connect.get_client(
    host=os.getenv("ch_host"),   
    username=os.getenv("ch_username"),           
    password=os.getenv("ch_password"),
    secure=True
)

database = os.getenv("ch_database")


def get_comprehensive_database_metadata(client, database, output_file='outputs/database_metadata.csv'):
    """
    Extract comprehensive metadata for all tables and columns in the database
    """
    # 1. Get all table names in the database
    table_query = f"SELECT name FROM system.tables WHERE database = '{database}'"
    table_names = [row[0] for row in client.query(table_query).result_rows]
    
    metadata = []
    
    for table in table_names:
        print(f"Processing table: {table}")
        
        # 2. Get column information including data types and primary key status
        column_query = f"""
        SELECT name, type, is_in_primary_key 
        FROM system.columns 
        WHERE database = '{database}' AND table = '{table}'
        """
        columns = client.query(column_query).result_rows
        
        for col_name, col_type, is_pk in columns:
            try:
                # 3. Calculate cardinality (distinct count)
                cardinality_query = f"SELECT count(DISTINCT `{col_name}`) FROM `{database}`.`{table}`"
                cardinality = client.query(cardinality_query).result_rows[0][0]
                
                # 4. Get distinct values based on cardinality
                if cardinality < 30:
                    # Get up to 3 distinct values
                    distinct_query = f"SELECT DISTINCT `{col_name}` FROM `{database}`.`{table}` LIMIT 3"
                else:
                    # Get up to 30 distinct values
                    distinct_query = f"SELECT DISTINCT `{col_name}` FROM `{database}`.`{table}` LIMIT 30"
                
                distinct_results = client.query(distinct_query).result_rows
                distinct_values = [str(row[0]) for row in distinct_results]
                
                # Create metadata record
                metadata.append({
                    'table_name': table,
                    'column_name': col_name,
                    'data_type': col_type,
                    'cardinality': cardinality,
                    'primary_key': 'Yes' if is_pk else 'No',
                    'distinct_values': ', '.join(distinct_values) if distinct_values else ''
                })
                
            except Exception as e:
                print(f"Error processing {table}.{col_name}: {e}")
                # Add record with error info
                metadata.append({
                    'table_name': table,
                    'column_name': col_name,
                    'data_type': col_type,
                    'cardinality': 'Error',
                    'primary_key': 'Yes' if is_pk else 'No',
                    'distinct_values': f'Error: {str(e)}'
                })
    
    # Create DataFrame and save to CSV
    df = pd.DataFrame(metadata)
    df.to_csv(output_file, index=False)
    print(f"Metadata saved to {output_file}")
    return df

if __name__ == "__main__":
    os.makedirs('outputs', exist_ok=True)
    df_metadata = get_comprehensive_database_metadata(client, database)

Processing table: customers
Processing table: geolocation
Processing table: geolocation
Processing table: items
Processing table: items
Processing table: order_payments
Processing table: order_payments
Processing table: order_reviews
Processing table: order_reviews
Processing table: orders
Processing table: orders
Processing table: product_category_name_translation
Processing table: product_category_name_translation
Processing table: sellers
Processing table: sellers
Metadata saved to outputs/database_metadata.csv
Metadata saved to outputs/database_metadata.csv


In [None]:
import os
import dspy
from dspy import InputField, OutputField, Signature

# Configure Gemini as the language model in DSPy
os.environ["GOOGLE_API_KEY"] = os.getenv("GOOGLE_API_KEY")
gemini = dspy.Google("models/gemini-1.5-flash")  # Using Gemini 1.5 Flash for efficiency
dspy.settings.configure(lm=gemini)

# DSPy Signature for Query Decomposition
class DecomposeQuery(Signature):
    """Decompose a business question into atomic sub-queries for RAG retrieval."""
    
    query: str = InputField(desc="The original user business question.")
    sub_queries: list[str] = OutputField(desc="List of 3-5 decomposed sub-queries.")

# DSPy Signature for Query Enrichment
class EnrichSubQuery(Signature):
    """Enrich a sub-query with synonyms, related terms, and schema metadata."""
    
    sub_query: str = InputField(desc="A single decomposed sub-query.")
    schema_metadata: str = InputField(desc="Optional schema metadata (e.g., table/column descriptions).", default="")
    enriched_query: str = OutputField(desc="Enriched version of the sub-query with expansions.")

# Function to perform decomposition and enrichment
def enhance_and_decompose_query(user_query, schema_metadata=""):
    # Step 1: Decompose the query using ChainOfThought for reasoning
    decompose = dspy.ChainOfThought(DecomposeQuery)
    decomposition_result = decompose(query=user_query)
    sub_queries = decomposition_result.sub_queries
    
    # Step 2: Enrich each sub-query
    enrich = dspy.ChainOfThought(EnrichSubQuery)
    enriched_sub_queries = []
    for sq in sub_queries:
        enrichment_result = enrich(sub_query=sq, schema_metadata=schema_metadata)
        enriched_sub_queries.append(enrichment_result.enriched_query)
    
    return enriched_sub_queries

# Example usage
if __name__ == "__main__":
    user_query = "What are the sales trends for top products in Q2 across regions?"
    schema_metadata = "Tables include: sales_data (columns: prod_id, sales_amt, region), product_inventory (columns: prod_name, stock_level), quarterly_metrics (columns: quarter, trend_score)."
    
    result = enhance_and_decompose_query(user_query, schema_metadata)
    print("Enriched Sub-Queries:")
    for eq in result:
        print(f"- {eq}")

ModuleNotFoundError: No module named 'dspy.functional'