# Spark Default Feature Engineering

This notebook demonstrates:
- Spark as the default engine for feature engineering
- Automatic engine detection
- Enhanced features with Spark (temporal, categorical, validation)
- Pipeline with Spark DataFrames
- Performance comparisons

In [None]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

# Check PySpark availability
try:
    from pyspark.sql import SparkSession
    from pyspark.sql import DataFrame as SparkDataFrame
    PYSPARK_AVAILABLE = True
    print("✓ PySpark is available")
except ImportError:
    PYSPARK_AVAILABLE = False
    print("⚠ PySpark not available - some examples will be skipped")

from cr_score.features import (
    create_feature_engineer,
    create_feature_engineer_auto,
    FeatureEngineer,
    FeatureEngineeringConfig,
    FeatureRecipe,
    AggregationType,
    TemporalTrendFeatures,
    CategoricalEncoder,
    FeatureValidator,
)

from cr_score.pipeline import ScorecardPipeline

pd.set_option('display.max_columns', None)
print("\n✓ All imports successful")

## 1. Spark as Default Engine

### 1.1 Factory Function Defaults to Spark

The `create_feature_engineer()` function now defaults to Spark for large-scale processing.

In [None]:
if PYSPARK_AVAILABLE:
    config = FeatureEngineeringConfig(
        recipes=[
            FeatureRecipe("max_balance_3m", "balance", AggregationType.MAX, window="last_3_months"),
            FeatureRecipe("avg_util", "utilization", AggregationType.MEAN),
        ],
        id_col="customer_id"
    )
    
    # Default: Spark engine
    engineer = create_feature_engineer(config)
    print(f"Default engine: {type(engineer).__name__}")
    print("✓ Factory function defaults to Spark")
else:
    print("⚠ PySpark not available - skipping Spark examples")

### 1.2 Explicit Engine Selection

You can still explicitly choose pandas if needed:

In [None]:
config = FeatureEngineeringConfig(
    recipes=[
        FeatureRecipe("max_balance_3m", "balance", AggregationType.MAX, window="last_3_months"),
    ],
    id_col="customer_id"
)

# Explicit pandas engine
engineer_pandas = create_feature_engineer(config, engine="pandas")
print(f"Explicit pandas engine: {type(engineer_pandas).__name__}")
print("✓ Backward compatibility maintained")

## 2. Automatic Engine Detection

### 2.1 Auto-Detection Function

The `create_feature_engineer_auto()` function automatically detects DataFrame type.

In [None]:
# Create sample data
df_pandas = pd.DataFrame({
    'customer_id': [1, 1, 1, 2, 2, 2],
    'date': pd.date_range('2024-01-01', periods=6, freq='M'),
    'balance': [1000, 1200, 1100, 2000, 2100, 2050],
    'utilization': [0.3, 0.35, 0.32, 0.5, 0.55, 0.52],
})

print("Sample pandas DataFrame:")
print(df_pandas.head())

if PYSPARK_AVAILABLE:
    spark = SparkSession.builder.appName("CR_Score_Playbook").getOrCreate()
    df_spark = spark.createDataFrame(df_pandas)
    
    print("\n✓ Spark DataFrame created")
    
    # Auto-detect from DataFrame type
    config = FeatureEngineeringConfig(
        recipes=[FeatureRecipe("max_balance", "balance", AggregationType.MAX)],
        id_col="customer_id"
    )
    
    engineer_spark = create_feature_engineer_auto(df_spark, config)
    print(f"Auto-detected engine for Spark DataFrame: {type(engineer_spark).__name__}")
    
    engineer_pandas_auto = create_feature_engineer_auto(df_pandas, config, prefer_spark=False)
    print(f"Auto-detected engine for pandas DataFrame (prefer_spark=False): {type(engineer_pandas_auto).__name__}")
    
    engineer_pandas_spark = create_feature_engineer_auto(df_pandas, config, prefer_spark=True)
    print(f"Auto-detected engine for pandas DataFrame (prefer_spark=True): {type(engineer_pandas_spark).__name__}")

### 2.2 Unified FeatureEngineer Class

The `FeatureEngineer` class provides a unified interface with automatic detection.

In [None]:
config = FeatureEngineeringConfig(
    recipes=[FeatureRecipe("max_balance", "balance", AggregationType.MAX)],
    id_col="customer_id"
)

