# CDAO SDK Demo: Multi-Platform ML Workflow Execution

This notebook demonstrates how to use the CDAO SDK to create and execute ML workflows across multiple compute platforms through Flyte's plugin architecture.

## What You'll Learn:
- Setting up the CDAO SDK with pre-configured Flyte access
- Defining tasks with platform-specific decorators
- Creating workflows that span multiple compute environments
- Monitoring execution across different platforms
- Best practices for data science experimentation

## Architecture Overview:
- **Control Plane**: Flyte Admin & Propeller in AWS EKS
- **Local Execution**: EKS pods for quick tasks
- **GPU Workloads**: Codeweave platform for ML training
- **Batch Processing**: AWS Batch for cost-effective long jobs
- **Big Data**: EMR Spark for distributed analytics

In [None]:
# Cell 1: Install and Import CDAO SDK
# The CDAO SDK comes pre-configured with access to the control plane

!pip install cdao-sdk --quiet

import cdao_sdk
from cdao_sdk import task, workflow, gpu_task, batch_task, spark_task, cpu_task
from cdao_sdk.types import FlyteFile, FlyteDirectory
import pandas as pd
import numpy as np
from typing import Dict, List
import time

print("✅ CDAO SDK imported successfully")
print(f"SDK Version: {cdao_sdk.__version__}")

In [None]:
# Cell 2: Configure SDK with Control Plane Access
# The SDK is pre-configured with credentials and endpoints

cdao_sdk.configure(
    # Pre-configured control plane endpoints
    admin_endpoint="https://flyte.your-domain.com",
    project="ml-experiments",
    domain="development",
    
    # Authentication is handled automatically via CDAO credentials
    auth_mode="corporate_sso",
    
    # Default execution settings
    default_image="ghcr.io/flyteorg/flytekit:py3.9-1.10.3",
    
    # S3 storage configuration (pre-configured buckets)
    storage_config={
        "metadata_bucket": "s3://education-eks-flyte-metadata-xxx",
        "userdata_bucket": "s3://education-eks-flyte-userdata-xxx",
        "workflow_bucket": "s3://bsingh-ml-workflows"
    }
)

print("✅ CDAO SDK configured with control plane access")
print("🔗 Connected to Flyte cluster via plugins")

## Task Definitions with Platform Targeting

The CDAO SDK provides decorators that automatically route tasks to the most appropriate compute platform based on resource requirements and cost optimization.

### Available Decorators:
- `@cpu_task`: Local EKS execution for quick tasks
- `@gpu_task`: Codeweave platform for GPU-accelerated workloads
- `@batch_task`: AWS Batch for cost-effective long-running jobs
- `@spark_task`: EMR Spark for big data processing

In [None]:
# Cell 3: Define Data Preprocessing Task (AWS Batch - Cost Optimized)

@batch_task(
    # Route to AWS Batch for cost optimization
    platform="aws-batch",
    instance_type="c5.2xlarge",
    spot_instances=True,  # Use spot instances for cost savings
    timeout_minutes=60,
    requests={"cpu": "2", "memory": "8Gi"},
    limits={"cpu": "4", "memory": "16Gi"}
)
def preprocess_credit_data(
    raw_data_path: str,
    cleaning_config: Dict[str, any]
) -> FlyteFile:
    """
    Preprocess credit scoring data on AWS Batch with spot instances.
    This task handles large-scale data cleaning and feature engineering.
    """
    import pandas as pd
    import numpy as np
    from sklearn.preprocessing import StandardScaler, LabelEncoder
    
    print(f"🔄 Processing data from: {raw_data_path}")
    print(f"📊 Using AWS Batch with spot instances for cost optimization")
    
    # Simulate data loading and processing
    # In real scenario, this would load from S3
    data = pd.read_csv(raw_data_path)
    
    # Data cleaning steps
    data = data.dropna()
    data = data.drop_duplicates()
    
    # Feature engineering
    scaler = StandardScaler()
    numeric_cols = data.select_dtypes(include=[np.number]).columns
    data[numeric_cols] = scaler.fit_transform(data[numeric_cols])
    
    # Save processed data
    output_path = "/tmp/processed_data.parquet"
    data.to_parquet(output_path)
    
    print(f"✅ Data preprocessing completed. Output saved to S3.")
    return FlyteFile(path=output_path)

print("✅ Data preprocessing task defined for AWS Batch execution")

