# Machine Learning with Apache MLlib via PySpark

This notebook demonstrates the use of **Apache MLlib**, the scalable machine learning library built on **Apache Spark**, to perform end-to-end regression modeling. Using **PySpark**, we process the California housing dataset, apply data cleaning, feature engineering, and train a linear regression model.

The workflow includes Spark-native techniques like `Imputer`, `VectorAssembler`, and `StandardScaler` for pipeline construction, ensuring scalability across distributed environments.

Finally, we evaluate the model using `RegressionMetrics` to assess performance.


### Install PySpark

This installs the PySpark library, which provides the Python API for Apache Spark.

In [2]:
%pip install pyspark

Collecting pyspark
  Using cached pyspark-4.0.0.tar.gz (434.1 MB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.9 (from pyspark)
  Using cached py4j-0.10.9.9-py2.py3-none-any.whl.metadata (1.3 kB)
Using cached py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): still running...
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-4.0.0-py2.py3-none-any.whl size=434741340 sha256=98867c7e53a093aff70487b0dd40dd0206329b5937f4e5e6c228ac1992d2f9f5
  Stored in directory: c:\users\jihit\appdata\local\pip\cache\wheels\2d\77\9b\12660be70f7f447940a0caede37ae208b2e0d1c8487dce52a6
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.9 pyspark-4.0.0
Note: you may need to restart 

### Initialize Spark Session

Creates a local Spark session named **MRTB1163** with a custom UI port (4050).

This session allows Spark operations within VSCode.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("MRTB1163")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.UnsupportedOperationException: getSubject is not supported
	at java.base/javax.security.auth.Subject.getSubject(Subject.java:277)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:588)
	at org.apache.spark.util.Utils$.$anonfun$getCurrentUserName$1(Utils.scala:2446)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2446)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:339)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:483)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1447)


### Load and Preview Dataset

Reads the `housing.csv` file into a Spark DataFrame with headers and inferred data types.

Displays the schema and shows the first 5 rows for a quick preview.

In [None]:
df = spark.read.format("csv").load("housing.csv", header=True, inferSchema=True)

df.printSchema()

df.show(5)

### Add ID Column and Basic Aggregation

Adds a unique `id` column to each row using `monotonically_increasing_id()`.

Reorders columns to place `id` first.  

Displays a sample of 3 rows, counts total records, and computes the average of `total_rooms`.


In [None]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn('id', monotonically_increasing_id())

df = df[['id'] + df.columns[:-1]]

df.show(3)

In [None]:
df.count()

In [None]:
df.select('total_rooms').agg({'total_rooms': 'avg'}).show()

### Summary Statistics and Grouped Aggregation

Calculates the mean for all columns in the dataset.  

Then, groups the data by `ocean_proximity` and computes the average for selected numerical columns.


In [None]:
from pyspark.sql.functions import mean

df.select(*[mean(c) for c in df.columns]).show()

In [None]:
df.groupby('ocean_proximity').agg({col: 'avg' for col in df.columns[3:-1]}).show()

### Custom UDF for Feature Transformation

Defines a user-defined function (UDF) to square the `total_rooms` column.

Applies the UDF to create a new column `total_rooms_squared` and displays the first 5 rows.

In [None]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def squared(value):
  return value * value

squared_udf = udf(squared, FloatType())

df.withColumn('total_rooms_squared', squared_udf('total_rooms')).show(5)

In [None]:
df.show(5)

### Train-Test Split and Feature Selection

Splits the dataset into training (70%) and testing (30%) sets.

Removes non-numerical and label columns from the feature list to prepare for model input.

In [None]:
train, test = df.randomSplit([0.7, 0.3])

train, test

In [None]:
numerical_features_lst = train.columns
numerical_features_lst.remove('median_house_value')
numerical_features_lst.remove('id')
numerical_features_lst.remove('ocean_proximity')

numerical_features_lst

### Handle Missing Values with Imputer

Uses `Imputer` from `pyspark.ml` to fill missing values in selected numerical columns.  

Applies the transformation to both training and testing datasets.

In [None]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=numerical_features_lst,
                  outputCols=numerical_features_lst)

imputer = imputer.fit(train)

train = imputer.transform(train)
test = imputer.transform(test)

train.show(3)

### Assemble Numerical Features

Combines all selected numerical columns into a single vector column named `numerical_feature_vector`.

Prepares the data for scaling and model input.

In [None]:
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numerical_features_lst,
                                             outputCol='numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(2)

In [None]:
train.select('numerical_feature_vector').take(2)

### Standardize Features

Applies `StandardScaler` to normalize the numerical feature vector by removing the mean and scaling to unit variance.

Outputs the result as `scaled_numerical_feature_vector` for both training and testing sets.