# Unified engineer - works with both pandas and Spark
engineer = FeatureEngineer(config)

if PYSPARK_AVAILABLE:
    # Works with Spark DataFrame
    result_spark = engineer.fit_transform(df_spark)
    print(f"Detected engine: {engineer.detected_engine}")
    print(f"Result type: {type(result_spark).__name__}")
    print("\n✓ Auto-detection working with Spark")

# Works with pandas DataFrame
result_pandas = engineer.fit_transform(df_pandas)
print(f"Detected engine: {engineer.detected_engine}")
print(f"Result type: {type(result_pandas).__name__}")
print("\n✓ Auto-detection working with pandas")

## 3. Enhanced Features with Spark

### 3.1 Temporal Trend Features

Temporal features automatically detect Spark DataFrames and use Spark implementations.

In [None]:
if PYSPARK_AVAILABLE:
    trend = TemporalTrendFeatures()
    
    # Delta feature
    df_delta = trend.delta(df_spark, "balance", time_col="date", group_cols=["customer_id"])
    print("✓ Delta feature created with Spark")
    print(f"Result type: {type(df_delta).__name__}")
    
    # Momentum feature
    df_momentum = trend.momentum(df_spark, "balance", time_col="date", group_cols=["customer_id"], window=3)
    print("✓ Momentum feature created with Spark")
    
    # Volatility feature
    df_volatility = trend.volatility(df_spark, "balance", time_col="date", group_cols=["customer_id"], window=3)
    print("✓ Volatility feature created with Spark")
    
    # Show results
    print("\nSample results:")
    df_delta.select("customer_id", "date", "balance", "balance_delta").show(10)
else:
    print("⚠ PySpark not available - skipping Spark temporal features")

### 3.2 Categorical Encoding

Categorical encoding automatically uses Spark with broadcast joins for efficiency.

In [None]:
if PYSPARK_AVAILABLE:
    # Add categorical column
    from pyspark.sql import functions as F
    df_cat = df_spark.withColumn("account_type", F.when(F.col("customer_id") == 1, "Premium").otherwise("Standard"))
    
    encoder = CategoricalEncoder()
    
    # Frequency encoding
    df_freq = encoder.freq_encoding(df_cat, "account_type")
    print("✓ Frequency encoding with Spark")
    print(f"Result type: {type(df_freq).__name__}")
    
    # Show results
    df_freq.select("account_type", "account_type_freq").distinct().show()
else:
    print("⚠ PySpark not available - skipping Spark categorical encoding")

### 3.3 Feature Validation

Feature validation uses Spark aggregations for efficient computation on large datasets.

In [None]:
if PYSPARK_AVAILABLE:
    validator = FeatureValidator(
        warning_thresholds={'missing_rate': 0.05},
        hard_fail_thresholds={'missing_rate': 0.20}
    )
    
    results = validator.validate_features(df_spark, feature_list=["balance", "utilization"])
    print("✓ Feature validation with Spark")
    
    # Convert to DataFrame for display
    results_df = validator.to_dataframe()
    print("\nValidation Results:")
    print(results_df)
else:
    print("⚠ PySpark not available - skipping Spark validation")

## 4. Pipeline with Spark

### 4.1 Pipeline Auto-Detection

The `ScorecardPipeline` now supports Spark DataFrames with automatic detection.

In [None]:
if PYSPARK_AVAILABLE:
    # Create sample training data with target
    train_data = pd.DataFrame({
        'age': np.random.randint(18, 70, 1000),
        'income': np.random.randint(20000, 150000, 1000),
        'credit_score': np.random.randint(300, 850, 1000),
        'default': np.random.binomial(1, 0.1, 1000),
    })
    
    train_spark = spark.createDataFrame(train_data)
    
    # Pipeline with Spark DataFrame
    pipeline = ScorecardPipeline(max_n_bins=5, prefer_spark=True)
    
    print("Fitting pipeline with Spark DataFrame...")
    pipeline.fit(train_spark, target_col="default")
    
    print(f"\n✓ Pipeline fitted with engine: {pipeline.engine_}")
    
    # Create test data
    test_data = pd.DataFrame({
        'age': np.random.randint(18, 70, 100),
        'income': np.random.randint(20000, 150000, 100),
        'credit_score': np.random.randint(300, 850, 100),
    })
    
    test_spark = spark.createDataFrame(test_data)
    
    # Predict with Spark DataFrame
    scores = pipeline.predict(test_spark)
    print(f"\n✓ Predictions generated: {len(scores)} scores")
    print(f"Score range: {scores.min():.0f} - {scores.max():.0f}")
