In [0]:
import matplotlib.pyplot as plt
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,rand
from pyspark.ml.feature import MinMaxScaler,PCA,VectorAssembler,PolynomialExpansion,StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GBTRegressor,LinearRegression,RandomForestRegressor,GeneralizedLinearRegression,DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.stat import Correlation

In [0]:
spark = SparkSession.builder.appName("model") \
    .config("spark.executor.memory", "100g") \
    .config("spark.driver.memory", "10g") \
    .config("spark.executor.cores", 16) \
    .config("spark.executor.instances", 16) \
    .config("spark.dynamicAllocation.enabled", "true") \
    .getOrCreate()
spark

In [0]:
path = r"/FileStore/tables/data_preprocessed.parquet"
df = spark.read.parquet(path)
df.show()

+--------+----+-----+---+-------------------+-------------+---------------+----------+------------------+--------------+------------+------------------+----------------+---------------+------------------+-------------------+--------------+------------------+------------------+-------------+------------------+-----------------+------------------+
|   price|year|month|day|propertyType_onehot|old/new_index|duration_onehot|PAON_index|       PAON_target|PAON_frequency|street_index|     street_target|street_frequency|town/city_index|  town/city_target|town/city_frequency|district_index|   district_target|district_frequency|country_index|    country_target|country_frequency|categoryType_index|
+--------+----+-----+---+-------------------+-------------+---------------+----------+------------------+--------------+------------+------------------+----------------+---------------+------------------+-------------------+--------------+------------------+------------------+-------------+-------------

In [0]:
# train test split
df_train, df_test = df.randomSplit([0.9,0.1], seed=123)

In [0]:
# normalize the data
cols_normalization = df.columns
cols_normalization.remove('price')

# assemble the data
vector_assembler = VectorAssembler(inputCols=cols_normalization, outputCol='features')
# normalize the data
scaler = StandardScaler(inputCol='features', outputCol='normalized_features', withStd=True, withMean=True)

pipeline = Pipeline(stages=[vector_assembler, scaler]).fit(df_train)
df_train_normalized = pipeline.transform(df_train)
pipeline = Pipeline(stages=[vector_assembler, scaler]).fit(df_test)
df_test_normalized = pipeline.transform(df_test)

In [0]:
pca = PCA(k=17,inputCol='normalized_features',outputCol='pca_features').fit(df_train_normalized)
df_train_pca = pca.transform(df_train_normalized)
explained_variance = pca.explainedVariance.toArray()
explained_variance,explained_variance.sum()
# the first 17 principal components out of 22 contain over 90% information

(array([0.18819301, 0.08919733, 0.08469763, 0.06052186, 0.05449273,
        0.05210558, 0.05084731, 0.04342856, 0.04053386, 0.03698966,
        0.03578353, 0.03353135, 0.03225762, 0.0317014 , 0.02967015,
        0.02875484, 0.02556534]),
 0.9182717423630258)

In [0]:
pca = PCA(k=17,inputCol='normalized_features',outputCol='pca_features').fit(df_test_normalized)
df_test_pca = pca.transform(df_test_normalized)
explained_variance = pca.explainedVariance.toArray()
explained_variance,explained_variance.sum()
# the first 17 principal components out of 22 contain over 90% information

(array([0.18810856, 0.08972676, 0.08535389, 0.06237577, 0.05520278,
        0.05234509, 0.05127277, 0.04556111, 0.04072001, 0.03695659,
        0.03584839, 0.03336381, 0.03216601, 0.03122089, 0.02894134,
        0.02562891, 0.02393203]),
 0.9187247128486264)

In [0]:
df_train_pca.write.parquet(r'/FileStore/tables/data_train_pca.parquet',mode="overwrite")
df_test_pca.write.parquet(r'/FileStore/tables/data_test_pca.parquet',mode="overwrite")

In [0]:
df_train_pca = spark.read.parquet(r'/FileStore/tables/data_train_pca.parquet')
df_test_pca = spark.read.parquet(r'/FileStore/tables/data_test_pca.parquet')

In [0]:
# build a Gradient Boosting regressor, find the hyperparamters using grid search
gbtr = GBTRegressor(featuresCol="pca_features", labelCol="price")
pipeline = Pipeline(stages=[gbtr])

param_grid = (ParamGridBuilder() \
              .addGrid(gbtr.maxDepth, [5]) \
              .addGrid(gbtr.maxBins, [64]) \
              .addGrid(gbtr.maxIter, [100]) \
              .build())
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")
# cross validataion
cross_validator = CrossValidator(estimator=pipeline, \
                                estimatorParamMaps=param_grid, \
                                evaluator=evaluator, \
                                numFolds=3) 