In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='numerical_feature_vector',
                        outputCol='scaled_numerical_feature_vector',
                        withStd=True, withMean=True)

scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)

train.show(3)

In [None]:
train.select('scaled_numerical_feature_vector').take(3)

### Encode Categorical Feature

Uses `StringIndexer` to convert the categorical `ocean_proximity` column into a numerical index named `ocean_category_index`.

Applies the transformation to both training and testing datasets.

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='ocean_proximity',
                        outputCol='ocean_category_index')

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(3)

In [None]:
# Check Unique Encoded Categories

set(train.select('ocean_category_index').collect()) 

### One-Hot Encode Categorical Feature

Applies `OneHotEncoder` to convert the indexed categorical column into a sparse binary vector named `ocean_category_one_hot`.

Used to prevent ordinal relationships in categorical features during model training.

In [None]:
from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = OneHotEncoder(inputCol='ocean_category_index',
                                outputCol='ocean_category_one_hot')

one_hot_encoder = one_hot_encoder.fit(train)

train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

train.show(3)

### Combine All Features

Merges scaled numerical features and one-hot encoded categorical features into a single column `final_feature_vector` for model training.

In [None]:
assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector',
                                       'ocean_category_one_hot'],
                            outputCol='final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)

In [None]:
train.select('final_feature_vector').take(2)

### Train Linear Regression Model and Predict

1. Initialize a linear regression model with input features and target label.
2. Fit the model on the training dataset.
3. Apply the trained model to the training data to generate predictions and rename the prediction column for clarity.

In [None]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final_feature_vector',
                      labelCol='median_house_value')

lr

In [None]:
lr = lr.fit(train)

lr

In [None]:
pred_train_df = lr.transform(train).withColumnRenamed('prediction',
                                                      'predicted_median_house_value')

pred_train_df.show(5)

### Predict on Test Data

Applies the trained linear regression model to the test dataset and renames the prediction column to `predicted_median_house_value` for easier interpretation.

In [None]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction', 'predicted_median_house_value')

pred_test_df.show(5)

### Convert to Pandas DataFrame

Converts the Spark DataFrame to a Pandas DataFrame for easier inspection or visualization using traditional Python libraries like matplotlib or seaborn.

In [None]:
pred_test_pd_df = pred_test_df.toPandas()

pred_test_pd_df.head(2)

# Prepare Data for Regression Evaluation

Extracts only the predicted and actual values for evaluation, then converts the Spark DataFrame to an RDD and maps the values to tuples, which is required for use with `RegressionMetrics` from MLlib.

In [None]:
predictions_and_actuals = pred_test_df[['predicted_median_house_value',
                                        'median_house_value']]
                                    
predictions_and_actuals_rdd = predictions_and_actuals.rdd

predictions_and_actuals_rdd.take(2)

In [None]:
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)

predictions_and_actuals_rdd.take(2)

### Evaluate Model Performance

Use `RegressionMetrics` from `pyspark.mllib.evaluation` to calculate and display evaluation metrics for the linear regression model such as:
- Mean Squared Error (MSE)
- Root Mean Squared Error (RMSE)
- Mean Absolute Error (MAE)
- and R-squared (R²)

In [None]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''
Mean Squared Error:      {0}
Root Mean Squared Error: {1}
Mean Absolute Error:     {2}
R**2:                    {3}
'''.format(metrics.meanSquaredError,
           metrics.rootMeanSquaredError,
           metrics.meanAbsoluteError,
           metrics.r2
           )

print(s)

### Visualize Actual vs Predicted (Plotly)

This interactive scatter plot shows how close the model's predictions are to the actual values.

The trendline offers a visual indicator of the model’s fit.

In [None]:
import plotly.express as px

fig = px.scatter(
    pred_test_pd_df,
    x='median_house_value',
    y='predicted_median_house_value',
    title='Actual vs Predicted House Values',
    labels={'median_house_value': 'Actual', 'predicted_median_house_value': 'Predicted'},
    opacity=0.7,
    trendline='ols',
    color='predicted_median_house_value'
)

fig.update_layout(showlegend=False)
fig.show()


### Plotly Bar Chart of Evaluation Metrics

This chart provides a visual summary of the model’s error and accuracy metrics including R².

In [None]:
import plotly.graph_objects as go

fig = go.Figure(data=[
    go.Bar(name='Regression Metrics', x=['MSE', 'RMSE', 'MAE', 'R²'],
           y=[metrics.meanSquaredError, metrics.rootMeanSquaredError, metrics.meanAbsoluteError, metrics.r2],
           marker_color='indianred')
])

fig.update_layout(title='Model Evaluation Metrics',
                  yaxis_title='Score',
                  xaxis_title='Metric',
                  template='plotly_white')
fig.show()