else:
    print("⚠ PySpark not available - skipping Spark pipeline example")

## 5. Performance Comparison

### 5.1 Small Dataset (Pandas Advantage)

For small datasets, pandas may be faster due to overhead.

In [None]:
import time

# Small dataset
df_small = pd.DataFrame({
    'customer_id': np.repeat(range(100), 12),
    'date': pd.date_range('2024-01-01', periods=1200, freq='M'),
    'balance': np.random.randn(1200) * 1000 + 5000,
})

config = FeatureEngineeringConfig(
    recipes=[FeatureRecipe("max_balance", "balance", AggregationType.MAX)],
    id_col="customer_id"
)

# Pandas
start = time.time()
engineer_pandas = create_feature_engineer(config, engine="pandas")
result_pandas = engineer_pandas.fit_transform(df_small)
time_pandas = time.time() - start

print(f"Pandas execution time: {time_pandas:.3f}s")

if PYSPARK_AVAILABLE:
    df_small_spark = spark.createDataFrame(df_small)
    
    start = time.time()
    engineer_spark = create_feature_engineer(config, engine="spark")
    result_spark = engineer_spark.fit_transform(df_small_spark)
    time_spark = time.time() - start
    
    print(f"Spark execution time: {time_spark:.3f}s")
    print(f"\nFor small datasets, pandas is {time_spark/time_pandas:.1f}x {'faster' if time_pandas < time_spark else 'slower'}")

### 5.2 Large Dataset (Spark Advantage)

For large datasets, Spark provides significant performance improvements.

In [None]:
if PYSPARK_AVAILABLE:
    # Larger dataset (10K customers, 12 months each = 120K rows)
    print("Creating large dataset (120K rows)...")
    
    df_large = pd.DataFrame({
        'customer_id': np.repeat(range(10000), 12),
        'date': pd.date_range('2024-01-01', periods=120000, freq='M'),
        'balance': np.random.randn(120000) * 1000 + 5000,
    })
    
    config = FeatureEngineeringConfig(
        recipes=[FeatureRecipe("max_balance", "balance", AggregationType.MAX)],
        id_col="customer_id"
    )
    
    # Pandas (may be slow or OOM)
    try:
        start = time.time()
        engineer_pandas = create_feature_engineer(config, engine="pandas")
        result_pandas = engineer_pandas.fit_transform(df_large)
        time_pandas = time.time() - start
        print(f"Pandas execution time: {time_pandas:.3f}s")
    except MemoryError:
        print("⚠ Pandas ran out of memory (expected for large datasets)")
        time_pandas = None
    
    # Spark
    df_large_spark = spark.createDataFrame(df_large)
    
    start = time.time()
    engineer_spark = create_feature_engineer(config, engine="spark")
    result_spark = engineer_spark.fit_transform(df_large_spark)
    time_spark = time.time() - start
    
    print(f"Spark execution time: {time_spark:.3f}s")
    
    if time_pandas:
        print(f"\nFor large datasets, Spark is {time_pandas/time_spark:.1f}x faster")
    else:
        print("\n✓ Spark handles large datasets that pandas cannot")
else:
    print("⚠ PySpark not available - skipping large dataset comparison")

## 6. Summary

### Key Takeaways:

1. **Spark is Default**: Factory function defaults to Spark for large-scale processing
2. **Auto-Detection**: Unified interface automatically detects DataFrame type
3. **Enhanced Features**: All enhanced features work seamlessly with Spark
4. **Pipeline Support**: ScorecardPipeline supports Spark DataFrames end-to-end
5. **Backward Compatible**: Existing pandas code continues to work

### When to Use:

- **Spark (Default)**: Large datasets (>100K rows), distributed processing, production pipelines
- **Pandas (Explicit)**: Small datasets (<10K rows), quick prototyping, single-machine development

In [None]:
print("\n" + "="*80)
print("Spark Default Feature Engineering - Complete!")
print("="*80)
print("\n✓ Spark is now the default engine for feature engineering")
print("✓ Auto-detection works seamlessly")
print("✓ Enhanced features support Spark")
print("✓ Pipeline supports Spark DataFrames")
print("\nAll feature engineering processes can now be done in Spark by default!")