In [0]:
%sql

CREATE CATALOG IF NOT EXISTS dummy_catalog;


In [0]:
%sql

CREATE SCHEMA IF NOT EXISTS dummy_catalog.dummy_schema;


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS dummy_schema;


In [0]:
%sql
USE CATALOG dummy_catalog;
USE SCHEMA dummy_schema;


In [0]:
%sql
-- Create 'orders' table
CREATE OR REPLACE TEMP VIEW orders AS
SELECT * FROM VALUES
  (101, 1, 1001, DATE('2025-06-01'), 650, 'shipped'),
  (102, 2, 1002, DATE('2025-05-20'), 250, 'shipped'),
  (103, 3, 1003, DATE('2025-05-10'), 80,  'pending'),
  (104, 1, 1002, DATE('2025-06-10'), 120, 'shipped')
AS orders(order_id, customer_id, product_id, order_date, total_amount, status);

-- Create 'customers' table
CREATE OR REPLACE TEMP VIEW customers AS
SELECT * FROM VALUES
  (1, 'Alice Johnson'),
  (2, 'Bob Smith'),
  (3, 'Charlie Davis')
AS customers(customer_id, customer_name);

-- Create 'products' table
CREATE OR REPLACE TEMP VIEW products AS
SELECT * FROM VALUES
  (1001, 'Laptop'),
  (1002, 'Headphones'),
  (1003, 'Keyboard')
AS products(product_id, product_name);


In [0]:
import pandas as pd
import re

def clean_func_name(name):
    return re.sub(r'\(.*\)', '', str(name)).strip().upper()

# 1. Load mapping from CSV (robust to encoding)
csv_path = '/Volumes/workspace/default/adhyan/Redshift and Databricks functions(Sheet1).csv'
try:
    mapping_df = pd.read_csv(csv_path, encoding='utf-8-sig')
except UnicodeDecodeError:
    mapping_df = pd.read_csv(csv_path, encoding='latin1')
mapping_df.columns = [col.strip() for col in mapping_df.columns]

# 2. Build direct and manual mappings
direct_map = mapping_df[mapping_df['need to change'].str.contains('direct', case=False, na=False)]
manual_map = mapping_df[mapping_df['need to change'].str.contains('manual|needs to be done manually|needs to do manually|needs to pass', case=False, na=False)]

function_map = {
    clean_func_name(row['Redshift_function']): clean_func_name(row['Databricks_function'])
    for _, row in direct_map.iterrows()
}
manual_funcs = set([clean_func_name(f) for f in manual_map['Redshift_function']])

# 3. Read SQL file
sql_path = '/Volumes/workspace/default/adhyan/Redshift_conversion.sql'
with open(sql_path, 'r', encoding='utf-8') as f:
    sql_script = f.read()

queries = [q.strip() for q in sql_script.split(';') if q.strip()]

# 4. Function replacement logic
def replace_functions(query, function_map, manual_funcs):
    flagged = []
    converted = query

    # Replace direct mappings from CSV
    for redshift_func, databricks_func in function_map.items():
        pattern = re.compile(rf'\b{re.escape(redshift_func)}\s*\(', re.IGNORECASE)
        converted = pattern.sub(f'{databricks_func}(', converted)

    # Flag manual functions from CSV
    for func in manual_funcs:
        if re.search(rf'\b{re.escape(func)}\s*\(', converted, re.IGNORECASE):
            flagged.append(func)

    return converted, flagged

# 5. Process all queries
converted_queries = []
manual_flags = []
for q in queries:
    converted, flagged = replace_functions(q, function_map, manual_funcs)
    converted_queries.append(converted)
    if flagged:
        manual_flags.append((converted, flagged))

# 6. Output results
out_path = '/Volumes/workspace/default/adhyan/Redshift_conversion_converted.sql'
with open(out_path, 'w', encoding='utf-8') as f:
    for q in converted_queries:
        f.write(q.strip() + ";\n\n")

# Print summary
print("=== Converted Queries ===")
for i, q in enumerate(converted_queries, 1):
    print(f"Query {i}:")
    print(q)
    print('-' * 60)

if manual_flags:
    print('\n=== Queries Requiring Manual Intervention ===')
    for i, (query, funcs) in enumerate(manual_flags, 1):
        print(f"Query {i} - Functions needing manual review: {', '.join(funcs)}")
        print(query)
        print('-' * 60)

print('\n=== Conversion Summary ===')
print(f'Total queries processed: {len(queries)}')
print(f'Successfully converted: {len(queries) - len(manual_flags)}')
print(f'Requires manual review: {len(manual_flags)}')


=== Converted Queries ===
Query 1:
SELECT DISTINCT
    o.order_id,
    c.customer_name,
    p.product_name,
    o.order_date,
    CASE 
        WHEN o.total_amount > 500 THEN 'High'
        WHEN o.total_amount BETWEEN 100 AND 500 THEN 'Medium'
        ELSE 'Low'
    END AS order_value_category