cv_model = cross_validator.fit(df_train_pca)
df_train_pred = cv_model.transform(df_train_pca)
df_test_pred = cv_model.transform(df_test_pca)
df_rmse = evaluator.evaluate(df_train_pred, {evaluator.metricName: "rmse"})
df_r2 = evaluator.evaluate(df_train_pred, {evaluator.metricName: "r2"})
print(f"Train Root Mean Squared Error: {df_rmse}")
print(f"Train R Squared: {df_r2}")

df_rmse = evaluator.evaluate(df_test_pred, {evaluator.metricName: "rmse"})
df_r2 = evaluator.evaluate(df_test_pred, {evaluator.metricName: "r2"})
print(f"Test Root Mean Squared Error: {df_rmse}")
print(f"Test R Squared: {df_r2}")

# Get the best model from the cross-validator
best_gbtr = cv_model.bestModel
print("Best Max Depth:", best_gbtr.stages[-1].getMaxDepth())
print("Best Max Bins:", best_gbtr.stages[-1].getMaxBins())
print("Best Max Iterations:", best_gbtr.stages[-1].getMaxIter())

Train Root Mean Squared Error: 493851.4898890147
Train R Squared: 0.6808534259932697
Test Root Mean Squared Error: 1349397.5874574592
Test R Squared: -3.818364439279069
Best Max Depth: 5
Best Max Bins: 64
Best Max Iterations: 100


In [0]:
# randomly choose 10 samples from training dataset and test dataset to make prediction
sampled_data = df_train_pca.orderBy(rand()).limit(10)
predictions = best_gbtr.transform(sampled_data)
predictions.select("pca_features","price", "prediction").show()

