In [1]:
import pandas as pd
import duckdb
import os
from typing import List, Dict, Optional
import re
import logging
from datetime import datetime

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='sql_generator.log'
)

In [2]:
class SQLQueryGenerator:
    def __init__(self, csv_path: str, chunk_size: int = 100000):
        """
        Initialize SQL Query Generator
        :param csv_path: Path to input CSV file
        :param chunk_size: Size of chunks for processing big data
        """
        self.csv_path = csv_path
        self.chunk_size = chunk_size
        self.table_name = "data_table"
        self.column_types = {}
        self.con = duckdb.connect(':memory:')  # In-memory database for speed

In [3]:
    def load_data(self) -> None:
        """
        Load CSV data into DuckDB with chunking for big data
        """
        try:
            logging.info(f"Loading CSV data from {self.csv_path}")
            
            # Create table directly from CSV
            self.con.execute(f"""
                CREATE TABLE {self.table_name} AS 
                SELECT * FROM read_csv_auto('{self.csv_path}', delim=',', header=True)
            """)
            
            # Get column info
            result = self.con.execute(f"DESCRIBE {self.table_name}").fetchall()
            self.column_types = {row[0]: row[1] for row in result}
            
            logging.info("Data loaded successfully")
        except Exception as e:
            logging.error(f"Error loading data: {str(e)}")
            raise

In [4]:
    def clean_query(self, query: str) -> str:
        """
        Clean and validate SQL query
        """
        # Remove extra spaces and normalize
        query = re.sub(r'\s+', ' ', query.strip())
        
        # Basic validation
        if not query.lower().startswith('select') and not query.lower().startswith('with'):
            raise ValueError("Only SELECT queries are supported for safety")
        
        return query

In [5]:
    def generate_basic_query(self, columns: List[str], conditions: Dict = None, 
                           limit: int = None) -> str:
        """
        Generate basic SELECT query
        """
        try:
            # Validate columns
            invalid_cols = [col for col in columns if col not in self.column_types]
            if invalid_cols:
                raise ValueError(f"Invalid columns: {invalid_cols}")

            # Build query
            select_clause = ", ".join(columns)
            query = f"SELECT {select_clause} FROM {self.table_name}"

            # Add conditions if any
            if conditions:
                where_clauses = []
                for col, condition in conditions.items():
                    if isinstance(condition, str):
                        where_clauses.append(f"{col} = '{condition}'")
                    elif isinstance(condition, (int, float)):
                        where_clauses.append(f"{col} = {condition}")
                    else:
                        raise ValueError(f"Unsupported condition type for {col}")
                query += " WHERE " + " AND ".join(where_clauses)

            # Add limit if specified
            if limit is not None:
                query += f" LIMIT {limit}"

            return query

        except Exception as e:
            logging.error(f"Error generating query: {str(e)}")
            raise

In [6]:
    def execute_query(self, query: str) -> pd.DataFrame:
        """
        Execute SQL query and return results
        """
        try:
            logging.info(f"Executing query: {query}")
            query = self.clean_query(query)
            
            # Use DuckDB to execute query
            result = self.con.execute(query).fetchdf()
            
            logging.info("Query executed successfully")
            return result
            
        except Exception as e:
            logging.error(f"Error executing query: {str(e)}")
            raise

In [7]:
    def optimize_for_big_data(self, query: str) -> str:
        """
        Optimize query for big data processing
        """
        try:
            # Add basic optimization hints
            optimized_query = query
            
            # If there's no LIMIT, add a reasonable default for preview
            if 'LIMIT' not in query.upper():
                optimized_query += " LIMIT 10000"
                
            # Suggest sampling for very large datasets
            if 'WHERE' not in query.upper():
                optimized_query += " SAMPLE 0.1"  # 10% sampling
                
            return optimized_query
            
        except Exception as e:
            logging.error(f"Error optimizing query: {str(e)}")
            raise

In [8]:
    def get_metadata(self) -> Dict:
        """
        Return metadata about the dataset
        """
        try:
            row_count = self.con.execute(f"SELECT COUNT(*) FROM {self.table_name}").fetchone()[0]
            return {
                'columns': self.column_types,
                'row_count': row_count,
                'table_name': self.table_name
            }
        except Exception as e:
            logging.error(f"Error getting metadata: {str(e)}")
            raise

In [9]:
    def close(self):
        """
        Close database connection
        """
        self.con.close()
        logging.info("Database connection closed")

In [10]:
def main():
    try:
        # Example usage
        csv_path = input("Enter path to CSV file: ")
        if not os.path.exists(csv_path):
            raise FileNotFoundError("CSV file not found")

        # Initialize generator
        generator = SQLQueryGenerator(csv_path)
        
        # Load data
        generator.load_data()
        
        # Show metadata
        metadata = generator.get_metadata()
        print("Dataset Metadata:")
        print(f"Table Name: {metadata['table_name']}")
        print(f"Row Count: {metadata['row_count']}")
        print("Columns:")
        for col, dtype in metadata['columns'].items():
            print(f"  {col}: {dtype}")

        while True:
            print("\nOptions:")
            print("1. Generate basic query")
            print("2. Execute custom query")
            print("3. Exit")
            
            choice = input("Enter choice (1-3): ")
            
            if choice == '1':
                columns = input("Enter columns to select (comma-separated): ").split(',')
                columns = [col.strip() for col in columns]
                
                conditions_input = input("Enter conditions (col=value, comma-separated, or leave empty): ")
                conditions = {}
                if conditions_input:
                    for cond in conditions_input.split(','):
                        col, val = cond.split('=')
                        conditions[col.strip()] = val.strip()
                
                limit = input("Enter limit (or leave empty): ")
                limit = int(limit) if limit else None
                
                query = generator.generate_basic_query(columns, conditions, limit)
                print("\nGenerated Query:")
                print(query)
                
                optimized_query = generator.optimize_for_big_data(query)
                print("\nOptimized Query:")
                print(optimized_query)
                
                result = generator.execute_query(optimized_query)
                print("\nResults:")
                print(result)

            elif choice == '2':
                custom_query = input("Enter custom SQL query: ")
                result = generator.execute_query(custom_query)
                print("\nResults:")
                print(result)

            elif choice == '3':
                break

            else:
                print("Invalid choice")

    except Exception as e:
        logging.error(f"Error in main: {str(e)}")
        print(f"An error occurred: {str(e)}")

    finally:
        generator.close()

if __name__ == "__main__":
    main()

Enter path to CSV file:  loan-train.csv


An error occurred: 'SQLQueryGenerator' object has no attribute 'load_data'


AttributeError: 'SQLQueryGenerator' object has no attribute 'close'