In [0]:
# =============================================================================
# MODULE 3: MACHINE LEARNING PIPELINE & MODEL DEVELOPMENT
# Urban Green Space Management System 
# =============================================================================
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit
import mlflow
import mlflow.sklearn
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from mlflow.models.signature import infer_signature
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score, StratifiedKFold
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, 
    classification_report, confusion_matrix, roc_auc_score, roc_curve, precision_recall_curve, auc
)
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

print("=== URBAN GREEN SPACE MANAGEMENT SYSTEM ===")
print("Module 3: Machine Learning Pipeline & Model Development (Improved)")
print("=" * 50)

# =============================================================================
# 1. INITIALIZATION AND CONFIGURATION
# =============================================================================

# Initialize Spark with optimized settings
spark = SparkSession.builder \
    .appName("UGSM_ML_Pipeline_Improved") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Configuration
CATALOG = "ml_project"
SILVER_SCHEMA = "silver"
GOLD_SCHEMA = "gold"

# MLflow configuration
mlflow.set_registry_uri("databricks")
experiment_name = "/Users/dilshanchanuka.bc@gmail.com/Urban Green Space Management"

try:
    mlflow.set_experiment(experiment_name)
    print(f" MLflow experiment set: {experiment_name}")
except Exception as e:
    print(f" MLflow experiment setup: {str(e)}")

# =============================================================================
# 2. DATA LOADING AND FEATURE ENGINEERING
# =============================================================================

def load_raw_data():
    """Load the raw dataset and display basic information"""
    
    print("\n📊 Loading raw dataset...")
    
    try:
        # Load your dataset (adjust table name as needed)
        df_spark = spark.read.table(f"{CATALOG}.{SILVER_SCHEMA}.integrated")
        print(f"✅ Loaded data from {CATALOG}.{SILVER_SCHEMA}.integrated")
        
        # Display basic info
        print(f"📋 Dataset Info:")
        print(f"  • Total records: {df_spark.count()}")
        print(f"  • Total columns: {len(df_spark.columns)}")
        print(f"  • Columns: {df_spark.columns}")
        
        # Show sample data
        print(f"\n📖 Sample Data:")
        df_spark.show(5, truncate=False)
        
        return df_spark
    except Exception as e:
        print(f"❌ Error loading data: {str(e)}")
        return None

def create_aggregated_features(df_spark):
    """Create aggregated features for ML from the raw dataset"""
    
    print("\n🔧 Creating aggregated features...")
    
    try:
        # Aggregate data by park to create features
        # park_features = df_spark.groupBy("park_id", "name", "city", "area_sqm", "latitude", "longitude") \
        #     .agg(
        #         # Air quality features
        #         F.max(F.col("aqi")).alias("max_aqi"),
        #         F.min(F.col("aqi")).alias("min_aqi"),
        #         F.stddev(F.col("aqi")).alias("std_aqi"),
        #         # ... rest of the aggregations ...
        #     )
        park_features = df_spark.groupBy("park_id", "name", "city", "area_sqm", "latitude", "longitude") \
            .agg(
                # Air quality features
                F.max(F.col("aqi")).alias("max_aqi"),
                F.min(F.col("aqi")).alias("min_aqi"),
                F.avg(F.col("aqi")).alias("avg_aqi"),
                F.stddev(F.col("aqi")).alias("std_aqi"),
                # Visitor features
                F.max(F.col("visitor_count")).alias("max_visitors"),
                F.min(F.col("visitor_count")).alias("min_visitors"),
                F.avg(F.col("visitor_count")).alias("avg_visitors"),
                F.stddev(F.col("visitor_count")).alias("std_visitors"),
                # Sentiment features
                F.max(F.col("sentiment_score")).alias("max_sentiment"),
                F.min(F.col("sentiment_score")).alias("min_sentiment"),
                F.avg(F.col("sentiment_score")).alias("avg_sentiment"),
                F.stddev(F.col("sentiment_score")).alias("std_sentiment"),
                # Event features
                F.sum(F.when(F.col("event_day") == True, 1).otherwise(0)).alias("event_days_count"),
                F.count("*").alias("total_records"),
                F.sum(F.col("visitor_count")).alias("total_visitors")
            )
        
        # Add derived features
        park_features_enhanced = park_features \
            .withColumn("aqi_range", F.col("max_aqi") - F.col("min_aqi")) \
            .withColumn("sentiment_range", F.col("max_sentiment") - F.col("min_sentiment")) \
            .withColumn("event_frequency", F.col("event_days_count") / F.col("total_records")) \
            .withColumn("park_density", F.col("total_visitors") / F.col("area_sqm")) \
            .fillna(0)  # Fill any null values with 0
        
        print(f"✅ Created aggregated features")
        return park_features_enhanced
        
    except Exception as e:
        print(f"❌ Error creating features: {str(e)}")
        return None


