# PyForge CLI Databricks Extension - Serverless Testing

This notebook specifically tests PyForge CLI Databricks extension in serverless compute environments.

## Key Features Tested
- **Serverless Detection**: Verify serverless environment detection
- **PySpark Optimization**: Test PySpark-optimized converters
- **Memory Efficiency**: Test memory-efficient processing
- **Streaming Support**: Test streaming for large datasets
- **Delta Lake**: Test Delta Lake output format

## Serverless-Specific Optimizations
- Automatic PySpark converter selection
- Memory-aware processing modes
- Streaming for large datasets
- Unity Catalog Volume integration

## Prerequisites
1. Databricks workspace with serverless compute
2. PyForge CLI with Databricks extension installed
3. Unity Catalog Volume access
4. PySpark and Delta Lake available

In [None]:
# Configuration for Serverless Testing
# =============================================================================
# SERVERLESS CONFIGURATION SECTION
# =============================================================================

# Create widgets for serverless-specific configuration
dbutils.widgets.text("pyforge_version", "latest", "PyForge Version")
dbutils.widgets.text("volume_path", "/Volumes/main/default/pyforge", "Volume Path")
dbutils.widgets.dropdown("test_dataset_size", "medium", ["small", "medium", "large"], "Test Dataset Size")
dbutils.widgets.checkbox("test_streaming", True, "Test Streaming Mode")
dbutils.widgets.checkbox("test_delta_lake", True, "Test Delta Lake")
dbutils.widgets.checkbox("test_memory_optimization", True, "Test Memory Optimization")
dbutils.widgets.checkbox("verbose_logging", True, "Verbose Logging")

# Get configuration values
PYFORGE_VERSION = dbutils.widgets.get("pyforge_version")
VOLUME_PATH = dbutils.widgets.get("volume_path")
TEST_DATASET_SIZE = dbutils.widgets.get("test_dataset_size")
TEST_STREAMING = dbutils.widgets.get("test_streaming") == "true"
TEST_DELTA_LAKE = dbutils.widgets.get("test_delta_lake") == "true"
TEST_MEMORY_OPTIMIZATION = dbutils.widgets.get("test_memory_optimization") == "true"
VERBOSE_LOGGING = dbutils.widgets.get("verbose_logging") == "true"

# Dataset size configuration
DATASET_SIZES = {
    "small": {"rows": 1000, "columns": 5},
    "medium": {"rows": 10000, "columns": 10},
    "large": {"rows": 100000, "columns": 15}
}

dataset_config = DATASET_SIZES[TEST_DATASET_SIZE]

print("🔧 Serverless Testing Configuration:")
print(f"   PyForge Version: {PYFORGE_VERSION}")
print(f"   Volume Path: {VOLUME_PATH}")
print(f"   Dataset Size: {TEST_DATASET_SIZE} ({dataset_config['rows']:,} rows, {dataset_config['columns']} columns)")
print(f"   Test Streaming: {TEST_STREAMING}")
print(f"   Test Delta Lake: {TEST_DELTA_LAKE}")
print(f"   Test Memory Optimization: {TEST_MEMORY_OPTIMIZATION}")
print(f"   Verbose Logging: {VERBOSE_LOGGING}")

# Initialize test tracking
serverless_test_results = {
    'environment_verification': None,
    'pyspark_optimization': None,
    'memory_efficiency': None,
    'streaming_support': None,
    'delta_lake_support': None,
    'performance_metrics': {}
}

In [None]:
# Serverless Environment Verification
# =============================================================================
# ENVIRONMENT VERIFICATION SECTION
# =============================================================================

import os
import sys
import time
from datetime import datetime

print("🔍 Verifying serverless environment...")

verification_start_time = time.time()
env_verification = {}

