# Module 6: Machine Learning Pipelines

## Business Context: TechCorp Salary Prediction Model

**The Goal:**
TechCorp HR wants to deploy a **Salary Prediction Model** that:
1. Suggests fair compensation for new hires based on experience and location
2. Identifies underpaid employees for salary adjustments
3. Supports budget planning for new positions

**What we're building:**
A complete ML Pipeline that predicts `salary` based on:
- `age` (proxy for experience)
- `country` (market rate differences)
- `source` (recruitment channel quality)

**Why Pipeline?** In production, HR will upload a CSV of candidates. The pipeline must apply the EXACT same transformations (imputation, encoding, scaling) as during training.

---

**Training Objective:** Master Spark ML Pipelines to create reproducible, production-ready ML workflows with MLflow tracking.

**Scope:**
- Pipeline Concepts: Why use Pipelines?
- Defining Stages: Chaining Imputers, Encoders, Scalers, and Models
- MLflow Tracking: Logging experiments, parameters, and metrics
- Hyperparameter Tuning: Using CrossValidator for automatic model selection
- Model Persistence: Saving the pipeline for production use

## Context and Requirements

- **Training day:** Day 1 - Data Preparation Fundamentals
- **Notebook type:** Demo
- **Technical requirements:**
  - Databricks Runtime 14.x LTS or newer
  - Unity Catalog enabled
  - MLflow enabled (default in Databricks)
  - Permissions: CREATE TABLE, SELECT, MODIFY
- **Dependencies:** `02_Data_Splitting.ipynb` (creates `customer_train`, `customer_test` tables)
- **Execution time:** ~30 minutes

> **Note:** This module brings together all previous concepts into a production-ready workflow.

## Theoretical Background

**Spark ML Pipelines:**
A Pipeline is a sequence of stages (Transformers and Estimators) executed in order. This ensures:
1. Consistent data transformation between training and inference
2. No data leakage (transformers fit only on training data)
3. Easy saving/loading of the entire workflow

**CrossValidator:**
Performs k-fold cross-validation with hyperparameter tuning:
- Splits training data into k folds
- Trains model on k-1 folds, validates on remaining fold
- Repeats for all combinations of hyperparameters
- Total models trained: `len(paramGrid) × numFolds`

**MLflow with Unity Catalog Models:**

| Feature | Description |
|---------|-------------|
| **Model Registry** | `catalog.schema.model_name` format |
| **Versioning** | Automatic version tracking (v1, v2, ...) |
| **Aliases** | `@champion`, `@challenger` for deployment stages |
| **Governance** | Unity Catalog permissions apply |
| **Lineage** | Track data and model dependencies |

**️ Unity Catalog requires Model Signature:**

Unity Catalog models **MUST** include a signature (input/output schema). Without it, registration fails.

```python
from mlflow.models.signature import infer_signature

# Infer signature from training data and predictions
signature = infer_signature(train_df.toPandas(), predictions.select("prediction").toPandas())

# Register with signature
mlflow.spark.log_model(
    model, 
    "model", 
    signature=signature,
    input_example=train_df.limit(5).toPandas(),
    registered_model_name="catalog.schema.model"
)
```

## Per-User Isolation

Run the initialization script for per-user catalog and schema isolation:

## Preprocessing Methods by Model Type

Different ML models have different requirements for data preprocessing. Choosing the wrong method can hurt performance!

### Scaling Requirements

| Model Type | Scaling Required? | Recommended Scaler | Why? |
|------------|-------------------|-------------------|------|
| **Linear Regression** | Yes | StandardScaler / RobustScaler | Coefficients sensitive to feature magnitude |
| **Logistic Regression** | Yes | StandardScaler | Gradient descent converges faster |
| **SVM** | Yes | StandardScaler | Distance-based, needs normalized features |
| **k-NN** | Yes | MinMaxScaler / StandardScaler | Distance-based algorithm |
| **Neural Networks** | Yes | StandardScaler / MinMaxScaler | Activation functions work best in [-1,1] or [0,1] |
| **Decision Tree** | No | None | Splits based on thresholds, scale-invariant |
| **Random Forest** | No | None | Ensemble of trees, scale-invariant |
| **Gradient Boosting (XGBoost, LightGBM)** | No | None | Tree-based, scale-invariant |
| **Naive Bayes** | No | None | Probability-based, not affected by scale |

### Encoding Categorical Variables