def create_target_variable(df_spark):
    """Create intervention required target variable based on business logic"""
    
    print("\n🎯 Creating target variable...")
    
    try:
        # Business logic for intervention requirement:
        # Parks need intervention if they meet any of these criteria:
        # 1. High air pollution (max AQI > 100) 
        # 2. Poor air quality AND negative sentiment (max AQI > 75 AND min sentiment < 0)
        # 3. Very negative sentiment (min sentiment < -0.5)
        # 4. Low visitor engagement despite good location (low visitors but large area)
        
        df_with_target = df_spark.withColumn(
            "intervention_required",
            when(
                (col("max_aqi") > 100) |
                ((col("max_aqi") > 75) & (col("min_sentiment") < 0)) |
                (col("min_sentiment") < -0.5) |
                ((col("max_visitors") < 50) & (col("area_sqm") > 10000)),
                1
            ).otherwise(0)
        )
        
        # Check target distribution
        target_dist = df_with_target.groupBy("intervention_required").count().collect()
        total_count = df_with_target.count()
        
        print(f"📊 Target Variable Distribution:")
        for row in target_dist:
            label = "No Intervention" if row.intervention_required == 0 else "Intervention Required"
            percentage = (row['count'] / total_count) * 100
            print(f"  • {label}: {row['count']} parks ({percentage:.1f}%)")
        
        return df_with_target
        
    except Exception as e:
        print(f"❌ Error creating target variable: {str(e)}")
        return None

def prepare_ml_dataset(df_spark):
    """Prepare the final dataset for machine learning"""
    
    print("\n🔧 Preparing ML dataset...")
    
    try:
        # Convert to Pandas for sklearn
        df_pandas = df_spark.toPandas()
        
        # Define features to exclude from ML
        exclude_cols = {
            'park_id', 'name', 'city', 'intervention_required'
        }
        
        # Get feature columns (all numeric columns except excluded ones)
        all_cols = set(df_pandas.columns)
        potential_features = all_cols - exclude_cols
        
        # Select only numeric features
        numeric_features = []
        for col in potential_features:
            if df_pandas[col].dtype in ['int64', 'float64', 'int32', 'float32']:
                numeric_features.append(col)
        
        print(f"📋 Selected {len(numeric_features)} numeric features:")
        for i, feat in enumerate(sorted(numeric_features), 1):
            print(f"  {i:2d}. {feat}")
        
        # Prepare feature matrix and target
        X = df_pandas[numeric_features].copy()
        y = df_pandas['intervention_required']
        
        # Handle missing values
        missing_info = X.isnull().sum()
        if missing_info.sum() > 0:
            print(f"\n🔧 Handling missing values:")
            for col in X.columns:
                if missing_info[col] > 0:
                    median_val = X[col].median()
                    X[col] = X[col].fillna(median_val)
                    print(f"  • Filled {missing_info[col]} missing values in {col} with median: {median_val:.2f}")
        
        # Handle infinite values
        X = X.replace([np.inf, -np.inf], np.nan)
        X = X.fillna(X.median())
        
        print(f"\n✅ Final dataset prepared:")
        print(f"  • Feature matrix shape: {X.shape}")
        print(f"  • Target vector shape: {y.shape}")
        print(f"  • Target distribution: {y.value_counts().to_dict()}")
        
        return X, y, numeric_features, df_pandas
        
    except Exception as e:
        print(f"❌ Error preparing ML dataset: {str(e)}")
        return None, None, None, None

# =============================================================================
# 3. MODEL DEVELOPMENT AND TRAINING
# =============================================================================