try:
    # Check if we're in serverless compute
    runtime_version = os.environ.get('DATABRICKS_RUNTIME_VERSION', 'unknown')
    is_serverless = 'serverless' in runtime_version.lower()
    
    print(f"✅ Runtime Version: {runtime_version}")
    print(f"✅ Serverless Environment: {is_serverless}")
    
    env_verification['runtime_version'] = runtime_version
    env_verification['is_serverless'] = is_serverless
    
    # Verify PySpark availability (critical for serverless)
    try:
        import pyspark
        pyspark_version = pyspark.__version__
        
        # Get Spark configuration
        spark_conf = dict(spark.conf.getAll())
        
        print(f"✅ PySpark Version: {pyspark_version}")
        print(f"✅ Spark Version: {spark.version}")
        print(f"✅ Spark Master: {spark.sparkContext.master}")
        
        env_verification['pyspark_version'] = pyspark_version
        env_verification['spark_version'] = spark.version
        env_verification['spark_master'] = spark.sparkContext.master
        env_verification['pyspark_available'] = True
        
        # Check for serverless-specific Spark configurations
        serverless_indicators = []
        for key, value in spark_conf.items():
            if 'serverless' in key.lower() or 'serverless' in str(value).lower():
                serverless_indicators.append(f"{key}: {value}")
        
        if serverless_indicators:
            print(f"✅ Serverless Spark configurations found: {len(serverless_indicators)}")
            env_verification['serverless_spark_config'] = serverless_indicators
        
    except ImportError:
        print("❌ PySpark not available - this will significantly impact serverless performance")
        env_verification['pyspark_available'] = False
    
    # Verify Delta Lake availability (important for serverless)
    try:
        import delta
        delta_version = delta.__version__
        print(f"✅ Delta Lake Version: {delta_version}")
        env_verification['delta_available'] = True
        env_verification['delta_version'] = delta_version
    except ImportError:
        print("⚠️ Delta Lake not available")
        env_verification['delta_available'] = False
    
    # Check Unity Catalog Volume access
    try:
        volume_files = dbutils.fs.ls(VOLUME_PATH)
        print(f"✅ Unity Catalog Volume accessible: {len(volume_files)} items")
        env_verification['volume_accessible'] = True
        
        # Create test directories
        test_data_path = f"{VOLUME_PATH}/serverless-test-data"
        test_output_path = f"{VOLUME_PATH}/serverless-test-output"
        dbutils.fs.mkdirs(test_data_path)
        dbutils.fs.mkdirs(test_output_path)
        print("✅ Test directories created")
        
    except Exception as e:
        print(f"❌ Volume access failed: {e}")
        env_verification['volume_accessible'] = False
    
    env_verification['verification_success'] = True
    
except Exception as e:
    print(f"❌ Environment verification failed: {e}")
    env_verification['verification_success'] = False
    env_verification['error'] = str(e)

verification_time = time.time() - verification_start_time
env_verification['verification_time'] = verification_time
serverless_test_results['environment_verification'] = env_verification

print(f"⏱️ Environment verification time: {verification_time:.2f} seconds")

if not env_verification.get('verification_success', False):
    print("❌ Cannot proceed without successful environment verification")
    dbutils.notebook.exit("Environment verification failed")

In [None]:
# Install and Test PyForge Databricks Extension
# =============================================================================
# EXTENSION INSTALLATION AND TESTING SECTION
# =============================================================================

print("📦 Installing and testing PyForge Databricks extension...")

install_start_time = time.time()

try:
    # Install PyForge CLI with Databricks extension
    if PYFORGE_VERSION == "latest":
        %pip install --no-cache-dir pyforge-cli[databricks]
    else:
        %pip install --no-cache-dir pyforge-cli[databricks]=={PYFORGE_VERSION}
    
    print("✅ PyForge CLI with Databricks extension installed")
    
    # Test PyForgeDatabricks API initialization
    from pyforge_cli.extensions.databricks.pyforge_databricks import PyForgeDatabricks
    
    # Initialize with Spark session
    forge = PyForgeDatabricks(spark_session=spark, auto_init=True)
    print("✅ PyForgeDatabricks initialized with Spark session")
    
    # Verify serverless environment detection
    env_info = forge.get_environment_info()
    print(f"✅ Environment detected: {env_info['compute_type']} ({env_info['runtime_version']})")
    
    if env_info['compute_type'] != 'serverless':
        print("⚠️ Not running in serverless environment - some optimizations may not apply")
    
    serverless_test_results['extension_installation'] = {
        'installation_success': True,
        'api_initialization': True,
        'detected_compute_type': env_info['compute_type']
    }
    
