In [1]:
# !pip install pyspark
# !pip install numpy
# !pip install pandas
# !pip install sklearn
#!pip install pyarrow
#!pip install mlflow

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

import numpy as np
import pandas as pd
from sklearn.datasets import load_boston #load boston data
import pickle

In [3]:
sparksession = SparkSession.builder.appName("SparkRegression").getOrCreate()

22/09/25 21:44:25 WARN Utils: Your hostname, Marios-MacBook-Pro-M1.local resolves to a loopback address: 127.0.0.1, but we couldn't find any external IP address!
22/09/25 21:44:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/25 21:44:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/25 21:44:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# load data
boston_df = load_boston ()


    The Boston housing prices dataset has an ethical problem. You can refer to
    the documentation of this function for further details.

    The scikit-learn maintainers therefore strongly discourage the use of this
    dataset unless the purpose of the code is to study and educate about
    ethical issues in data science and machine learning.

    In this special case, you can fetch the dataset from the original
    source::

        import pandas as pd
        import numpy as np


        data_url = "http://lib.stat.cmu.edu/datasets/boston"
        raw_df = pd.read_csv(data_url, sep="\s+", skiprows=22, header=None)
        data = np.hstack([raw_df.values[::2, :], raw_df.values[1::2, :2]])
        target = raw_df.values[1::2, 2]

    Alternative datasets include the California housing dataset (i.e.
    :func:`~sklearn.datasets.fetch_california_housing`) and the Ames housing
    dataset. You can load the datasets as follows::

        from sklearn.datasets import fetch_california_h

In [5]:

boston_df.keys()

dict_keys(['data', 'target', 'feature_names', 'DESCR', 'filename', 'data_module'])

In [6]:
#Check the description of the dataset
print(boston_df.DESCR)

.. _boston_dataset:

Boston house prices dataset
---------------------------

**Data Set Characteristics:**  

    :Number of Instances: 506 

    :Number of Attributes: 13 numeric/categorical predictive. Median Value (attribute 14) is usually the target.

    :Attribute Information (in order):
        - CRIM     per capita crime rate by town
        - ZN       proportion of residential land zoned for lots over 25,000 sq.ft.
        - INDUS    proportion of non-retail business acres per town
        - CHAS     Charles River dummy variable (= 1 if tract bounds river; 0 otherwise)
        - NOX      nitric oxides concentration (parts per 10 million)
        - RM       average number of rooms per dwelling
        - AGE      proportion of owner-occupied units built prior to 1940
        - DIS      weighted distances to five Boston employment centres
        - RAD      index of accessibility to radial highways
        - TAX      full-value property-tax rate per $10,000
        - PTRATIO  pu

In [7]:
# Gathering the features of the dataset with column names
df = pd.DataFrame(boston_df.data,columns=boston_df.feature_names)
#Creating a new column for the Target variable
df['PRICE'] = boston_df.target

df.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,PRICE
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98,24.0
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14,21.6
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03,34.7
3,0.03237,0.0,2.18,0.0,0.458,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94,33.4
4,0.06905,0.0,2.18,0.0,0.458,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33,36.2


In [8]:
#Converting DataFrame into a SparkDataframe
spark_df = sparksession.createDataFrame(df)

In [9]:
#Viewing the first 10 records of the SparkDataframe
spark_df.show(5)


[Stage 0:>                                                          (0 + 1) / 1]

+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|PRICE|
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+
|0.00632|18.0| 2.31| 0.0|0.538|6.575|65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98| 24.0|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421|78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185|61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03| 34.7|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998|45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147|54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33| 36.2|
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+
only showing top 5 rows




                                                                                

In [10]:
#Understading the description of the Spark DataFrame
spark_df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- PRICE: double (nullable = true)



In [11]:
#Quick summary of the stats of the data
spark_df.describe().show()

                                                                                

+-------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+
|summary|              CRIM|                ZN|             INDUS|               CHAS|                NOX|                RM|               AGE|               DIS|              RAD|               TAX|          PTRATIO|                B|             LSTAT|             PRICE|
+-------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+
|  count|               506|               506|               506|                506|                506|               506|               506|               506|            

## Data Checks and Cleaning

In [12]:
from pyspark.sql.functions import *

col_null_cnt_df =  spark_df.select([count(when(col(c).isNull(),c)).alias(c) for c in spark_df.columns])

print(col_null_cnt_df)

DataFrame[CRIM: bigint, ZN: bigint, INDUS: bigint, CHAS: bigint, NOX: bigint, RM: bigint, AGE: bigint, DIS: bigint, RAD: bigint, TAX: bigint, PTRATIO: bigint, B: bigint, LSTAT: bigint, PRICE: bigint]