In [None]:
# Cell 4: Define Feature Engineering Task (Local EKS - Quick Execution)

@cpu_task(
    # Route to local EKS for quick turnaround
    platform="local-eks",
    requests={"cpu": "1", "memory": "4Gi"},
    limits={"cpu": "2", "memory": "8Gi"},
    timeout_minutes=30
)
def engineer_features(
    processed_data: FlyteFile,
    feature_config: Dict[str, any]
) -> FlyteFile:
    """
    Feature engineering task executed on local EKS for quick iteration.
    Ideal for development and testing phases.
    """
    import pandas as pd
    import numpy as np
    from sklearn.preprocessing import PolynomialFeatures
    
    print("🔧 Engineering features on local EKS cluster")
    print("⚡ Quick execution for iterative development")
    
    # Load processed data
    data = pd.read_parquet(processed_data.path)
    
    # Create polynomial features
    poly_features = PolynomialFeatures(degree=2, include_bias=False)
    numeric_cols = data.select_dtypes(include=[np.number]).columns[:5]  # Limit for demo
    poly_data = poly_features.fit_transform(data[numeric_cols])
    
    # Create feature names
    feature_names = poly_features.get_feature_names_out(numeric_cols)
    poly_df = pd.DataFrame(poly_data, columns=feature_names)
    
    # Combine with original data
    final_data = pd.concat([data, poly_df], axis=1)
    
    # Save engineered features
    output_path = "/tmp/engineered_features.parquet"
    final_data.to_parquet(output_path)
    
    print(f"✅ Feature engineering completed. {len(feature_names)} new features created.")
    return FlyteFile(path=output_path)

print("✅ Feature engineering task defined for local EKS execution")

In [None]:
# Cell 5: Define Model Training Task (Codeweave GPU - ML Optimized)

@gpu_task(
    # Route to Codeweave for GPU-accelerated training
    platform="codeweave",
    gpu_type="nvidia-tesla-v100",
    gpu_count=1,
    requests={"cpu": "4", "memory": "16Gi", "gpu": "1"},
    limits={"cpu": "8", "memory": "32Gi", "gpu": "1"},
    timeout_minutes=180,
    container_image="your-registry/ml-training:gpu-latest"
)
def train_neural_network(
    feature_data: FlyteFile,
    model_config: Dict[str, any]
) -> Dict[str, any]:
    """
    Train a neural network model on Codeweave GPU infrastructure.
    Optimized for deep learning workloads with GPU acceleration.
    """
    print("🚀 Training neural network on Codeweave GPU cluster")
    print("💎 Using V100 GPU for accelerated training")
    
    import torch
    import torch.nn as nn
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score, roc_auc_score
    
    # Load feature data
    data = pd.read_parquet(feature_data.path)
    
    # Prepare training data
    X = data.drop(['target'], axis=1).values  # Assuming 'target' column exists
    y = data['target'].values
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # Convert to PyTorch tensors and move to GPU
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    X_train_tensor = torch.FloatTensor(X_train).to(device)
    y_train_tensor = torch.FloatTensor(y_train).to(device)
    X_test_tensor = torch.FloatTensor(X_test).to(device)
    
    # Define neural network
    class CreditScoringNet(nn.Module):
        def __init__(self, input_size):
            super(CreditScoringNet, self).__init__()
            self.network = nn.Sequential(
                nn.Linear(input_size, 256),
                nn.ReLU(),
                nn.Dropout(0.3),
                nn.Linear(256, 128),
                nn.ReLU(),
                nn.Dropout(0.3),
                nn.Linear(128, 64),
                nn.ReLU(),
                nn.Linear(64, 1),
                nn.Sigmoid()
            )
        
        def forward(self, x):
            return self.network(x)
    
    # Initialize model and move to GPU
    model = CreditScoringNet(X_train.shape[1]).to(device)
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=model_config.get('learning_rate', 0.001))
    
    # Training loop
    epochs = model_config.get('epochs', 100)
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(X_train_tensor)
        loss = criterion(outputs.squeeze(), y_train_tensor)
        loss.backward()
        optimizer.step()
        
        if epoch % 20 == 0:
            print(f"Epoch {epoch}/{epochs}, Loss: {loss.item():.4f}")
    
    # Evaluation
    model.eval()
    with torch.no_grad():
        test_outputs = model(X_test_tensor)
        predictions = (test_outputs.squeeze() > 0.5).cpu().numpy()
        test_probs = test_outputs.squeeze().cpu().numpy()
    
    accuracy = accuracy_score(y_test, predictions)
    auc_score = roc_auc_score(y_test, test_probs)
    
    # Save model
    model_path = "/tmp/neural_network_model.pth"
    torch.save(model.state_dict(), model_path)
    
    results = {
        "model_path": model_path,
        "accuracy": float(accuracy),
        "auc_score": float(auc_score),
        "training_completed": True,
        "gpu_used": str(device)
    }
    
    print(f"✅ Neural network training completed on GPU")
    print(f"📊 Accuracy: {accuracy:.4f}, AUC: {auc_score:.4f}")
    
    return results

