# üöÄ Spark Tune - Enhanced Features Demo

This notebook demonstrates all the new features added to Spark Tune:

1. **Data Quality Checking** - PySpark-native quality analysis
2. **YData Profiling** - Comprehensive data profiling
3. **Time Series Detection** - Automatic temporal structure detection
4. **Enhanced Preprocessing** - Missing values, outliers, scaling, rare categories
5. **Baseline Models** - Simple model comparison
6. **LightAutoML** - Automated machine learning
7. **Model Comparison** - Impact analysis framework

In [0]:
# %restart_python

In [0]:
# !pip install -r /Workspace/Users/yadvendra@aidetic.in/spark_beyond/requirements.txt

In [1]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

import warnings
warnings.filterwarnings('ignore')

## 0. Initialize MLflow Tracking

MLflow tracks all model training runs (baselines, XGBoost, AutoML) so you can compare experiments later.
Uses local file-based tracking (`./mlruns/` directory).

In [None]:
from backend.services.mlflow_service import MLflowTracker

# Initialize MLflow tracker (uses local file-based tracking in ./mlruns/)
tracker = MLflowTracker(session_id="hdfc_demo")

if tracker.is_enabled:
    exp_info = tracker.get_experiment_info()
    print(f"MLflow tracking enabled")
    print(f"  Experiment: {exp_info['name']}")
    print(f"  Experiment ID: {exp_info['experiment_id']}")
    print(f"  Artifact location: {exp_info['artifact_location']}")
else:
    print("MLflow tracking is disabled (mlflow not installed or init failed)")

## 1. Initialize Spark and Load Data

In [2]:
# from pyspark.sql import SparkSession

# # Initialize Spark
# spark = SparkSession.builder \
#     .master("local[*]") \
#     .appName("Spark Tune Enhanced Demo") \
#     .config("spark.driver.memory", "4g") \
#     .getOrCreate()

# spark.sparkContext.setLogLevel("ERROR")

# print(f"Spark version: {spark.version}")

In [0]:
from backend.core.utils import process_col_names

# Load the bank marketing dataset
# df = spark.read.options(
#     header=True,
#     inferSchema='True',
#     delimiter=','
# ).csv("dbfs:/Workspace/Users/yadvendra@aidetic.in/spark_beyond/backend/data/bank-additional-full.csv")

df = spark.read.csv(
    "/Volumes/aidetic_databricks/default/credit_card_transactions/credit_card_transactions.csv",
    header=True,
    inferSchema=True
)

df = df.drop("Unnamed: 0")

# Process column names
df = process_col_names(df)

print(f"Dataset shape: {df.count():,} rows x {len(df.columns)} columns")
df.printSchema()

---
## 2. üìã Data Quality Checking (NEW)

The `DataQualityChecker` performs PySpark-native quality analysis without converting to Pandas,
making it suitable for very large datasets.

In [0]:
# from backend.core.profiling.data_quality import DataQualityChecker

# # Run data quality checks
# quality_checker = DataQualityChecker(df)
# quality_report = quality_checker.run_all_checks()

# print("=" * 60)
# print("DATA QUALITY REPORT")
# print("=" * 60)
# print(f"\nüìä Quality Score: {quality_report.quality_score}/100")
# print(f"üìà Row Count: {quality_report.row_count:,}")
# print(f"üìã Column Count: {quality_report.column_count}")
# print(f"üîÑ Duplicate Rows: {quality_report.duplicate_count:,}")

In [0]:
# # Display quality issues
# print("\n‚ö†Ô∏è DATA QUALITY ISSUES:")
# print("-" * 40)
# if quality_report.issues:
#     for issue in quality_report.issues[:10]:
#         severity_icon = "üî¥" if issue['severity'] == 'high' else "üü°" if issue['severity'] == 'medium' else "üü¢"
#         print(f"{severity_icon} [{issue['severity'].upper()}] {issue['column']}: {issue['issue']}")
# else:
#     print("‚úÖ No major quality issues detected!")

In [0]:
# # Display recommendations
# print("\nüí° PREPROCESSING RECOMMENDATIONS:")
# print("-" * 40)
# if quality_report.recommendations:
#     for rec in quality_report.recommendations[:10]:
#         priority_icon = "üî¥" if rec['priority'] == 'high' else "üü°" if rec['priority'] == 'medium' else "üü¢"
#         print(f"{priority_icon} [{rec['priority'].upper()}] {rec['column']}: {rec['action']}")
# else:
#     print("‚úÖ No preprocessing recommendations needed!")

In [0]:
# # Detect outliers
# print("\nüìä OUTLIER DETECTION (IQR Method):")
# print("-" * 40)
# outliers = quality_checker.detect_outliers(method='iqr', threshold=1.5)

# for col, stats in outliers.items():
#     if stats['outlier_pct'] > 0:
#         print(f"  {col}: {stats['outlier_count']:,} outliers ({stats['outlier_pct']:.2f}%)")
#         print(f"    Bounds: [{stats['lower_bound']:.2f}, {stats['upper_bound']:.2f}]")