FROM
    orders o
JOIN
    customers c ON o.customer_id = c.customer_id
JOIN
    products p ON o.product_id = p.product_id
WHERE
    o.order_date >= DATE_SUB(CURRENT_DATE, 30)
    AND o.status = 'shipped'
ORDER BY
    o.order_date DESC
------------------------------------------------------------

=== Conversion Summary ===
Total queries processed: 1
Successfully converted: 1
Requires manual review: 0


In [0]:
%sql

SELECT DISTINCT
    o.order_id,
    c.customer_name,
    p.product_name,
    o.order_date,
    CASE 
        WHEN o.total_amount > 500 THEN 'High'
        WHEN o.total_amount BETWEEN 100 AND 500 THEN 'Medium'
        ELSE 'Low'
    END AS order_value_category
FROM
    orders o
JOIN
    customers c ON o.customer_id = c.customer_id
JOIN
    products p ON o.product_id = p.product_id
WHERE
    o.order_date >= DATE_SUB(CURRENT_DATE, 30)
    AND o.status = 'shipped'
ORDER BY
    o.order_date DESC


order_id,customer_name,product_name,order_date,order_value_category
104,Alice Johnson,Headphones,2025-06-10,Medium
101,Alice Johnson,Laptop,2025-06-01,High
102,Bob Smith,Headphones,2025-05-20,Medium


testing

In [0]:
import pandas as pd
import re
from typing import Dict, List, Tuple, Set

def clean_func_name(name: str) -> str:
    """Clean function name by removing parameters and normalizing case."""
    if pd.isna(name):
        return ''
    # Remove everything after first parenthesis and clean whitespace
    return re.sub(r'\(.*\)', '', str(name)).strip().upper()

def load_mappings(csv_path: str) -> Tuple[Dict[str, dict], Dict[str, dict], pd.DataFrame]:
    """
    Load and validate function mappings from CSV.
    Returns:
        - function_map: Dictionary of direct replacements
        - manual_funcs: Dictionary of functions needing manual conversion
        - full_mapping: Complete DataFrame for reference
    """
    try:
        mapping_df = pd.read_csv(csv_path, encoding='utf-8-sig')
    except UnicodeDecodeError:
        mapping_df = pd.read_csv(csv_path, encoding='latin1')
    
    # Clean column names and fill empty values
    mapping_df.columns = [col.strip() for col in mapping_df.columns]
    mapping_df.fillna('', inplace=True)
    
    # Validate critical mappings
    required_mappings = {
        'DATE_SUB': 'DATE_SUB',
        'DATE_TRUNC': 'DATE_TRUNC',
        'NVL': 'COALESCE'
    }
    
    # Create direct mapping (functions that can be automatically converted)
    direct_map = mapping_df[
        mapping_df['need to change'].str.contains(
            'direct|no need|can be directly replaced', 
            case=False, 
            na=False
        )
    ]
    
    # Create manual mapping (functions needing special handling)
    manual_map = mapping_df[
        mapping_df['need to change'].str.contains(
            'manual|needs to be done|needs to pass|needs to replace',
            case=False,
            na=False
        )
    ]
    
    # Build function dictionaries with validation
    function_map = {}
    for _, row in direct_map.iterrows():
        func_name = clean_func_name(row['Redshift_function'])
        # Skip empty or invalid mappings
        if not func_name or func_name.startswith('EXAMPLE'):
            continue
        function_map[func_name] = {
            'target': clean_func_name(row['Databricks_function']),
            'description': row['Description'],
            'example': row['Use in redshift']
        }
    
    # Add required mappings if missing
    for func, target in required_mappings.items():
        if func not in function_map:
            function_map[func] = {
                'target': target,
                'description': f'Automatically added required mapping for {func}',
                'example': f'{func}(...)'
            }
    
    manual_funcs = {}
    for _, row in manual_map.iterrows():
        func_name = clean_func_name(row['Redshift_function'])
        if not func_name:
            continue
        manual_funcs[func_name] = {
            'suggestion': row['Databricks_function'],
            'description': row['Description'],
            'example': row['Use in redshift']
        }
    
    return function_map, manual_funcs, mapping_df

def preprocess_sql(sql_content: str) -> List[str]:
    """Split SQL into individual queries while handling edge cases."""
    # Remove comments (both single-line and multi-line)
    sql_content = re.sub(r'--.*?$|/\*.*?\*/', '', sql_content, flags=re.MULTILINE|re.DOTALL)
    
    # Split on semicolons that aren't within strings
    queries = [
        q.strip() 
        for q in re.split(r';(?=(?:[^\'"]|\'[^\']*\'|"[^"]*")*$)', sql_content)
        if q.strip()
    ]
    return queries

