# Part 0: Single Threaded

In [2]:
import numpy as np
import pandas as pd

# load the boston data set
from sklearn.datasets import load_boston
boston = load_boston()

# convert to a Pandas Data Frame
boston_pd = pd.DataFrame(data= np.c_[boston['data'],boston['target']], columns= np.append(boston['feature_names'], 'target')).sample(frac=1)
print(boston_pd.shape)
boston_pd.head(5)


In [3]:
from sklearn.linear_model import LinearRegression
from scipy.stats.stats import pearsonr

# split into data and label arrays 
y = boston_pd['target']
X = boston_pd.drop(['target'], axis=1)

# create training (~80%) and test data sets
X_train = X[:400]
X_test = X[400:]
y_train = y[:400]
y_test = y[400:]

# train a classifier 
lr = LinearRegression()
model = lr.fit(X_train, y_train)

# make predictions
y_pred = model.predict(X_test)

# error metrics
r = pearsonr(y_pred, y_test)
mae = sum(abs(y_pred - y_test))/len(y_test)
print("R-sqaured: " + str(r[0]**2))
print("MAE: " + str(mae))


# Part 1: Native Spark

In [5]:
# convert to a Spark data frame
boston_sp = spark.createDataFrame(boston_pd)
display(boston_sp.take(5))


CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
0.03537,34.0,6.09,0.0,0.433,6.59,40.4,5.4917,7.0,329.0,16.1,395.75,9.5,22.0
0.12329,0.0,10.01,0.0,0.547,5.913,92.9,2.3534,6.0,432.0,17.8,394.95,16.21,18.8
23.6482,0.0,18.1,0.0,0.671,6.38,96.2,1.3861,24.0,666.0,20.2,396.9,23.69,13.1
4.42228,0.0,18.1,0.0,0.584,6.003,94.5,2.5403,24.0,666.0,20.2,331.29,21.32,19.1
0.27957,0.0,9.69,0.0,0.585,5.926,42.6,2.3817,6.0,391.0,19.2,396.9,13.59,24.5


In [6]:
from pyspark.ml.feature import VectorAssembler

# split into training and test spark data frames
boston_train = spark.createDataFrame(boston_pd[:400])
boston_test = spark.createDataFrame(boston_pd[400:])

# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols= boston_train.schema.names[:(boston_pd.shape[1] - 1)],  outputCol="features" )
boston_train = assembler.transform(boston_train).select('features', 'target') 
boston_test = assembler.transform(boston_test).select('features', 'target') 

display(boston_train.take(5))

features,target
"List(1, 13, List(), List(0.03537, 34.0, 6.09, 0.0, 0.433, 6.59, 40.4, 5.4917, 7.0, 329.0, 16.1, 395.75, 9.5))",22.0
"List(1, 13, List(), List(0.12329, 0.0, 10.01, 0.0, 0.547, 5.913, 92.9, 2.3534, 6.0, 432.0, 17.8, 394.95, 16.21))",18.8
"List(1, 13, List(), List(23.6482, 0.0, 18.1, 0.0, 0.671, 6.38, 96.2, 1.3861, 24.0, 666.0, 20.2, 396.9, 23.69))",13.1
"List(1, 13, List(), List(4.42228, 0.0, 18.1, 0.0, 0.584, 6.003, 94.5, 2.5403, 24.0, 666.0, 20.2, 331.29, 21.32))",19.1
"List(1, 13, List(), List(0.27957, 0.0, 9.69, 0.0, 0.585, 5.926, 42.6, 2.3817, 6.0, 391.0, 19.2, 396.9, 13.59))",24.5


In [7]:
# linear regresion with Spark
from pyspark.ml.regression import LinearRegression

# linear regression 
lr = LinearRegression(maxIter=10, regParam=0.1, elasticNetParam=0.5, labelCol="target")

# Fit the model
model = lr.fit(boston_train)
boston_pred = model.transform(boston_test)

# calculate results 
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))


In [8]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

crossval = CrossValidator(estimator=LinearRegression(labelCol = "target"),  
                           estimatorParamMaps=ParamGridBuilder().addGrid(LinearRegression.elasticNetParam, [0, 0.5, 1.0]).build(),
                           evaluator=RegressionEvaluator(labelCol = "target", metricName = "r2"),
                           numFolds=10)

# cross validate the model and select the best fit
cvModel = crossval.fit(boston_train) 
model = cvModel.bestModel

# calculate results 
boston_pred = model.transform(boston_test)
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))


# Part 2: Thread Pools

In [10]:
# sklearn version 
from sklearn.ensemble import RandomForestRegressor as RFR
from multiprocessing.pool import ThreadPool

# allow up to 5 concurrent threads
pool = ThreadPool(5)

# hyperparameters to test out (n_trees)
parameters = [ 10, 20, 50]

# define a function to train a RF model and return metrics 
def sklearn_random_forest(trees, X_train, X_test, y_train, y_test):

    # train a random forest regressor with the specified number of trees
    rf= RFR(n_estimators = trees)
    model = rf.fit(X_train, y_train)

    # make predictions
    y_pred = model.predict(X_test)
    r = pearsonr(y_pred, y_test)

    # return the number of trees, and the R value 
    return [trees, r[0]**2]  

# run the tasks 
pool.map(lambda trees: sklearn_random_forest(trees, X_train, X_test, y_train, y_test), parameters)

  

In [11]:
# spark version
from pyspark.ml.regression import RandomForestRegressor

# define a function to train a RF model and return metrics 
def mllib_random_forest(trees, boston_train, boston_test):

    # train a random forest regressor with the specified number of trees
    rf = RandomForestRegressor(numTrees = trees, labelCol="target")
    model = rf.fit(boston_train)

    # make predictions
    boston_pred = model.transform(boston_test)
    r = boston_pred.stat.corr("prediction", "target")

    # return the number of trees, and the R value 
    return [trees, r**2]
  
# run the tasks 
pool.map(lambda trees: mllib_random_forest(trees, boston_train, boston_test), parameters)
  

# Part 3: Pandas UDF

In [13]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

# setup the spark data frame as a table
boston_sp.createOrReplaceTempView("boston")

# add train/test label and expand the data set by 3x (each num trees parameter)
full_df = spark.sql("""
  select *
  from (
    select *, case when rand() < 0.8 then 1 else 0 end as training 
    from boston
  ) b
  cross join (
      select 11 as trees union all select 20 as trees union all select 50 as trees)
""")

schema = StructType([StructField('trees', LongType(), True),
                     StructField('r_squared', DoubleType(), True)])  

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def train_RF(boston_pd):
    trees = boston_pd['trees'].unique()[0]

    # get the train and test groups 
    boston_train = boston_pd[boston_pd['training'] == 1]
    boston_test = boston_pd[boston_pd['training'] == 0] 
        
    # create data and label groups 
    y_train = boston_train['target']
    X_train = boston_train.drop(['target'], axis=1)
    y_test = boston_test['target']
    X_test = boston_test.drop(['target'], axis=1)
   
    # train a classifier 
    rf= RFR(n_estimators = trees)
    model = rf.fit(X_train, y_train)

    # make predictions
    y_pred = model.predict(X_test)
    r = pearsonr(y_pred, y_test)
    
    # return the number of trees, and the R value 
    return pd.DataFrame({'trees': trees, 'r_squared': (r[0]**2)}, index=[0])
  
# use the Pandas UDF
results = full_df.groupby('trees').apply(train_RF)

# print the results 
print(results.take(3))