---
## 3. üìä YData Profiling (NEW)

The `DataProfiler` generates comprehensive data profiles using ydata-profiling,
with automatic sampling for large datasets.

In [0]:
from ydata_profiling import ProfileReport
from backend.core.utils.spark_pandas_bridge import spark_to_pandas_safe

# df_pd

profile = ProfileReport(df.toPandas(), title="Profiling Report")

profile.to_file("data_profiling_report.html")

In [0]:
from backend.core.profiling.ydata_profiler import DataProfiler, quick_profile

# Quick profile (faster, minimal report)
print("Generating quick profile...")
quick_stats = quick_profile(df, max_rows=10000)

print("\nüìà QUICK PROFILE SUMMARY:")
print("-" * 40)
summary = quick_stats['summary']
print(f"  Rows: {summary.get('n_rows', 0):,}")
print(f"  Columns: {summary.get('n_columns', 0)}")
print(f"  Missing Cells: {summary.get('missing_cells_pct', 0):.2f}%")
print(f"  Duplicate Rows: {summary.get('duplicate_rows_pct', 0):.2f}%")

In [0]:
# Display alerts
print("\n‚ö†Ô∏è DATA ALERTS:")
print("-" * 40)
if quick_stats['alerts']:
    for alert in quick_stats['alerts'][:10]:
        print(f"  - {alert['column']}: {alert['type']}")
else:
    print("  ‚úÖ No alerts!")

In [0]:
# Display profiling recommendations
print("\nüí° PROFILING RECOMMENDATIONS:")
print("-" * 40)
if quick_stats['recommendations']:
    for rec in quick_stats['recommendations'][:10]:
        priority_icon = "üî¥" if rec['priority'] == 'high' else "üü°" if rec['priority'] == 'medium' else "üü¢"
        print(f"{priority_icon} {rec['column']}: {rec['action']}")

---
## 4. üéØ Problem Definition & Schema Validation

In [0]:
from backend.core.discovery import Problem, SchemaChecks

# Define the ML problem
problem = Problem(
    target="is_fraud",
    type="classification",
    desired_result=1,
    date_column="trans_date_trans_time"
)

print(f"Problem Type: {problem.type}")
print(f"Target Column: {problem.target}")
print(f"Desired Result: {problem.desired_result}")

In [0]:
# Validate schema
schema_checker = SchemaChecks(dataframe=df, problem=problem)
schema_info = schema_checker.check()

print(f"\nüìã SCHEMA SUMMARY:")
print(f"  Categorical columns: {len(schema_info['categorical'])}")
print(f"  Numerical columns: {len(schema_info['numerical'])}")
print(f"  Boolean columns: {len(schema_info['boolean'])}")

---
## 5. ‚è±Ô∏è Time Series Detection (NEW)

The `detect_time_series_structure` function automatically identifies temporal patterns
and recommends appropriate time-series features.

In [0]:
from backend.core.utils.time_series_detector import detect_time_series_structure, TimeSeriesFrequency

# Detect time-series structure
ts_info = detect_time_series_structure(df, schema_checker)

print("\n‚è±Ô∏è TIME SERIES DETECTION RESULTS:")
print("-" * 40)
print(f"  Is Time Series: {ts_info.is_time_series}")
print(f"  Time Column: {ts_info.time_column or 'N/A'}")
print(f"  Frequency: {ts_info.frequency.value if ts_info.frequency else 'N/A'}")
print(f"  Entity Columns: {ts_info.entity_columns or 'N/A'}")

if ts_info.warnings:
    print("\n‚ö†Ô∏è Warnings:")
    for warning in ts_info.warnings:
        print(f"    - {warning}")

if ts_info.recommended_features:
    print("\nüí° Recommended Time-Series Features:")
    for feature in ts_info.recommended_features:
        print(f"    - {feature}")

---
## 6. üîß Enhanced Preprocessing (NEW)

The `EnhancedPreprocessor` provides feature-engine inspired transformations:
- Missing value imputation (mean, median, mode, constant)
- Outlier handling (IQR cap/remove, Z-score cap/remove)
- Feature scaling (standard, minmax, robust)
- Rare category grouping

In [0]:
from backend.core.features.preprocessing_enhanced import (
    EnhancedPreprocessor, PreprocessingConfig,
    ImputationStrategy, OutlierStrategy, ScalingStrategy
)

# Configure preprocessing
config = PreprocessingConfig(
    imputation_strategy=ImputationStrategy.MEDIAN,
    outlier_strategy=OutlierStrategy.IQR_CAP,
    outlier_threshold=1.5,
    scaling_strategy=ScalingStrategy.ROBUST,
    rare_category_threshold=0.01,  # 1%
    rare_category_replacement="RARE"
)

