In [0]:
%pip install databricks-labs-lakebridge

In [0]:
%restart_python

In [0]:
import subprocess
import tempfile
import json
import os
from datetime import datetime
from typing import List, Dict, Optional, Tuple
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import sys

In [0]:
class TeradataSparkConverter:
    """Databricks-optimized Teradata to Spark SQL converter using Lakebridge."""
    
    def __init__(self, catalog: str = "az_adb_simbus_training", schema: str = "td2ss"):
        """Initialize converter with Databricks catalog and schema."""
        self.catalog = catalog
        self.schema = schema
        self.spark = SparkSession.builder.getOrCreate()
        self.conversion_history = []
        
        # Create tracking table if not exists
        self._setup_tracking_table()
    
    def _setup_tracking_table(self):
        """Create conversion history tracking table in Unity Catalog."""
        # self.spark.sql(f"CREATE CATALOG IF NOT EXISTS {self.catalog}")
        self.spark.sql(f"USE CATALOG {self.catalog}")
        self.spark.sql(f"CREATE SCHEMA IF NOT EXISTS {self.schema}")
        
        self.spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {self.catalog}.{self.schema}.conversion_history (
                conversion_id STRING,
                timestamp TIMESTAMP,
                original_sql STRING,
                converted_sql STRING,
                status STRING,
                warnings ARRAY<STRING>,
                fixme_count INT,
                conversion_time_ms LONG,
                user STRING
            ) USING DELTA
        """)
    
    def convert_single_query(self, teradata_sql: str, 
                            save_to_catalog: bool = True) -> Dict:
        """Convert single Teradata query to Spark SQL."""
        start_time = datetime.now()
        conversion_id = f"conv_{start_time.strftime('%Y%m%d_%H%M%S')}"
        
        # Create temp files
        with tempfile.NamedTemporaryFile(mode='w', suffix='.sql', delete=False) as f:
            f.write(teradata_sql)
            input_file = f.name
        
        output_dir = tempfile.mkdtemp()
        print(output_dir)
        
        try:
            # Run Lakebridge conversion
            cmd = [
                "databricks", "labs", "lakebridge", "transpile",
                "--source-dialect", "teradata",
                "--input-source", input_file,
                "--output-folder", output_dir,
                "--transpiler", "bladebridge"
            ]
            
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
            print(f"Command: {' '.join(cmd)}")
            print(f"Return code: {result.returncode}")
            print(f"STDOUT: {result.stdout}")
            print(f"ERROR: {result.stderr}")
            
            # Process results
        #     converted_sql = ""
        #     warnings = []
        #     fixme_count = 0
            
        #     if result.returncode == 0:
        #         # Read converted SQL
        #         for sql_file in os.listdir(output_dir):
        #             if sql_file.endswith('.sql'):
        #                 with open(os.path.join(output_dir, sql_file), 'r') as f:
        #                     content = f.read()
        #                     converted_sql = content
        #                     fixme_count = content.upper().count('FIXME')
                            
        #                     # Extract warnings
        #                     for line in content.split('\n'):
        #                         if 'WARNING' in line.upper() or 'FIXME' in line.upper():
        #                             warnings.append(line.strip())
                
        #         status = "SUCCESS"
        #     else:
        #         status = "FAILED"
        #         warnings = [result.stderr] if result.stderr else ["Unknown error"]
        #         print(result.stderr)
            
        #     # Calculate conversion time
        #     conversion_time_ms = int((datetime.now() - start_time).total_seconds() * 1000)
            
        #     # Prepare result
        #     result_dict = {
        #         "conversion_id": conversion_id,
        #         "timestamp": start_time,
        #         "original_sql": teradata_sql[:5000],  # Truncate for storage
        #         "converted_sql": converted_sql[:5000] if converted_sql else None,
        #         "status": status,
        #         "warnings": warnings,
        #         "fixme_count": fixme_count,
        #         "conversion_time_ms": conversion_time_ms,
        #         "user": self.spark.sql("SELECT current_user()").collect()[0][0]
        #     }
            
        #     # Save to catalog if requested
        #     if save_to_catalog and status == "SUCCESS":
        #         self._save_to_catalog(result_dict)
            
        #     return result_dict
            
        finally:
            # Cleanup
            os.unlink(input_file)
            import shutil
            shutil.rmtree(output_dir, ignore_errors=True)
    
    def _save_to_catalog(self, result: Dict):
        """Save conversion result to Unity Catalog."""
        df = self.spark.createDataFrame([result])
        df.write.mode("append").saveAsTable(
            f"{self.catalog}.{self.schema}.conversion_history"
        )
    
    def convert_from_table(self, source_table: str, 
                          query_column: str = "sql_query") -> pd.DataFrame:
        """Convert multiple queries from a Delta table."""
        queries_df = self.spark.table(source_table).select(query_column).collect()
        
        results = []
        for row in queries_df:
            query = row[query_column]
            result = self.convert_single_query(query)
            results.append(result)
        
        return pd.DataFrame(results)
    
    def show_conversion_metrics(self):
        """Display conversion metrics dashboard."""
        metrics_df = self.spark.sql(f"""
            SELECT 
                COUNT(*) as total_conversions,
                SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) as successful,
                SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) as failed,
                AVG(conversion_time_ms) as avg_time_ms,
                SUM(fixme_count) as total_fixme_items,
                COUNT(DISTINCT user) as unique_users
            FROM {self.catalog}.{self.schema}.conversion_history
        """)
        
        return metrics_df.toPandas()

In [0]:
converter = TeradataSparkConverter(catalog="az_adb_simbus_training", schema="td2ss")

In [0]:
spark.table('az_adb_simbus_training.td2ss.conversion_history').display()

In [0]:
# Example 1: Convert single Teradata query
teradata_query = """
CREATE SET TABLE sales.monthly_summary ,NO FALLBACK (
    month_id INTEGER,
    product_id INTEGER,
    total_sales DECIMAL(12,2),
    units_sold INTEGER
) PRIMARY INDEX (month_id, product_id);