## Data Transformation into Vectors

In [13]:
# Combining the features into a single Vector 
# In Sci-kit Learn each feature has its own column, and we don’t need to worry about applying 
# any more transformations to the dataframe. However, in MLlib, we cannot have individual columns for each feature. 
# MLlib requires all values of features in a row to be contained in an array, 
# and they all sit in one column called ‘features’.

# ********** IMPORTANT NOTE: Do not include any categorical column names whose values 
#     have not been StringIndexed or OneHotEncoded  **************

from pyspark.ml.feature import VectorAssembler

In [14]:
# Creating an instance of the VectorAssembler
assembler = VectorAssembler(inputCols=['CRIM','ZN','INDUS','CHAS',
                                       'NOX','RM','AGE','DIS','RAD',
                                       'TAX','PTRATIO','B','LSTAT'],
                            outputCol='features')

In [15]:
# transforming our spark dataframe
sparkdf_vector_out = assembler.transform(spark_df)
# Viewing the first 5 rows
sparkdf_vector_out.show(5)

22/09/25 21:44:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+--------------------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|PRICE|            features|
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+--------------------+
|0.00632|18.0| 2.31| 0.0|0.538|6.575|65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98| 24.0|[0.00632,18.0,2.3...|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421|78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6|[0.02731,0.0,7.07...|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185|61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03| 34.7|[0.02729,0.0,7.07...|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998|45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|[0.03237,0.0,2.18...|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147|54.2|6.0622|3.0|222.0|   18

In [16]:
# While we could say something like this...
sparkdf_final_df = sparkdf_vector_out.select(['features', 'PRICE'])
# I want to rename the 'PRICE' column to 'label' as well
sparkdf_final_df = sparkdf_vector_out.select(['features', col('PRICE').alias('label')])
sparkdf_final_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.00632,18.0,2.3...| 24.0|
|[0.02731,0.0,7.07...| 21.6|
|[0.02729,0.0,7.07...| 34.7|
|[0.03237,0.0,2.18...| 33.4|
|[0.06905,0.0,2.18...| 36.2|
+--------------------+-----+
only showing top 5 rows



### Saving the Scaler Model

In [18]:
#Correlation between the features (Vectored) 
Correlation.corr(sparkdf_final_df,'features').collect()

22/09/25 21:44:43 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/09/25 21:44:43 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

[Row(pearson(features)=DenseMatrix(13, 13, [1.0, -0.2005, 0.4066, -0.0559, 0.421, -0.2192, 0.3527, -0.3797, ..., -0.6138, 0.6023, -0.497, 0.4887, 0.544, 0.374, -0.3661, 1.0], False))]

## Creating the Train and Test Datasets

In [19]:
# Creating our train and test sets
train, test = spark_df.randomSplit([0.7, 0.3], seed=42)

In [26]:
train.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- PRICE: double (nullable = true)



## Regression Model

In [27]:
from pyspark.ml.regression import LinearRegression

# creating an instance of a linear regression model
lr_model = LinearRegression(featuresCol='features',labelCol='PRICE')

### Creating Pipeline

In [28]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, lr_model])
pipeline

Pipeline_128c20ec4d94

In [29]:
# fitting the model to the train set
reg_model_pipeline = pipeline.fit(train)

22/09/25 21:44:45 WARN Instrumentation: [c9c55d35] regParam is zero, which might cause numerical instability and overfitting.
22/09/25 21:44:45 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


## Test Results

In [32]:
test_results = reg_model_pipeline.transform(test)

In [65]:
test_results.select('features','PRICE','prediction').show()

+--------------------+-----+------------------+
|            features|PRICE|        prediction|
+--------------------+-----+------------------+
|[0.0136,75.0,4.0,...| 18.9|14.742108698462062|
|[0.02731,0.0,7.07...| 21.6|24.615392886028477|
|[0.02985,0.0,2.18...| 28.7| 24.63571998287687|
|[0.03237,0.0,2.18...| 33.4|28.144303164575852|
|[0.0536,21.0,5.64...| 25.0| 27.53311279936195|
|[0.06417,0.0,5.96...| 18.9| 23.27510518126308|
|[0.06905,0.0,2.18...| 36.2| 27.52970561388826|
|[0.09378,12.5,7.8...| 21.7|21.119973433162617|
|[0.10328,25.0,5.1...| 19.6| 20.47913393902804|
|[0.11747,12.5,7.8...| 18.9|21.228024413391385|
|[0.12269,0.0,6.91...| 21.2|22.932262951843672|
|[0.14932,25.0,5.1...| 18.7| 17.19749628558582|
|[0.15445,25.0,5.1...| 23.3| 21.30625186093249|
|[0.15936,0.0,6.91...| 24.7|24.908943804132363|
|[0.17142,0.0,6.91...| 19.3| 22.10847644062102|
|[0.17505,0.0,5.96...| 24.7|22.826851138563015|
|[0.18836,0.0,6.91...| 20.0| 20.64422439844359|
|[0.22927,0.0,6.91...| 16.6|17.940530654

In [64]:
test.show(10)

+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|PRICE|
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+
| 0.0136|75.0|  4.0| 0.0| 0.41|5.888|47.6|7.3197|3.0|469.0|   21.1| 396.9| 14.8| 18.9|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421|78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43|58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21| 28.7|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998|45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|
| 0.0536|21.0| 5.64| 0.0|0.439|6.511|21.1|6.8147|4.0|243.0|   16.8| 396.9| 5.28| 25.0|
|0.06417| 0.0| 5.96| 0.0|0.499|5.933|68.2|3.3603|5.0|279.0|   19.2| 396.9| 9.68| 18.9|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147|54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33| 36.2|
|0.09378|12.5| 7.87| 0.0|0.524|5.889|39.0|5.4509|5.0|311.0|   15.2| 390.5|15.71| 21.7|
|0.10328|25.0| 5.13| 0.0|0.453|5.927|47.2| 

### Model Serve - Using MLflow

In [None]:
###
## Used the wrapper from !pip install "https://github.com/amesar/mlflow-tools" due to the error from the Pyspark 

# ************ PROBLEM *************
# Currently you cannot load a SparkML model as a UDF with MLflow due to named column bug.
# Error message: pyspark.sql.utils.IllegalArgumentException: sepal_length does not exist. Available: 0, 1, 2, 3
# mlflow git issue 4131 - Spark UDF throws IllegalArgumentException

# ***********  SOLUTION **************
# There is a workaround that leverages a custom PythonModel wrapper.
# Wrapper: sparkml_udf_workaround.py
# Usage: test_sparkml_udf_workaround.py
#


In [35]:
import mlflow

#Import python script / library 
#Original link from "https://github.com/amesar/mlflow-tools"

import sparkml_udf_workaround
from sparkml_udf_workaround import log_udf_model

In [57]:
model_name = "sparkML-regmodel_pipeline"
columns = ['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','B','LSTAT']

with mlflow.start_run() as run: 
    mlflow.spark.log_model(reg_model_pipeline, model_name)
    log_udf_model(run.info.run_id, model_name, columns)


                                                                                

### Model Serving

In [58]:
#Feature names
columns = ['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','B','LSTAT']

#Get WRAPPER model name and model path - Wrapper (log_udf_model) created a copy of the main 
# directory prefrixed with 'udf'

model_name = f"udf-{model_name}"
model_path = f"runs:/{run.info.run_id}/{model_name}"
print("model_path:",model_path)

model_path: runs:/bba8ec7412dc43f49cba60f10ba9153b/udf-sparkML-regmodel_pipeline


In [60]:
boston_udf = mlflow.pyfunc.spark_udf(spark=sparksession, model_uri=model_path)
boston_udf



<function mlflow.pyfunc.spark_udf.<locals>.udf(iterator: Iterator[Tuple[Union[pandas.core.series.Series, pandas.core.frame.DataFrame], ...]]) -> Iterator[pandas.core.series.Series]>

In [61]:
predictions = spark_df.withColumn("prediction", boston_udf(*columns))
print("Predictions:")
predictions.show()

Predictions:


22/09/25 21:59:52 WARN Utils: Your hostname, Marios-MacBook-Pro-M1.local resolves to a loopback address: 127.0.0.1, but we couldn't find any external IP address!
22/09/25 21:59:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/25 21:59:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/25 21:59:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/09/25 21:59:54 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
2022/09/25 21:59:56 INFO mlflow.spark: File '/var/folders/56/tj7c5hxx3cv__yr0f0359m3w0000gn/T/tmp6kodcdit/models/tmp7lphatr1/udf-sparkML-regmodel_pipeline/artifacts/sparkML-regmodel_pipeline/sparkml' is already on DFS, copy is not necessary.
22/09/25 22:00:05 WARN package: Trunc

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+------------------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|PRICE|        prediction|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+------------------+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98| 24.0|29.242518738731157|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6|24.615392886028477|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03| 34.7| 30.27746982633743|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|28.144303164575852|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33| 36.2| 27.52970561388826|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21| 28.7| 24.63571998287687|
|0.08829|12.5| 7.87| 0.0|0.524|6.012|


                                                                                

In [None]:
mlflow.end_run()