print("üìã PREPROCESSING CONFIGURATION:")
print(f"  Imputation: {config.imputation_strategy.value}")
print(f"  Outlier Handling: {config.outlier_strategy.value if config.outlier_strategy else 'None'}")
print(f"  Scaling: {config.scaling_strategy.value}")
print(f"  Rare Category Threshold: {config.rare_category_threshold*100}%")

In [0]:
# Get column lists
numerical_cols = schema_checker.get_typed_col("numerical")
categorical_cols = schema_checker.get_typed_col("categorical")

print(f"Numerical columns ({len(numerical_cols)}): {numerical_cols[:5]}...")
print(f"Categorical columns ({len(categorical_cols)}): {categorical_cols[:5]}...")

In [0]:
# Apply enhanced preprocessing
preprocessor = EnhancedPreprocessor(df, config)

# 1. Impute missing values
print("\n1Ô∏è‚É£ Imputing missing values...")
df_imputed = preprocessor.impute_missing_values(numerical_cols, strategy=ImputationStrategy.MEDIAN)
print(f"   ‚úÖ Imputed {len(numerical_cols)} numerical columns")

In [0]:
# 2. Handle outliers
print("\n2Ô∏è‚É£ Handling outliers (IQR capping)...")
preprocessor_outliers = EnhancedPreprocessor(df_imputed, config)
df_no_outliers = preprocessor_outliers.handle_outliers(numerical_cols)

# Show outlier handling stats
outlier_params = preprocessor_outliers.get_fitted_params().get('outliers', {})
print(f"   ‚úÖ Capped outliers in {len(outlier_params)} columns")
for col, params in list(outlier_params.items())[:3]:
    print(f"      {col}: bounds [{params['lower_bound']:.2f}, {params['upper_bound']:.2f}]")

In [0]:
# 3. Group rare categories
# print("\n3Ô∏è‚É£ Grouping rare categories...")
# preprocessor_rare = EnhancedPreprocessor(df_no_outliers, config)
# df_grouped = preprocessor_rare.group_rare_categories(categorical_cols)

# # Show rare category grouping stats
# rare_params = preprocessor_rare.get_fitted_params().get('rare_categories', {})
# for col, params in rare_params.items():
#     if params['count'] > 0:
#         print(f"   {col}: grouped {params['count']} rare categories into '{params['replacement']}'")

In [0]:
# 4. Scale features
print("\n4Ô∏è‚É£ Scaling features (Robust scaling)...")
# preprocessor_scale = EnhancedPreprocessor(df_grouped, config)
preprocessor_scale = EnhancedPreprocessor(df_no_outliers, config)
df_scaled = preprocessor_scale.scale_features(numerical_cols[:3])  # Just first 3 for demo

scaling_params = preprocessor_scale.get_fitted_params().get('scaling', {})
print(f"   ‚úÖ Scaled {len(scaling_params)} columns")
for col, params in scaling_params.items():
    print(f"      {col}: median={params['median']:.2f}, IQR={params['iqr']:.2f}")

In [0]:
# 5. Cyclical encoding (bonus feature)
print("\n5Ô∏è‚É£ Cyclical encoding example...")

# Create a sample hour column for demo
from pyspark.sql import functions as F
df_with_hour = df_scaled.withColumn("hour", F.lit(12))  # Mock hour column

preprocessor_cyclical = EnhancedPreprocessor(df_with_hour, config)
df_cyclical = preprocessor_cyclical.encode_cyclical_features("hour", period=24)

print("   Created: hour_sin, hour_cos columns")
df_cyclical.select("hour", "hour_sin", "hour_cos").show(3)

In [0]:
display(df_with_hour.limit(5))

In [0]:
# break

In [0]:
# from tsfresh.examples import load_robot_execution_failures
# from tsfresh import extract_relevant_featuresa
# df, y = load_robot_execution_failures()
# X = extract_relevant_features(df, y, column_id='id', column_sort='time')
# X.head()

In [0]:
# # from tsfresh import extract_features, extract_relevant_features, select_features
# # from tsfresh.utilities.dataframe_functions import impute
# # from tsfresh.feature_extraction import ComprehensiveFCParameters

# from tsfresh.convenience.bindings import extract_features_with_spark


# extraction_settings = ComprehensiveFCParameters()

# X = extract_features(df_with_hour, column_id='trans_num', column_sort='trans_date_trans_time',
#                      default_fc_parameters=extraction_settings,
#                      # we impute = remove all NaN features automatically
#                      impute_function=impute)


In [0]:
# X.sample(5)

In [0]:
# from backend.core.features.process import PreProcessVariables

# # Disable ANSI mode to prevent integer overflow on large feature values
# # spark.conf.set("spark.sql.ansi.enabled", "false")

# # # Create a new schema checker for the feature-engineered dataframe
# # schema_checker_features = SchemaChecks(dataframe=df_with_features, problem=problem)
# # schema_checker_features.check()