print("✅ Neural network training task defined for Codeweave GPU execution")

In [None]:
# Cell 6: Define Big Data Analytics Task (EMR Spark - Distributed Processing)

@spark_task(
    # Route to EMR Spark for big data processing
    platform="emr-spark",
    cluster_size="medium",  # 1 master + 3 workers
    instance_type="r5.xlarge",
    spark_config={
        "spark.sql.adaptive.enabled": "true",
        "spark.sql.adaptive.coalescePartitions.enabled": "true"
    },
    timeout_minutes=120
)
def analyze_credit_patterns(
    processed_data: FlyteFile,
    analysis_config: Dict[str, any]
) -> Dict[str, any]:
    """
    Perform large-scale analytics on credit data using Spark on EMR.
    Ideal for distributed computing and big data analysis.
    """
    print("📊 Analyzing credit patterns on EMR Spark cluster")
    print("🔄 Distributed processing for large datasets")
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, avg, count, stddev, corr
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.stat import Correlation
    
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("CreditPatternAnalysis") \
        .getOrCreate()
    
    # Load data into Spark DataFrame
    df = spark.read.parquet(processed_data.path)
    
    print(f"📈 Processing {df.count()} records across Spark cluster")
    
    # Basic statistics
    numeric_cols = [field.name for field in df.schema.fields 
                   if str(field.dataType) in ['DoubleType', 'IntegerType', 'FloatType']]
    
    stats_results = {}
    
    # Calculate correlation matrix for top features
    vector_assembler = VectorAssembler(
        inputCols=numeric_cols[:10],  # Limit for demo
        outputCol="features"
    )
    
    df_vector = vector_assembler.transform(df)
    correlation_matrix = Correlation.corr(df_vector, "features").head()
    
    # Risk pattern analysis
    risk_patterns = df.groupBy("risk_category") \
        .agg(
            count("*").alias("count"),
            avg("credit_score").alias("avg_credit_score"),
            avg("income").alias("avg_income"),
            stddev("credit_score").alias("credit_score_std")
        ).collect()
    
    # Geographic analysis (if location data exists)
    if "state" in df.columns:
        geographic_analysis = df.groupBy("state") \
            .agg(
                count("*").alias("applications"),
                avg("credit_score").alias("avg_score"),
                (count(col("default_flag") == 1) / count("*")).alias("default_rate")
            ).orderBy(col("default_rate").desc()).collect()
    
    # Time-based patterns (if date columns exist)
    temporal_patterns = {}
    if "application_date" in df.columns:
        temporal_patterns = df.groupBy("month", "year") \
            .agg(
                count("*").alias("applications"),
                avg("credit_score").alias("avg_score")
            ).collect()
    
    # Stop Spark session
    spark.stop()
    
    results = {
        "total_records": df.count(),
        "correlation_analysis": "completed",
        "risk_patterns": len(risk_patterns),
        "geographic_analysis": "completed" if "state" in df.columns else "skipped",
        "temporal_analysis": "completed" if "application_date" in df.columns else "skipped",
        "processing_time": "optimized_with_spark"
    }
    
    print("✅ Big data analytics completed on EMR Spark")
    print(f"🎯 Analyzed patterns across {df.count()} records")
    
    return results

print("✅ Big data analytics task defined for EMR Spark execution")

In [None]:
# Cell 7: Define Model Evaluation Task (Local EKS - Quick Results)