+--------------------+--------+------------------+
|        pca_features|   price|        prediction|
+--------------------+--------+------------------+
|[2.33706181922505...| 15250.0|19336.800098973683|
|[-2.1727021900695...| 86000.0|130273.03679550433|
|[-0.0049376361103...| 45000.0| 57932.43084916571|
|[0.07044625187204...| 20000.0|122277.17692150448|
|[0.16472961856901...|235000.0| 223299.3144634328|
|[0.80831803657633...|249950.0|242463.07377544144|
|[2.50525030485581...| 92000.0|  75392.5545963743|
|[0.16713696204551...|375000.0| 332251.2722124717|
|[1.32628139380721...|120000.0| 88059.36654714942|
|[1.08155441097422...|391500.0| 437926.7266181784|
+--------------------+--------+------------------+



In [0]:
# build a Random Forest regressor, find the hyperparamters using grid search
rfr = RandomForestRegressor(featuresCol="pca_features", labelCol="price")
pipeline = Pipeline(stages=[rfr])

param_grid = (ParamGridBuilder() \
              .addGrid(rfr.maxDepth, [9]) \
              .addGrid(rfr.maxBins, [64]) \
              .addGrid(rfr.numTrees, [200]) \
              .addGrid(rfr.subsamplingRate, [1]) \
              .build())
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")
# cross validataion
cross_validator = CrossValidator(estimator=pipeline, \
                                estimatorParamMaps=param_grid, \
                                evaluator=evaluator, \
                                numFolds=3) 

cv_model = cross_validator.fit(df_train_pca)
df_train_pred = cv_model.transform(df_train_pca)
df_test_pred = cv_model.transform(df_test_pca)
df_rmse = evaluator.evaluate(df_train_pred, {evaluator.metricName: "rmse"})
df_r2 = evaluator.evaluate(df_train_pred, {evaluator.metricName: "r2"})
print(f"Train Root Mean Squared Error: {df_rmse}")
print(f"Train R Squared: {df_r2}")

df_rmse = evaluator.evaluate(df_test_pred, {evaluator.metricName: "rmse"})
df_r2 = evaluator.evaluate(df_test_pred, {evaluator.metricName: "r2"})
print(f"Test Root Mean Squared Error: {df_rmse}")
print(f"Test R Squared: {df_r2}")

# Get the best model from the cross-validator
best_rfr = cv_model.bestModel
print("Best Max Depth:", best_rfr.stages[-1].getMaxDepth())
print("Best Max Bins:", best_rfr.stages[-1].getMaxBins())
print("Best Number of Trees:", best_rfr.stages[-1].getNumTrees)

Train Root Mean Squared Error: 471571.75152378186
Train R Squared: 0.708999983546762
Test Root Mean Squared Error: 476664.1322745305
Test R Squared: 0.3987643367780349
Best Max Depth: 9
Best Max Bins: 64
Best Number of Trees: 200


In [0]:
# build a linear regression model, using polynomial expansion to increase the feature dimensions
poly_expansion = PolynomialExpansion(inputCol='pca_features', outputCol='polynomial_features', degree=3)
df_train_polyexpansion = poly_expansion.transform(df_train_pca)
df_test_polyexpansion = poly_expansion.transform(df_test_pca)

lr = GeneralizedLinearRegression(featuresCol='polynomial_features', labelCol='price',maxIter=100, regParam=0.1)
pipeline = Pipeline(stages=[lr])
param_grid = (ParamGridBuilder() \
              .build())
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")
# cross validataion
cross_validator = CrossValidator(estimator=pipeline, \
                                estimatorParamMaps=param_grid, \
                                evaluator=evaluator, \
                                numFolds=3) 

cv_model = cross_validator.fit(df_train_polyexpansion)
df_train_pred = cv_model.transform(df_train_polyexpansion)
df_test_pred = cv_model.transform(df_test_polyexpansion)
df_rmse = evaluator.evaluate(df_train_pred, {evaluator.metricName: "rmse"})
df_r2 = evaluator.evaluate(df_train_pred, {evaluator.metricName: "r2"})
print(f"Train Root Mean Squared Error: {df_rmse}")
print(f"Train R Squared: {df_r2}")
df_rmse = evaluator.evaluate(df_test_pred, {evaluator.metricName: "rmse"})
df_r2 = evaluator.evaluate(df_test_pred, {evaluator.metricName: "r2"})
print(f"Test Root Mean Squared Error: {df_rmse}")
print(f"Test R Squared: {df_r2}")

# Get the best model from the cross-validator
best_lr = cv_model.bestModel

Train Root Mean Squared Error: 468162.2818000351
Train R Squared: 0.7131926396726731
Test Root Mean Squared Error: 97768842.3381753
Test R Squared: -25293.187402912303


In [0]:
# build a Decision Tree regressor, find the hyperparamters using grid search
dtr = DecisionTreeRegressor(featuresCol="pca_features", labelCol="price")
pipeline = Pipeline(stages=[dtr])

param_grid = (ParamGridBuilder() \
              .addGrid(dtr.maxDepth, [20]) \
              .addGrid(dtr.maxBins, [64]) \
              .build())
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")
# cross validataion
cross_validator = CrossValidator(estimator=pipeline, \
                                estimatorParamMaps=param_grid, \
                                evaluator=evaluator, \
                                numFolds=3) 

cv_model = cross_validator.fit(df_train_pca)
df_train_pred = cv_model.transform(df_train_pca)
df_test_pred = cv_model.transform(df_test_pca)
df_rmse = evaluator.evaluate(df_train_pred, {evaluator.metricName: "rmse"})
df_r2 = evaluator.evaluate(df_train_pred, {evaluator.metricName: "r2"})
print(f"Train Root Mean Squared Error: {df_rmse}")
print(f"Train R Squared: {df_r2}")

df_rmse = evaluator.evaluate(df_test_pred, {evaluator.metricName: "rmse"})
df_r2 = evaluator.evaluate(df_test_pred, {evaluator.metricName: "r2"})
print(f"Test Root Mean Squared Error: {df_rmse}")
print(f"Test R Squared: {df_r2}")

# Get the best model from the cross-validator
best_dtr = cv_model.bestModel
print("Best Max Depth:", best_dtr.stages[-1].getMaxDepth())
print("Best Max Bins:", best_dtr.stages[-1].getMaxBins())

Train Root Mean Squared Error: 270733.64511588425
Train R Squared: 0.9040860203139902
Test Root Mean Squared Error: 1538618.0088971492
Test R Squared: -5.26442781169355
Best Max Depth: 20
Best Max Bins: 64


In [0]:
# using decision tree to find out the feature importances
dtr = DecisionTreeRegressor(featuresCol="normalized_features", labelCol="price",maxDepth=20)
dtr = dtr.fit(df_train_pca)
dtr.featureImportances

SparseVector(26, {0: 0.2415, 1: 0.0877, 2: 0.1147, 3: 0.0084, 4: 0.0016, 5: 0.0056, 6: 0.0109, 7: 0.0007, 8: 0.0108, 9: 0.0034, 10: 0.1, 11: 0.0319, 12: 0.0067, 13: 0.0599, 14: 0.0397, 15: 0.031, 16: 0.0145, 17: 0.021, 18: 0.0051, 19: 0.0415, 20: 0.0514, 21: 0.0188, 22: 0.0119, 23: 0.0327, 24: 0.0029, 25: 0.0458})