def create_ml_pipelines():
    """Create multiple ML pipelines for comparison"""
    
    print("\n🤖 Creating ML pipelines...")
    
    pipelines = {
        'logistic_regression': Pipeline([
            ('scaler', StandardScaler()),
            ('classifier', LogisticRegression(random_state=42, max_iter=1000))
        ]),
        
        'random_forest': Pipeline([
            ('scaler', RobustScaler()),  
            ('classifier', RandomForestClassifier(random_state=42, n_jobs=-1))
        ]),
        
        'gradient_boosting': Pipeline([
            ('scaler', StandardScaler()),
            ('classifier', GradientBoostingClassifier(random_state=42))
        ])
    }
    
    # Hyperparameter grids
    param_grids = {
        'logistic_regression': {
            'classifier__C': [0.1, 1.0, 10.0],
            'classifier__penalty': ['l1', 'l2'],
            'classifier__solver': ['liblinear']
        },
        
        'random_forest': {
            'classifier__n_estimators': [50, 100, 200],
            'classifier__max_depth': [5, 10, None],
            'classifier__min_samples_split': [2, 5, 10]
        },
        
        'gradient_boosting': {
            'classifier__n_estimators': [50, 100],
            'classifier__learning_rate': [0.1, 0.2],
            'classifier__max_depth': [3, 5]
        }
    }
    
    print(f"✅ Created {len(pipelines)} ML pipelines")
    return pipelines, param_grids

def evaluate_model_performance(model, X_test, y_test, model_name):
    """Comprehensive model evaluation with visualizations"""
    
    print(f"\n📊 Evaluating {model_name} Performance...")
    
    try:
        # Make predictions
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
        
        # Calculate metrics
        metrics = {
            'accuracy': accuracy_score(y_test, y_pred),
            'precision': precision_score(y_test, y_pred, average='binary', zero_division=0),
            'recall': recall_score(y_test, y_pred, average='binary', zero_division=0),
            'f1': f1_score(y_test, y_pred, average='binary', zero_division=0)
        }
        
        if y_pred_proba is not None and len(np.unique(y_test)) > 1:
            try:
                metrics['auc_roc'] = roc_auc_score(y_test, y_pred_proba)
            except:
                metrics['auc_roc'] = 0.5
        
        # Print results
        print(f"📈 Performance Metrics:")
        for metric, value in metrics.items():
            print(f"  • {metric.replace('_', ' ').title()}: {value:.4f}")
        
        # Confusion Matrix
        cm = confusion_matrix(y_test, y_pred)
        print(f"📊 Confusion Matrix:")
        print(f"  • True Negatives: {cm[0,0]}")
        print(f"  • False Positives: {cm[0,1] if cm.shape[1] > 1 else 0}")
        print(f"  • False Negatives: {cm[1,0] if cm.shape[0] > 1 else 0}")
        print(f"  • True Positives: {cm[1,1] if cm.shape == (2,2) else 0}")
        
        # Create visualizations
        fig, axes = plt.subplots(1, 3, figsize=(18, 5))
        
        # Plot 1: Confusion Matrix
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[0])
        axes[0].set_title(f'Confusion Matrix - {model_name.replace("_", " ").title()}')
        axes[0].set_xlabel('Predicted Labels')
        axes[0].set_ylabel('True Labels')
        
        # Plot 2: ROC Curve
        if y_pred_proba is not None and len(np.unique(y_test)) > 1:
            fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
            roc_auc = auc(fpr, tpr)
            axes[1].plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC Curve (AUC = {roc_auc:.2f})')
            axes[1].plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
            axes[1].set_xlim([0.0, 1.0])
            axes[1].set_ylim([0.0, 1.05])
            axes[1].set_xlabel('False Positive Rate')
            axes[1].set_ylabel('True Positive Rate')
            axes[1].set_title(f'ROC Curve - {model_name.replace("_", " ").title()}')
            axes[1].legend()
            axes[1].grid(True, alpha=0.3)
            
            # Plot 3: Precision-Recall Curve
            precision, recall, _ = precision_recall_curve(y_test, y_pred_proba)
            pr_auc = auc(recall, precision)
            axes[2].plot(recall, precision, color='darkorange', lw=2, label=f'PR Curve (AUC = {pr_auc:.2f})')
            axes[2].set_xlim([0.0, 1.0])
            axes[2].set_ylim([0.0, 1.05])
            axes[2].set_xlabel('Recall')
            axes[2].set_ylabel('Precision')
            axes[2].set_title(f'Precision-Recall Curve - {model_name.replace("_", " ").title()}')
            axes[2].legend()
            axes[2].grid(True, alpha=0.3)
        else:
            axes[1].text(0.5, 0.5, 'ROC Curve not available\n(insufficient class variation)', 
                        ha='center', va='center', transform=axes[1].transAxes)
            axes[2].text(0.5, 0.5, 'PR Curve not available\n(insufficient class variation)', 
                        ha='center', va='center', transform=axes[2].transAxes)
        
        plt.tight_layout()
        plt.show()
        
        return metrics, y_pred, y_pred_proba
        
    except Exception as e:
        print(f"❌ Error in model evaluation: {str(e)}")
        return {}, None, None