# # # Re-enable ANSI mode
# # spark.conf.set("spark.sql.ansi.enabled", "true")

# # Apply Spark ML preprocessing
# pre_process_variables = PreProcessVariables(
#     dataframe=df_with_features,
#     problem=problem,
#     schema_checks=schema_checker_features
# )

# transformed_df, feature_names, feature_output_col, feature_map = pre_process_variables.process()

# print(f"\nüìä PREPROCESSING SUMMARY:")
# print(f"  Encoded categorical features: {len(feature_names)}")
# print(f"  Feature vector column: {feature_output_col}")
# print(f"  Total columns after preprocessing: {len(transformed_df.columns)}")

---
## 7. üîß Auto Feature Generation

In [0]:
from backend.core.features.auto_feature_generator import AutoFeatureGenerator

# Initialize feature generator
feature_gen = AutoFeatureGenerator(
    schema_checks=schema_checker,
    problem=problem
)

# Get column lists and exclude target to avoid removal error
numerical_cols = schema_checker.get_typed_col(col_type="numerical")
categorical_cols = schema_checker.get_typed_col(col_type="categorical")
datetime_cols = schema_checker.get_typed_col(col_type="datetime")

# Remove target from all lists if present
target = problem.target
if target in numerical_cols:
    numerical_cols.remove(target)
if target in categorical_cols:
    categorical_cols.remove(target)
if target in datetime_cols:
    datetime_cols.remove(target)

# Generate all features with explicit column lists
df_with_features = feature_gen.generate_all_features(
    include_numerical=True,
    include_interactions=True,
    include_binning=True,
    include_datetime=True,
    include_string=False,
    numerical_columns=numerical_cols,
    categorical_columns=categorical_cols,
    datetime_columns=datetime_cols
)

print(f"\nüìä FEATURE GENERATION SUMMARY:")
print(f"  Original features: {len(df.columns)}")
print(f"  Total features: {len(df_with_features.columns)}")
print(f"  New features generated: {len(df_with_features.columns) - len(df.columns)}")

---
## 8. üîÑ Feature Preprocessing (Spark ML)

In [0]:
from backend.core.features.process import PreProcessVariables
from pyspark.sql import functions as F
from backend.core.discovery import SchemaChecks

# IMPORTANT: Restore original method first to avoid recursion from previous runs
schema_checker.get_typed_col = SchemaChecks.get_typed_col.__get__(schema_checker, SchemaChecks)

# Get categorical columns and filter by cardinality to avoid model size overflow
categorical_cols = schema_checker.get_typed_col(col_type="categorical")

# Remove target if present
if problem.target in categorical_cols:
    categorical_cols.remove(problem.target)

# Filter out high-cardinality columns to keep model size under 268MB limit
# Only keep columns with < 20 unique values (reduced from 100 to exclude state column)
print("Filtering categorical columns by cardinality...")
low_cardinality_cols = []
for col in categorical_cols:
    distinct_count = df_with_features.select(F.countDistinct(col)).collect()[0][0]
    if distinct_count < 20:
        low_cardinality_cols.append(col)
        print(f"  ‚úì {col}: {distinct_count} unique values")
    else:
        print(f"  ‚úó {col}: {distinct_count} unique values (excluded)")

print(f"\nUsing {len(low_cardinality_cols)} of {len(categorical_cols)} categorical columns")

# Reduce numerical features to avoid model size overflow
# Keep only original numerical features + a few key engineered ones
print("\nReducing numerical features to fit model size limit...")
all_numerical_cols = schema_checker.get_typed_col(col_type="numerical")
if problem.target in all_numerical_cols:
    all_numerical_cols.remove(problem.target)

# Keep only first 15 numerical features (reduced from 30 to fit under 268MB limit)
reduced_numerical_cols = all_numerical_cols[:15]
print(f"  Using {len(reduced_numerical_cols)} of {len(all_numerical_cols)} numerical features")

# Drop unused numerical columns from dataframe
cols_to_keep = [problem.target] + low_cardinality_cols + reduced_numerical_cols + [problem.date_column]
cols_to_keep = [c for c in cols_to_keep if c in df_with_features.columns]
df_reduced = df_with_features.select(*cols_to_keep)

print(f"  Reduced from {len(df_with_features.columns)} to {len(df_reduced.columns)} columns")

# Sample data to reduce model size (fit on 0.25% of data to stay under 268MB limit)
print("\nSampling data to reduce model size...")
df_sample = df_reduced.sample(fraction=0.00025, seed=42)
sample_count = df_sample.count()
print(f"  Sample size: {sample_count:,} rows ({sample_count/df_reduced.count()*100:.2f}%)")

# Patch schema_checker to return filtered columns
original_get_typed_col = SchemaChecks.get_typed_col
def patched_get_typed_col(self, col_type):
    if col_type == "categorical":
        return low_cardinality_cols
    elif col_type == "numerical":
        return reduced_numerical_cols
    return original_get_typed_col(self, col_type)