except Exception as e:
    print(f"❌ Extension installation/testing failed: {e}")
    serverless_test_results['extension_installation'] = {
        'installation_success': False,
        'error': str(e)
    }
    dbutils.notebook.exit("Extension installation failed")

install_time = time.time() - install_start_time
serverless_test_results['performance_metrics']['install_time'] = install_time
print(f"⏱️ Installation time: {install_time:.2f} seconds")

In [None]:
# Test PySpark-Optimized Converters
# =============================================================================
# PYSPARK OPTIMIZATION TESTING SECTION
# =============================================================================

print("⚡ Testing PySpark-optimized converters...")

pyspark_test_start = time.time()
pyspark_results = {}

try:
    # Generate test dataset using Spark
    print(f"📊 Generating test dataset: {dataset_config['rows']:,} rows...")
    
    # Create large test dataset with Spark
    from pyspark.sql.functions import col, rand, when, concat, lit
    
    test_df = spark.range(dataset_config['rows']).toDF("id")
    
    # Add various column types for comprehensive testing
    for i in range(dataset_config['columns'] - 1):
        if i % 4 == 0:
            # String column
            test_df = test_df.withColumn(f"string_col_{i}", concat(lit("test_"), col("id").cast("string")))
        elif i % 4 == 1:
            # Numeric column
            test_df = test_df.withColumn(f"numeric_col_{i}", rand() * 1000)
        elif i % 4 == 2:
            # Boolean column
            test_df = test_df.withColumn(f"boolean_col_{i}", when(rand() > 0.5, True).otherwise(False))
        else:
            # Category column
            test_df = test_df.withColumn(f"category_col_{i}", when(col("id") % 3 == 0, "A").when(col("id") % 3 == 1, "B").otherwise("C"))
    
    # Cache for performance
    test_df.cache()
    actual_rows = test_df.count()
    
    print(f"✅ Test dataset created: {actual_rows:,} rows, {len(test_df.columns)} columns")
    
    # Save as CSV for conversion testing
    test_csv_path = f"{VOLUME_PATH}/serverless-test-data/large_test_data.csv"
    test_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(test_csv_path)
    
    # Get the actual CSV file path (Spark creates a directory)
    csv_files = dbutils.fs.ls(test_csv_path)
    actual_csv_path = None
    for file_info in csv_files:
        if file_info.name.endswith('.csv'):
            actual_csv_path = file_info.path
            break
    
    if actual_csv_path:
        print(f"✅ CSV test file created: {actual_csv_path}")
        
        # Test PySpark CSV converter
        print("Testing PySpark CSV converter...")
        
        csv_convert_start = time.time()
        
        # Test conversion with PyForge
        output_path = f"{VOLUME_PATH}/serverless-test-output/converted_data.parquet"
        
        conversion_result = forge.convert(
            input_path=actual_csv_path,
            output_path=output_path,
            format="parquet",
            engine="spark"  # Force Spark engine
        )
        
        csv_convert_time = time.time() - csv_convert_start
        
        print(f"✅ PySpark CSV conversion completed in {csv_convert_time:.2f} seconds")
        print(f"   Rows processed: {conversion_result['rows_processed']:,}")
        print(f"   Engine used: {conversion_result['engine_used']}")
        print(f"   Output size: {conversion_result.get('output_size_mb', 'unknown')} MB")
        
        # Verify output
        verify_df = spark.read.parquet(output_path)
        verify_count = verify_df.count()
        
        data_integrity = verify_count == actual_rows
        print(f"✅ Data integrity check: {data_integrity} ({verify_count:,} rows)")
        
        pyspark_results['csv_conversion'] = {
            'success': True,
            'conversion_time': csv_convert_time,
            'rows_processed': conversion_result['rows_processed'],
            'engine_used': conversion_result['engine_used'],
            'data_integrity': data_integrity,
            'throughput_rows_per_sec': actual_rows / csv_convert_time
        }
        
    else:
        print("❌ CSV test file creation failed")
        pyspark_results['csv_conversion'] = {'success': False, 'error': 'CSV file creation failed'}
    