def convert_function_call(match: re.Match, function_map: Dict[str, dict]) -> str:
    """Convert a single function call using the mapping rules with exact matching."""
    func_name = match.group(1).upper()  # Force uppercase comparison
    params = match.group(2)
    
    # Get mapping info with exact match
    mapping = function_map.get(func_name)
    
    if mapping:
        return f"{mapping['target']}({params})"
    return match.group(0)  # Return original if no mapping found

def convert_query(query: str, function_map: Dict[str, dict], manual_funcs: Dict[str, dict]) -> Tuple[str, List[dict]]:
    """Convert a single SQL query from Redshift to Databricks with validation."""
    flagged = []
    converted = query
    
    # First pass: Check for unmapped functions
    all_known_funcs = set(function_map.keys()).union(set(manual_funcs.keys()))
    found_funcs = set(re.findall(r'\b([A-Z_][A-Z0-9_]*)\s*\(', query, re.IGNORECASE))
    
    for func in found_funcs:
        func_upper = func.upper()
        if func_upper not in all_known_funcs:
            flagged.append({
                'function': func_upper,
                'suggestion': 'UNMAPPED_FUNCTION',
                'description': 'Function not found in mapping file',
                'example': f'{func_upper}(...)'
            })
    
    # Second pass: Convert directly mappable functions
    for func_name, mapping in function_map.items():
        # Use word boundaries and exact matching
        pattern = re.compile(rf'\b({re.escape(func_name)})\b\s*\((.*?)\)', re.IGNORECASE)
        converted = pattern.sub(
            lambda m: convert_function_call(m, function_map),
            converted
        )
    
    # Third pass: Flag manual functions
    for func_name, details in manual_funcs.items():
        if re.search(rf'\b{re.escape(func_name)}\b\s*\(', converted, re.IGNORECASE):
            flagged.append({
                'function': func_name,
                'suggestion': details['suggestion'],
                'description': details['description'],
                'example': details['example']
            })
    
    return converted, flagged

def process_conversion(input_sql_path: str, output_sql_path: str, mapping_csv_path: str):
    """Main conversion workflow with enhanced validation."""
    # Load mappings
    function_map, manual_funcs, full_mapping = load_mappings(mapping_csv_path)
    
    # Diagnostic output
    print("\n=== FUNCTION MAPPING VALIDATION ===")
    print(f"Direct mappings loaded: {len(function_map)}")
    print(f"Functions requiring manual conversion: {len(manual_funcs)}")
    
    # Verify critical mappings
    critical_funcs = ['DATE_SUB', 'DATE_TRUNC', 'NVL', 'LISTAGG']
    print("\nCritical function mappings:")
    for func in critical_funcs:
        status = "✓" if func in function_map else "✗"
        target = function_map.get(func, {}).get('target', 'MISSING')
        print(f"  {status} {func.ljust(15)} → {target.ljust(15)}")
    
    # Read and preprocess SQL
    with open(input_sql_path, 'r', encoding='utf-8') as f:
        sql_content = f.read()
    
    queries = preprocess_sql(sql_content)
    converted_queries = []
    manual_reviews = []
    
    # Process each query
    for i, query in enumerate(queries, 1):
        converted, flagged = convert_query(query, function_map, manual_funcs)
        converted_queries.append(converted)
        if flagged:
            manual_reviews.append({
                'query_number': i,
                'converted_query': converted,
                'flagged_functions': flagged
            })
    
    # Write output
    with open(output_sql_path, 'w', encoding='utf-8') as f:
        f.write("\n\n".join(converted_queries))
    
    # Generate comprehensive report
    print("\n=== CONVERSION REPORT ===")
    print(f"Total queries processed: {len(queries)}")
    print(f"Fully converted queries: {len(queries) - len(manual_reviews)}")
    print(f"Queries needing manual review: {len(manual_reviews)}")
    
    # Display all converted queries
    print("\n=== FULL CONVERTED OUTPUT ===")
    for i, query in enumerate(converted_queries, 1):
        print(f"\n-- Query {i} --")
        print(query)
        print("-" * 80)
    
    # Display manual review items
    if manual_reviews:
        print("\n=== MANUAL REVIEW REQUIRED ===")
        for review in manual_reviews:
            print(f"\nQuery #{review['query_number']}:")
            print(review['converted_query'])
            print("\nFunctions needing attention:")
            for func in review['flagged_functions']:
                print(f"\n* {func['function']}:")
                print(f"  Description: {func['description']}")
                print(f"  Suggestion: {func['suggestion']}")
                print(f"  Example: {func['example']}")
            print("=" * 80)
    
    # Verify output file
    print(f"\nOutput file created at: {output_sql_path}")
    try:
        with open(output_sql_path, 'r') as f:
            print("\nFirst 300 characters of output file:")
            print(f.read(300))
    except Exception as e:
        print(f"\nError verifying output file: {str(e)}")