@cpu_task(
    platform="local-eks",
    requests={"cpu": "1", "memory": "2Gi"},
    limits={"cpu": "2", "memory": "4Gi"},
    timeout_minutes=15
)
def evaluate_model_performance(
    model_results: Dict[str, any],
    analytics_results: Dict[str, any],
    evaluation_config: Dict[str, any]
) -> Dict[str, any]:
    """
    Evaluate model performance and generate business insights.
    Quick execution on local EKS for immediate feedback.
    """
    print("📊 Evaluating model performance on local EKS")
    print("⚡ Quick evaluation for immediate insights")
    
    import numpy as np
    from datetime import datetime
    
    # Extract model metrics
    model_accuracy = model_results.get("accuracy", 0.0)
    model_auc = model_results.get("auc_score", 0.0)
    
    # Calculate business impact
    baseline_accuracy = 0.75  # Assume current business baseline
    improvement = model_accuracy - baseline_accuracy
    
    # Business metrics
    annual_applications = 100000  # Example business volume
    cost_per_false_positive = 500  # Cost of approving bad credit
    revenue_per_true_positive = 50  # Revenue from good credit approval
    
    # Calculate ROI
    annual_improvement = annual_applications * improvement
    annual_cost_savings = annual_improvement * cost_per_false_positive * 0.1  # 10% FP reduction
    annual_revenue_increase = annual_improvement * revenue_per_true_positive * 0.05  # 5% TP increase
    
    total_annual_impact = annual_cost_savings + annual_revenue_increase
    
    # Model readiness assessment
    readiness_score = 0
    if model_accuracy > 0.80:
        readiness_score += 30
    if model_auc > 0.85:
        readiness_score += 30
    if improvement > 0.05:
        readiness_score += 40
    
    readiness_level = "Not Ready"
    if readiness_score >= 70:
        readiness_level = "Production Ready"
    elif readiness_score >= 50:
        readiness_level = "Needs Improvement"
    else:
        readiness_level = "Requires Significant Work"
    
    evaluation_results = {
        "model_performance": {
            "accuracy": model_accuracy,
            "auc_score": model_auc,
            "improvement_over_baseline": improvement
        },
        "business_impact": {
            "annual_cost_savings": annual_cost_savings,
            "annual_revenue_increase": annual_revenue_increase,
            "total_annual_impact": total_annual_impact
        },
        "readiness_assessment": {
            "score": readiness_score,
            "level": readiness_level,
            "recommendation": "Deploy to production" if readiness_score >= 70 else "Continue development"
        },
        "evaluation_timestamp": datetime.now().isoformat(),
        "platform_utilization": {
            "preprocessing_platform": "AWS Batch (spot instances)",
            "training_platform": "Codeweave GPU",
            "analytics_platform": "EMR Spark",
            "evaluation_platform": "Local EKS"
        }
    }
    
    print(f"✅ Model evaluation completed")
    print(f"🎯 Accuracy: {model_accuracy:.3f}, AUC: {model_auc:.3f}")
    print(f"💰 Estimated annual impact: ${total_annual_impact:,.2f}")
    print(f"🚦 Readiness: {readiness_level}")
    
    return evaluation_results

print("✅ Model evaluation task defined for local EKS execution")

## Workflow Definition

Now we'll create a comprehensive ML workflow that orchestrates all tasks across multiple compute platforms, leveraging Flyte's plugin architecture for optimal resource utilization.

In [None]:
# Cell 8: Create Multi-Platform ML Workflow

