In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, GeneralizedLinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.datasets import fetch_california_housing
# Create a SparkSession
spark = SparkSession.builder \
    .appName("RegressionPipelineExample") \
    .getOrCreate()
housing = fetch_california_housing()
import pandas as pd
df = pd.DataFrame(data=housing.data,columns=housing.feature_names)
df['label'] = housing.target
df_data = spark.createDataFrame(df)
df_data.show(5) 



+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|label|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+
|8.3252|    41.0| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|4.526|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|3.585|
|7.2574|    52.0| 8.288135593220339| 1.073446327683616|     496.0|2.8022598870056497|   37.85|  -122.24|3.521|
|5.6431|    52.0|5.8173515981735155|1.0730593607305936|     558.0| 2.547945205479452|   37.85|  -122.25|3.413|
|3.8462|    52.0| 6.281853281853282|1.0810810810810811|     565.0|2.1814671814671813|   37.85|  -122.25|3.422|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+
o

In [0]:
# Prepare the features column
feature_cols = df_data.columns[:-1]  # Assuming the last column is the label
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_data = assembler.transform(df_data)
df_data.show(5,100)

+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+-----------------------------------------------------------------------------------------+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|label|                                                                                 features|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+-----------------------------------------------------------------------------------------+
|8.3252|    41.0| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|4.526|[8.3252,41.0,6.984126984126984,1.0238095238095237,322.0,2.5555555555555554,37.88,-122.23]|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|3.585|[8.3014,21.0,6.238137082601054,0.9718804920913884,2401.0,2.109841827768014,37.86,-122.

In [0]:
# Scale and normalize features
scaler = StandardScaler(inputCol="raw_features", outputCol="scaled_features", withStd=True, withMean=True)

In [0]:
# Split the dataset into training and testing sets
(trainingData, testData) = df_data.randomSplit([0.8, 0.2])

In [0]:
# Initialize regression models
lr = LinearRegression(featuresCol="features", labelCol="label")
glr = GeneralizedLinearRegression(featuresCol="features", labelCol="label")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="label")
rf = RandomForestRegressor(featuresCol="features", labelCol="label")
gbt = GBTRegressor(featuresCol="features", labelCol="label")

# Define a pipeline for each regression algorithm
pipeline_lr = Pipeline(stages=[lr])
pipeline_glr = Pipeline(stages=[glr])
pipeline_dt = Pipeline(stages=[dt])
pipeline_rf = Pipeline(stages=[rf])
pipeline_gbt = Pipeline(stages=[gbt])

# Fit the pipelines
model_lr = pipeline_lr.fit(trainingData)
model_glr = pipeline_glr.fit(trainingData)
model_dt = pipeline_dt.fit(trainingData)
model_rf = pipeline_rf.fit(trainingData)
model_gbt = pipeline_gbt.fit(trainingData)

# Make predictions
predictions_lr = model_lr.transform(testData)
predictions_glr = model_glr.transform(testData)
predictions_dt = model_dt.transform(testData)
predictions_rf = model_rf.transform(testData)
predictions_gbt = model_gbt.transform(testData)

# Evaluate the models
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

rmse_lr = evaluator.evaluate(predictions_lr)
rmse_glr = evaluator.evaluate(predictions_glr)
rmse_dt = evaluator.evaluate(predictions_dt)
rmse_rf = evaluator.evaluate(predictions_rf)
rmse_gbt = evaluator.evaluate(predictions_gbt)

print("Linear Regression RMSE:", rmse_lr)
print("General Linear Regression RMSE:", rmse_glr)
print("Decision Tree Regression RMSE:", rmse_dt)
print("Random Forest Regression RMSE:", rmse_rf)
print("Gradient Boosted Tree Regression RMSE:", rmse_gbt)