| Encoding Method | When to Use | Models | Spark MLlib Class |
|----------------|-------------|--------|-------------------|
| **Label Encoding** (0, 1, 2...) | Ordinal categories (Low/Medium/High) | Tree-based models | `StringIndexer` |
| **One-Hot Encoding** | Nominal categories (Country, Color) | Linear models, NN | `OneHotEncoder` |
| **Target Encoding** | High cardinality (1000+ categories) | All models | Custom / Feature Store |
| **Binary Encoding** | Medium cardinality (10-100 categories) | All models | Custom |

### Common Mistakes

| Mistake | Problem | Solution |
|---------|---------|----------|
| OneHot + Tree models | Unnecessary, creates sparse features | Use `StringIndexer` only |
| Scaling OneHot features | Destroys 0/1 meaning | Scale only numeric features |
| Label encoding for Linear models | Implies false ordering (France=1 < Germany=2) | Use OneHot instead |
| Scaling before Split | Data leakage from test set | Always fit scaler on train only |

### Quick Reference: Pipeline by Model

**For Linear/Logistic Regression:**
```
Imputer -> StringIndexer -> OneHotEncoder -> VectorAssembler(numeric) -> Scaler -> VectorAssembler(all) -> Model
```

**For Tree-based Models (RF, GBT, XGBoost):**
```
Imputer -> StringIndexer -> VectorAssembler -> Model
```
*No scaling, no OneHot needed!*

**For Neural Networks:**
```
Imputer -> StringIndexer -> OneHotEncoder -> VectorAssembler -> StandardScaler -> Model
```

In [0]:
%run ./00_Setup

**Import Libraries and Load Data:**

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler, RobustScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Load Raw Split Data (We start from scratch in the pipeline!)
train_df = spark.table("customer_train")
test_df = spark.table("customer_test")

## Section 1: Defining Pipeline Stages

**Why use a Pipeline?**
1.  **Prevention of Data Leakage:** When we calculate things like "Mean" for imputation or "Max" for scaling, we must calculate them **only on the Training set** and apply them to the Test set. A Pipeline ensures `fit()` is called on Train and `transform()` on Test.
2.  **Reproducibility:** It bundles all preprocessing steps and the model into a single artifact.
3.  **Simplicity:** You can save/load the entire workflow as one object.

We will reconstruct our manual steps into a reusable Pipeline.

Transformation and Data Cleaning

In [0]:
from pyspark.sql.functions import regexp_replace, upper, datediff, current_date, col, when, coalesce, lit

# IMPORTANT: Apply same transformations to BOTH train and test data!

# 1. Basic cleaning
train_df = train_df.dropDuplicates()
train_df = train_df.withColumn("country", regexp_replace(upper("country"), r"\\", ""))

test_df = test_df.dropDuplicates()
test_df = test_df.withColumn("country", regexp_replace(upper("country"), r"\\", ""))

# 2. Feature Engineering (matching Module 05)
# Calculate tenure_days from registration_date
train_df = train_df.withColumn("tenure_days", datediff(current_date(), col("registration_date")))
test_df = test_df.withColumn("tenure_days", datediff(current_date(), col("registration_date")))

# Fill missing source with "UNKNOWN"
train_df = train_df.withColumn("source", coalesce(col("source"), lit("UNKNOWN")))
test_df = test_df.withColumn("source", coalesce(col("source"), lit("UNKNOWN")))

# Show available features
print("Available columns for modeling:")
print(train_df.columns)

In [0]:
display(train_df)

In [0]:
# ============================================
# PIPELINE - SALARY PREDICTION MODEL
# ============================================
# Target: salary_imputed
# Features for predicting salary:
#   - age_imputed: base age
#   - experience_years: age - 22 (years since graduation)
#   - tenure_days: time with company
#   - ltv_proxy: age * tenure (interaction)
#   - country, source: categorical features

from pyspark.ml.feature import SQLTransformer

# 1. Imputation (age, salary, tenure_days)
imputer = Imputer(
    inputCols=["age", "salary", "tenure_days"], 
    outputCols=["age_imputed", "salary_imputed", "tenure_days_imputed"]
).setStrategy("median")

# 2. Feature Engineering
# Create experience_years (key predictor!) and ltv_proxy
feature_engineer = SQLTransformer(
    statement="""
    SELECT *, 
           age_imputed - 22 AS experience_years,
           age_imputed * tenure_days_imputed AS ltv_proxy,
           AVG(salary_imputed) OVER (PARTITION BY country ORDER BY tenure_days_imputed 
                                     ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS country_salary_avg_rolling,
           CASE 
               WHEN salary_imputed > country_salary_avg_rolling * 1.03 THEN 1
               WHEN salary_imputed < country_salary_avg_rolling * 0.97 THEN -1
               ELSE 0
           END AS salary_3pct_flag
    FROM __THIS__
    """
)