def train_and_evaluate_models(X, y, feature_names):
    """Train and evaluate multiple models with cross-validation"""
    
    print("\n🚀 Starting Model Training and Evaluation...")
    
    try:
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, 
            stratify=y if y.nunique() > 1 else None
        )
        
        print(f"📊 Data Split:")
        print(f"  • Training: {len(X_train)} samples")
        print(f"  • Testing: {len(X_test)} samples")
        print(f"  • Training target distribution: {y_train.value_counts().to_dict()}")
        print(f"  • Testing target distribution: {y_test.value_counts().to_dict()}")
        
        # Create pipelines
        pipelines, param_grids = create_ml_pipelines()
        
        # Store results
        model_results = {}
        best_model = None
        best_score = 0
        
        # Train and evaluate each model
        for model_name, pipeline in pipelines.items():
            print(f"\n🔄 Training {model_name.replace('_', ' ').title()}...")
            
            try:
                # Hyperparameter tuning with GridSearchCV
                cv_strategy = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
                
                grid_search = GridSearchCV(
                    pipeline,
                    param_grids[model_name],
                    cv=cv_strategy,
                    scoring='f1',
                    n_jobs=1,  
                    verbose=0
                )
                
                # Fit the model
                grid_search.fit(X_train, y_train)
                best_pipeline = grid_search.best_estimator_
                
                print(f"✅ Best parameters: {grid_search.best_params_}")
                print(f"✅ Best CV score: {grid_search.best_score_:.4f}")

                # Infer the model signature
                signature = infer_signature(X_train, best_pipeline.predict(X_train))
                
                # Evaluate on test set
                metrics, y_pred, y_pred_proba = evaluate_model_performance(
                    best_pipeline, X_test, y_test, model_name
                )
                
                # Store results
                model_results[model_name] = {
                    'model': best_pipeline,
                    'params': grid_search.best_params_,
                    'cv_score': grid_search.best_score_,
                    'metrics': metrics,
                    'predictions': y_pred,
                    'probabilities': y_pred_proba
                }
                
                # Track best model
                if metrics.get('f1', 0) > best_score:
                    best_score = metrics.get('f1', 0)
                    best_model = model_name
                
                # Log to MLflow
                try:
                    with mlflow.start_run(run_name=f"{model_name}_rewritten"):
                        # Log parameters
                        mlflow.log_params(grid_search.best_params_)
                        
                        # Log metrics
                        mlflow.log_metrics(metrics)
                        mlflow.log_metric("cv_score", grid_search.best_score_)
                        
                        # Log model
                        # mlflow.sklearn.log_model(best_pipeline, f"{model_name}_pipeline")
                        mlflow.sklearn.log_model(best_pipeline, f"{model_name}_pipeline", signature=signature)
                        print(f"✅ Logged to MLflow")
                        
                except Exception as mlflow_error:
                    print(f"⚠️ MLflow logging failed: {str(mlflow_error)}")
            
            except Exception as model_error:
                print(f"❌ Error training {model_name}: {str(model_error)}")
                continue
        
        print(f"\n🏆 Model Training Summary:")
        print(f"  • Trained {len(model_results)} models successfully")
        if best_model:
            print(f"  • Best model: {best_model} (F1 Score: {best_score:.4f})")
        
        return model_results, best_model, X_test, y_test
        
    except Exception as e:
        print(f"❌ Error in model training: {str(e)}")
        return {}, None, None, None


