# Batch Inference Job Configuration Guide

## Overview
This configuration sets up a Databricks job for batch inference processing of wine quality predictions. The job is designed to process CSV files containing wine features and generate predictions with SHAP explanations.



## Job Configuration

### Basic Information
- **Job Name**: `Batch-Inference-Job`
- **Source**: WORKSPACE

### Cluster Configuration
- **Spark Version**: 15.4.x-scala2.12
- **Node Type**: Standard_D4ds_v5
- **Runtime Engine**: STANDARD
- **Cluster Type**: CLASSIC_PREVIEW
- **ML Runtime**: Enabled
- **Single Node**: Yes
- **Data Security Mode**: DATA_SECURITY_MODE_DEDICATED

![Job Compute Configuration](screenshot/JobCompute.png)

### Job Parameters
The job accepts the following parameters:
1. `catalog_name`: Target catalog for data storage
2. `schema_name`: Target schema within the catalog
3. `file_name`: Name of the input CSV file to process
4. `folder_name`: Name of the input CSV folder (to segregate differnet users)

![Job Parameters Configuration](screenshot/JobParameters.png)

## Deployment Instructions

1. **Create Job**
   - Navigate to Databricks Workspace
   - Go to Jobs → Create Job
   - Select "Notebook" as the task type

2. **Configure Job Settings**
   - Set the job name to `Batch-Inference-Job`
   - Specify the notebook path

3. **Set Up Cluster**
   - Create a new cluster with the specified configuration
   - Ensure ML runtime is enabled
   - Set up the environment variables

4. **Configure Parameters**
   - Add the four required parameters:
     - `catalog_name`
     - `schema_name`
     - `file_name`
     - `folder_name`
   - Keep them as empty

In [0]:
dbutils.widgets.text("catalog_name", "")
dbutils.widgets.text("schema_name", "")
dbutils.widgets.text("folder_name", "")
dbutils.widgets.text("file_name", "")

In [0]:
# Training Wine Quality Model with SHAP Explanations

catalog_name = dbutils.widgets.get("catalog_name")
schema_name = dbutils.widgets.get("schema_name")
folder_name = dbutils.widgets.get("folder_name")
volume_upload_path = "batch_inference_upload"
volume_download_path = "batch_inference_download"
file_name = dbutils.widgets.get("file_name")

# Create schema if it doesn't exist
query = f"""
CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}
"""
spark.sql(query)

# Create upload and download volumes within the catalog and schema
query_upload_volume = f"""
CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.{volume_upload_path}
"""
spark.sql(query_upload_volume)

query_download_volume = f"""
CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.{volume_download_path}
"""
spark.sql(query_download_volume)

In [0]:
import mlflow
from pyspark.sql.functions import struct, col

mlflow.set_registry_uri("databricks-uc")

logged_model = f'models:/{catalog_name}.{schema_name}.wine_quality_with_shap@champion'

# Load model as a Spark UDF. Override result_type if the model does not return double values.
loaded_model = mlflow.pyfunc.load_model(logged_model)

In [0]:
df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(f"/Volumes/{catalog_name}/{schema_name}/{volume_upload_path}/{folder_name}/{file_name}").toPandas()
display(df)

In [0]:
# Predict on a Pandas DataFrame.
import pandas as pd

model_output = loaded_model.predict(df)

In [0]:
def process_batch_prediction(df, model_output):
    # Create a results DataFrame
    results = df.copy()
    
    # Add prediction column
    results['prediction'] = model_output['predictions']
    
    # Add SHAP values as separate columns
    for i, feature in enumerate(model_output['feature_names']):
        results[f'shap_{feature}'] = [shap[i] for shap in model_output['shap_values']]
  
    
    # Reorder columns to match real-time format
    feature_cols = [col for col in results.columns if col not in ['row_id', 'prediction'] and not col.startswith('shap_')]
    shap_cols = [col for col in results.columns if col.startswith('shap_')]
    
    final_cols = feature_cols + ['prediction'] + shap_cols
    
    return results[final_cols]

results_df = process_batch_prediction(df, model_output)

In [0]:
spark.createDataFrame(results_df).createOrReplaceTempView("prediction_results")

explanation_query = f"""
SELECT 
    *,
    ai_query(
        '{catalog_name}-{schema_name}-wine_quality_interpreter',
        CONCAT(
            'Analyze these wine quality prediction features and SHAP values: ',
            to_json(named_struct(
                'features', named_struct(
                    'fixed_acidity', fixed_acidity,
                    'volatile_acidity', volatile_acidity,
                    'citric_acid', citric_acid,
                    'residual_sugar', residual_sugar,
                    'chlorides', chlorides,
                    'free_sulfur_dioxide', free_sulfur_dioxide,
                    'total_sulfur_dioxide', total_sulfur_dioxide,
                    'density', density,
                    'pH', pH,
                    'sulphates', sulphates,
                    'alcohol', alcohol,
                    'is_red', is_red
                ),
                'shap_values', named_struct(
                    'shap_fixed_acidity', shap_fixed_acidity,
                    'shap_volatile_acidity', shap_volatile_acidity,
                    'shap_citric_acid', shap_citric_acid,
                    'shap_residual_sugar', shap_residual_sugar,
                    'shap_chlorides', shap_chlorides,
                    'shap_free_sulfur_dioxide', shap_free_sulfur_dioxide,
                    'shap_total_sulfur_dioxide', shap_total_sulfur_dioxide,
                    'shap_density', shap_density,
                    'shap_pH', shap_pH,
                    'shap_sulphates', shap_sulphates,
                    'shap_alcohol', shap_alcohol,
                    'shap_is_red', shap_is_red
                ),
                'prediction', prediction
            ))
        )
    ) as explanation
FROM prediction_results
"""

In [0]:
# Execute the query
explanations_df = spark.sql(explanation_query)

# Save the final results
output_dir = f"/Volumes/{catalog_name}/{schema_name}/{volume_download_path}/{folder_name}"
dbutils.fs.mkdirs(output_dir)
output_path = f"{output_dir}/results_with_explanations_{file_name}"
explanations_df.toPandas().to_csv(output_path, index=False)