@workflow
def comprehensive_ml_pipeline(
    raw_data_path: str = "s3://bsingh-ml-workflows/credit_data/raw/credit_data.csv",
    cleaning_config: Dict[str, any] = {"remove_outliers": True, "fill_na": "median"},
    feature_config: Dict[str, any] = {"polynomial_degree": 2, "interaction_terms": True},
    model_config: Dict[str, any] = {"epochs": 100, "learning_rate": 0.001, "batch_size": 256},
    analysis_config: Dict[str, any] = {"correlation_threshold": 0.8, "risk_segments": 5},
    evaluation_config: Dict[str, any] = {"baseline_accuracy": 0.75, "target_auc": 0.85}
) -> Dict[str, any]:
    """
    Comprehensive ML pipeline that demonstrates multi-platform execution:
    
    1. Data Preprocessing (AWS Batch - cost optimized)
    2. Feature Engineering (Local EKS - quick iteration)  
    3. Model Training (Codeweave GPU - ML optimized)
    4. Big Data Analytics (EMR Spark - distributed processing)
    5. Model Evaluation (Local EKS - immediate results)
    
    Each task is automatically routed to the optimal compute platform
    based on resource requirements and cost considerations.
    """
    
    print("🚀 Starting comprehensive ML pipeline across multiple platforms")
    
    # Step 1: Data Preprocessing on AWS Batch (cost-effective for large data)
    print("📊 Step 1: Data preprocessing on AWS Batch with spot instances")
    processed_data = preprocess_credit_data(
        raw_data_path=raw_data_path,
        cleaning_config=cleaning_config
    )
    
    # Step 2: Feature Engineering on Local EKS (quick turnaround for development)
    print("🔧 Step 2: Feature engineering on local EKS cluster")
    engineered_features = engineer_features(
        processed_data=processed_data,
        feature_config=feature_config
    )
    
    # Step 3: Model Training on Codeweave GPU (specialized ML hardware)
    print("🧠 Step 3: Neural network training on Codeweave GPU infrastructure")
    model_results = train_neural_network(
        feature_data=engineered_features,
        model_config=model_config
    )
    
    # Step 4: Big Data Analytics on EMR Spark (distributed analytics)
    print("📈 Step 4: Pattern analysis on EMR Spark cluster")
    analytics_results = analyze_credit_patterns(
        processed_data=processed_data,
        analysis_config=analysis_config
    )
    
    # Step 5: Model Evaluation on Local EKS (immediate business insights)
    print("🎯 Step 5: Model evaluation and business impact assessment")
    final_evaluation = evaluate_model_performance(
        model_results=model_results,
        analytics_results=analytics_results,
        evaluation_config=evaluation_config
    )
    
    print("✅ Multi-platform ML pipeline completed successfully")
    
    return final_evaluation

print("✅ Comprehensive ML workflow defined")
print("🔄 Workflow will execute across 4 different compute platforms")
print("💡 Each task automatically routed to optimal platform by Flyte Propeller")

## Workflow Execution and Monitoring

Now we'll execute the workflow and demonstrate real-time monitoring capabilities of the CDAO SDK.

In [None]:
# Cell 9: Execute Workflow with Real-time Monitoring

print("🚀 Executing multi-platform ML workflow via CDAO SDK")
print("📡 Submitting to Flyte control plane for orchestration")

# Execute the workflow
execution = cdao_sdk.run(
    workflow=comprehensive_ml_pipeline,
    inputs={
        "raw_data_path": "s3://bsingh-ml-workflows/credit_data/sample_data.csv",
        "model_config": {
            "epochs": 50,  # Reduced for demo
            "learning_rate": 0.001,
            "batch_size": 128
        },
        "analysis_config": {
            "correlation_threshold": 0.75,
            "risk_segments": 3
        }
    },
    execution_name=f"multi_platform_ml_demo_{int(time.time())}",
    wait_for_completion=False  # Enable real-time monitoring
)

print(f"✅ Workflow submitted successfully")
print(f"🆔 Execution ID: {execution.id}")
print(f"🔗 Execution URL: {execution.console_url}")
print(f"📊 Status: {execution.status}")

# Store execution for monitoring
workflow_execution = execution

In [None]:
# Cell 10: Real-time Monitoring Dashboard

import time
from IPython.display import clear_output
import matplotlib.pyplot as plt

