In [None]:
# notebook code : 
# https://towardsdatascience.com/3-methods-for-parallelization-in-spark-6a1a4333b473

In [0]:
# single thread
import numpy as np
import pandas as pd

# load the boston data set
from sklearn.datasets import load_boston
boston = load_boston()

# convert to a Pandas Data Frame
boston_pd = pd.DataFrame(data= np.c_[boston['data'],boston['target']], 
              columns= np.append(boston['feature_names'], 'target')).sample(frac=1)
print(boston_pd.shape)
boston_pd.head(5)


Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
170,1.20742,0.0,19.58,0.0,0.605,5.875,94.6,2.4259,5.0,403.0,14.7,292.29,14.43,17.4
56,0.02055,85.0,0.74,0.0,0.41,6.383,35.7,9.1876,2.0,313.0,17.3,396.9,5.77,24.7
407,11.9511,0.0,18.1,0.0,0.659,5.608,100.0,1.2852,24.0,666.0,20.2,332.09,12.13,27.9
131,1.19294,0.0,21.89,0.0,0.624,6.326,97.7,2.271,4.0,437.0,21.2,396.9,12.26,19.6
92,0.04203,28.0,15.04,0.0,0.464,6.442,53.6,3.6659,4.0,270.0,18.2,395.01,8.16,22.9


In [0]:
# split dataset and run LinearRegression 
from sklearn.linear_model import LinearRegression
from scipy.stats.stats import pearsonr

# split into data and label arrays 
y = boston_pd['target']
X = boston_pd.drop(['target'], axis=1)

# create training (~80%) and test data sets
X_train = X[:400]
X_test = X[400:]
y_train = y[:400]
y_test = y[400:]

# train a classifier 
lr = LinearRegression()
model = lr.fit(X_train, y_train)

# make predictions
y_pred = model.predict(X_test)

# error metrics
r = pearsonr(y_pred, y_test)
mae = sum(abs(y_pred - y_test))/len(y_test)
print("R-sqaured: " + str(r[0]**2))
print("MAE: " + str(mae))

In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder \
    .master("local") \
    .appName("Spark ML") \
    .getOrCreate()

sc = SparkContext.getOrCreate()

In [0]:
# Native spark
from pyspark.ml.feature import VectorAssembler

# convert to a Spark data frame
boston_sp = spark.createDataFrame(boston_pd)
display(boston_sp.take(5))

# split into training and test spark data frames
boston_train = spark.createDataFrame(boston_pd[:400])
boston_test = spark.createDataFrame(boston_pd[400:])

# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols= boston_train.schema.names[:(boston_pd.shape[1] - 1)],  
                                                                        outputCol="features" )
boston_train = assembler.transform(boston_train).select('features', 'target') 
boston_test = assembler.transform(boston_test).select('features', 'target') 

display(boston_train.take(5))


CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
1.20742,0.0,19.58,0.0,0.605,5.875,94.6,2.4259,5.0,403.0,14.7,292.29,14.43,17.4
0.02055,85.0,0.74,0.0,0.41,6.383,35.7,9.1876,2.0,313.0,17.3,396.9,5.77,24.7
11.9511,0.0,18.1,0.0,0.659,5.608,100.0,1.2852,24.0,666.0,20.2,332.09,12.13,27.9
1.19294,0.0,21.89,0.0,0.624,6.326,97.7,2.271,4.0,437.0,21.2,396.9,12.26,19.6
0.04203,28.0,15.04,0.0,0.464,6.442,53.6,3.6659,4.0,270.0,18.2,395.01,8.16,22.9


features,target
"Map(vectorType -> dense, length -> 13, values -> List(1.20742, 0.0, 19.58, 0.0, 0.605, 5.875, 94.6, 2.4259, 5.0, 403.0, 14.7, 292.29, 14.43))",17.4
"Map(vectorType -> dense, length -> 13, values -> List(0.02055, 85.0, 0.74, 0.0, 0.41, 6.383, 35.7, 9.1876, 2.0, 313.0, 17.3, 396.9, 5.77))",24.7
"Map(vectorType -> dense, length -> 13, values -> List(11.9511, 0.0, 18.1, 0.0, 0.659, 5.608, 100.0, 1.2852, 24.0, 666.0, 20.2, 332.09, 12.13))",27.9
"Map(vectorType -> dense, length -> 13, values -> List(1.19294, 0.0, 21.89, 0.0, 0.624, 6.326, 97.7, 2.271, 4.0, 437.0, 21.2, 396.9, 12.26))",19.6
"Map(vectorType -> dense, length -> 13, values -> List(0.04203, 28.0, 15.04, 0.0, 0.464, 6.442, 53.6, 3.6659, 4.0, 270.0, 18.2, 395.01, 8.16))",22.9


In [0]:
# linear regresion with Spark
from pyspark.ml.regression import LinearRegression

# linear regression 
lr = LinearRegression(maxIter=100, regParam=0.1, 
                      elasticNetParam=0.5, labelCol="target")

# Fit the model
model = lr.fit(boston_train)
boston_pred = model.transform(boston_test)

# calculate results 
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

In [0]:
# linear regresion with Spark
from pyspark.ml.regression import LinearRegression

# linear regression 
lr = LinearRegression(maxIter=100, regParam=0.1, 
                      elasticNetParam=0.5, labelCol="target")

# Fit the model
model = lr.fit(boston_train)
boston_pred = model.transform(boston_test)

# calculate results 
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

crossval = CrossValidator(estimator=LinearRegression(labelCol = "target"),  
                         estimatorParamMaps=ParamGridBuilder().addGrid(
                           LinearRegression.elasticNetParam, [0, 0.5, 1.0]).build(),
                         evaluator=RegressionEvaluator(
                           labelCol = "target", metricName = "r2"),
                         numFolds=10)

# cross validate the model and select the best fit
cvModel = crossval.fit(boston_train) 
model = cvModel.bestModel

# calculate results 
boston_pred = model.transform(boston_test)
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))