if __name__ == "__main__":
    # Configuration
    CSV_PATH = '/Volumes/workspace/default/adhyan/Redshift and Databricks functions(Sheet1).csv'
    INPUT_SQL = '/Volumes/workspace/default/adhyan/Redshift_conversion.sql'
    OUTPUT_SQL = '/Volumes/workspace/default/adhyan/Redshift_conversion_converted.sql'
    
    # Run conversion
    process_conversion(INPUT_SQL, OUTPUT_SQL, CSV_PATH)


=== FUNCTION MAPPING VALIDATION ===
Direct mappings loaded: 150
Functions requiring manual conversion: 30

Critical function mappings:
  ✓ DATE_SUB        → DATE_SUB       
  ✓ DATE_TRUNC      → TRUNC          
  ✓ NVL             → COALESCE       
  ✗ LISTAGG         → MISSING        

=== CONVERSION REPORT ===
Total queries processed: 1
Fully converted queries: 1
Queries needing manual review: 0

=== FULL CONVERTED OUTPUT ===

-- Query 1 --
SELECT DISTINCT
    o.order_id,
    c.customer_name,
    p.product_name,
    o.order_date,
    CASE 
        WHEN o.total_amount > 500 THEN 'High'
        WHEN o.total_amount BETWEEN 100 AND 500 THEN 'Medium'
        ELSE 'Low'
    END AS order_value_category
FROM
    orders o
JOIN
    customers c ON o.customer_id = c.customer_id
JOIN
    products p ON o.product_id = p.product_id
WHERE
    o.order_date >= DATE_SUB(CURRENT_DATE, 30)
    AND o.status = 'shipped'
ORDER BY
    o.order_date DESC
--------------------------------------------------------

In [0]:
%sql

SELECT DISTINCT
    o.order_id,
    c.customer_name,
    p.product_name,
    o.order_date,
    CASE 
        WHEN o.total_amount > 500 THEN 'High'
        WHEN o.total_amount BETWEEN 100 AND 500 THEN 'Medium'
        ELSE 'Low'
    END AS order_value_category
FROM
    orders o
JOIN
    customers c ON o.customer_id = c.customer_id
JOIN
    products p ON o.product_id = p.product_id
WHERE
    o.order_date >= DATE_SUB(CURRENT_DATE, 30)
    AND o.status = 'shipped'
ORDER BY
    o.order_date DESC

order_id,customer_name,product_name,order_date,order_value_category
104,Alice Johnson,Headphones,2025-06-10,Medium
101,Alice Johnson,Laptop,2025-06-01,High
102,Bob Smith,Headphones,2025-05-20,Medium


clear sepration between csv and CSV-driven mappings and critical fallbacks   
  # Define critical function fallbacks (only used if missing in CSV)
  code is primarily taking mappings from the CSV file, but it also has some automatic fallbacks for critical functions.
  It processes two main categories from the CSV:
direct_map = mapping_df[mapping_df['need to change'].str.contains('direct|no need|can be directly replaced', ...)]
manual_map = mapping_df[mapping_df['need to change'].str.contains('manual|needs to be done|needs to pass|needs to replace', ...)]
2. Automatic Fallback Mappings
The code includes required_mappings as a safety net for critical functions:

python
required_mappings = {
    'DATE_SUB': 'DATE_SUB',       # Ensures DATE_SUB is always mapped
    'DATE_TRUNC': 'DATE_TRUNC',   # Ensures DATE_TRUNC is always mapped
    'NVL': 'COALESCE'             # Ensures NVL maps to COALESCE
}


Clearly reports what's missing (like LISTAGG)




In [0]:
import pandas as pd
import re
from typing import Dict, List, Tuple, Set, Optional, Pattern, Match

def clean_func_name(name: str) -> str:
    """Clean function name by removing parameters and normalizing case."""
    if pd.isna(name) or not isinstance(name, str):
        return ''
    return re.sub(r'\(.*\)', '', name).strip().upper()

