In [1]:
import numpy as np
import pandas as pd
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
# load the boston data set
from sklearn.datasets import load_boston
boston = load_boston()

In [2]:
# convert to a Pandas Data Frame
boston_pd = pd.DataFrame(data= np.c_[boston['data'],boston['target']],
                         columns= np.append(boston['feature_names'],'target')
                        ).sample(frac=1)
boston_pd.head(5)

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
209,0.43571,0.0,10.59,1.0,0.489,5.344,100.0,3.875,4.0,277.0,18.6,396.9,23.09,20.0
311,0.79041,0.0,9.9,0.0,0.544,6.122,52.8,2.6403,4.0,304.0,18.4,396.9,5.98,22.1
215,0.19802,0.0,10.59,0.0,0.489,6.182,42.4,3.9454,4.0,277.0,18.6,393.63,9.47,25.0
419,11.8123,0.0,18.1,0.0,0.718,6.824,76.5,1.794,24.0,666.0,20.2,48.45,22.74,8.4
331,0.05023,35.0,6.06,0.0,0.4379,5.706,28.4,6.6407,1.0,304.0,16.9,394.02,12.43,17.1


In [3]:
from sklearn.linear_model import LinearRegression
from scipy.stats import pearsonr
#split data
from sklearn.model_selection import train_test_split
# split into data and label arrays 
y = boston_pd['target']
X = boston_pd.drop(['target'], axis=1)
X_train,X_test, y_train,y_test = train_test_split(X,y,
                                                  train_size =0.80,
                                                  test_size=0.2,
                                                  shuffle=False,
                                                  random_state=0)


# train a classifier 
lr = LinearRegression()
model = lr.fit(X_train, y_train)

# make predictions
y_pred = model.predict(X_test)

# error metrics
r = pearsonr(y_pred, y_test)
mae = sum(abs(y_pred - y_test))/len(y_test)
print("R-sqaured: " + str(r[0]**2))
print("MAE: " + str(mae))

R-sqaured: 0.7522181881375871
MAE: 3.157415008267265


In [4]:
# Import PySpark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# convert to a Spark data frame
boston_sp = spark.createDataFrame(boston_pd)
display(boston_sp.take(5))

[Row(CRIM=0.43571, ZN=0.0, INDUS=10.59, CHAS=1.0, NOX=0.489, RM=5.344, AGE=100.0, DIS=3.875, RAD=4.0, TAX=277.0, PTRATIO=18.6, B=396.9, LSTAT=23.09, target=20.0),
 Row(CRIM=0.79041, ZN=0.0, INDUS=9.9, CHAS=0.0, NOX=0.544, RM=6.122, AGE=52.8, DIS=2.6403, RAD=4.0, TAX=304.0, PTRATIO=18.4, B=396.9, LSTAT=5.98, target=22.1),
 Row(CRIM=0.19802, ZN=0.0, INDUS=10.59, CHAS=0.0, NOX=0.489, RM=6.182, AGE=42.4, DIS=3.9454, RAD=4.0, TAX=277.0, PTRATIO=18.6, B=393.63, LSTAT=9.47, target=25.0),
 Row(CRIM=11.8123, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.718, RM=6.824, AGE=76.5, DIS=1.794, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=48.45, LSTAT=22.74, target=8.4),
 Row(CRIM=0.05023, ZN=35.0, INDUS=6.06, CHAS=0.0, NOX=0.4379, RM=5.706, AGE=28.4, DIS=6.6407, RAD=1.0, TAX=304.0, PTRATIO=16.9, B=394.02, LSTAT=12.43, target=17.1)]

In [5]:
from pyspark.ml.feature import VectorAssembler

# split into training and test spark data frames
boston_train = spark.createDataFrame(boston_pd[:400])
boston_test = spark.createDataFrame(boston_pd[400:])

# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols= boston_train.schema.names[:(boston_pd.shape[1] - 1)],  outputCol="features" )

In [6]:
boston_train = assembler.transform(boston_train).select('features', 'target') 
boston_test = assembler.transform(boston_test).select('features', 'target') 
display(boston_train.take(5))

[Row(features=DenseVector([0.4357, 0.0, 10.59, 1.0, 0.489, 5.344, 100.0, 3.875, 4.0, 277.0, 18.6, 396.9, 23.09]), target=20.0),
 Row(features=DenseVector([0.7904, 0.0, 9.9, 0.0, 0.544, 6.122, 52.8, 2.6403, 4.0, 304.0, 18.4, 396.9, 5.98]), target=22.1),
 Row(features=DenseVector([0.198, 0.0, 10.59, 0.0, 0.489, 6.182, 42.4, 3.9454, 4.0, 277.0, 18.6, 393.63, 9.47]), target=25.0),
 Row(features=DenseVector([11.8123, 0.0, 18.1, 0.0, 0.718, 6.824, 76.5, 1.794, 24.0, 666.0, 20.2, 48.45, 22.74]), target=8.4),
 Row(features=DenseVector([0.0502, 35.0, 6.06, 0.0, 0.4379, 5.706, 28.4, 6.6407, 1.0, 304.0, 16.9, 394.02, 12.43]), target=17.1)]