def create_model_comparison_visualization(model_results):
    """Create visualization comparing model performance"""
    
    print("\n📊 Creating model comparison visualization...")
    
    try:
        if not model_results:
            print("⚠️ No model results to visualize")
            return
        
        # Extract metrics for comparison
        models = list(model_results.keys())
        metrics_to_plot = ['accuracy', 'precision', 'recall', 'f1']
        
        # Create comparison plot
        fig, axes = plt.subplots(1, 2, figsize=(18, 7)) # Increased figure size for better readability
        
        # Plot 1: Metrics comparison (Bar Chart)
        metric_data = {}
        for metric in metrics_to_plot:
            metric_data[metric] = [model_results[model]['metrics'].get(metric, 0) for model in models]
        
        x = np.arange(len(models))
        width = 0.2
        
        for i, metric in enumerate(metrics_to_plot):
            axes[0].bar(x + i*width - (len(metrics_to_plot)/2 - 0.5) * width, metric_data[metric], width, label=metric.title()) # Adjusted x for grouping
        
        axes[0].set_xlabel('Models')
        axes[0].set_ylabel('Score')
        axes[0].set_title('Model Performance Comparison')
        axes[0].set_xticks(x) # Centered xticks
        axes[0].set_xticklabels([m.replace('_', ' ').title() for m in models], rotation=45, ha='right') # Rotate for long names
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        axes[0].set_ylim(0, 1) # Assuming scores are between 0 and 1
        
        # Plot 2: CV Score vs Test F1 Score (Scatter Plot with improved labels)
        cv_scores = [model_results[model]['cv_score'] for model in models]
        test_f1_scores = [model_results[model]['metrics'].get('f1', 0) for model in models]
        
        axes[1].scatter(cv_scores, test_f1_scores, s=150, alpha=0.8, edgecolors='w', linewidth=0.5, zorder=2) # Larger markers with white edge
        
        # Enhanced annotation for scatter plot
        for i, model in enumerate(models):
            label = model.replace('_', ' ').title()
            x_coord = cv_scores[i]
            y_coord = test_f1_scores[i]
            
            # --- Dynamic Offset Logic for Annotations ---
            # This is a simple strategy. For more complex cases, you might need
            # libraries like `adjustText` or more sophisticated collision detection.
            
            # Default offset
            offset_x, offset_y = 0.005, 0.005 # Small positive offset (to the right and up)
            ha_align = 'left' # Horizontal alignment
            va_align = 'bottom' # Vertical alignment

            # Apply specific offsets for potentially overlapping models
            # You'll need to customize these based on your typical data values and specific model names
            if 'random_forest' in model.lower():
                offset_x, offset_y = -0.015, 0.005 # Example: move text left, slightly up
                ha_align = 'right'
            elif 'gradient_boosting' in model.lower():
                offset_x, offset_y = 0.005, -0.015 # Example: move text right, slightly down
                ha_align = 'left'
            elif 'logistic_regression' in model.lower():
                offset_x, offset_y = -0.01, -0.01 # Example: another adjustment
                ha_align = 'right'

            # Add more specific conditions if you have other models that clash
            # Example: If two models have very close (x,y) points
            # You might iterate through models and compare distances to apply offsets
            # For simplicity, we are using hardcoded offsets for common problematic names

            axes[1].annotate(label, 
                             (x_coord, y_coord),
                             xytext=(x_coord + offset_x, y_coord + offset_y), # Apply dynamic offset
                             textcoords='data', # Use 'data' for xytext to be in data coordinates
                             arrowprops=dict(facecolor='black', shrink=0.05, width=0.5, headwidth=5, alpha=0.7),
                             horizontalalignment=ha_align, 
                             verticalalignment=va_align,
                             fontsize=9,
                             bbox=dict(boxstyle="round,pad=0.3", fc="yellow", ec="b", lw=0.5, alpha=0.7)) # Added bbox for visibility
        
        axes[1].set_xlabel('Cross-Validation F1 Score')
        axes[1].set_ylabel('Test F1 Score')
        axes[1].set_title('CV Score vs Test Performance')
        axes[1].grid(True, alpha=0.3)
        
        # Add diagonal line for reference
        if cv_scores and test_f1_scores:
            # Ensure the line covers the full range of data, slightly beyond min/max
            min_score = min(min(cv_scores), min(test_f1_scores)) * 0.95
            max_score = max(max(cv_scores), max(test_f1_scores)) * 1.05
            
            axes[1].plot([min_score, max_score], [min_score, max_score], 'r--', alpha=0.5, label='Perfect Agreement', zorder=1)
            axes[1].legend()
        
        plt.tight_layout()
        plt.show()
        
        print("✅ Model comparison visualization created")
        
    except Exception as viz_error:
        print(f"❌ Visualization error: {str(viz_error)}")