def load_mappings(csv_path: str) -> Tuple[Dict[str, dict], Dict[str, dict], pd.DataFrame]:
    """
    Load function mappings from CSV with strict validation.
    Returns:
        - function_map: Verified direct replacements
        - manual_funcs: Functions needing manual handling
        - full_mapping: Original DataFrame
    """
    try:
        mapping_df = pd.read_csv(csv_path, encoding='utf-8-sig').fillna('')
    except (UnicodeDecodeError, FileNotFoundError) as e:
        try:
            mapping_df = pd.read_csv(csv_path, encoding='latin1').fillna('')
        except Exception as e:
            raise ValueError(f"Failed to load CSV: {str(e)}")
    
    mapping_df.columns = [col.strip() for col in mapping_df.columns]

    # Define critical function fallbacks
    CRITICAL_FALLBACKS = {
        'DATE_SUB': {'target': 'DATE_SUB', 'description': 'Date subtraction', 'example': 'DATE_SUB(date, days)'},
        'NVL': {'target': 'COALESCE', 'description': 'Null value replacement', 'example': 'NVL(expr, default)'},
        'LISTAGG': {'target': 'CONCAT_WS', 'description': 'String aggregation', 'example': 'LISTAGG(col, delimiter)'}
    }

    # Process direct mappings with more robust filtering
    direct_map = mapping_df[
        mapping_df['need to change'].str.contains(
            r'direct|no need|can be directly replaced', 
            case=False, 
            na=False, regex=True
        )
    ]
    
    function_map = {}
    for _, row in direct_map.iterrows():
        try:
            rs_func = clean_func_name(row['Redshift_function'])
            db_func = clean_func_name(row['Databricks_function'])
            if rs_func and db_func and not rs_func.startswith(('EXAMPLE', 'TEST')):
                function_map[rs_func] = {
                    'target': db_func,
                    'description': row.get('Description', ''),
                    'example': row.get('Use in redshift', '')
                }
        except Exception as e:
            print(f"Warning: Skipping row due to error - {str(e)}")

    # Add critical fallbacks only if missing
    for func, mapping in CRITICAL_FALLBACKS.items():
        if func not in function_map:
            function_map[func] = mapping

    # Process manual mappings with error handling
    manual_map = mapping_df[
        mapping_df['need to change'].str.contains(
            r'manual|needs to be done|needs to pass|needs to replace',
            case=False,
            na=False, regex=True
        )
    ]
    
    manual_funcs = {}
    for _, row in manual_map.iterrows():
        try:
            rs_func = clean_func_name(row['Redshift_function'])
            if rs_func:
                manual_funcs[rs_func] = {
                    'suggestion': clean_func_name(row['Databricks_function']),
                    'description': row.get('Description', ''),
                    'example': row.get('Use in redshift', '')
                }
        except Exception as e:
            print(f"Warning: Skipping manual mapping row - {str(e)}")

    return function_map, manual_funcs, mapping_df

def preprocess_sql(sql_content: str) -> List[str]:
    """Split SQL into clean, individual queries."""
    # Remove all comments more robustly
    sql_content = re.sub(r'--[^\n]*|/\*.*?\*/', '', sql_content, flags=re.DOTALL)
    # Split on semicolons not in strings with better handling
    return [
        q.strip() for q in re.split(r';(?=(?:[^\'"]|\'[^\']*\'|"[^"]*")*$)', sql_content)
        if q.strip()
    ]

def convert_function_call(match: re.Match, function_map: Dict[str, dict]) -> str:
    """Convert function call with exact matching and parameter preservation."""
    func_name = match.group(1).upper()
    params = match.group(2)
    if func_name in function_map:
        return f"{function_map[func_name]['target']}({params})"
    return match.group(0)

def convert_query(query: str, function_map: Dict[str, dict], manual_funcs: Dict[str, dict]) -> Tuple[str, List[dict]]:
    """Convert query with comprehensive validation."""
    flagged = []
    converted = query
    
    # More robust function detection
    func_pattern: Pattern = re.compile(r'\b([A-Z_][A-Z0-9_]*)\s*\((?![^()]*\))', re.IGNORECASE)
    found_funcs = {f.upper() for f in func_pattern.findall(query)}
    
    all_mapped = set(function_map.keys()).union(manual_funcs.keys())
    
    # Check for unmapped functions
    for func in found_funcs - all_mapped:
        flagged.append({
            'function': func,
            'suggestion': 'UNMAPPED_FUNCTION',
            'description': 'No mapping found in CSV or fallbacks',
            'example': f'{func}(...)'
        })

    # Convert direct mappings
    for func_name in found_funcs & set(function_map.keys()):
        pattern = re.compile(rf'\b({re.escape(func_name)})\b\s*\((.*?)\)', re.IGNORECASE)
        converted = pattern.sub(lambda m: convert_function_call(m, function_map), converted)

    # Flag manual functions
    for func_name in found_funcs & set(manual_funcs.keys()):
        flagged.append(manual_funcs[func_name])

    return converted, flagged