SELECT 
    product_id,
    SUM(total_sales) as revenue,
    COUNT(*) as transactions
FROM sales.monthly_summary
QUALIFY ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY revenue DESC) <= 10
GROUP BY product_id;
"""

result = converter.convert_single_query(teradata_query)
print(result)

# Display result
# print(f"Status: {result['status']}")
# print(f"Conversion Time: {result['conversion_time_ms']}ms")
# print(f"FIXME Items: {result['fixme_count']}")
# print("\nConverted SQL:")
# print(result['converted_sql'])

In [0]:
try:
    from databricks_labs_lakebridge import transpile
    # or
    import lakebridge
except ImportError as e:
    print(f"Cannot import: {e}")

In [0]:
def test_lakebridge_direct():
    """Test if we can use lakebridge as a library instead of CLI."""
    try:
        # Check if lakebridge is importable
        import databricks_labs_lakebridge
        print("✅ databricks-labs-lakebridge module found")
        
        # Check available functions
        print("\nAvailable functions:")
        for item in dir(databricks_labs_lakebridge):
            if not item.startswith('_'):
                print(f"  - {item}")
        
        return True
    except ImportError as e:
        print(f"❌ Cannot import databricks_labs_lakebridge: {e}")
        
        # Try alternative import
        try:
            import lakebridge
            print("✅ lakebridge module found (alternative import)")
            return True
        except ImportError:
            print("❌ Neither databricks_labs_lakebridge nor lakebridge can be imported")
            return False

# Test the import
test_lakebridge_direct()

In [0]:
import pkg_resources
import subprocess

print("Checking installed packages related to lakebridge:")
for dist in pkg_resources.working_set:
    if 'lakebridge' in dist.key.lower():
        print(f"  - {dist.key} version {dist.version} at {dist.location}")

# print("\nChecking pip list:")
# result = subprocess.run([sys.executable, "-m", "pip", "list", "|", "grep", "-i", "lakebridge"], 
#                        capture_output=True, text=True, shell=True)
# print(result.stdout)

print("\nChecking available CLI commands:")
# Check if databricks CLI is available
result = subprocess.run(["which", "databricks"], capture_output=True, text=True)
if result.returncode == 0:
    print(f"  - databricks CLI found at: {result.stdout.strip()}")
    
    # Try to run databricks labs list
    result = subprocess.run(["databricks", "labs", "list"], capture_output=True, text=True)
    if result.returncode == 0:
        print("  - Available labs:")
        print(result.stdout)
else:
    print("  - databricks CLI not found in PATH")

In [0]:
subprocess.run(["databricks", "labs", "list"], capture_output=True, text=True)

In [0]:
teradata_sql = """
CREATE SET TABLE sales.monthly_summary ,NO FALLBACK (
    month_id INTEGER,
    product_id INTEGER,
    total_sales DECIMAL(12,2),
    units_sold INTEGER
) PRIMARY INDEX (month_id, product_id);

SELECT 
    product_id,
    SUM(total_sales) as revenue,
    COUNT(*) as transactions
FROM sales.monthly_summary
QUALIFY ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY revenue DESC) <= 10
GROUP BY product_id;
"""

In [0]:
with tempfile.NamedTemporaryFile(mode='w', suffix='.sql', delete=False) as f:
    f.write(teradata_sql)
    input_file = f.name

output_dir = tempfile.mkdtemp()

In [0]:
cmd = [
                sys.executable, "-m", "databricks labs lakebridge", "transpile",
                "--source-dialect", "teradata",
                "--input-source", input_file,
                "--output-folder", output_dir,
                "--transpiler", "bladebridge"
            ]
subprocess.run(cmd, capture_output=True, text=True, timeout=60)