schema_checker.get_typed_col = patched_get_typed_col.__get__(schema_checker, SchemaChecks)

# Apply Spark ML preprocessing (fit on sample)
train_dataset = df_sample

pre_process_variables = PreProcessVariables(
    dataframe=df_with_features,
    problem=problem,
    schema_checks=schema_checker,
    train_dataframe = train_dataset
)

transformed_df, feature_names, feature_output_col, feature_map = pre_process_variables.process()

# Restore original method
schema_checker.get_typed_col = original_get_typed_col.__get__(schema_checker, SchemaChecks)

print(f"\nüìä PREPROCESSING SUMMARY:")
print(f"  Encoded categorical features: {len(feature_names)}")
print(f"  Feature vector column: {feature_output_col}")
print(f"  Total columns after preprocessing: {len(transformed_df.columns)}")

In [0]:
break

---
## 9. üìä Baseline Models (NEW)

The `BaselineModels` class trains simple models to establish performance baselines.

In [0]:
from backend.core.models.baseline_models import BaselineModels

# Split data
train_df, test_df = transformed_df.randomSplit([0.8, 0.2], seed=42)

print(f"Train set: {train_df.count():,} rows")
print(f"Test set: {test_df.count():,} rows")

In [0]:
# Initialize baseline models
baselines = BaselineModels(
    problem=problem,
    train_df=train_df,
    test_df=test_df,
    feature_col=feature_output_col,
    label_col=problem.target
)

print("\nüìä TRAINING BASELINE MODELS...")
print("-" * 50)

In [0]:
# Train naive baseline (always predicts majority class)
print("\n1Ô∏è‚É£ Naive Baseline...")
naive_result = baselines.train_naive_baseline()
print(f"   Accuracy: {naive_result.metrics.get('accuracy', 0):.4f}")
print(f"   Training time: {naive_result.training_time:.2f}s")

In [0]:
# Train Decision Tree
print("\n2Ô∏è‚É£ Decision Tree...")
dt_result = baselines.train_decision_tree(max_depth=5)
print(f"   Accuracy: {dt_result.metrics.get('accuracy', 0):.4f}")
print(f"   F1: {dt_result.metrics.get('f1', 0):.4f}")
print(f"   Training time: {dt_result.training_time:.2f}s")

In [0]:
# Train Logistic Regression
print("\n3Ô∏è‚É£ Logistic Regression...")
lr_result = baselines.train_logistic_regression()
print(f"   Accuracy: {lr_result.metrics.get('accuracy', 0):.4f}")
print(f"   F1: {lr_result.metrics.get('f1', 0):.4f}")
print(f"   AUC: {lr_result.metrics.get('auc', 0):.4f}")
print(f"   Training time: {lr_result.training_time:.2f}s")

In [0]:
# Get all baseline results
import pandas as pd

all_results = baselines.get_results()

print("\nüìä BASELINE MODELS SUMMARY:")
print("-" * 60)

results_data = []
for result in all_results:
    results_data.append({
        'Model': result.model_name,
        'Accuracy': result.metrics.get('accuracy', 0),
        'F1': result.metrics.get('f1', 0),
        'AUC': result.metrics.get('auc', 0),
        'Time (s)': result.training_time
    })

results_df = pd.DataFrame(results_data)
print(results_df.to_string(index=False))

In [None]:
# Log all baseline models to MLflow
print("\nüì° Logging baseline models to MLflow...")
for result in all_results:
    run_id = tracker.log_baseline_run(
        model_name=result.model_name,
        metrics=result.metrics,
        training_time=result.training_time,
        params={"model_name": result.model_name},
    )
    status = f"run_id={run_id}" if run_id else "skipped"
    print(f"  {result.model_name}: {status}")

---
## 10. üéØ XGBoost Training

In [0]:
from backend.core.features.feature_selector import FeatureSelector

# Train XGBoost model
feature_selector = FeatureSelector(
    problem=problem,
    transformed_df=transformed_df,
    feature_names=feature_names,
    feature_col=feature_output_col,
    feature_idx_name_mapping=feature_map,
    train_split=0.8
)

print("Training XGBoost model...")
feature_selector.train_model()
print("‚úÖ XGBoost training complete!")

In [0]:
# Evaluate XGBoost
print("\nüìä XGBOOST EVALUATION:")
print("-" * 40)

print("\nTrain Set:")
train_metrics = feature_selector.evaluate(train=True)

print("\nTest Set:")
test_metrics = feature_selector.evaluate(train=False)

In [None]:
# Log XGBoost training run to MLflow
print("\nüì° Logging XGBoost run to MLflow...")

xgb_params = {
    "num_round": 100,
    "max_depth": 4,
    "learning_rate": 0.1,
    "eval_metric": "logloss",
    "num_features": len(feature_names),
    "train_split": 0.8,
}