def create_monitoring_dashboard(execution):
    """Create a real-time monitoring dashboard for the workflow execution"""
    
    # Platform mapping for visual representation
    platform_colors = {
        'aws-batch': '#ff9900',      # AWS Orange
        'local-eks': '#2ecc71',      # Green
        'codeweave': '#e74c3c',      # Red
        'emr-spark': '#f39c12',      # Orange
    }
    
    # Get current execution status
    current_status = cdao_sdk.get_execution_status(execution.id)
    
    print("🖥️  CDAO SDK - Multi-Platform Execution Monitor")
    print("=" * 60)
    print(f"🆔 Execution ID: {execution.id}")
    print(f"📊 Overall Status: {current_status.phase}")
    print(f"⏱️  Started: {current_status.created_at}")
    print(f"⏰ Duration: {current_status.duration}")
    print("")
    
    # Task status breakdown
    print("📋 Task Execution Status by Platform:")
    print("-" * 40)
    
    task_statuses = current_status.task_executions
    
    for task_name, task_info in task_statuses.items():
        platform = task_info.get('platform', 'unknown')
        status = task_info.get('status', 'unknown')
        
        status_icon = {
            'QUEUED': '⏳',
            'RUNNING': '🔄',
            'SUCCEEDED': '✅',
            'FAILED': '❌',
            'ABORTED': '🛑'
        }.get(status, '❓')
        
        print(f"{status_icon} {task_name}")
        print(f"   Platform: {platform}")
        print(f"   Status: {status}")
        if 'duration' in task_info:
            print(f"   Duration: {task_info['duration']}")
        if 'cost_estimate' in task_info:
            print(f"   Est. Cost: ${task_info['cost_estimate']:.4f}")
        print("")
    
    # Resource utilization summary
    print("💰 Platform Cost Optimization:")
    print("-" * 30)
    
    total_cost = 0
    platform_usage = {}
    
    for task_name, task_info in task_statuses.items():
        platform = task_info.get('platform', 'local-eks')
        cost = task_info.get('cost_estimate', 0.01)
        total_cost += cost
        
        if platform not in platform_usage:
            platform_usage[platform] = {'tasks': 0, 'cost': 0}
        
        platform_usage[platform]['tasks'] += 1
        platform_usage[platform]['cost'] += cost
    
    for platform, usage in platform_usage.items():
        cost_per_task = usage['cost'] / usage['tasks'] if usage['tasks'] > 0 else 0
        print(f"🔸 {platform}: {usage['tasks']} tasks, ${usage['cost']:.4f} (${cost_per_task:.4f}/task)")
    
    print(f"\n💸 Total Estimated Cost: ${total_cost:.4f}")
    
    # Progress visualization
    completed_tasks = sum(1 for t in task_statuses.values() if t.get('status') == 'SUCCEEDED')
    total_tasks = len(task_statuses)
    progress = (completed_tasks / total_tasks) * 100 if total_tasks > 0 else 0
    
    print(f"\n📈 Progress: {completed_tasks}/{total_tasks} tasks completed ({progress:.1f}%)")
    print(f"{'█' * int(progress/5)}{'░' * (20-int(progress/5))} {progress:.1f}%")
    
    return current_status.phase

# Start monitoring
print("🔄 Starting real-time monitoring...")
print("💡 This will show task execution across different platforms")

try:
    # Monitor for a few iterations (in real scenario, this would run until completion)
    for i in range(5):
        clear_output(wait=True)
        
        status = create_monitoring_dashboard(workflow_execution)
        
        if status in ['SUCCEEDED', 'FAILED', 'ABORTED']:
            break
            
        print(f"\n⏱️  Monitoring iteration {i+1}/5 (updating every 30 seconds)")
        print("🔄 Workflow executing across multiple compute platforms...")
        
        time.sleep(5)  # Reduced for demo (normally 30 seconds)
        
except KeyboardInterrupt:
    print("\n⏹️  Monitoring stopped by user")

print("\n✅ Monitoring session completed")

In [None]:
# Cell 11: Retrieve and Analyze Results

print("📊 Retrieving workflow execution results...")

# Get final results (this would normally wait for completion)
try:
    # In real scenario, this would retrieve actual results
    final_results = cdao_sdk.get_execution_results(workflow_execution.id)
    
    print("✅ Workflow execution completed successfully!")
    print("\n📈 Final Results Summary:")
    print("=" * 50)
    
    # Model Performance Results
    if 'model_performance' in final_results:
        perf = final_results['model_performance']
        print(f"🧠 Model Performance:")
        print(f"   Accuracy: {perf.get('accuracy', 'N/A'):.3f}")
        print(f"   AUC Score: {perf.get('auc_score', 'N/A'):.3f}")
        print(f"   Improvement: {perf.get('improvement_over_baseline', 'N/A'):.3f}")
    
    # Business Impact
    if 'business_impact' in final_results:
        impact = final_results['business_impact']
        print(f"\n💰 Business Impact:")
        print(f"   Annual Cost Savings: ${impact.get('annual_cost_savings', 0):,.2f}")
        print(f"   Annual Revenue Increase: ${impact.get('annual_revenue_increase', 0):,.2f}")
        print(f"   Total Annual Impact: ${impact.get('total_annual_impact', 0):,.2f}")
    
    # Platform Utilization
    if 'platform_utilization' in final_results:
        platforms = final_results['platform_utilization']
        print(f"\n🔧 Platform Utilization:")
        for task, platform in platforms.items():
            print(f"   {task}: {platform}")
    
    # Readiness Assessment
    if 'readiness_assessment' in final_results:
        readiness = final_results['readiness_assessment']
        print(f"\n🚦 Production Readiness:")
        print(f"   Score: {readiness.get('score', 'N/A')}/100")
        print(f"   Level: {readiness.get('level', 'N/A')}")
        print(f"   Recommendation: {readiness.get('recommendation', 'N/A')}")

