In [16]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession \
  .builder \
  .appName("multi key test") \
  .master("local[16]") \
  .getOrCreate()

sc= spark.sparkContext

In [4]:
# Sum of the first 100 Whole numbers
rdd = sc.parallelize(range(10000000000 +1))
rdd.sum()


50000000005000000000

In [10]:
import numpy as np
import pandas as pd

# load the boston data set
from sklearn.datasets import load_boston
boston = load_boston()

# convert to a Pandas Data Frame
boston_pd = pd.DataFrame(data= np.c_[boston['data'],boston['target']], 
              columns= np.append(boston['feature_names'], 'target')).sample(frac=1)
print(boston_pd.shape)
boston_pd.head(5)

(506, 14)



    The Boston housing prices dataset has an ethical problem. You can refer to
    the documentation of this function for further details.

    The scikit-learn maintainers therefore strongly discourage the use of this
    dataset unless the purpose of the code is to study and educate about
    ethical issues in data science and machine learning.

    In this special case, you can fetch the dataset from the original
    source::

        import pandas as pd
        import numpy as np

        data_url = "http://lib.stat.cmu.edu/datasets/boston"
        raw_df = pd.read_csv(data_url, sep="\s+", skiprows=22, header=None)
        data = np.hstack([raw_df.values[::2, :], raw_df.values[1::2, :2]])
        target = raw_df.values[1::2, 2]

    Alternative datasets include the California housing dataset (i.e.
    :func:`~sklearn.datasets.fetch_california_housing`) and the Ames housing
    dataset. You can load the datasets as follows::

        from sklearn.datasets import fetch_california_ho

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
505,0.04741,0.0,11.93,0.0,0.573,6.03,80.8,2.505,1.0,273.0,21.0,396.9,7.88,11.9
307,0.04932,33.0,2.18,0.0,0.472,6.849,70.3,3.1827,7.0,222.0,18.4,396.9,7.53,28.2
386,24.3938,0.0,18.1,0.0,0.7,4.652,100.0,1.4672,24.0,666.0,20.2,396.9,28.28,10.5
45,0.17142,0.0,6.91,0.0,0.448,5.682,33.8,5.1004,3.0,233.0,17.9,396.9,10.21,19.3
326,0.30347,0.0,7.38,0.0,0.493,6.312,28.9,5.4159,5.0,287.0,19.6,396.9,6.15,23.0


In [13]:
from sklearn.linear_model import LinearRegression
from scipy.stats import pearsonr

# split into data and label arrays 
y = boston_pd['target']
X = boston_pd.drop(['target'], axis=1)

# create training (~80%) and test data sets
X_train = X[:400]
X_test = X[400:]
y_train = y[:400]
y_test = y[400:]

# train a classifier 
lr = LinearRegression()
model = lr.fit(X_train, y_train)

# make predictions
y_pred = model.predict(X_test)

# error metrics
r = pearsonr(y_pred, y_test)
mae = sum(abs(y_pred - y_test))/len(y_test)
print("R-sqaured: " + str(r[0]**2))
print("MAE: " + str(mae))

R-sqaured: 0.7230573095749625
MAE: 3.6288318442391945


In [17]:
from pyspark.ml.feature import VectorAssembler

# convert to a Spark data frame
boston_sp = spark.createDataFrame(boston_pd)
display(boston_sp.take(5))

# split into training and test spark data frames
boston_train = spark.createDataFrame(boston_pd[:400])
boston_test = spark.createDataFrame(boston_pd[400:])

# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols= boston_train.schema.names[:(boston_pd.shape[1] - 1)],  
                                                                        outputCol="features" )
boston_train = assembler.transform(boston_train).select('features', 'target') 
boston_test = assembler.transform(boston_test).select('features', 'target') 

display(boston_train.take(5))

[Row(CRIM=0.04741, ZN=0.0, INDUS=11.93, CHAS=0.0, NOX=0.573, RM=6.03, AGE=80.8, DIS=2.505, RAD=1.0, TAX=273.0, PTRATIO=21.0, B=396.9, LSTAT=7.88, target=11.9),
 Row(CRIM=0.04932, ZN=33.0, INDUS=2.18, CHAS=0.0, NOX=0.472, RM=6.849, AGE=70.3, DIS=3.1827, RAD=7.0, TAX=222.0, PTRATIO=18.4, B=396.9, LSTAT=7.53, target=28.2),
 Row(CRIM=24.3938, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.7, RM=4.652, AGE=100.0, DIS=1.4672, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=396.9, LSTAT=28.28, target=10.5),
 Row(CRIM=0.17142, ZN=0.0, INDUS=6.91, CHAS=0.0, NOX=0.448, RM=5.682, AGE=33.8, DIS=5.1004, RAD=3.0, TAX=233.0, PTRATIO=17.9, B=396.9, LSTAT=10.21, target=19.3),
 Row(CRIM=0.30347, ZN=0.0, INDUS=7.38, CHAS=0.0, NOX=0.493, RM=6.312, AGE=28.9, DIS=5.4159, RAD=5.0, TAX=287.0, PTRATIO=19.6, B=396.9, LSTAT=6.15, target=23.0)]