except Exception as e:
    print(f"❌ PySpark optimization testing failed: {e}")
    pyspark_results['error'] = str(e)

pyspark_test_time = time.time() - pyspark_test_start
pyspark_results['total_test_time'] = pyspark_test_time
serverless_test_results['pyspark_optimization'] = pyspark_results
serverless_test_results['performance_metrics']['pyspark_test_time'] = pyspark_test_time

print(f"⏱️ PySpark optimization testing time: {pyspark_test_time:.2f} seconds")

In [None]:
# Test Streaming Support for Large Datasets
# =============================================================================
# STREAMING SUPPORT TESTING SECTION
# =============================================================================

if TEST_STREAMING:
    print("🌊 Testing streaming support for large datasets...")
    
    streaming_test_start = time.time()
    streaming_results = {}
    
    try:
        # Test streaming mode conversion
        print("Testing streaming mode conversion...")
        
        if actual_csv_path:
            # Test streaming conversion
            streaming_output_path = f"{VOLUME_PATH}/serverless-test-output/streaming_converted.parquet"
            
            streaming_convert_start = time.time()
            
            streaming_result = forge.convert(
                input_path=actual_csv_path,
                output_path=streaming_output_path,
                format="parquet",
                streaming=True,  # Enable streaming mode
                chunk_size=1000  # Process in chunks
            )
            
            streaming_convert_time = time.time() - streaming_convert_start
            
            print(f"✅ Streaming conversion completed in {streaming_convert_time:.2f} seconds")
            print(f"   Rows processed: {streaming_result['rows_processed']:,}")
            print(f"   Chunks processed: {streaming_result.get('chunks_processed', 'unknown')}")
            
            # Compare streaming vs batch performance
            batch_time = pyspark_results.get('csv_conversion', {}).get('conversion_time', 0)
            if batch_time > 0:
                streaming_efficiency = batch_time / streaming_convert_time
                print(f"✅ Streaming efficiency: {streaming_efficiency:.2f}x {'faster' if streaming_efficiency > 1 else 'slower'} than batch")
            
            streaming_results['streaming_conversion'] = {
                'success': True,
                'conversion_time': streaming_convert_time,
                'rows_processed': streaming_result['rows_processed'],
                'streaming_efficiency': streaming_efficiency if 'streaming_efficiency' in locals() else None
            }
            
        else:
            print("⚠️ No test file available for streaming test")
            streaming_results['streaming_conversion'] = {'skipped': 'no_test_file'}
        
        # Test memory efficiency during streaming
        if TEST_MEMORY_OPTIMIZATION:
            print("Testing memory efficiency...")
            
            try:
                import psutil
                
                # Get memory usage before and after streaming
                process = psutil.Process()
                memory_before = process.memory_info().rss / 1024 / 1024  # MB
                
                # Process a dataset in streaming mode
                # (This would be implementation-specific)
                
                memory_after = process.memory_info().rss / 1024 / 1024  # MB
                memory_usage = memory_after - memory_before
                
                print(f"✅ Memory usage during streaming: {memory_usage:.1f} MB")
                
                streaming_results['memory_efficiency'] = {
                    'memory_before_mb': memory_before,
                    'memory_after_mb': memory_after,
                    'memory_used_mb': memory_usage
                }
                
            except ImportError:
                print("⚠️ psutil not available - skipping memory efficiency test")
                streaming_results['memory_efficiency'] = {'skipped': 'psutil_unavailable'}
    
    except Exception as e:
        print(f"❌ Streaming support testing failed: {e}")
        streaming_results['error'] = str(e)
    
    streaming_test_time = time.time() - streaming_test_start
    streaming_results['total_test_time'] = streaming_test_time
    serverless_test_results['streaming_support'] = streaming_results
    serverless_test_results['performance_metrics']['streaming_test_time'] = streaming_test_time
    
    print(f"⏱️ Streaming support testing time: {streaming_test_time:.2f} seconds")