def process_conversion(input_path: str, output_path: str, mapping_path: str) -> None:
    """End-to-end conversion workflow with error handling."""
    try:
        function_map, manual_funcs, _ = load_mappings(mapping_path)
    except Exception as e:
        print(f"Error loading mappings: {str(e)}")
        return

    # Validation report
    print("=== MAPPING VALIDATION ===")
    print(f"Direct mappings: {len(function_map)}")
    print(f"Manual functions: {len(manual_funcs)}")
    print("\nCritical function status:")
    for func in ['DATE_SUB', 'NVL', 'LISTAGG', 'DATE_TRUNC']:
        status = "✓" if func in function_map else "✗"
        target = function_map.get(func, {}).get('target', 'MISSING')
        print(f"  {status} {func.ljust(10)} → {target.ljust(15)}")

    try:
        with open(input_path, 'r', encoding='utf-8') as f:
            queries = preprocess_sql(f.read())
    except Exception as e:
        print(f"Error reading input SQL: {str(e)}")
        return

    results = []
    manual_reviews = []
    
    print("\n=== CONVERSION PROCESS ===")
    for i, query in enumerate(queries, 1):
        try:
            print(f"\nProcessing Query {i}:")
            print("Original query:")
            print(query)
            
            converted, flags = convert_query(query, function_map, manual_funcs)
            results.append(converted)
            
            print("\nConverted query:")
            print(converted)
            
            if flags:
                manual_reviews.append({'query_num': i, 'query': converted, 'flags': flags})
                print("\nFlags raised:")
                for flag in flags:
                    print(f"  - {flag['function']}: {flag.get('description', '')}")
                    print(f"    Suggested fix: {flag.get('suggestion', 'Add to CSV')}")
        except Exception as e:
            print(f"Error converting query {i}: {str(e)}")
            results.append(query)  # Keep original if conversion fails

    try:
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write("\n\n".join(results))
    except Exception as e:
        print(f"Error writing output: {str(e)}")
        return

    # Generate final report
    print("\n=== FINAL CONVERSION REPORT ===")
    print(f"Total queries processed: {len(queries)}")
    print(f"Queries requiring manual review: {len(manual_reviews)}")
    print(f"\nOutput saved to: {output_path}")

    if manual_reviews:
        print("\n=== QUERIES NEEDING MANUAL REVIEW ===")
        for review in manual_reviews:
            print(f"\nQuery {review['query_num']}:")
            print(review['query'])
            print("\nIssues:")
            for flag in review['flags']:
                print(f"  - {flag['function']}: {flag.get('description', '')}")
                print(f"    Suggested fix: {flag.get('suggestion', 'Add to CSV')}")

if __name__ == "__main__":
    # Configuration with single output path
    process_conversion(
        input_path='/Volumes/workspace/default/adhyan/Sanjivani.sql',
        output_path='/Volumes/workspace/default/adhyan/databricks_converted.sql',
        mapping_path='/Volumes/workspace/default/adhyan/Redshift and Databricks functions(Sheet1).csv'
    )

=== MAPPING VALIDATION ===
Direct mappings: 151
Manual functions: 30

Critical function status:
  ✓ DATE_SUB   → DATE_SUB       
  ✓ NVL        → COALESCE       
  ✓ LISTAGG    → CONCAT_WS      
  ✓ DATE_TRUNC → TRUNC          

=== CONVERSION PROCESS ===

Processing Query 1:
Original query:
WITH user_stats AS (
  SELECT 
    user_id,
    COUNT(DISTINCT order_id) AS order_count,
    SUM(CASE WHEN refunded = FALSE THEN amount ELSE 0 END) AS net_revenue,
    APPROXIMATE PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY amount) AS median_order
  FROM orders
  WHERE order_date BETWEEN DATEADD(month, -3, CURRENT_DATE) AND CURRENT_DATE
  GROUP BY user_id
)
SELECT
  u.user_id,
  u.username,
  us.order_count,
  us.net_revenue,
  TO_CHAR(us.net_revenue/us.order_count, '999.99') AS avg_order_value,
  DATE_DIFF('day', u.signup_date, CURRENT_DATE) AS days_since_signup
FROM users u
JOIN user_stats us ON u.user_id = us.user_id
WHERE us.order_count > 1
ORDER BY us.net_revenue DESC
LIMIT 100

Converted quer

concern about having to manually maintain fallback functions. You'd like the script to work more like an LLM-based transpiler that can handle conversions more intelligently without predefined mappings. Here's how we can enhance your current approach:

Key Improvements
Dynamic Function Mapping: Instead of hardcoded fallbacks, implement pattern-based conversions

Context-Aware Conversion: Analyze function parameters to make smarter conversions

Syntax Tree Parsing: Move beyond regex to proper SQL parsing

Learning Mechanism: Cache successful conversions for future use

In [0]:
import pandas as pd
import re
from typing import Dict, List, Tuple, Set, Optional, Pattern, Match, Any
from collections import defaultdict
import sqlparse
from sqlparse.sql import Function, Identifier, Token
from sqlparse.tokens import Name, Punctuation

