In [2]:
pip install pyspark

Collecting pysparkNote: you may need to restart the kernel to use updated packages.
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)

Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425383 sha256=eedade531891f1489719d680d53dc74635cd965fa59ad914aef75fc33d106e4e
  Stored in directory: c:\users\ihamz\appdata\local\pip\cache\wheels\57\bd\14\ce9e21f2649298678d011fb8f71ed38ee70b42b94fef0be142
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.0


In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark
import pyspark.pandas as ps



In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ML Pyspark').getOrCreate()
spark

In [5]:
df = spark.read.csv('real_estate_data.csv',header=True,inferSchema=True)

In [6]:
df.printSchema()

root
 |-- zipcode: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- price: double (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- livingArea: double (nullable = true)
 |-- homeType: string (nullable = true)
 |-- homeStatus: string (nullable = true)
 |-- is_bankOwned: boolean (nullable = true)
 |-- isUnmappable: boolean (nullable = true)
 |-- isPreforeclosureAuction: boolean (nullable = true)
 |-- isNonOwnerOccupied: boolean (nullable = true)
 |-- isPremierBuilder: boolean (nullable = true)
 |-- isZillowOwned: boolean (nullable = true)
 |-- currency: string (nullable = true)
 |-- country: string (nullable = true)
 |-- taxAssessedvalue: double (nullable = true)
 |-- lotAreaValue: double (nullable = true)
 |-- lotAreaUnit: string (nullable = true)



In [7]:
df.show()

+-------+----------------+-----+---------+---------+--------+---------+--------+----------+-------------+----------+------------+------------+-----------------------+------------------+----------------+-------------+--------+-------+----------------+-------------------+-----------+
|zipcode|            city|state| latitude|longitude|   price|bathrooms|bedrooms|livingArea|     homeType|homeStatus|is_bankOwned|isUnmappable|isPreforeclosureAuction|isNonOwnerOccupied|isPremierBuilder|isZillowOwned|currency|country|taxAssessedvalue|       lotAreaValue|lotAreaUnit|
+-------+----------------+-----+---------+---------+--------+---------+--------+----------+-------------+----------+------------+------------+-----------------------+------------------+----------------+-------------+--------+-------+----------------+-------------------+-----------+
|  20743| Capitol Heights|   MD| 38.86519|-76.92221|268850.0|        3|       4|    2352.0|SINGLE_FAMILY|  FOR_SALE|        true|       false|         

In [10]:
from pyspark.sql.functions import *


In [11]:
missing_values_count = df.select([sum(col(c).isNull().cast('int')).alias(c) for c in df.columns])

missing_values_count.show()

+-------+----+-----+--------+---------+-----+---------+--------+----------+--------+----------+------------+------------+-----------------------+------------------+----------------+-------------+--------+-------+----------------+------------+-----------+
|zipcode|city|state|latitude|longitude|price|bathrooms|bedrooms|livingArea|homeType|homeStatus|is_bankOwned|isUnmappable|isPreforeclosureAuction|isNonOwnerOccupied|isPremierBuilder|isZillowOwned|currency|country|taxAssessedvalue|lotAreaValue|lotAreaUnit|
+-------+----+-----+--------+---------+-----+---------+--------+----------+--------+----------+------------+------------+-----------------------+------------------+----------------+-------------+--------+-------+----------------+------------+-----------+
|      0|   0|    0|       0|        0|    0|        0|       0|         0|       0|         0|         326|           0|                      0|                 0|               0|            0|       0|      0|               6|      

In [13]:
df = df.drop('is_bankOwned')


In [14]:
df_clean=df.dropna()

In [15]:
df_clean=df_clean.dropDuplicates()

In [16]:
df_clean.show()

+-------+---------------+-----+--------+---------+--------+---------+--------+----------+-------------+----------+------------+-----------------------+------------------+----------------+-------------+--------+-------+----------------+-------------------+-----------+
|zipcode|           city|state|latitude|longitude|   price|bathrooms|bedrooms|livingArea|     homeType|homeStatus|isUnmappable|isPreforeclosureAuction|isNonOwnerOccupied|isPremierBuilder|isZillowOwned|currency|country|taxAssessedvalue|       lotAreaValue|lotAreaUnit|
+-------+---------------+-----+--------+---------+--------+---------+--------+----------+-------------+----------+------------+-----------------------+------------------+----------------+-------------+--------+-------+----------------+-------------------+-----------+
|  20706|         Lanham|   MD|38.97061|-76.83151|435000.0|        2|       4|    2500.0|SINGLE_FAMILY|  FOR_SALE|       false|                  false|              true|           false|        f

In [17]:
print((df_clean.count(), len(df_clean.columns)))

(335, 21)


In [18]:
df_clean.select(
    percentile_approx("price", [0.25, 0.5, 0.75], 1000000).alias("quantiles")).collect()

[Row(quantiles=[379000.0, 435000.0, 475000.0])]

In [19]:
q1_beds = df_clean.select(percentile_approx("bedrooms", [0.25], 1000000).alias("quantiles")).collect()[0]['quantiles'][0]
q3_beds = df_clean.select(percentile_approx("bedrooms", [0.75], 1000000).alias("quantiles")).collect()[0]['quantiles'][0]
iqr_beds = q3_beds - q1_beds

#Check the Outlier Beds
top_outlier_beds = q3_beds + 1.5 * iqr_beds
bottom_outlier_beds= q1_beds - 1.5 * iqr_beds
print(top_outlier_beds, bottom_outlier_beds)

5.5 1.5


In [20]:
q1_baths = df_clean.select(percentile_approx("bathrooms", [0.25], 1000000).alias("quantiles")).collect()[0]['quantiles'][0]
q3_baths = df_clean.select(percentile_approx("bathrooms", [0.75], 1000000).alias("quantiles")).collect()[0]['quantiles'][0]
iqr_baths = q3_baths - q1_baths

#Check the Outlier Baths
top_outlier_baths = q3_baths + 1.5 * iqr_baths
bottom_outlier_baths = q1_baths - 1.5 * iqr_baths
print(top_outlier_baths, bottom_outlier_baths)

4.5 0.5


In [21]:
q1_area = df_clean.select(percentile_approx("livingArea", [0.25], 1000000).alias("quantiles")).collect()[0]['quantiles'][0]
q3_area = df_clean.select(percentile_approx("livingArea", [0.75], 1000000).alias("quantiles")).collect()[0]['quantiles'][0]
iqr_area = q3_area - q1_area

#Check the Outlier Area
top_outlier_area = q3_area + 1.5 * iqr_area
bottom_outlier_area = q1_area - 1.5 * iqr_area
print(top_outlier_area, bottom_outlier_area)

3122.0 34.0


In [22]:
q1_price = df_clean.select(percentile_approx("price", [0.25], 1000000).alias("quantiles")).collect()[0]['quantiles'][0]
q3_price = df_clean.select(percentile_approx("price", [0.75], 1000000).alias("quantiles")).collect()[0]['quantiles'][0]
iqr_price = q3_price - q1_price

#Check the Outlier Price
top_outlier_price = q3_price + 1.5 * iqr_price
bottom_outlier_price = q1_price - 1.5 * iqr_price
print(top_outlier_price, bottom_outlier_price)

619000.0 235000.0


In [23]:
df_clean = df_clean.filter(df_clean['bedrooms']<=top_outlier_beds)
df_clean = df_clean.filter(df_clean['bathrooms']<=top_outlier_baths)
df_clean = df_clean.filter(df_clean['livingArea']<=top_outlier_area)
df_clean = df_clean.filter(df_clean['price']<=top_outlier_price)

In [27]:
from pyspark.sql.functions import col

df_clean = df_clean.withColumn("zipcode", col("zipcode").cast("string"))


In [28]:
df_clean.show()

+-------+---------------+-----+--------+---------+--------+---------+--------+----------+-------------+----------+------------+-----------------------+------------------+----------------+-------------+--------+-------+----------------+-------------------+-----------+
|zipcode|           city|state|latitude|longitude|   price|bathrooms|bedrooms|livingArea|     homeType|homeStatus|isUnmappable|isPreforeclosureAuction|isNonOwnerOccupied|isPremierBuilder|isZillowOwned|currency|country|taxAssessedvalue|       lotAreaValue|lotAreaUnit|
+-------+---------------+-----+--------+---------+--------+---------+--------+----------+-------------+----------+------------+-----------------------+------------------+----------------+-------------+--------+-------+----------------+-------------------+-----------+
|  20706|         Lanham|   MD|38.97061|-76.83151|435000.0|        2|       4|    2500.0|SINGLE_FAMILY|  FOR_SALE|       false|                  false|              true|           false|        f

In [29]:
df_selected = df_clean.select('zipcode','city','state','price','bathrooms','bedrooms','livingArea','homeType','homeStatus','country','taxAssessedvalue','lotAreaValue','lotAreaUnit')
df_selected.printSchema()

root
 |-- zipcode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- price: double (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- livingArea: double (nullable = true)
 |-- homeType: string (nullable = true)
 |-- homeStatus: string (nullable = true)
 |-- country: string (nullable = true)
 |-- taxAssessedvalue: double (nullable = true)
 |-- lotAreaValue: double (nullable = true)
 |-- lotAreaUnit: string (nullable = true)



In [30]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=["zipcode","city","state","homeType","homeStatus","country","lotAreaUnit"], 
                        outputCols=["zipcodeIndex","cityIndex", "stateIndex", "hometypeIndex","homestatusIndex","countryIndex","lotareaunitIndex"], 
                        stringOrderType="alphabetAsc")
indexed = indexer.fit(df_selected).transform(df_selected)
indexed.show()

+-------+---------------+-----+--------+---------+--------+----------+-------------+----------+-------+----------------+-------------------+-----------+------------+---------+----------+-------------+---------------+------------+----------------+
|zipcode|           city|state|   price|bathrooms|bedrooms|livingArea|     homeType|homeStatus|country|taxAssessedvalue|       lotAreaValue|lotAreaUnit|zipcodeIndex|cityIndex|stateIndex|hometypeIndex|homestatusIndex|countryIndex|lotareaunitIndex|
+-------+---------------+-----+--------+---------+--------+----------+-------------+----------+-------+----------------+-------------------+-----------+------------+---------+----------+-------------+---------------+------------+----------------+
|  20706|         Lanham|   MD|435000.0|        2|       4|    2500.0|SINGLE_FAMILY|  FOR_SALE|    USA|        332000.0|0.42316345270890726|      acres|         8.0|     15.0|       1.0|          0.0|            0.0|         0.0|             0.0|
|  20783|   

In [34]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCols=["zipcodeIndex","cityIndex", "stateIndex","lotareaunitIndex"],
                        outputCols=["categoryZipcode", "categoryCity", "categoryState","categoryLotareaunit"])