run_id = tracker.log_xgboost_run(
    params=xgb_params,
    train_metrics=train_metrics,
    test_metrics=test_metrics,
)
print(f"  XGBoost: run_id={run_id}" if run_id else "  XGBoost: skipped")

In [0]:
# Feature importance
print("\nüéØ TOP 20 FEATURES BY IMPORTANCE:")
print("-" * 40)

importance_list = feature_selector.get_feature_importances()
for i, (feature, importance) in enumerate(importance_list[:20]):
    print(f"  {i+1:2d}. {feature}: {importance:.4f}")

---
## 10.1 üîç SHAP Analysis (NEW)

SHAP (SHapley Additive exPlanations) provides detailed insights into how each feature contributes to predictions.

In [0]:
# SHAP Analysis - Feature importance based on Shapley values
print("\nüîç SHAP ANALYSIS:")
print("=" * 50)

# Run SHAP analysis (this may take a minute)
shap_results = feature_selector.get_shap_analysis(
    sample_size=1000,  # Use 1000 samples for faster analysis
    plot=True,
    plot_type='all'  # Generate both summary and bar plots
)

print("\nüìä SHAP Feature Importance (Top 15):")
print("-" * 40)
print(shap_results['feature_importance'].head(15).to_string(index=False))

In [0]:
# Get detailed feature value impacts from tree splits
print("\nüìä FEATURE VALUE IMPACTS (Aggregated from Tree Splits):")
print("-" * 60)

feature_impacts = feature_selector.get_feature_value_impacts(top_n=15)
print(feature_impacts.to_string(index=False))

In [0]:
# Explain a single prediction
print("\nüîé EXPLAINING A SINGLE PREDICTION:")
print("=" * 50)

# Explain the first test instance
explanation = feature_selector.explain_prediction(instance_idx=0, use_test=True)

print(f"\nInstance Index: {explanation['instance_idx']}")
print(f"Prediction: {explanation['prediction']}")
print(f"Probability: {explanation['probability']}")
print(f"Actual Label: {explanation['actual_label']}")
print(f"Base Value: {explanation['base_value']:.4f}")

print("\nüìà Top 5 Positive Contributors (pushing toward positive class):")
print(explanation['top_positive'][['Feature', 'Value', 'SHAP_Value']].to_string(index=False))

print("\nüìâ Top 5 Negative Contributors (pushing toward negative class):")
print(explanation['top_negative'][['Feature', 'Value', 'SHAP_Value']].to_string(index=False))

In [0]:
# Generate waterfall plot for the prediction
print("\nüìä Generating SHAP Waterfall Plot...")
feature_selector.plot_shap_waterfall(instance_idx=0, use_test=True)

# Display the saved plot
from IPython.display import Image, display
display(Image(filename='shap_waterfall_instance_0.png'))

---
## 10.2 üìä Feature Insight Analysis (NEW)

Feature Insight Analysis provides SparkBeyond-style discovery of feature conditions that have high **Lift** (how much better than baseline), **Support** (coverage), and **RIG** (Relative Information Gain).

This helps identify:
- Which feature values are most predictive of the target
- Microsegments (combinations of features) that perform even better
- Optimal trade-offs between lift and support

In [0]:
from backend.core.features.insight_analyzer import FeatureInsightAnalyzer, quick_insight_analysis

print("üìä FEATURE INSIGHT ANALYSIS")
print("=" * 60)
print("\nAnalyzing features for lift, support, and RIG...")

# Initialize the analyzer
insight_analyzer = FeatureInsightAnalyzer(
    df=df_with_features,  # Use original data (not transformed)
    problem=problem,
    schema_checks=schema_checker,
    n_bins=10,          # Number of bins for numeric features
    min_support=0.01,   # Minimum 1% support
    min_lift=1.1        # Minimum 10% lift over baseline
)

# Run analysis
result = insight_analyzer.get_analysis_result(discover_microsegments=True)

print(f"\nüìà Analysis Summary:")
print(f"  Target Class: {result.target_class}")
print(f"  Baseline Rate: {result.baseline_rate*100:.2f}%")
print(f"  Total Records: {result.total_count:,}")
print(f"  Total Insights Found: {result.summary['total_insights']}")
print(f"  Microsegments Found: {result.summary['total_microsegments']}")

In [0]:
result

In [0]:
# Display top insights
print("\nüéØ TOP 20 FEATURE INSIGHTS (Sorted by Lift):")
print("-" * 70)

insights_df = insight_analyzer.to_dataframe(top_n=20)
display_cols = ['Condition', 'Lift', 'Support', 'Support_Count', 'RIG', 'Class_Rate']
print(insights_df[display_cols].to_string(index=False))

In [0]:
# Display microsegments (feature combinations)
print("\nüîó TOP MICROSEGMENTS (Feature Combinations):")
print("-" * 70)