class SQLTranspiler:
    def __init__(self, mapping_path: Optional[str] = None):
        self.function_map = defaultdict(dict)
        self.manual_funcs = defaultdict(dict)
        self.learned_conversions = {}
        self.sql_keywords = {
            'AS', 'SELECT', 'FROM', 'WHERE', 'GROUP', 'BY', 'ORDER', 'LIMIT',
            'WITH', 'JOIN', 'ON', 'AND', 'OR', 'NOT', 'CASE', 'WHEN', 'THEN', 'END',
            'BETWEEN', 'TRUE', 'FALSE', 'NULL', 'IS', 'IN', 'LIKE', 'DISTINCT', 'HAVING'
        }
        
        if mapping_path:
            self.load_mappings(mapping_path)
            
        # Initialize with intelligent pattern-based conversions
        self.init_pattern_based_conversions()
    
    def init_pattern_based_conversions(self):
        """Initialize common conversion patterns that don't need explicit mappings"""
        # Date/time functions
        self.add_conversion_pattern(
            pattern=r'DATEADD\s*\(\s*(day|days)\s*,\s*(.+?)\s*,\s*(.+?)\s*\)',
            replacement=r'DATE_ADD(\3, \2)',
            description='Convert day-based DATEADD to DATE_ADD'
        )
        self.add_conversion_pattern(
            pattern=r'DATEADD\s*\(\s*(month|months)\s*,\s*(.+?)\s*,\s*(.+?)\s*\)',
            replacement=r'DATE_ADD(\3, \2*30)',
            description='Convert month-based DATEADD to approximate days'
        )
        
        # String functions
        self.add_conversion_pattern(
            pattern=r'LISTAGG\s*\(\s*(.+?)\s*,\s*(.+?)\s*\)',
            replacement=r'ARRAY_JOIN(COLLECT_LIST(\1), \2)',
            description='Convert LISTAGG to Databricks array functions'
        )
        
        # Conditional functions
        self.add_conversion_pattern(
            pattern=r'NVL\s*\(\s*(.+?)\s*,\s*(.+?)\s*\)',
            replacement=r'COALESCE(\1, \2)',
            description='Convert NVL to COALESCE'
        )
    
    def add_conversion_pattern(self, pattern: str, replacement: str, description: str):
        """Add a regex-based conversion pattern"""
        self.function_map['PATTERNS'][pattern] = {
            'replacement': replacement,
            'description': description,
            'compiled': re.compile(pattern, re.IGNORECASE)
        }
    
    def load_mappings(self, csv_path: str):
        """Load function mappings from CSV"""
        try:
            mapping_df = pd.read_csv(csv_path, encoding='utf-8-sig').fillna('')
        except (UnicodeDecodeError, FileNotFoundError) as e:
            try:
                mapping_df = pd.read_csv(csv_path, encoding='latin1').fillna('')
            except Exception as e:
                raise ValueError(f"Failed to load CSV: {str(e)}")
        
        # Process direct mappings
        direct_map = mapping_df[
            mapping_df['need to change'].str.contains(
                r'direct|no need|can be directly replaced', 
                case=False, 
                na=False, regex=True
            )
        ]
        
        for _, row in direct_map.iterrows():
            rs_func = self.clean_func_name(row['Redshift_function'])
            db_func = self.clean_func_name(row['Databricks_function'])
            if rs_func and db_func and rs_func not in self.sql_keywords:
                self.function_map[rs_func] = {
                    'target': db_func,
                    'description': row.get('Description', ''),
                    'example': row.get('Use in redshift', '')
                }
        
        # Process manual mappings
        manual_map = mapping_df[
            mapping_df['need to change'].str.contains(
                r'manual|needs to be done|needs to pass|needs to replace',
                case=False,
                na=False, regex=True
            )
        ]
        
        for _, row in manual_map.iterrows():
            rs_func = self.clean_func_name(row['Redshift_function'])
            if rs_func and rs_func not in self.sql_keywords:
                self.manual_funcs[rs_func] = {
                    'suggestion': self.clean_func_name(row['Databricks_function']),
                    'description': row.get('Description', ''),
                    'example': row.get('Use in redshift', '')
                }
    
    def clean_func_name(self, name: str) -> str:
        """Clean function name by removing parameters and normalizing case."""
        if pd.isna(name) or not isinstance(name, str):
            return ''
        return re.sub(r'\(.*\)', '', name).strip().upper()
    
    def parse_sql(self, sql_content: str) -> List[str]:
        """Parse SQL into statements using sqlparse"""
        statements = sqlparse.parse(sql_content)
        return [str(stmt) for stmt in statements if str(stmt).strip()]
    
    def convert_statement(self, statement: str) -> Tuple[str, List[dict]]:
        """Convert a single SQL statement using parse tree"""
        parsed = sqlparse.parse(statement)[0]
        converted_tokens = []
        flagged = []
        
        for token in parsed.tokens:
            if token.is_group and isinstance(token, Function):
                # Handle function calls
                func_name = self.clean_func_name(token.get_name())
                params = token.get_parameters()
                
                if func_name in self.function_map:
                    # Apply direct mapping
                    converted = self.apply_function_mapping(func_name, params)
                    converted_tokens.append(converted)
                elif func_name in self.manual_funcs:
                    # Flag for manual review
                    flagged.append(self.manual_funcs[func_name])
                    converted_tokens.append(str(token))
                else:
                    # Try pattern-based conversion
                    converted, pattern_used = self.try_pattern_conversion(str(token))
                    if pattern_used:
                        converted_tokens.append(converted)
                    else:
                        # Unknown function
                        flagged.append({
                            'function': func_name,
                            'suggestion': 'UNMAPPED_FUNCTION',
                            'description': 'No mapping found',
                            'example': str(token)
                        })
                        converted_tokens.append(str(token))
            else:
                converted_tokens.append(str(token))
        
        return ' '.join(converted_tokens), flagged
    
    def apply_function_mapping(self, func_name: str, params: str) -> str:
        """Apply function mapping with parameter handling"""
        target = self.function_map[func_name]['target']
        
        # Special handling for known function patterns
        if func_name == 'DATEADD' and len(params.split(',')) == 3:
            unit, value, date = [p.strip() for p in params.split(',', 2)]
            if 'month' in unit.lower():
                try:
                    value = str(int(value) * 30)  # Approximate month→day conversion
                except ValueError:
                    pass
            return f"{target}({date}, {value})"
        
        return f"{target}({params})"
    
    def try_pattern_conversion(self, func_call: str) -> Tuple[str, bool]:
        """Attempt pattern-based conversion"""
        for pattern, data in self.function_map['PATTERNS'].items():
            match = data['compiled'].search(func_call)
            if match:
                converted = data['compiled'].sub(data['replacement'], func_call)
                return converted, True
        return func_call, False
    
    def process_conversion(self, input_path: str, output_path: str) -> None:
        """End-to-end conversion workflow"""
        try:
            with open(input_path, 'r', encoding='utf-8') as f:
                statements = self.parse_sql(f.read())
        except Exception as e:
            print(f"❌ Error reading input SQL: {str(e)}")
            return

        results = []
        manual_reviews = []
        
        print("\n=== CONVERSION PROCESS ===")
        for i, stmt in enumerate(statements, 1):
            try:
                print(f"\n🔧 Processing Statement {i}:")
                print("📜 Original:")
                print(stmt)
                
                converted, flags = self.convert_statement(stmt)
                results.append(converted)
                
                print("\n🔄 Converted:")
                print(converted)
                
                if flags:
                    manual_reviews.append({'stmt_num': i, 'stmt': converted, 'flags': flags})
                    print("\n⚠️  Flags raised:")
                    for flag in flags:
                        print(f"  - {flag['function']}: {flag.get('description', '')}")
            except Exception as e:
                print(f"❌ Error converting statement {i}: {str(e)}")
                results.append(stmt)

        try:
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write("\n\n".join(results))
        except Exception as e:
            print(f"❌ Error writing output: {str(e)}")
            return

        # Final report
        print("\n=== CONVERSION SUMMARY ===")
        print(f"📊 Total statements processed: {len(statements)}")
        print(f"⚠️  Statements needing manual review: {len(manual_reviews)}")
        print(f"\n💾 Output saved to: {output_path}")