In [7]:
# linear regresion with Spark
from pyspark.ml.regression import LinearRegression

# linear regression 
lr = LinearRegression(maxIter=10, regParam=0.1, elasticNetParam=0.5, labelCol="target")

# Fit the model
model = lr.fit(boston_train)
boston_pred = model.transform(boston_test)

# calculate results 
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

R-sqaured: 0.7494373900890622


In [16]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

crossval = CrossValidator(estimator=LinearRegression(labelCol = "target"),  
                         estimatorParamMaps=ParamGridBuilder().addGrid(
                           LinearRegression.elasticNetParam, [0, 0.5, 1.0]).build(),
                         evaluator=RegressionEvaluator(
                           labelCol = "target", metricName = "r2"),
                         numFolds=10)

# cross validate the model and select the best fit
cvModel = crossval.fit(boston_train) 
model = cvModel.bestModel

# calculate results 
boston_pred = model.transform(boston_test)
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

R-sqaured: 0.7371479310298303


AttributeError: 'CrossValidatorModel' object has no attribute 'best_params_'

In [8]:
# sklearn version 
from sklearn.ensemble import RandomForestRegressor as RFR
from multiprocessing.pool import ThreadPool

# allow up to 5 concurrent threads
pool = ThreadPool(5)

# hyperparameters to test out (n_trees)
parameters = [ 10, 20, 50]

# define a function to train a RF model and return metrics 
def sklearn_random_forest(trees, X_train, X_test, y_train, y_test):

    # train a random forest regressor with the specified number of trees
    rf= RFR(n_estimators = trees)
    model = rf.fit(X_train, y_train)

    # make predictions
    y_pred = model.predict(X_test)
    r = pearsonr(y_pred, y_test)

    # return the number of trees, and the R value 
    return [trees, r[0]**2]  

# run the tasks 
pool.map(lambda trees: sklearn_random_forest(trees, X_train,
                                           X_test, y_train, y_test), parameters)


[[10, 0.9003868500739229], [20, 0.8968407869468661], [50, 0.8963772265804383]]

In [36]:
# spark version
from pyspark.ml.regression import RandomForestRegressor

# define a function to train a RF model and return metrics 
def mllib_random_forest(trees, boston_train, boston_test):

    # train a random forest regressor with the specified number of trees
    rf = RandomForestRegressor(numTrees = trees, labelCol="target")
    model = rf.fit(boston_train)

    # make predictions
    boston_pred = model.transform(boston_test)
    r = boston_pred.stat.corr("prediction", "target")

    # return the number of trees, and the R value 
    return [trees, r**2]
  
# run the tasks 
pool.map(lambda trees: mllib_random_forest(trees, boston_train, boston_test), parameters)

[[10, 0.8498726041898135], [20, 0.8284686226121398], [50, 0.8249331388684156]]

In [10]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

# setup the spark data frame as a table
boston_sp.createOrReplaceTempView("boston")

# add train/test label and expand the data set by 3x (each num trees parameter)
full_df = spark.sql("""
  select *
  from (
    select *, case when rand() < 0.8 then 1 else 0 end as training 
    from boston
  ) b
  cross join (
      select 11 as trees union all select 20 as trees union all select 50 as trees)
""")

schema = StructType([StructField('trees', LongType(), True),
                     StructField('r_squared', DoubleType(), True)])  

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def train_RF(boston_pd):
    trees = boston_pd['trees'].unique()[0]

    # get the train and test groups 
    boston_train = boston_pd[boston_pd['training'] == 1]
    boston_test = boston_pd[boston_pd['training'] == 0] 
        
    # create data and label groups 
    y_train = boston_train['target']
    X_train = boston_train.drop(['target'], axis=1)
    y_test = boston_test['target']
    X_test = boston_test.drop(['target'], axis=1)
   
    # train a classifier 
    rf= RFR(n_estimators = trees)
    model = rf.fit(X_train, y_train)

    # make predictions
    y_pred = model.predict(X_test)
    r = pearsonr(y_pred, y_test)
    
    # return the number of trees, and the R value 
    return pd.DataFrame({'trees': trees, 'r_squared': (r[0]**2)}, index=[0])
  
# use the Pandas UDF
results = full_df.groupby('trees').apply(train_RF)

# print the results 
print(results.take(3))



[Row(trees=11, r_squared=0.9266122436888398), Row(trees=20, r_squared=0.926764044487551), Row(trees=50, r_squared=0.9346669196092909)]