if result.microsegments:
    for i, micro in enumerate(result.microsegments[:10], 1):
        print(f"\n{i}. {micro.name}")
        print(f"   Lift: x{micro.lift:.2f} | Support: {micro.support*100:.1f}% ({micro.support_count:,}) | RIG: {micro.rig:.3f}")
        print(f"   Class Rate: {micro.class_rate*100:.1f}% vs Baseline: {micro.baseline_rate*100:.1f}%")
else:
    print("No microsegments found that improve over individual features.")

In [0]:
# Generate Lift vs Support scatter plot (like SparkBeyond UI)
print("\nüìä Generating Lift vs Support Scatter Plot...")
insight_analyzer.plot_lift_support_scatter(
    top_n=50,
    highlight_microsegments=True,
    save_path='insight_lift_support.png'
)

# Display the plot
from IPython.display import Image, display
display(Image(filename='insight_lift_support.png'))

In [0]:
# Generate top insights bar chart
print("\nüìä Generating Top Insights Bar Chart...")
insight_analyzer.plot_top_insights(
    top_n=15,
    metric='lift',
    save_path='insight_top_features.png'
)

# Display the plot
display(Image(filename='insight_top_features.png'))

In [0]:
# You can also use the quick function for rapid analysis
# insights_df, result = quick_insight_analysis(df, problem, schema_checker, top_n=20, plot=True)

# Or access insights through FeatureSelector after training
# insight_result = feature_selector.get_insight_analysis(schema_checker, plot=True)

print("\n‚úÖ Feature Insight Analysis complete!")

---
## 11. ü§ñ LightAutoML (NEW)

The `AutoMLRunner` provides automated machine learning using LightAutoML,
which automatically tries multiple algorithms and finds the best model.

In [0]:
from backend.core.models.evalml_runner import AutoMLRunner, quick_automl

print("ü§ñ LIGHTAUTOML SEARCH")
print("=" * 50)
print("\nThis may take a few minutes...\n")

# Initialize AutoML runner
automl_runner = AutoMLRunner(
    spark=spark,
    problem=problem,
    max_rows_for_pandas=50000,  # Sample for faster demo
    verbose=True
)

# Run AutoML search: Quick Mode
automl_result = automl_runner.run_automl(
    spark_df=df_with_features,
    timeout=120,  # 2 minutes for demo
    cpu_limit=4,
    quick_mode=True
)

# # Run AutoML search: Full Mode
# automl_result = automl_runner.run_automl(
#     spark_df=df_with_features,
#     timeout=600,  # 2 minutes for demo
#     cpu_limit=4,
# )

print("\n‚úÖ AutoML search complete!")

In [0]:
# Display AutoML results
print("\nüìä AUTOML RESULTS:")
print("-" * 40)
print(f"  Best Score: {automl_result.best_score:.4f}")
print(f"  Problem Type: {automl_result.problem_type}")
print(f"  Metric: {automl_result.metric}")
print(f"  Search Time: {automl_result.search_time:.1f}s")

if automl_result.model_summary:
    print(f"\n  Models in ensemble: {automl_result.model_summary.get('n_models', 0)}")
    print(f"  Levels: {automl_result.model_summary.get('levels', 0)}")

In [0]:
# AutoML feature importance
if automl_result.feature_importance is not None:
    print("\nüéØ AUTOML TOP FEATURES:")
    print("-" * 40)
    print(automl_result.feature_importance.head(15).to_string(index=False))

In [0]:
# Evaluate AutoML on test data
print("\nüìä AUTOML EVALUATION ON TEST DATA:")
print("-" * 40)

automl_metrics = automl_runner.evaluate(df_with_features)
for metric, value in automl_metrics.items():
    print(f"  {metric}: {value:.4f}")

In [None]:
# Log AutoML run to MLflow
print("\nüì° Logging AutoML run to MLflow...")

run_id = tracker.log_automl_run(
    config={
        "timeout": 120,
        "cpu_limit": 4,
        "quick_mode": True,
        "max_rows_for_pandas": 50000,
    },
    best_score=automl_result.best_score,
    search_time=automl_result.search_time,
    problem_type=automl_result.problem_type,
    metric=automl_result.metric,
)
print(f"  AutoML: run_id={run_id}" if run_id else "  AutoML: skipped")

---
## 12. üî¨ Model Comparison (NEW)

The `ModelComparison` framework tracks experiments and measures the impact
of feature engineering and model selection.

In [0]:
from backend.core.models.model_comparison import ModelComparison

# Initialize comparison framework
comparison = ModelComparison(primary_metric='accuracy')

# Add baseline results
for result in all_results:
    comparison.add_baseline_result(result)

# Add XGBoost result
comparison.add_experiment(
    name="XGBoost + Engineered Features",
    model_name="XGBoost",
    feature_set="engineered",
    metrics=test_metrics,
    training_time=0
)

# Add AutoML result
comparison.add_experiment(
    name="LightAutoML",
    model_name="LightAutoML Ensemble",
    feature_set="engineered",
    metrics=automl_metrics,
    training_time=automl_result.search_time
)