if __name__ == "__main__":
    transpiler = SQLTranspiler(
        mapping_path='/Volumes/workspace/default/adhyan/Redshift and Databricks functions(Sheet1).csv'
    )
    
    transpiler.process_conversion(
        input_path='/Volumes/workspace/default/adhyan/transpiler.sql',
        output_path='/Volumes/workspace/default/adhyan/databricks_converted.sql'
    )


=== CONVERSION PROCESS ===

🔧 Processing Statement 1:
📜 Original:
-- Sample Redshift query with various functions
SELECT 
    user_id,
    NVL(username, 'anonymous') AS safe_username,
    DATE_TRUNC('month', signup_date) AS signup_month,
    LISTAGG(product_name, '|') WITHIN GROUP (ORDER BY purchase_date) AS products_ordered,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY order_amount) AS median_order,
    DECODE(status, 1, 'active', 2, 'inactive', 'unknown') AS status_text,
    DATEADD(day, 30, last_login_date) AS renewal_date
FROM 
    users u
JOIN 
    orders o ON u.user_id = o.customer_id
WHERE 
    signup_date >= DATEADD(year, -1, CURRENT_DATE)
    AND is_verified = TRUE
GROUP BY 
    user_id, username, signup_date, status, last_login_date
HAVING 
    COUNT(*) > 1
ORDER BY 
    signup_month DESC;

🔄 Converted:
-- Sample Redshift query with various functions
 SELECT   
         user_id,
    NVL(username, 'anonymous') AS safe_username,
    DATE_TRUNC('month', signup_date) AS signu