except Exception as e:
    print(f"⚠️  Results not yet available: {e}")
    print("💡 In a real scenario, you would wait for workflow completion")
    
    # Show example results structure
    print("\n📋 Example Results Structure:")
    example_results = {
        "model_performance": {
            "accuracy": 0.847,
            "auc_score": 0.892,
            "improvement_over_baseline": 0.097
        },
        "business_impact": {
            "annual_cost_savings": 485000.00,
            "annual_revenue_increase": 125000.00,
            "total_annual_impact": 610000.00
        },
        "platform_utilization": {
            "preprocessing": "AWS Batch (spot instances)",
            "training": "Codeweave GPU (V100)",
            "analytics": "EMR Spark (3-node cluster)",
            "evaluation": "Local EKS"
        },
        "readiness_assessment": {
            "score": 85,
            "level": "Production Ready",
            "recommendation": "Deploy to production"
        }
    }
    
    import json
    print(json.dumps(example_results, indent=2))

## Advanced Features and Best Practices

### Key Benefits of CDAO SDK + Multi-Platform Architecture:

1. **Cost Optimization**: Automatic routing to most cost-effective platform
2. **Performance**: GPU acceleration for ML, distributed processing for big data
3. **Developer Experience**: Simple decorators hide complex infrastructure
4. **Scalability**: Automatic scaling across multiple compute environments
5. **Monitoring**: Real-time visibility across all platforms

### Best Practices:

- Use `@cpu_task` for quick development and testing
- Use `@gpu_task` for ML training and inference workloads
- Use `@batch_task` with spot instances for cost-sensitive long jobs
- Use `@spark_task` for big data analytics and distributed processing
- Monitor costs and optimize platform selection based on workload patterns

In [None]:
# Cell 12: Advanced SDK Features and Scheduling

print("🔧 Demonstrating advanced CDAO SDK features...")

# Schedule workflow for regular execution
scheduled_execution = cdao_sdk.schedule_workflow(
    workflow=comprehensive_ml_pipeline,
    schedule="0 2 * * 1",  # Every Monday at 2 AM
    inputs={
        "raw_data_path": "s3://bsingh-ml-workflows/credit_data/weekly_batch.csv",
        "model_config": {"epochs": 200, "learning_rate": 0.0005}
    },
    schedule_name="weekly_model_retrain",
    enabled=True
)

print(f"✅ Scheduled weekly retraining: {scheduled_execution.schedule_id}")

# Create development vs production configurations
dev_config = cdao_sdk.create_environment_config(
    name="development",
    default_resources={"cpu": "500m", "memory": "1Gi"},
    cost_optimization=True,
    use_spot_instances=True
)

prod_config = cdao_sdk.create_environment_config(
    name="production", 
    default_resources={"cpu": "2", "memory": "4Gi"},
    cost_optimization=False,
    use_spot_instances=False,
    high_availability=True
)

print("✅ Environment configurations created")

# Workflow versioning and rollback
version_info = cdao_sdk.create_workflow_version(
    workflow=comprehensive_ml_pipeline,
    version="v1.2.0",
    description="Added EMR Spark analytics and improved GPU training",
    changelog=[
        "Enhanced feature engineering with polynomial features",
        "Added Codeweave GPU support for neural networks", 
        "Integrated EMR Spark for big data analytics",
        "Improved cost optimization with spot instances"
    ]
)

print(f"✅ Workflow version created: {version_info.version}")

# Export workflow for sharing
workflow_export = cdao_sdk.export_workflow(
    workflow=comprehensive_ml_pipeline,
    include_dependencies=True,
    format="yaml"
)

print("✅ Workflow exported for team collaboration")

print("\n🎯 CDAO SDK Demo Completed Successfully!")
print("🔗 All tasks executed across optimal compute platforms via Flyte plugins")
print("📊 Real-time monitoring and cost optimization enabled")
print("🚀 Ready for production deployment and team collaboration")