model = encoder.fit(indexed)
encoded = model.transform(indexed)
encoded.show()

+-------+---------------+-----+--------+---------+--------+----------+-------------+----------+-------+----------------+-------------------+-----------+------------+---------+----------+-------------+---------------+------------+----------------+---------------+---------------+-------------+-------------------+
|zipcode|           city|state|   price|bathrooms|bedrooms|livingArea|     homeType|homeStatus|country|taxAssessedvalue|       lotAreaValue|lotAreaUnit|zipcodeIndex|cityIndex|stateIndex|hometypeIndex|homestatusIndex|countryIndex|lotareaunitIndex|categoryZipcode|   categoryCity|categoryState|categoryLotareaunit|
+-------+---------------+-----+--------+---------+--------+----------+-------------+----------+-------+----------------+-------------------+-----------+------------+---------+----------+-------------+---------------+------------+----------------+---------------+---------------+-------------+-------------------+
|  20706|         Lanham|   MD|435000.0|        2|       4|  

In [35]:
encoded.columns

['zipcode',
 'city',
 'state',
 'price',
 'bathrooms',
 'bedrooms',
 'livingArea',
 'homeType',
 'homeStatus',
 'country',
 'taxAssessedvalue',
 'lotAreaValue',
 'lotAreaUnit',
 'zipcodeIndex',
 'cityIndex',
 'stateIndex',
 'hometypeIndex',
 'homestatusIndex',
 'countryIndex',
 'lotareaunitIndex',
 'categoryZipcode',
 'categoryCity',
 'categoryState',
 'categoryLotareaunit']