else:
    print("⏭️ Streaming support testing skipped (disabled in configuration)")
    serverless_test_results['streaming_support'] = {'skipped': True}

In [None]:
# Test Delta Lake Support
# =============================================================================
# DELTA LAKE TESTING SECTION
# =============================================================================

if TEST_DELTA_LAKE and env_verification.get('delta_available', False):
    print("🔺 Testing Delta Lake support...")
    
    delta_test_start = time.time()
    delta_results = {}
    
    try:
        # Test Delta Lake conversion
        print("Testing Delta Lake format conversion...")
        
        if actual_csv_path:
            delta_output_path = f"{VOLUME_PATH}/serverless-test-output/delta_table"
            
            delta_convert_start = time.time()
            
            # Convert to Delta Lake format
            delta_result = forge.convert(
                input_path=actual_csv_path,
                output_path=delta_output_path,
                format="delta",
                partition_by="category_col_6" if dataset_config['columns'] > 7 else None
            )
            
            delta_convert_time = time.time() - delta_convert_start
            
            print(f"✅ Delta Lake conversion completed in {delta_convert_time:.2f} seconds")
            print(f"   Rows processed: {delta_result['rows_processed']:,}")
            print(f"   Engine used: {delta_result['engine_used']}")
            
            # Verify Delta table
            delta_df = spark.read.format("delta").load(delta_output_path)
            delta_count = delta_df.count()
            
            print(f"✅ Delta table verification: {delta_count:,} rows")
            
            # Test Delta table operations
            try:
                # Test ACID properties - simple update
                from delta.tables import DeltaTable
                
                delta_table = DeltaTable.forPath(spark, delta_output_path)
                
                # Get table history
                history = delta_table.history().select("version", "timestamp", "operation").collect()
                print(f"✅ Delta table history: {len(history)} versions")
                
                delta_results['delta_conversion'] = {
                    'success': True,
                    'conversion_time': delta_convert_time,
                    'rows_processed': delta_result['rows_processed'],
                    'delta_count': delta_count,
                    'versions': len(history),
                    'acid_support': True
                }
                
            except Exception as e:
                print(f"⚠️ Delta table operations test failed: {e}")
                delta_results['delta_conversion'] = {
                    'success': True,
                    'conversion_time': delta_convert_time,
                    'rows_processed': delta_result['rows_processed'],
                    'acid_support': False,
                    'acid_error': str(e)
                }
        
        else:
            print("⚠️ No test file available for Delta Lake test")
            delta_results['delta_conversion'] = {'skipped': 'no_test_file'}
    
    except Exception as e:
        print(f"❌ Delta Lake testing failed: {e}")
        delta_results['error'] = str(e)
    
    delta_test_time = time.time() - delta_test_start
    delta_results['total_test_time'] = delta_test_time
    serverless_test_results['delta_lake_support'] = delta_results
    serverless_test_results['performance_metrics']['delta_test_time'] = delta_test_time
    
    print(f"⏱️ Delta Lake testing time: {delta_test_time:.2f} seconds")

else:
    skip_reason = "disabled" if not TEST_DELTA_LAKE else "delta_unavailable"
    print(f"⏭️ Delta Lake testing skipped ({skip_reason})")
    serverless_test_results['delta_lake_support'] = {'skipped': skip_reason}

In [None]:
# Serverless Test Results Summary
# =============================================================================
# RESULTS SUMMARY SECTION
# =============================================================================

print("📊 Generating serverless test results summary...")

import json
from datetime import datetime