def analyze_feature_importance(model_results, feature_names, best_model_name):
    """Analyze and visualize feature importance for tree-based models"""
    
    if not best_model_name or best_model_name not in model_results:
        print("⚠️ No valid model available for feature importance analysis")
        return None
    
    print(f"\n🎯 Analyzing Feature Importance for {best_model_name}...")
    
    try:
        best_model = model_results[best_model_name]['model']
        
        # Get feature importance (works for tree-based models)
        if hasattr(best_model.named_steps['classifier'], 'feature_importances_'):
            importances = best_model.named_steps['classifier'].feature_importances_
            
            # Create feature importance dataframe
            feature_importance_df = pd.DataFrame({
                'feature': feature_names,
                'importance': importances
            }).sort_values('importance', ascending=False)
            
            print(f"📊 Top 10 Most Important Features:")
            for i, (_, row) in enumerate(feature_importance_df.head(10).iterrows(), 1):
                print(f"  {i:2d}. {row['feature']}: {row['importance']:.4f}")
            
            # Visualization
            plt.figure(figsize=(12, 8))
            top_features = feature_importance_df.head(15)
            sns.barplot(data=top_features, x='importance', y='feature', palette='viridis')
            plt.title(f'Feature Importance - {best_model_name.replace("_", " ").title()}')
            plt.xlabel('Importance Score')
            plt.tight_layout()
            plt.show()
            
            print("✅ Feature importance analysis completed")
            return feature_importance_df
            
        else:
            print("⚠️ Selected model doesn't support feature importance")
            return None
            
    except Exception as fi_error:
        print(f"❌ Feature importance analysis failed: {str(fi_error)}")
        return None