Linear Regression RMSE: 0.7154279207799202
General Linear Regression RMSE: 0.7154279207799202
Decision Tree Regression RMSE: 0.7062000747791418
Random Forest Regression RMSE: 0.6589518171055013
Gradient Boosted Tree Regression RMSE: 0.5691318056290677


In [0]:
predictions_lr.show(5,60)

+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+------------------------------------------------------------+------------------+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|label|                                                    features|        prediction|
+------+--------+------------------+------------------+----------+------------------+--------+---------+-----+------------------------------------------------------------+------------------+
|0.7403|    37.0| 4.491428571428571|1.1485714285714286|    1046.0|2.9885714285714284|   37.96|  -122.37|0.686|[0.7403,37.0,4.491428571428571,1.1485714285714286,1046.0,...| 1.143277544256371|
|  0.75|    52.0| 2.823529411764706|0.9117647058823529|     191.0| 5.617647058823529|    37.8|  -122.28|1.625|[0.75,52.0,2.823529411764706,0.9117647058823529,191.0,5.6...| 1.356556401944033|
|0.8075|    52.0| 2.490322580645161|1.0580645

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Define parameter grids for hyperparameter tuning
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

paramGrid_glr = ParamGridBuilder() \
    .addGrid(glr.regParam, [0.1, 0.01]) \
    .addGrid(glr.maxIter, [10, 20]) \
    .build()

paramGrid_dt = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 20]) \
    .addGrid(dt.maxBins, [16, 32, 64]) \
    .build()

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 200]) \
    .addGrid(rf.maxDepth, [5, 10, 20]) \
    .build()

paramGrid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10, 20]) \
    .addGrid(gbt.maxIter, [10, 20, 30]) \
    .build()

In [0]:
# Define evaluator
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

In [0]:
# Define CrossValidator for each algorithm
cv_lr = CrossValidator(estimator=pipeline_lr,
                       estimatorParamMaps=paramGrid_lr,
                       evaluator=evaluator,
                       numFolds=3)

cv_dt = CrossValidator(estimator=pipeline_dt,
                       estimatorParamMaps=paramGrid_dt,
                       evaluator=evaluator,
                       numFolds=3)

cv_rf = CrossValidator(estimator=pipeline_rf,
                       estimatorParamMaps=paramGrid_rf,
                       evaluator=evaluator,
                       numFolds=3)

cv_gbt = CrossValidator(estimator=pipeline_gbt,
                        estimatorParamMaps=paramGrid_gbt,
                        evaluator=evaluator,
                        numFolds=3)

In [0]:
# Fit the CrossValidators
# cvModel_lr = cv_lr.fit(trainingData)
#cvModel_dt = cv_dt.fit(trainingData)
cvModel_rf = cv_rf.fit(trainingData)
# cvModel_gbt = cv_gbt.fit(trainingData)

# Make predictions
# predictions_lr = cvModel_lr.transform(testData)
#predictions_dt = cvModel_dt.transform(testData)
predictions_rf = cvModel_rf.transform(testData)
# predictions_gbt = cvModel_gbt.transform(testData)

# Evaluate the models
#rmse_lr = evaluator.evaluate(predictions_lr)
#rmse_dt = evaluator.evaluate(predictions_dt)
rmse_rf = evaluator.evaluate(predictions_rf)
# rmse_gbt = evaluator.evaluate(predictions_gbt)

# Print the RMSE of each model
#print("Linear Regression RMSE:", rmse_lr)
#print("Decision Tree Regression RMSE:", rmse_dt)
print("Random Forest Regression RMSE:", rmse_rf)
# print("Gradient Boosted Tree Regression RMSE:", rmse_gbt)

In [0]:
# Print the RMSE of each model
#print("Linear Regression RMSE:", rmse_lr)
# print("Decision Tree Regression RMSE:", rmse_dt)
print("Random Forest Regression RMSE:", rmse_rf)
# print("Gradient Boosted Tree Regression RMSE:", rmse_gbt)

Linear Regression RMSE: 0.7180876814041877