In [36]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['bathrooms','bedrooms','livingArea','categoryCity','categoryState','lotAreaValue','categoryLotareaunit','categoryZipcode','taxAssessedvalue'],
                                 outputCol="Independent Features")
output=featureassembler.transform(encoded)

In [37]:
output.show()

+-------+---------------+-----+--------+---------+--------+----------+-------------+----------+-------+----------------+-------------------+-----------+------------+---------+----------+-------------+---------------+------------+----------------+---------------+---------------+-------------+-------------------+--------------------+
|zipcode|           city|state|   price|bathrooms|bedrooms|livingArea|     homeType|homeStatus|country|taxAssessedvalue|       lotAreaValue|lotAreaUnit|zipcodeIndex|cityIndex|stateIndex|hometypeIndex|homestatusIndex|countryIndex|lotareaunitIndex|categoryZipcode|   categoryCity|categoryState|categoryLotareaunit|Independent Features|
+-------+---------------+-----+--------+---------+--------+----------+-------------+----------+-------+----------------+-------------------+-----------+------------+---------+----------+-------------+---------------+------------+----------------+---------------+---------------+-------------+-------------------+--------------------

In [38]:
finalized_data=output.select("Independent Features","price")

In [39]:
finalized_data.show()

+--------------------+--------+
|Independent Features|   price|
+--------------------+--------+
|(72,[0,1,2,18,29,...|435000.0|
|(72,[0,1,2,16,29,...|525000.0|
|(72,[0,1,2,28,30,...|330000.0|
|(72,[0,1,2,27,29,...|445000.0|
|(72,[0,1,2,14,29,...|449900.0|
|(72,[0,1,2,28,30,...|435000.0|
|(72,[0,1,2,27,29,...|524900.0|
|(72,[0,1,2,11,29,...|465000.0|
|(72,[0,1,2,21,29,...|350000.0|
|(72,[0,1,2,10,29,...|424900.0|
|(72,[0,1,2,17,29,...|469900.0|
|(72,[0,1,2,27,29,...|440000.0|
|(72,[0,1,2,7,29,3...|340000.0|
|(72,[0,1,2,14,29,...|430000.0|
|(72,[0,1,2,11,29,...|469000.0|
|(72,[0,1,2,27,29,...|424900.0|
|(72,[0,1,2,28,30,...|550000.0|
|(72,[0,1,2,20,29,...|408000.0|
|(72,[0,1,2,22,29,...|549900.0|
|(72,[0,1,2,22,29,...|537000.0|
+--------------------+--------+
only showing top 20 rows



In [40]:
train_data,test_data=finalized_data.randomSplit([0.8,0.2])

In [42]:
from pyspark.ml.regression import LinearRegression

regressor=LinearRegression(featuresCol='Independent Features', labelCol='price')
regressor=regressor.fit(train_data)
print("Coefficients: " + str(regressor.coefficients))
print("Intercept: " + str(regressor.intercept))

Coefficients: [27846.935827799807,16498.177012309192,34.21708940444712,37690.79508142999,7633.147397303968,9948.40175014528,22180.04125501572,-31233.915301711033,2510.3809429787466,2981.3147253236098,-20612.763208908396,11870.786207749867,-12833.070429122874,65988.39888588719,-12018.45344014719,17217.761069400993,12012.75820763868,-8328.202246312732,-6422.803011708855,-14224.475480061776,-16752.292769112595,-2329.912761011438,49902.9648016209,70003.35241104715,-23063.158873343793,71413.16558050054,-15979.860320566497,-15164.117933676293,15501.789625954703,-23420.897129573692,6.390100510176352,69015.51521936271,91738.97346661457,87116.83609708711,85043.66796831932,7445.54140834223,7453.712215940309,-15974.664980275911,2981.3147253237075,7633.147397304038,-6422.803011708787,9948.401750145606,-14224.475480061266,-1733.210222887669,30040.549536788632,0.0,-20612.763208909637,-2329.912761011598,11870.786207750185,-29471.33045159074,-12018.453440147385,-16752.292769112864,-23063.158873343433,

In [43]:
trainingSummary = regressor.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 47547.504975
r2: 0.581031


In [44]:
pred_results=regressor.evaluate(test_data)

In [45]:
pred_results.meanAbsoluteError,pred_results.rootMeanSquaredError

(39591.03037125528, 50083.31765349226)

In [46]:
from pyspark.ml.regression import RandomForestRegressor

rf=RandomForestRegressor(featuresCol='Independent Features', labelCol='price')
rf_model=rf.fit(train_data)

In [47]:
### Prediction of random forest model
from pyspark.ml.evaluation import RegressionEvaluator
rf_predictions = rf_model.transform(test_data)
rf_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = rf_evaluator.evaluate(rf_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 51744.7


In [48]:
rf_predictions.show()

+--------------------+--------+------------------+
|Independent Features|   price|        prediction|
+--------------------+--------+------------------+
|(72,[0,1,2,5,29,3...|305000.0| 429418.0344682814|
|(72,[0,1,2,6,29,3...|439000.0|394596.52181506075|
|(72,[0,1,2,6,29,3...|515000.0|433724.42153036146|
|(72,[0,1,2,6,29,3...|489900.0|455117.91631026036|
|(72,[0,1,2,7,29,3...|285000.0| 318738.9903432676|
|(72,[0,1,2,7,29,3...|315000.0|319709.40284844354|
|(72,[0,1,2,7,29,3...|219900.0| 315623.4820099342|
|(72,[0,1,2,7,29,3...|320000.0| 346590.7676069028|
|(72,[0,1,2,7,29,3...|340000.0|351540.35519501107|
|(72,[0,1,2,7,29,3...|415000.0|378711.15338365996|
|(72,[0,1,2,10,29,...|380000.0| 426858.2377631599|
|(72,[0,1,2,10,29,...|475000.0|403208.63904443546|
|(72,[0,1,2,11,29,...|525000.0| 438658.3893446665|
|(72,[0,1,2,11,29,...|465000.0| 449954.9550030699|
|(72,[0,1,2,12,29,...|439995.0| 447938.3637999027|
|(72,[0,1,2,12,29,...|438800.0|391144.00878334197|
|(72,[0,1,2,12,29,...|470000.0|

In [49]:
rf_model.featureImportances

SparseVector(72, {0: 0.1051, 1: 0.0642, 2: 0.1927, 3: 0.0277, 7: 0.032, 8: 0.0002, 10: 0.0019, 13: 0.0091, 14: 0.0032, 16: 0.0063, 17: 0.0017, 18: 0.005, 20: 0.0025, 22: 0.0247, 23: 0.0046, 25: 0.0041, 26: 0.0043, 27: 0.0045, 28: 0.0139, 29: 0.0111, 30: 0.0386, 31: 0.0078, 35: 0.007, 37: 0.0061, 40: 0.0002, 44: 0.0018, 46: 0.0029, 48: 0.0011, 49: 0.036, 50: 0.0016, 51: 0.0007, 53: 0.0, 54: 0.0039, 56: 0.0003, 57: 0.0019, 59: 0.0017, 60: 0.0045, 61: 0.0003, 62: 0.0021, 63: 0.0073, 64: 0.0028, 66: 0.0088, 68: 0.0054, 69: 0.0132, 70: 0.0008, 71: 0.3244})

In [50]:
finalized_data.select('Independent Features').take(2)

[Row(Independent Features=SparseVector(72, {0: 2.0, 1: 4.0, 2: 2500.0, 18: 1.0, 29: 1.0, 30: 0.4232, 31: 1.0, 40: 1.0, 71: 332000.0})),
 Row(Independent Features=SparseVector(72, {0: 3.0, 1: 5.0, 2: 2376.0, 16: 1.0, 29: 1.0, 30: 8028.0, 60: 1.0, 71: 457000.0}))]

In [51]:
from pyspark.ml.regression import GBTRegressor
train_data,test_data=finalized_data.randomSplit([0.8,0.2])
gbt=GBTRegressor(featuresCol='Independent Features', labelCol='price')
gbt_model=gbt.fit(train_data)

In [52]:
### Prediction of GBT Model
gbt_predictions = gbt_model.transform(test_data)
gbt_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 56463.8


In [53]:
gbt_predictions.show()

+--------------------+--------+------------------+
|Independent Features|   price|        prediction|
+--------------------+--------+------------------+
|(72,[0,1,2,5,29,3...|305000.0| 410454.6036436463|
|(72,[0,1,2,6,29,3...|499500.0| 461763.1405921831|
|(72,[0,1,2,6,29,3...|475000.0|466581.22118833906|
|(72,[0,1,2,7,29,3...|310000.0| 349036.5980817347|
|(72,[0,1,2,7,29,3...|285000.0|265426.01857766643|
|(72,[0,1,2,7,29,3...|349000.0|347226.41822692583|
|(72,[0,1,2,7,29,3...|370000.0|258383.87148917865|
|(72,[0,1,2,10,29,...|380000.0|463770.45766412996|
|(72,[0,1,2,10,29,...|510000.0| 506599.6994629643|
|(72,[0,1,2,10,29,...|450000.0|  474874.179380933|
|(72,[0,1,2,10,29,...|475000.0|  456652.753188823|
|(72,[0,1,2,10,29,...|475000.0|478135.89812836197|
|(72,[0,1,2,11,29,...|399900.0| 483410.3200084229|
|(72,[0,1,2,12,29,...|389900.0|448008.03500822134|
|(72,[0,1,2,12,29,...|379900.0|403388.42347812135|
|(72,[0,1,2,14,29,...|415000.0| 350126.2071385327|
|(72,[0,1,2,14,29,...|439000.0|

In [65]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor

# Assuming 'train_data' is your training dataset

# Define the GBTRegressor model
gbt = GBTRegressor(featuresCol='Independent Features', labelCol='price')

# Define the parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10, 15]) \
    .addGrid(gbt.maxBins, [32, 64]) \
    .addGrid(gbt.maxIter, [10, 20]) \
    .build()

# Define the evaluator
evaluator = RegressionEvaluator(labelCol='price', metricName='rmse')

# Define the CrossValidator
cv = CrossValidator(estimator=gbt, 
                    estimatorParamMaps=param_grid, 
                    evaluator=evaluator, 
                    numFolds=3)  # You can adjust the number of folds

# Fit the CrossValidator to the training data
cv_model = cv.fit(train_data)

# Get the best model from cross-validation
best_model = cv_model.bestModel

# Evaluate the best model on the test set
predictions = best_model.transform(test_data)
rmse = evaluator.evaluate(predictions)

# Print the best hyperparameters and RMSE
print("Best hyperparameters:", best_model.extractParamMap())
print("Root Mean Squared Error (RMSE) on test data:", rmse)


Best hyperparameters: {Param(parent='GBTRegressor_7d414387fe6b', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='GBTRegressor_7d414387fe6b', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='GBTRegressor_7d414387fe6b', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to

In [69]:
import os

# Set HADOOP_HOME to avoid FileNotFoundException
os.environ['HADOOP_HOME'] = ''

# Now save your model
gbt_model.write().overwrite().save("C:/Users/ihamz/real-estate-project")


Py4JJavaError: An error occurred while calling o14499.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:131)
	at org.apache.hadoop.mapred.OutputCommitter.setupJob(OutputCommitter.java:265)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:79)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1620)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1620)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1606)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1606)
	at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:413)
	at org.apache.spark.ml.util.DefaultParamsWriter.saveImpl(ReadWrite.scala:384)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more