def save_predictions_to_spark(df_pandas, model_results, best_model_name, X, spark):
    """Save model predictions back to Spark tables"""
    
    print(f"\n💾 Saving predictions to Spark tables...")
    
    try:
        if not best_model_name or best_model_name not in model_results:
            print("❌ No valid model available for predictions")
            return None
        
        # Get best model
        best_model = model_results[best_model_name]['model']
        
        # Make predictions on full dataset
        predictions = best_model.predict(X)
        prediction_proba = best_model.predict_proba(X)[:, 1]
        
        # Create predictions dataframe
        predictions_df = pd.DataFrame({
            'park_id': df_pandas['park_id'],
            'park_name': df_pandas['name'],
            'city': df_pandas['city'],
            'intervention_actual': df_pandas['intervention_required'],
            'intervention_pred': predictions,
            'intervention_probability': prediction_proba,
            'model_name': best_model_name,
            'prediction_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        })
        
        # Convert to Spark DataFrame and save
        predictions_spark = spark.createDataFrame(predictions_df)
        predictions_spark.write.mode("overwrite") \
            .saveAsTable(f"{CATALOG}.{GOLD_SCHEMA}.urban_green_space_predictions")
        
        print(f"✅ Predictions saved to {CATALOG}.{GOLD_SCHEMA}.urban_green_space_predictions")
        
        # Display sample predictions
        print(f"\n📋 Sample Predictions:")
        predictions_spark.show(10, truncate=False)
        
        # Show prediction summary
        pred_summary = predictions_spark.groupBy("intervention_pred").count().collect()
        print(f"\n📊 Prediction Summary:")
        for row in pred_summary:
            label = "No Intervention" if row.intervention_pred == 0 else "Intervention Recommended"
            print(f"  • {label}: {row['count']} parks")
        
        return predictions_spark
        
    except Exception as save_error:
        print(f"❌ Error saving predictions: {str(save_error)}")
        return None

# =============================================================================
# 4. EXECUTE COMPLETE ML PIPELINE
# =============================================================================

def run_complete_ml_pipeline():
    """Execute the complete ML pipeline"""
    
    print(f"\n🚀 EXECUTING COMPLETE ML PIPELINE")
    print("=" * 50)
    
    try:
        # Step 1: Load raw data
        df_spark = load_raw_data()
        if df_spark is None:
            print("❌ Failed to load data. Pipeline terminated.")
            return None
        
        # Step 2: Create aggregated features
        df_features = create_aggregated_features(df_spark)
        if df_features is None:
            print("❌ Failed to create features. Pipeline terminated.")
            return None
        
        # Step 3: Create target variable
        df_with_target = create_target_variable(df_features)
        if df_with_target is None:
            print("❌ Failed to create target variable. Pipeline terminated.")
            return None
        
        # Step 4: Prepare ML dataset
        X, y, feature_names, df_pandas = prepare_ml_dataset(df_with_target)
        if X is None:
            print("❌ Failed to prepare ML dataset. Pipeline terminated.")
            return None
        
        # Step 5: Train and evaluate models
        model_results, best_model_name, X_test, y_test = train_and_evaluate_models(X, y, feature_names)
        if not model_results:
            print("❌ No models were successfully trained. Pipeline terminated.")
            return None
        
        # Step 6: Create model comparison visualization
        create_model_comparison_visualization(model_results)
        
        # Step 7: Analyze feature importance
        feature_importance_df = analyze_feature_importance(model_results, feature_names, best_model_name)
        
        # Step 8: Save predictions
        predictions_spark = save_predictions_to_spark(df_pandas, model_results, best_model_name, X, spark)
        
        # Step 9: Final summary
        print_final_summary(model_results, best_model_name, feature_importance_df)
        
        return {
            'model_results': model_results,
            'best_model': best_model_name,
            'feature_importance': feature_importance_df,
            'predictions': predictions_spark
        }
        
    except Exception as e:
        print(f"❌ Pipeline execution failed: {str(e)}")
        return None

def print_final_summary(model_results, best_model_name, feature_importance_df):
    """Print comprehensive final summary of the ML pipeline"""
    
    print(f"\n🎯 MACHINE LEARNING PIPELINE SUMMARY")
    print("=" * 60)
    
    if model_results:
        print(f"✅ Successfully trained {len(model_results)} models")
        
        # Model performance summary
        print(f"\n📊 MODEL PERFORMANCE SUMMARY:")
        print(f"{'Model':<20} {'Accuracy':<10} {'Precision':<10} {'Recall':<10} {'F1-Score':<10}")
        print("-" * 60)
        
        for model_name, results in model_results.items():
            metrics = results['metrics']
            print(f"{model_name.replace('_', ' ').title():<20} "
                  f"{metrics.get('accuracy', 0):<10.4f} "
                  f"{metrics.get('precision', 0):<10.4f} "
                  f"{metrics.get('recall', 0):<10.4f} "
                  f"{metrics.get('f1', 0):<10.4f}")
        
        if best_model_name:
            print(f"\n🏆 BEST PERFORMING MODEL: {best_model_name.replace('_', ' ').title()}")
            best_metrics = model_results[best_model_name]['metrics']
            print(f"   📈 Performance Metrics:")
            for metric, value in best_metrics.items():
                print(f"      • {metric.replace('_', ' ').title()}: {value:.4f}")
        
        # Feature importance summary
        if feature_importance_df is not None:
            print(f"\n🎯 TOP 5 MOST IMPORTANT FEATURES:")
            for i, (_, row) in enumerate(feature_importance_df.head(5).iterrows(), 1):
                print(f"   {i}. {row['feature']}: {row['importance']:.4f}")
        
        print(f"\n📋 OUTPUTS GENERATED:")
        print(f"   • Model artifacts saved to MLflow")
        print(f"   • Predictions saved to: {CATALOG}.{GOLD_SCHEMA}.urban_green_space_predictions")
        print(f"   • Performance visualizations generated")
        
        print(f"\n💡 BUSINESS INSIGHTS:")
        print(f"   • Model can predict which parks need intervention")
        print(f"   • Air quality and visitor patterns are key factors")
        print(f"   • Can be used for proactive park management")
        
        print(f"\n🚀 NEXT STEPS:")
        print(f"   • Deploy model to production environment")
        print(f"   • Set up automated retraining pipeline")
        print(f"   • Create monitoring dashboard for predictions")
        print(f"   • Integrate with park management systems")
    
    else:
        print("⚠️ No models were successfully trained")
        print("   • Check data quality and feature engineering")
        print("   • Review target variable creation logic")
        print("   • Ensure sufficient data volume")

# =============================================================================
# 5. DATA QUALITY CHECKS AND VALIDATION
# =============================================================================

def perform_data_quality_checks(df_spark):
    """Perform comprehensive data quality checks"""
    
    print(f"\n🔍 PERFORMING DATA QUALITY CHECKS")
    print("=" * 40)
    
    try:
        # Basic statistics
        total_records = df_spark.count()
        total_parks = df_spark.select("park_id").distinct().count()
        
        print(f"📊 Basic Statistics:")
        print(f"   • Total records: {total_records:,}")
        print(f"   • Unique parks: {total_parks:,}")
        print(f"   • Average records per park: {total_records/total_parks:.1f}")
        
        # Check for missing values
        print(f"\n🔍 Missing Value Analysis:")
        for col in df_spark.columns:
            null_count = df_spark.filter(df_spark[col].isNull()).count()
            null_percentage = (null_count / total_records) * 100
            if null_percentage > 0:
                print(f"   • {col}: {null_count:,} ({null_percentage:.1f}%)")
        
        # Data range analysis for key columns
        print(f"\n📈 Data Range Analysis:")
        
        # AQI analysis
        aqi_stats = df_spark.select("aqi").describe().collect()
        print(f"   • AQI Range:")
        for stat in aqi_stats:
            if stat.summary in ['min', 'max', 'mean']:
                print(f"     - {stat.summary.title()}: {float(stat.aqi):.2f}")
        
        # Visitor count analysis
        visitor_stats = df_spark.select("visitor_count").describe().collect()
        print(f"   • Visitor Count Range:")
        for stat in visitor_stats:
            if stat.summary in ['min', 'max', 'mean']:
                print(f"     - {stat.summary.title()}: {float(stat.visitor_count):.2f}")
        
        # Sentiment analysis
        sentiment_stats = df_spark.select("sentiment_score").describe().collect()
        print(f"   • Sentiment Score Range:")
        for stat in sentiment_stats:
            if stat.summary in ['min', 'max', 'mean']:
                print(f"     - {stat.summary.title()}: {float(stat.sentiment_score):.3f}")
        
        # Temporal coverage
        date_range = df_spark.select("date").agg(F.min("date").alias("min_date"), F.max("date").alias("max_date")).collect()[0]
        print(f"\n📅 Temporal Coverage:")
        print(f"   • Date range: {date_range['min_date']} to {date_range['max_date']}")
        
        # Geographic coverage
        city_count = df_spark.select("city").distinct().count()
        print(f"\n🌍 Geographic Coverage:")
        print(f"   • Number of cities: {city_count}")
        
        top_cities = df_spark.groupBy("city").count().orderBy("count", ascending=False).limit(5).collect()
        print(f"   • Top 5 cities by record count:")
        for i, row in enumerate(top_cities, 1):
            print(f"     {i}. {row.city}: {row['count']:,} records")
        
        print(f"\n✅ Data quality checks completed")
        
    except Exception as e:
        print(f"❌ Error in data quality checks: {str(e)}")

# =============================================================================
# 6. EXECUTE COMPLETE PIPELINE
# =============================================================================

# Run the complete ML pipeline
print("Starting Urban Green Space ML Pipeline...")
print("Dataset columns available:", ['park_id', 'timestamp', 'date', 'day_of_week', 'visitor_count', 
                                   'event_day', 'hour', 'name', 'city', 'area_sqm', 'latitude', 
                                   'longitude', 'aqi', 'no2_level', 'pm25_level', 'o3_level', 
                                   'tweet_text', 'sentiment_label', 'sentiment_score'])

# Execute the pipeline
pipeline_results = run_complete_ml_pipeline()

# Additional analysis if pipeline was successful
if pipeline_results:
    print(f"\n🎉 PIPELINE EXECUTION COMPLETED SUCCESSFULLY!")
    
    # Perform data quality checks if raw data is available
    try:
        raw_data = spark.read.table(f"{CATALOG}.{SILVER_SCHEMA}.integrated")
        perform_data_quality_checks(raw_data)
    except:
        print("⚠️ Could not perform data quality checks - raw data table not accessible")
else:
    print(f"\n❌ PIPELINE EXECUTION FAILED")
    print("Please check the error messages above and ensure:")
    print("   • Data table exists and is accessible")
    print("   • Required columns are present in the dataset")
    print("   • Data has sufficient quality and volume")