In [1]:
import pandas as pd
import numpy as np

This demo covers the basics of MLLib with an example Regression problem on the California housing dataset [found here](https://www.kaggle.com/camnugent/california-housing-prices).

References in the PySpark MLLib Library can be found at:
[pyspark mllib api](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)

Note for below: Spark integrates with libsvm format seamlessly which is a text format in which each line represents a labeled sparse feature vector using the following format: label index1:value1 index2:value2 via spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")

In [2]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing(as_frame=True)
df = pd.concat([housing['data'], housing['target']], axis=1)

# other formats
# df = spark.read.load("examples/src/main/resources/people.json", format="json")
# df = spark.read.load("examples/src/main/resources/people.csv", format="csv", sep=":", inferSchema="true", header="true")


SparkSession is the entrypoint for all Python based Spark programs, provides the functionality and access to RDD's. It is very similar to opening the Spark CMD line Interface and utilizing the SparkSession object automatically provided to you.

Parameters:<br> 
master => Refers to the main/master resource in the cluster depending on if your using yarn or mesos for resource management. We use local\[x\] below for standalone mode, the * represents number of partitions to use when creating an RDD/DataFrame/Dataset - better utilizes with # of cores u have. <br>
appname => name of app <br>
configs => any configs necessary (we add port/driver address to utilize the spark built in UI to see jobs) <br>
getOrCreate => creates app if not already active (make sure to spark.stop() at end of session <br>


In [3]:
from pyspark.sql import SparkSession

# Create PySpark SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("ML Lib Example") \
    .config("spark.driver.bindAddress","localhost") \
    .config("spark.ui.port","8080") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# spark.sparkContext.getConf().getAll()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/26 17:15:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Create PySpark DataFrame from Pandas

df = spark.createDataFrame(df) # can also use toDF() function
df.printSchema()
df.show()

root
 |-- MedInc: double (nullable = true)
 |-- HouseAge: double (nullable = true)
 |-- AveRooms: double (nullable = true)
 |-- AveBedrms: double (nullable = true)
 |-- Population: double (nullable = true)
 |-- AveOccup: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- MedHouseVal: double (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+------+--------+------------------+------------------+----------+------------------+--------+---------+-----------+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|MedHouseVal|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----------+
|8.3252|    41.0| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|      4.526|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|      3.585|
|7.2574|    52.0| 8.288135593220339| 1.073446327683616|     496.0|2.8022598870056497|   37.85|  -122.24|      3.521|
|5.6431|    52.0|5.8173515981735155|1.0730593607305936|     558.0| 2.547945205479452|   37.85|  -122.25|      3.413|
|3.8462|    52.0| 6.281853281853282|1.0810810810810811|     565.0|2.1814671814671813|   37.85|  -122.25|      3.422|
|4.0368|    52.0| 4.761658031088083|1.1036269430051813|     413.

                                                                                

Dense Vectors represent value arrays similar to the underlying type - numpy arrays (.toArray() gives you underlying numpy array).

In [5]:
from pyspark.ml.linalg import DenseVector

# x[:1-1] are the features and x[-1] is the target variable
data = df.rdd.map(lambda x: (x[-1], DenseVector(x[:-1]))) 

# creating a dataframe with columns ‘label’ (target variable) and #‘features’(dense vector of independent variables)
df2 = spark.createDataFrame(data, ["label", "features"])
df2.printSchema()
df2.show()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[8.3252,41.0,6.98...|
|3.585|[8.3014,21.0,6.23...|
|3.521|[7.2574,52.0,8.28...|
|3.413|[5.6431,52.0,5.81...|
|3.422|[3.8462,52.0,6.28...|
|2.697|[4.0368,52.0,4.76...|
|2.992|[3.6591,52.0,4.93...|
|2.414|[3.12,52.0,4.7975...|
|2.267|[2.0804,42.0,4.29...|
|2.611|[3.6912,52.0,4.97...|
|2.815|[3.2031,52.0,5.47...|
|2.418|[3.2705,52.0,4.77...|
|2.135|[3.075,52.0,5.322...|
|1.913|[2.6736,52.0,4.0,...|
|1.592|[1.9167,52.0,4.26...|
|  1.4|[2.125,50.0,4.242...|
|1.525|[2.775,52.0,5.939...|
|1.555|[2.1202,52.0,4.05...|
|1.587|[1.9911,50.0,5.34...|
|1.629|[2.6033,52.0,5.46...|
+-----+--------------------+
only showing top 20 rows



Scalers :<br> 
pyspark.ml.feature.MaxAbsScaler (Python class, in MaxAbsScaler)<br>
pyspark.ml.feature.MaxAbsScalerModel (Python class, in MaxAbsScalerModel)<br>
pyspark.ml.feature.MinMaxScaler (Python class, in MinMaxScaler)<br>
pyspark.ml.feature.MinMaxScalerModel (Python class, in MinMaxScalerModel)<br>
pyspark.ml.feature.RobustScaler (Python class, in RobustScaler)<br>

In [6]:
from pyspark.ml.feature import StandardScaler

ss = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler = ss.fit(df2)
scaled_df = scaler.transform(df2)
scaled_df.printSchema()
scaled_df.show()

                                                                                

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaled_features: vector (nullable = true)

+-----+--------------------+--------------------+
|label|            features|     scaled_features|
+-----+--------------------+--------------------+
|4.526|[8.3252,41.0,6.98...|[4.38209539419521...|
|3.585|[8.3014,21.0,6.23...|[4.36956790291790...|
|3.521|[7.2574,52.0,8.28...|[3.82004265529144...|
|3.413|[5.6431,52.0,5.81...|[2.97033134567133...|
|3.422|[3.8462,52.0,6.28...|[2.02450575423456...|
|2.697|[4.0368,52.0,4.76...|[2.12483095748897...|
|2.992|[3.6591,52.0,4.93...|[1.92602282910917...|
|2.414|[3.12,52.0,4.7975...|[1.64225936072275...|
|2.267|[2.0804,42.0,4.29...|[1.09505011988705...|
|2.611|[3.6912,52.0,4.97...|[1.94291915137814...|
|2.815|[3.2031,52.0,5.47...|[1.68600030715738...|
|2.418|[3.2705,52.0,4.77...|[1.72147732027043...|
|2.135|[3.075,52.0,5.322...|[1.61857292763540...|
|1.913|[2.6736,52.0,4.0,...|[1.40728994449626...|
|1.592|[1.9167,52.0,

In [7]:
# Splitting the data
train, test = scaled_df.randomSplit([.8,.2],seed=1)
("Train Shape: {}, Test Shape: {}").format((train.count(), len(train.columns)), (test.count(), len(test.columns)))

                                                                                

'Train Shape: (16432, 3), Test Shape: (4208, 3)'

In [8]:
from pyspark.ml.regression import LinearRegression

linearReg = LinearRegression(featuresCol = "scaled_features", labelCol = "label") # fit the model to the the training data
model = linearReg.fit(train) # make predictions on the test set

                                                                                

In [10]:
# Predictions
predictions = model.transform(test) # show the predicted values and the actual values
predictions.select("prediction" , "label").show()

+-----+--------------------+--------------------+------------------+
|label|            features|     scaled_features|        prediction|
+-----+--------------------+--------------------+------------------+
| 0.72|[2.57,52.0,4.2021...|[1.35275851187739...| 2.129807202999551|
|0.764|[2.5192,43.0,4.88...|[1.32601916074767...|1.8919277100759402|
|0.784|[1.1667,52.0,5.40...|[0.61411025517796...|1.4160110704872366|
|0.797|[2.1111,44.0,4.02...|[1.11120953090442...|1.7189627294627527|
|  0.8|[2.9063,42.0,4.59...|[1.52977512181683...|  2.11349438758441|
|0.824|[2.455,42.0,5.223...|[1.29222651620973...| 1.863731439139336|
|0.831|[1.4113,52.0,4.29...|[0.74285917813718...| 1.612754622773167|
|0.833|[2.3869,16.0,4.79...|[1.25638104747087...|1.5877001472916135|
|0.851|[1.6196,38.0,3.83...|[0.85250104507261...|1.5812261158514005|
|0.857|[1.75,47.0,4.0,0....|[0.92113906450795...| 1.621465991567753|
|0.875|[1.6098,52.0,5.02...|[0.84734266631137...|1.5651671736329646|
|0.889|[2.2467,46.0,5.94...|[1.182

In [11]:
# Evaluations
r2_test = model.evaluate(test).r2
print("R2 score on test set: ", r2_test)

R2 score on test set:  0.6122841723689958


In [12]:
from pyspark.ml.evaluation import RegressionEvaluator # RMSE on test set

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label")

rmse_test = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae_test = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
mse_test = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})

print("RMSE on test set: {},\nMAE on test set: {}, \nMSE on test set: {}".format(rmse_test, mae_test, mse_test))

RMSE on test set: 0.7132556397149299,
MAE on test set: 0.5259975078213824, 
MSE on test set: 0.5087336075851538


## Classification & Pipeline Example

Uses the iris dataset for classifying iris plants given petal length, petal width, sepal length, sepal width.

In [13]:
from pyspark.sql.types import *
from pyspark.ml import Pipeline 
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression 

iris_schema = StructType([
    StructField("sepal_length", DoubleType(), False),
    StructField("sepal_width", DoubleType(), False),
    StructField("petal_length", DoubleType(), False),
    StructField("petal_width", DoubleType(), False),
    StructField("class", StringType(), False)
])

iris_df = spark.read.schema(iris_schema).load("iris.csv", format="csv")
iris_df.show()

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|      class|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|
|         5.4|        3.7|         1.5|        0.2|Iris-setosa|
|         4.8|        3.4|         1.6|        0.2|Iris-setosa|
|         4.8|        3.0|         1.4| 

In [14]:
data = iris_df.rdd.map(lambda x: (x[-1], DenseVector(x[:-1]))) 
# creating a dataframe with columns ‘label’ (target variable) and #‘features’(dense vector of independent variables)
df2 = spark.createDataFrame(data, schema=['label', 'features'])

In [15]:
df2.show()

+-----------+-----------------+
|      label|         features|
+-----------+-----------------+
|Iris-setosa|[5.1,3.5,1.4,0.2]|
|Iris-setosa|[4.9,3.0,1.4,0.2]|
|Iris-setosa|[4.7,3.2,1.3,0.2]|
|Iris-setosa|[4.6,3.1,1.5,0.2]|
|Iris-setosa|[5.0,3.6,1.4,0.2]|
|Iris-setosa|[5.4,3.9,1.7,0.4]|
|Iris-setosa|[4.6,3.4,1.4,0.3]|
|Iris-setosa|[5.0,3.4,1.5,0.2]|
|Iris-setosa|[4.4,2.9,1.4,0.2]|
|Iris-setosa|[4.9,3.1,1.5,0.1]|
|Iris-setosa|[5.4,3.7,1.5,0.2]|
|Iris-setosa|[4.8,3.4,1.6,0.2]|
|Iris-setosa|[4.8,3.0,1.4,0.1]|
|Iris-setosa|[4.3,3.0,1.1,0.1]|
|Iris-setosa|[5.8,4.0,1.2,0.2]|
|Iris-setosa|[5.7,4.4,1.5,0.4]|
|Iris-setosa|[5.4,3.9,1.3,0.4]|
|Iris-setosa|[5.1,3.5,1.4,0.3]|
|Iris-setosa|[5.7,3.8,1.7,0.3]|
|Iris-setosa|[5.1,3.8,1.5,0.3]|
+-----------+-----------------+
only showing top 20 rows



In [16]:
label_encoder = StringIndexer(inputCol='label', outputCol='label_class')
scaling = StandardScaler(inputCol="features", outputCol="scaled_features")
multi_lr = LogisticRegression(maxIter=2, regParam=0.3, elasticNetParam=0.8, featuresCol = "scaled_features", labelCol = "label_class")

pipeline = Pipeline(stages=[label_encoder, scaling, multi_lr]) 

In [17]:
model = pipeline.fit(df2) 
prediction = model.transform(df2)

prediction.show()

+-----------+-----------------+-----------+--------------------+--------------------+--------------------+----------+
|      label|         features|label_class|     scaled_features|       rawPrediction|         probability|prediction|
+-----------+-----------------+-----------+--------------------+--------------------+--------------------+----------+
|Iris-setosa|[5.1,3.5,1.4,0.2]|        0.0|[6.15892840883878...|[0.50415620379330...|[0.55187226587420...|       0.0|
|Iris-setosa|[4.9,3.0,1.4,0.2]|        0.0|[5.9174018045706,...|[0.49309454006432...|[0.54960500935144...|       0.0|
|Iris-setosa|[4.7,3.2,1.3,0.2]|        0.0|[5.67587520030241...|[0.53860721388051...|[0.56183407326107...|       0.0|
|Iris-setosa|[4.6,3.1,1.5,0.2]|        0.0|[5.55511189816831...|[0.51191378171187...|[0.55443358078581...|       0.0|
|Iris-setosa|[5.0,3.6,1.4,0.2]|        0.0|[6.03816510670469...|[0.51939642107407...|[0.55587274798294...|       0.0|
|Iris-setosa|[5.4,3.9,1.7,0.4]|        0.0|[6.5212183152

In [18]:
spark.stop()