# 3. Encoding (country and source)
indexer_country = StringIndexer(inputCol="country", outputCol="country_idx", handleInvalid="keep")
indexer_source = StringIndexer(inputCol="source", outputCol="source_idx", handleInvalid="keep")
encoder = OneHotEncoder(inputCols=["country_idx", "source_idx"], outputCols=["country_vec", "source_vec"])

# 4. Assembly - Features for salary prediction
assembler = VectorAssembler(
    inputCols=[
        "age_imputed",         # Base age
        "experience_years",    # Years since graduation (key predictor!)
        "tenure_days_imputed",
        "salary_3pct_flag",    # Time with company  
        "ltv_proxy",           # Age * Tenure interaction
        "country_vec",         # Country one-hot encoded
        "source_vec"           # Source one-hot encoded
    ], 
    outputCol="features_raw", 
    handleInvalid="skip"
)

# 5. Scaling
scaler = RobustScaler(inputCol="features_raw", outputCol="features")

# 6. Model
lr = LinearRegression(labelCol="salary_imputed", featuresCol="features")

# --- The Pipeline ---
pipeline = Pipeline(stages=[
    imputer, 
    feature_engineer,
    indexer_country, 
    indexer_source, 
    encoder, 
    assembler, 
    scaler, 
    lr
])

print("Salary Prediction Pipeline created with features:")
print("  - age_imputed")
print("  - experience_years (age - 22)")
print("  - tenure_days_imputed")
print("  - ltv_proxy (age * tenure_days)")
print("  - country_vec (one-hot)")
print("  - source_vec (one-hot)")

In [0]:
# Create a pipeline WITHOUT the model (without LinearRegression)
# stages[:-1] means "all steps from the beginning, but exclude the last one"
preprocessing_pipeline = Pipeline(stages=pipeline.getStages()[:-1])

# Now this is just a Transformer, so it works immediately!
# You don't need to do fit() (unless you have Imputer/Scalers, then fit is fast)
processed_data = preprocessing_pipeline.fit(train_df).transform(train_df)

# Preview what goes into the model
display(processed_data)

## Section 2: Training with MLflow

We use `mlflow.start_run()` to track this experiment.

### What is a Model Signature?

**model signature** defines the schema (column names and data types) of the inputs and outputs that a machine learning model expects and produces. In MLflow and Databricks, model signatures:

- Act as a contract for how to interact with the model.
- Enable automatic validation of input/output data formats.
- Are required for registering models in Unity Catalog.
- Help document and enforce correct usage during deployment.

**Example:**  
A model that predicts salary might have this signature:
- **Inputs:** `age` (integer), `country` (string), `source` (string)
- **Output:** `salary` (double)

Model signatures ensure consistency, safety, and reproducibility in production ML workflows.

In [0]:
# Set MLflow to use Unity Catalog for model registry
mlflow.set_registry_uri("databricks-uc")

# Import signature inference for Unity Catalog (REQUIRED!)
from mlflow.models.signature import infer_signature

# Set Experiment
username = spark.sql("SELECT current_user()").collect()[0][0]
experiment_path = f"/Users/{username}/dp4ml_pipeline_demo"
mlflow.set_experiment(experiment_path)

# Model name for Unity Catalog
model_name = f"{catalog_name}.{schema_name}.salary_prediction_model"

**RegressionEvaluator** is a PySpark class used to evaluate regression models. It computes metrics like RMSE, MAE, and R² by comparing the model's predictions to the true labels.

**evaluate** is a method of RegressionEvaluator that calculates the chosen metric on a given DataFrame containing prediction and label columns.

In [0]:
with mlflow.start_run(run_name="salary_prediction_v1"):
    
    # Log Parameters
    mlflow.log_param("model", "LinearRegression")
    mlflow.log_param("scaler", "RobustScaler")
    
    # Fit Pipeline
    print("Training Pipeline...")
    model = pipeline.fit(train_df)
    
    # Evaluate
    predictions = model.transform(test_df)
    evaluator = RegressionEvaluator(labelCol="salary_imputed", metricName="rmse")
    rmse = evaluator.evaluate(predictions)

    
    # Log Metrics
    mlflow.log_metric("rmse", rmse)
    
    # Infer model signature from input and output data
    # Unity Catalog REQUIRES signature for model registration
    input_example = train_df.limit(5).toPandas()
    signature = infer_signature(
        train_df.toPandas(),
        predictions.select("prediction").toPandas()
    )
    
    evaluator_r2 = RegressionEvaluator(labelCol="salary_imputed", predictionCol="prediction", metricName="r2")
    r2 = evaluator_r2.evaluate(predictions)

    # Log Metrics
    mlflow.log_metric("r2", r2)

    # Log Model and Register to Unity Catalog with signature
    mlflow.spark.log_model(
        model, 
        "model",
        signature=signature,
        input_example=input_example,
        registered_model_name=model_name  # Auto-register to UC
    )
    
    print(f"Model registered to Unity Catalog: {model_name}")
    print(f"RMSE: {rmse}")
    print(f"R2: {r2}")