[Row(features=DenseVector([0.0474, 0.0, 11.93, 0.0, 0.573, 6.03, 80.8, 2.505, 1.0, 273.0, 21.0, 396.9, 7.88]), target=11.9),
 Row(features=DenseVector([0.0493, 33.0, 2.18, 0.0, 0.472, 6.849, 70.3, 3.1827, 7.0, 222.0, 18.4, 396.9, 7.53]), target=28.2),
 Row(features=DenseVector([24.3938, 0.0, 18.1, 0.0, 0.7, 4.652, 100.0, 1.4672, 24.0, 666.0, 20.2, 396.9, 28.28]), target=10.5),
 Row(features=DenseVector([0.1714, 0.0, 6.91, 0.0, 0.448, 5.682, 33.8, 5.1004, 3.0, 233.0, 17.9, 396.9, 10.21]), target=19.3),
 Row(features=DenseVector([0.3035, 0.0, 7.38, 0.0, 0.493, 6.312, 28.9, 5.4159, 5.0, 287.0, 19.6, 396.9, 6.15]), target=23.0)]

In [18]:
# linear regresion with Spark
from pyspark.ml.regression import LinearRegression

# linear regression 
lr = LinearRegression(maxIter=10, regParam=0.1, 
                      elasticNetParam=0.5, labelCol="target")

# Fit the model
model = lr.fit(boston_train)
boston_pred = model.transform(boston_test)

# calculate results 
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

R-sqaured: 0.7115010821501254


In [19]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

crossval = CrossValidator(estimator=LinearRegression(labelCol = "target"),  
                         estimatorParamMaps=ParamGridBuilder().addGrid(
                           LinearRegression.elasticNetParam, [0, 0.5, 1.0]).build(),
                         evaluator=RegressionEvaluator(
                           labelCol = "target", metricName = "r2"),
                         numFolds=10)

# cross validate the model and select the best fit
cvModel = crossval.fit(boston_train) 
model = cvModel.bestModel

# calculate results 
boston_pred = model.transform(boston_test)
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

R-sqaured: 0.7230573095749673


In [20]:
# sklearn version 
from sklearn.ensemble import RandomForestRegressor as RFR
from multiprocessing.pool import ThreadPool

# allow up to 5 concurrent threads
pool = ThreadPool(5)

# hyperparameters to test out (n_trees)
parameters = [ 10, 20, 50]

# define a function to train a RF model and return metrics 
def sklearn_random_forest(trees, X_train, X_test, y_train, y_test):

    # train a random forest regressor with the specified number of trees
    rf= RFR(n_estimators = trees)
    model = rf.fit(X_train, y_train)

    # make predictions
    y_pred = model.predict(X_test)
    r = pearsonr(y_pred, y_test)

    # return the number of trees, and the R value 
    return [trees, r[0]**2]  

# run the tasks 
pool.map(lambda trees: sklearn_random_forest(trees, X_train,
                                           X_test, y_train, y_test), parameters)


[[10, 0.8539412099834258], [20, 0.8533219084658166], [50, 0.8720090447050126]]

In [21]:
# spark version
from pyspark.ml.regression import RandomForestRegressor

# define a function to train a RF model and return metrics 
def mllib_random_forest(trees, boston_train, boston_test):

    # train a random forest regressor with the specified number of trees
    rf = RandomForestRegressor(numTrees = trees, labelCol="target")
    model = rf.fit(boston_train)

    # make predictions
    boston_pred = model.transform(boston_test)
    r = boston_pred.stat.corr("prediction", "target")

    # return the number of trees, and the R value 
    return [trees, r**2]
  
# run the tasks 
pool.map(lambda trees: mllib_random_forest(trees, boston_train, boston_test), parameters)
 

[[10, 0.8436836892201441], [20, 0.8911474300093474], [50, 0.8769789726473258]]

In [37]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

# setup the spark data frame as a table
boston_sp.createOrReplaceTempView("boston")

# add train/test label and expand the data set by 3x (each num trees parameter)
full_df = spark.sql("""
  select *
  from (
    select *, case when rand() < 0.8 then 1 else 0 end as training 
    from boston
  ) b
  cross join (
      select 11 as trees union all select 20 as trees union all select 50 as trees)
""")

schema = StructType([StructField('trees', LongType(), True),
                     StructField('r_squared', DoubleType(), True)])  

#@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def train_RF(boston_pd):
    trees = boston_pd['trees'].unique()[0]

    # get the train and test groups 
    boston_train = boston_pd[boston_pd['training'] == 1]
    boston_test = boston_pd[boston_pd['training'] == 0] 
        
    # create data and label groups 
    y_train = boston_train['target']
    X_train = boston_train.drop(['target'], axis=1)
    y_test = boston_test['target']
    X_test = boston_test.drop(['target'], axis=1)
   
    # train a classifier 
    rf= RFR(n_estimators = trees)
    model = rf.fit(X_train, y_train)

    # make predictions
    y_pred = model.predict(X_test)
    r = pearsonr(y_pred, y_test)
    
    # return the number of trees, and the R value 
    return pd.DataFrame({'trees': trees, 'r_squared': (r[0]**2)}, index=[0])
  
# use the Pandas UDF
# results = full_df.groupby('trees').apply(train_RF)

# print the results 
# print(results.take(3))

full_df.groupby('trees').applyInPandas(
    train_RF, schema="trees long, r_squared double").show()  

+-----+------------------+
|trees|         r_squared|
+-----+------------------+
|   11|0.8426812014785933|
|   20|0.8553074478093554|
|   50|0.8650469974108973|
+-----+------------------+



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 58162)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 235, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 239, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/

In [26]:
import pandas as pd  
from pyspark.sql.functions import pandas_udf, ceil
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))  
def normalize(pdf):
    v = pdf.v
    return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").applyInPandas(
    normalize, schema="id long, v double").show()  

+---+-------------------+
| id|                  v|
+---+-------------------+
|  1|-0.7071067811865475|
|  1| 0.7071067811865475|
|  2|-0.8320502943378437|
|  2|-0.2773500981126146|
|  2| 1.1094003924504583|
+---+-------------------+