# Add metadata
serverless_test_results['metadata'] = {
    'test_type': 'serverless_functional',
    'notebook_name': '03-databricks-extension-serverless-test',
    'timestamp': datetime.now().isoformat(),
    'dataset_config': dataset_config,
    'configuration': {
        'pyforge_version': PYFORGE_VERSION,
        'volume_path': VOLUME_PATH,
        'test_dataset_size': TEST_DATASET_SIZE,
        'test_streaming': TEST_STREAMING,
        'test_delta_lake': TEST_DELTA_LAKE,
        'test_memory_optimization': TEST_MEMORY_OPTIMIZATION
    }
}

# Calculate overall success
critical_tests = [
    serverless_test_results['environment_verification'].get('verification_success', False),
    serverless_test_results.get('extension_installation', {}).get('installation_success', False),
    serverless_test_results.get('pyspark_optimization', {}).get('csv_conversion', {}).get('success', False)
]

overall_success = all(critical_tests)
serverless_test_results['overall_success'] = overall_success

# Calculate total test time
total_test_time = sum(serverless_test_results['performance_metrics'].values())
serverless_test_results['total_test_time'] = total_test_time

# Display summary
print("\n" + "="*70)
print("🎯 SERVERLESS DATABRICKS EXTENSION TEST SUMMARY")
print("="*70)

status_icon = "✅" if overall_success else "❌"
print(f"{status_icon} Overall Test Status: {'PASSED' if overall_success else 'FAILED'}")
print(f"🕐 Total Test Duration: {total_test_time:.2f} seconds")
print(f"🌐 Environment: {serverless_test_results['environment_verification'].get('runtime_version', 'unknown')}")
print(f"📦 Dataset Size: {TEST_DATASET_SIZE} ({dataset_config['rows']:,} rows)")

print("\n📋 Test Results:")
print(f"   Environment Verification: {'✅' if serverless_test_results['environment_verification'].get('verification_success') else '❌'}")
print(f"   Extension Installation: {'✅' if serverless_test_results.get('extension_installation', {}).get('installation_success') else '❌'}")
print(f"   PySpark Optimization: {'✅' if serverless_test_results.get('pyspark_optimization', {}).get('csv_conversion', {}).get('success') else '❌'}")

if not serverless_test_results.get('streaming_support', {}).get('skipped'):
    print(f"   Streaming Support: {'✅' if serverless_test_results.get('streaming_support', {}).get('streaming_conversion', {}).get('success') else '❌'}")

if not serverless_test_results.get('delta_lake_support', {}).get('skipped'):
    print(f"   Delta Lake Support: {'✅' if serverless_test_results.get('delta_lake_support', {}).get('delta_conversion', {}).get('success') else '❌'}")

print("\n⚡ Performance Metrics:")
pyspark_csv = serverless_test_results.get('pyspark_optimization', {}).get('csv_conversion', {})
if pyspark_csv.get('success'):
    print(f"   CSV Conversion Throughput: {pyspark_csv.get('throughput_rows_per_sec', 0):,.0f} rows/sec")
    print(f"   Conversion Time: {pyspark_csv.get('conversion_time', 0):.2f} seconds")
    print(f"   Engine Used: {pyspark_csv.get('engine_used', 'unknown')}")

streaming_conv = serverless_test_results.get('streaming_support', {}).get('streaming_conversion', {})
if streaming_conv.get('success'):
    efficiency = streaming_conv.get('streaming_efficiency')
    if efficiency:
        print(f"   Streaming Efficiency: {efficiency:.2f}x {'faster' if efficiency > 1 else 'slower'} than batch")

print("\n⏱️ Time Breakdown:")
for metric, time_val in serverless_test_results['performance_metrics'].items():
    print(f"   {metric.replace('_', ' ').title()}: {time_val:.2f}s")

# Save results
results_json = json.dumps(serverless_test_results, indent=2, default=str)
print("\n💾 Serverless test results saved to serverless_test_results variable")

if VERBOSE_LOGGING:
    print("\n📝 Detailed Results:")
    print(results_json[:1500] + "..." if len(results_json) > 1500 else results_json)

print("\n" + "="*70)
print("🏁 Serverless testing completed!")
print("="*70)