###  **PySpark Regression Pipeline Demo – Predicting Medical Charges**

####  **Overview**:

This demo walks through a complete **machine learning pipeline in PySpark** to predict `charges` (medical costs) based on features such as `age`, `sex`, `bmi`, `smoker`, `region`, and `children`. It includes **data preprocessing**, **feature engineering**, **model training**, and **evaluation** using multiple regression metrics.

####  **Key Steps**:

* Encode categorical features using `StringIndexer` and `OneHotEncoder`
* Assemble all features into a single vector
* Standardize numerical inputs (optional but recommended)
* Train a `LinearRegression` model using Spark MLlib
* Evaluate the model using RMSE, MAE, MSE, and R²


# Import Required Libraries

In [2]:

import  pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create SparkSession

In [3]:
spark = SparkSession.builder.appName('practice').getOrCreate()
spark

# Load and Inspect Dataset

In [4]:
# Load dataset
# Replace with actual path or use a Spark table in Databricks
df = spark.read.csv("TRAIN.csv", header=True, inferSchema=True)

In [5]:
df.printSchema()
df.show(truncate=False)

root
 |-- age: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- children: integer (nullable = true)
 |-- charges: double (nullable = true)

+-----------+------+-----------+------+---------+--------+-----------+
|age        |sex   |bmi        |smoker|region   |children|charges    |
+-----------+------+-----------+------+---------+--------+-----------+
|21.0       |male  |25.745     |no    |northeast|2       |3279.86855 |
|36.976978  |female|25.74416485|yes   |southeast|3       |21454.49424|
|18.0       |male  |30.03      |no    |southeast|1       |1720.3537  |
|37.0       |male  |30.67689127|no    |northeast|3       |6801.437542|
|58.0       |male  |32.01      |no    |southeast|1       |11946.6259 |
|46.0       |male  |26.62      |no    |southeast|1       |7742.1098  |
|25.22173065|male  |31.19264736|no    |northeast|4       |21736.32814|
|29.48644271|female|24.

In [6]:
df.columns

['age', 'sex', 'bmi', 'smoker', 'region', 'children', 'charges']

# Preprocess Data – Encode Categorical Features

In [7]:
from pyspark.ml.feature import StringIndexer

# Define categorical columns
categorical_cols = ['sex', 'smoker', 'region']
index_output_cols = [col + "_index" for col in categorical_cols]

# Create a single StringIndexer for multiple columns
indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="keep")

# Fit and transform the data
df = indexer.fit(df).transform(df)

# Show results
df.select(categorical_cols + index_output_cols).show(5)


+------+------+---------+---------+------------+------------+
|   sex|smoker|   region|sex_index|smoker_index|region_index|
+------+------+---------+---------+------------+------------+
|  male|    no|northeast|      0.0|         0.0|         3.0|
|female|   yes|southeast|      1.0|         1.0|         0.0|
|  male|    no|southeast|      0.0|         0.0|         0.0|
|  male|    no|northeast|      0.0|         0.0|         3.0|
|  male|    no|southeast|      0.0|         0.0|         0.0|
+------+------+---------+---------+------------+------------+
only showing top 5 rows


# Preprocess Data – Index

In [8]:
from pyspark.ml.feature import OneHotEncoder

ohe_output_cols = [col + "_vec" for col in categorical_cols]
encoder = OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)
df = encoder.fit(df).transform(df)

# Assemble Features into a Single Vector

In [9]:
numeric_cols = ['age', 'bmi', 'children']
assembler_inputs = numeric_cols + ohe_output_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
df = assembler.transform(df)

#  Standardize Features

In [10]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

# Split Data into Training and Test Sets

In [11]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Train Linear Regression Model

In [12]:
lr = LinearRegression(featuresCol="scaled_features", labelCol="charges")
lr_model = lr.fit(train_df)

# Make Predictions on Test Data

In [13]:
predictions = lr_model.transform(test_df)
predictions.show()

+-----------+------+-----------+------+---------+--------+-----------+---------+------------+------------+-------------+-------------+-------------+--------------------+--------------------+------------------+
|        age|   sex|        bmi|smoker|   region|children|    charges|sex_index|smoker_index|region_index|      sex_vec|   smoker_vec|   region_vec|            features|     scaled_features|        prediction|
+-----------+------+-----------+------+---------+--------+-----------+---------+------------+------------+-------------+-------------+-------------+--------------------+--------------------+------------------+
|       18.0|female|      25.08|    no|northeast|       0|  2196.4732|      1.0|         0.0|         3.0|(2,[1],[1.0])|(2,[0],[1.0])|(4,[3],[1.0])|(11,[0,1,4,5,10],...|[-1.7189520097957...|2219.6308871349247|
|       18.0|female|      27.28|   yes|southeast|       3| 18223.4512|      1.0|         1.0|         0.0|(2,[1],[1.0])|(2,[1],[1.0])|(4,[0],[1.0])|(11,[0,1,2,4

# Evaluate Model Performance with Regression Metrics

In [14]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define evaluators with different metrics
evaluator_rmse = RegressionEvaluator(labelCol="charges", predictionCol="prediction", metricName="rmse")
evaluator_mse = RegressionEvaluator(labelCol="charges", predictionCol="prediction", metricName="mse")
evaluator_mae = RegressionEvaluator(labelCol="charges", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="charges", predictionCol="prediction", metricName="r2")

# Evaluate the model
rmse = evaluator_rmse.evaluate(predictions)
mse = evaluator_mse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

# Print all results
print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")
print(f"Mean Squared Error (MSE): {mse:.4f}")
print(f"Mean Absolute Error (MAE): {mae:.4f}")
print(f"R-squared (R²): {r2:.4f}")


Root Mean Squared Error (RMSE): 5844.4031
Mean Squared Error (MSE): 34157047.3721
Mean Absolute Error (MAE): 3843.7246
R-squared (R²): 0.6883


In [15]:
predictions.select("age", "sex", "smoker", "region", "charges", "prediction").show(5)

+----+------+------+---------+-----------+------------------+
| age|   sex|smoker|   region|    charges|        prediction|
+----+------+------+---------+-----------+------------------+
|18.0|female|    no|northeast|  2196.4732|2219.6308871349247|
|18.0|female|   yes|southeast| 18223.4512|25518.202723848706|
|18.0|female|    no|northeast|7323.734819| 3372.945486812723|
|18.0|female|    no|northeast|  4561.1885| 6241.049343137458|
|18.0|female|    no|southeast|  1629.8335|3682.9422511236153|
+----+------+------+---------+-----------+------------------+
only showing top 5 rows