## Section 3: Hyperparameter Tuning with CrossValidator

**Why tune hyperparameters?**
In the previous example, we used default settings for `LinearRegression`. But models have "knobs" (hyperparameters) that can drastically change performance (e.g., `regParam` for regularization).

**CrossValidator** automates this:
1.  Define a **ParamGrid** (list of hyperparameters to try).
2.  CrossValidator trains $k$ models for each combination (k-fold).
3.  It picks the best model based on the evaluation metric.

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# We reuse our ENHANCED pipeline with feature engineering
lr_tune = LinearRegression(labelCol="salary_imputed", featuresCol="features")

# Use the same enhanced pipeline with all features
pipeline_tune = Pipeline(stages=[
    imputer, 
    feature_engineer,      # Feature engineering step
    indexer_country, 
    indexer_source, 
    encoder, 
    assembler, 
    scaler, 
    lr_tune
])

# Define the Parameter Grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr_tune.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr_tune.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

print(f"Number of hyperparameter combinations to test: {len(paramGrid)}")

In [0]:
# Create CrossValidator
# numFolds=3 means we do 3-fold cross-validation for each param combo
# Total fits = len(paramGrid) * numFolds = 9 * 3 = 27 models!

crossval = CrossValidator(
    estimator=pipeline_tune,
    estimatorParamMaps=paramGrid,
    evaluator=RegressionEvaluator(labelCol="salary_imputed", metricName="r2"),
    numFolds=3,
    parallelism=4,  # Train 4 models in parallel (faster on clusters)
    seed=42  # Added seed for reproducibility
)

print("CrossValidator configured. Training will evaluate 27 models...")

# Fit CrossValidator (This takes longer!)
cv_model = crossval.fit(train_df)

# Get best model
best_model = cv_model.bestModel
print("Best model found!")

In [0]:
print(best_model)


**Explanation of the Code:**

- **Evaluate the best model on TEST set:**  
  We use the trained pipeline (`best_model`) to generate predictions on the test dataset (`test_df`). The `evaluator` (typically a RegressionEvaluator) computes the Root Mean Squared Error (RMSE) on these predictions, giving us a measure of model accuracy on unseen data.

- **Extract the best hyperparameters from the LinearRegression stage:**  
  The pipeline's last stage is the fitted `LinearRegressionModel`. We access it via `best_model.stages[-1]` to inspect which hyperparameters (like `regParam`, `elasticNetParam`) were chosen as best during cross-validation.

- **Compute R2 on test set:**  
  We create a new `RegressionEvaluator` set to the R2 metric, then evaluate the predictions to see how much variance in salary is explained by the model (R2 score).

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the best model on TEST set
predictions_cv = best_model.transform(test_df)
evaluator = RegressionEvaluator(labelCol="salary_imputed", metricName="rmse")
rmse_cv = evaluator.evaluate(predictions_cv)

# Extract the best hyperparameters from the LinearRegression stage
# The last stage in the pipeline is the LinearRegressionModel
lr_model_stage = best_model.stages[-1]

# Compute R2 on test set
evaluator_r2 = RegressionEvaluator(labelCol="salary_imputed", predictionCol="prediction", metricName="r2")
r2_cv = evaluator_r2.evaluate(predictions_cv)

# Compute MAE for additional insight
evaluator_mae = RegressionEvaluator(labelCol="salary_imputed", predictionCol="prediction", metricName="mae")
mae_cv = evaluator_mae.evaluate(predictions_cv)

print(f"Best Model RMSE: {rmse_cv:.4f}")
print(f"Best Model R2: {r2_cv:.4f}")
print(f"Best Model MAE: {mae_cv:.4f}")
print(f"Best regParam: {lr_model_stage.getRegParam()}")
print(f"Best elasticNetParam: {lr_model_stage.getElasticNetParam()}")

