# Spark with Python

05-08-2023
by Thomas Johnson III

This notebook is to practice the usage of Pyspark for personal coding development. A lot of what is seen here will be extensions of existing work or methods for the loaded Python libraries that already exist.

In [1]:
# Loading in necessary modules and functions.
import numpy as np
import pandas as pd
from sklearn.datasets import make_regression
from pyspark.context import SparkContext
from pyspark.mllib.stat import Statistics
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, GBTRegressor, RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import PCA
import time

In [2]:
# Generate some regression data to be used for fitting and evaluation.
def gen_data():
    np.random.seed(12) # Set a random seed
    # Obtain the simulated data for regression.
    dat_x, dat_y = make_regression(n_samples = 5000, # 5000 cases
                                                   n_features = 25, # 25 variables, by default 10 are infromative.
                                                   noise = 1)
    x_names = []
    for i in range(25):
        x_names.append("X" + str(i+1)) #Get names of X columns
    # Dataframe conversion occurring
    dat_x = pd.DataFrame(dat_x, columns = x_names) 
    dat_y = pd.DataFrame(dat_y, columns = ["y"])
    return(dat_x, dat_y) # Return X and y

Now we obtain the $\boldsymbol{X}$ and $\boldsymbol{y}$ data.

In [3]:
X_dat, y_dat = gen_data() 

Combine $\boldsymbol{X}$ and \boldsymbol{y} into one dataframe. Starting in pandas then moving the dataframe into pyspark.

In [4]:
pop_data = pd.concat([X_dat,y_dat], axis = 1)

Now that we have the population data, we can start loading into Pyspark.

In [5]:
# Set up the Spark settings with 4Gb of memory. Set up a spark session to work in.
# SparkContext.setSystemProperty("spark.executor.memory", "4g").setSystemProperty("spark.cores.max", "6")
sp_context = SparkSession.builder.master("local[1]").config("spark.executor.memory", "8g").config("spark.cores.max", "6") \
    .appName("ml_spark").getOrCreate()
    


24/06/09 20:11:51 WARN Utils: Your hostname, LAPTOP-9T87KVBO resolves to a loopback address: 127.0.1.1; using 192.168.56.1 instead (on interface eth1)
24/06/09 20:11:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/09 20:11:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# Move pandas dataframe to Spark in memory
pop_data_sp = sp_context.createDataFrame(pop_data)

In [7]:
# Calculate some brief summary statistics.
pop_data_sp.select(pop_data_sp.columns[1:5]).summary().show()

24/06/09 20:11:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/06/09 20:11:57 WARN TaskSetManager: Stage 0 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-------+--------------------+--------------------+--------------------+--------------------+
|summary|                  X2|                  X3|                  X4|                  X5|
+-------+--------------------+--------------------+--------------------+--------------------+
|  count|                5000|                5000|                5000|                5000|
|   mean|-0.01210631494364...|-8.85369555971085...|-0.00268963742477...|-0.02023292955390...|
| stddev|  0.9957392741360688|   1.006254205754425|  1.0083556524718054|  1.0015132191307523|
|    min| -3.4157248243589216|  -3.654432921988915| -3.5675804306862626| -3.8897380263174064|
|    25%|  -0.676277675641535| -0.6825850313187773| -0.6808199601427417| -0.6880649901638419|
|    50%|-0.01392845608911...|4.529808944312057E-4|-0.01630454755484...|-0.01341902585471932|
|    75%|  0.6391356777594667|  0.6768263942589848|  0.6704488113096037|  0.6501401096039792|
|    max|   3.407902648447736|  3.8102699875673443|   3.5708

In [8]:
#Calclate some brief summary statistics
pop_data_sp.select(pop_data_sp.columns[23:]).summary().show()

24/06/09 20:12:00 WARN TaskSetManager: Stage 3 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.


+-------+--------------------+--------------------+-------------------+
|summary|                 X24|                 X25|                  y|
+-------+--------------------+--------------------+-------------------+
|  count|                5000|                5000|               5000|
|   mean|-0.00498021898128...|-0.02020631205235034| -3.798446493633077|
| stddev|  0.9877367012402092|  1.0201279396655665| 234.20786791921896|
|    min|  -3.367035150216294| -3.4850827883691182| -884.2724915998847|
|    25%| -0.6889496650184208| -0.7074029537948439| -156.7858165392303|
|    50%|-0.01406311451878...|-0.01767028251457...|-1.7103445920178768|
|    75%|  0.6693079349083157|  0.6525044474585894| 158.57970199205099|
|    max|   3.499212619752443|  3.8085663482139127|  972.4118959873122|
+-------+--------------------+--------------------+-------------------+



In [None]:
pop_data_sp = pop_data_sp.repartition(10)

In [1]:
# The train test split.
train_dat, test_dat = pop_data_sp.randomSplit(weights = [0.7, 0.3], seed = 424)

NameError: name 'pop_data_sp' is not defined

# Random Forest

Adding a random forest as well for comparison with the GBT and lienar regression predicive models.

In [25]:
# random forest regressor model
rfRegress = RandomForestRegressor(labelCol = "y", seed = 9)

In [26]:
# Construct a pipeline
rf_pipeline_1 = Pipeline(stages = [vec_assembler, rfRegress])

In [30]:
# constructing the parameter grid
rf_reg_params = ParamGridBuilder()\
    .addGrid(rfRegress.maxDepth, [5, 10])\
    .addGrid(rfRegress.maxBins, [16, 32])\
    .addGrid(rfRegress.numTrees, [10, 20, 30]).build()

In [31]:
# building cross validaiton object
rf_cross_val = CrossValidator(estimator = rf_pipeline_1,
                              estimatorParamMaps = rf_reg_params,
                              evaluator= RegressionEvaluator(labelCol = "y"),
                                    numFolds = 5)

Get the time it takes to build the random forest regressor.

In [32]:
start_time_rf = time.time()
rf_model_fitted = rf_cross_val.fit(train_dat)
end_time_rf = time.time() - start_time_rf

24/06/09 20:17:07 WARN TaskSetManager: Stage 425 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.
24/06/09 20:17:07 WARN TaskSetManager: Stage 426 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.
24/06/09 20:17:07 WARN TaskSetManager: Stage 427 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.
24/06/09 20:17:08 WARN TaskSetManager: Stage 428 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.
24/06/09 20:17:08 WARN TaskSetManager: Stage 430 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.
24/06/09 20:17:08 WARN TaskSetManager: Stage 432 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.
24/06/09 20:17:08 WARN TaskSetManager: Stage 434 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.

Get RMSE on the training data.

In [33]:
RegressionEvaluator(labelCol = "y").evaluate(rf_model_fitted.transform(train_dat))

24/06/09 20:19:27 WARN TaskSetManager: Stage 1659 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.


63.89853290683766

Get RMSE on the testing data.

In [34]:
RegressionEvaluator(labelCol = "y").evaluate(rf_model_fitted.transform(test_dat))

24/06/09 20:19:29 WARN TaskSetManager: Stage 1660 contains a task of very large size (1164 KiB). The maximum recommended task size is 1000 KiB.


118.01437994180446