print("‚úÖ Added all experiments to comparison framework")

In [None]:
# Log feature comparison runs to MLflow (XGBoost + AutoML with engineered features)
print("\nüì° Logging feature comparison runs to MLflow...")

run_id = tracker.log_feature_comparison_run(
    model_name="XGBoost",
    feature_set="engineered",
    metrics=test_metrics,
    training_time=0,
    num_features=len(feature_names),
)
print(f"  XGBoost (engineered): run_id={run_id}" if run_id else "  XGBoost (engineered): skipped")

run_id = tracker.log_feature_comparison_run(
    model_name="LightAutoML Ensemble",
    feature_set="engineered",
    metrics=automl_metrics,
    training_time=automl_result.search_time,
    num_features=len(df_with_features.columns),
)
print(f"  LightAutoML (engineered): run_id={run_id}" if run_id else "  LightAutoML (engineered): skipped")

In [0]:
# Get comparison results
comparison_result = comparison.get_comparison()

print("\nüî¨ MODEL COMPARISON:")
print("=" * 70)
print(comparison_result.comparison_table.to_string())

In [0]:
# Display improvements
print("\nüìà IMPACT ANALYSIS:")
print("-" * 50)

if comparison_result.improvements:
    for impact_name, impact_data in comparison_result.improvements.items():
        print(f"\n{impact_name.replace('_', ' ').title()}:")
        print(f"  Absolute Improvement: {impact_data.get('absolute_improvement', 0):.4f}")
        print(f"  Percentage Improvement: {impact_data.get('percentage_improvement', 0):.2f}%")
        print(f"  Best Model: {impact_data.get('best_model', 'N/A')}")

In [0]:
# Best model
print("\nüèÜ BEST MODEL:")
print("-" * 50)
best = comparison_result.best_experiment
print(f"  Name: {best.name}")
print(f"  Model: {best.model_name}")
print(f"  Feature Set: {best.feature_set}")
print(f"  {comparison.primary_metric.title()}: {best.metrics.get(comparison.primary_metric, 0):.4f}")

---
## 13. MLflow Experiment Summary

View all tracked runs from this session. These runs are stored locally in `./mlruns/` and can also be viewed with the MLflow UI by running `mlflow ui` from the project root.

In [None]:
# Retrieve and display all MLflow runs for this experiment
print("MLflow EXPERIMENT RUNS")
print("=" * 80)

runs = tracker.get_runs(max_results=50)

if runs:
    import pandas as pd

    runs_data = []
    for run in runs:
        row = {
            "Run Name": run["run_name"],
            "Status": run["status"],
            "Model Type": run["tags"].get("model_type", ""),
        }
        # Add key metrics
        for metric_name in ["accuracy", "f1", "auc", "best_score",
                            "test_accuracy", "test_f1", "test_auc",
                            "training_time_seconds"]:
            if metric_name in run["metrics"]:
                row[metric_name] = round(run["metrics"][metric_name], 4)
        runs_data.append(row)

    runs_df = pd.DataFrame(runs_data)
    print(f"\nTotal runs: {len(runs)}")
    print(f"Experiment: {tracker.get_experiment_info()['name']}\n")
    display(runs_df)
else:
    print("No MLflow runs found for this experiment.")

print(f"\nTo launch the MLflow UI, run: mlflow ui --backend-store-uri file://<path-to-mlruns>")

---
## Summary

This notebook demonstrated all the features in Spark Tune:

| Feature | Module | Description |
|---------|--------|-------------|
| **MLflow Tracking** | `services.mlflow_service` | Experiment tracking for all model runs |
| Data Quality Checking | `core.profiling.data_quality` | PySpark-native quality analysis |
| YData Profiling | `core.profiling.ydata_profiler` | Comprehensive data profiling |
| Time Series Detection | `core.utils.time_series_detector` | Automatic temporal pattern detection |
| Enhanced Preprocessing | `core.features.preprocessing_enhanced` | Missing values, outliers, scaling |
| Baseline Models | `core.models.baseline_models` | Simple model comparison |
| **SHAP Analysis** | `core.features.feature_selector` | Model interpretability with SHAP |
| **Feature Insights** | `core.features.insight_analyzer` | Lift, Support, RIG analysis (SparkBeyond-style) |
| LightAutoML | `core.models.evalml_runner` | Automated machine learning |
| Model Comparison | `core.models.model_comparison` | Impact analysis framework |

### Key Metrics in Feature Insight Analysis:
- **Lift**: How much better a feature condition performs vs. baseline (e.g., x3.09 = 3x better)
- **Support**: Percentage of data covered by the condition (e.g., 25% = 10,234 records)
- **RIG**: Relative Information Gain - how much information the feature provides about the target

In [0]:
# Cleanup
# spark.stop()
# print("\n‚úÖ Spark session stopped. Demo complete!")