# Show cross-validation average metrics for all parameter combinations
print("\n--- Cross-Validation Results (avg RMSE for each param combo) ---")
avg_metrics = cv_model.avgMetrics
for i, (params, metric) in enumerate(zip(paramGrid, avg_metrics)):
    reg = params[lr_tune.regParam]
    elastic = params[lr_tune.elasticNetParam]
    print(f"  regParam={reg}, elasticNetParam={elastic} -> avg RMSE: {metric:.4f}")

These lines print the evaluation metrics and hyperparameters of the best model found by cross-validation:

- **Best Model RMSE: `{rmse_cv}`**  
  Shows the Root Mean Squared Error (RMSE) of the best model on the test set. Lower RMSE means better predictive accuracy.

- **Best Model R2: `{r2_cv}`**  
  Shows the R-squared (coefficient of determination) of the best model on the test set. Higher R2 (closer to 1) means the model explains more variance in the target variable.

- **Best regParam: `{lr_model_stage.getRegParam()}`**  
  Displays the value of the regularization parameter (regParam) chosen by cross-validation. This controls the amount of regularization applied to the model to prevent overfitting.

- **Best elasticNetParam: `{lr_model_stage.getElasticNetParam()}`**  
  Displays the value of the elasticNet mixing parameter (elasticNetParam) chosen by cross-validation. This controls the mix between L1 (lasso) and L2 (ridge) regularization.

In [0]:
import mlflow
from mlflow.models.signature import infer_signature

model_name = f"{catalog_name}.{schema_name}.salary_prediction_best_model_tuned"

# Infer signature from train and test data
input_example = train_df.limit(5).toPandas()
signature = infer_signature(
    train_df.toPandas(),
    best_model.transform(test_df).select("prediction").toPandas()
)

# Register the best model to Unity Catalog
mlflow.spark.log_model(
    best_model,
    "best_model",
    signature=signature,
    input_example=input_example,
    registered_model_name=model_name
)

## Best Practices

### Pipeline Strategy Guide:

| Component | Best Practice | Why |
|-----------|--------------|-----|
| **Order of stages** | Impute → Encode → Scale → Model | Data dependencies |
| **handleInvalid** | Use "keep" for StringIndexer | Handle new categories |
| **Scaler choice** | RobustScaler for outliers | Most robust default |
| **CrossValidator folds** | 3-5 for large data, 5-10 for small | Balance bias/variance |
| **MLflow logging** | Log params, metrics, AND model | Full reproducibility |

### Common Mistakes to Avoid:

1. **Fitting pipeline on all data** → Data leakage
2. **Too many CV folds** → Slow training, no benefit
3. **Not logging to MLflow** → Lost experiments
4. **Huge param grids** → Combinatorial explosion
5. **Not saving the best model** → Can't reproduce

### Pro Tips:

- Use `parallelism` parameter in CrossValidator for faster training
- Start with small param grid, expand based on results
- Always evaluate on holdout TEST set (not validation)
- Use MLflow Model Registry for production deployment
- Save both the pipeline AND the fitted model

## Summary

### What we achieved:

- **Pipeline Definition**: Created end-to-end workflow (Impute → Encode → Scale → Model)
- **MLflow Tracking**: Logged parameters, metrics, and model artifacts
- **Unity Catalog Models**: Registered model with governance and versioning
- **CrossValidator**: Automated hyperparameter search with 3-fold CV

### Key Takeaways:

| # | Principle |
|---|-----------|
| 1 | **Pipelines prevent data leakage** - fit on train, transform on test |
| 2 | **MLflow is essential** - track all experiments |
| 3 | **Unity Catalog Models** - governance, versioning, lineage |
| 4 | **CrossValidator automates tuning** - finds best hyperparameters |
| 5 | **Evaluate on TEST only once** - final unbiased estimate |

### Unity Catalog Artifacts Created:

| Artifact | Location | Purpose |
|----------|----------|---------|
| Experiment | `/Users/{user}/dp4ml_pipeline_demo` | Group runs |
| Model | `{catalog}.{schema}.salary_prediction_model` | Production deployment |
| Versions | v1, v2, ... | Track model iterations |

### Next Steps:

**Next Module:** Module 7 - Feature Store & MLflow (production ML)

## Cleanup

Optionally remove demo artifacts created during exercises:

In [0]:
# Cleanup - remove demo artifacts created in this notebook

# Uncomment the lines below to remove demo artifacts:

# import shutil
# shutil.rmtree(model_path, ignore_errors=True)
# mlflow.delete_experiment(mlflow.get_experiment_by_name(experiment_path).experiment_id)

# print("All demo artifacts removed")

print("Cleanup disabled (uncomment code to remove demo artifacts)")