In [1]:
from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler,StringIndexer, OneHotEncoder, Imputer
from pyspark.sql.functions import col,count,isnan,expr,when,format_number, initcap


from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder


spark = SparkSession.builder.getOrCreate()

#seperate columns of data with ";" signs as delimiter
houseDF = spark.read.option("header","true").option("delimiter", ";")\
            .option("inferSchema","true").csv("downloads/houses.csv")


houseDF.show(300)

24/01/05 22:09:02 WARN Utils: Your hostname, Cerens-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.51.62.64 instead (on interface en0)
24/01/05 22:09:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/05 22:09:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+----+----------+----+---------+---------+----------+------------+----+---------+-------+
|Room|LivingRoom|Area|FlatCount|HouseType|GardenArea|BalkonyCount|Pool|     City|  Price|
+----+----------+----+---------+---------+----------+------------+----+---------+-------+
|   1|         1|  73|        1|    Daire|         0|           0|   0|    İzmir| 507000|
|   2|         1|   2|        1|    Daire|         0|           1|   0|    Bursa| 723600|
|   4|         2| 180|        1|    Daire|         0|           2|   0|    İzmir|1280000|
|   3|         1| 102|        1|    Daire|         0|           0|   0|    Bursa| 989000|
|   4|         1|  66|        1|    Daire|         0|           2|   0|   Ankara| 679000|
|   2|         1|  83|        1|    Daire|         0|           2|   0|  Kocaeli| 543000|
|   2|         1|  98|        1|    Daire|         0|           2|   0|   Ankara| 800000|
|   4|         1| 160|        2| Müstakil|       133|           2|   0|   Ankara|2558000|
|   5|    

In [2]:
#DATA CLEANING

#Assigning each column with its corresponding data type. So we can make imputation and adjustments
columns = ["Room" ,"LivingRoom" ,"Area" ,"FlatCount", "HouseType", "GardenArea" , "BalkonyCount", "Pool", "City" ,"Price"]
columnTypes = ["int", "int", "int", "int", "string" , "int", "int", "int", "string", "int"] 


for col_name, col_type in zip(columns, columnTypes):
    houseDF = houseDF.withColumn(col_name, col(col_name).cast(col_type))

'''
meanImputer = Imputer(inputCol="Room",outputCol="imputedRoom")
houseDF = meanImputer.fit(houseDF).transform(houseDF) 
'''

#checking how many NULL values we have in each column
houseDF.select([count(when( col(c).isNull(), c)).alias(c) for c in houseDF.columns]).show()

houseDF.show()

+----+----------+----+---------+---------+----------+------------+----+----+-----+
|Room|LivingRoom|Area|FlatCount|HouseType|GardenArea|BalkonyCount|Pool|City|Price|
+----+----------+----+---------+---------+----------+------------+----+----+-----+
|   8|         1|   1|        0|        2|         0|           0|   0|   0|    0|
+----+----------+----+---------+---------+----------+------------+----+----+-----+

+----+----------+----+---------+---------+----------+------------+----+--------+-------+
|Room|LivingRoom|Area|FlatCount|HouseType|GardenArea|BalkonyCount|Pool|    City|  Price|
+----+----------+----+---------+---------+----------+------------+----+--------+-------+
|   1|         1|  73|        1|    Daire|         0|           0|   0|   İzmir| 507000|
|   2|         1|   2|        1|    Daire|         0|           1|   0|   Bursa| 723600|
|   4|         2| 180|        1|    Daire|         0|           2|   0|   İzmir|1280000|
|   3|         1| 102|        1|    Daire|        

In [3]:
#there are some empty HouseType values where the FlatCount = 1.
#Handle it so that when HouseType is NULL and flatCount = 1, then HouseType will be "Daire"

houseDF = houseDF.withColumn("HouseType", when((col("FlatCount") == "1") & (col("HouseType").isNull()), "Daire").otherwise(col("HouseType")))


#making sure each city is in the same format and capitalized
houseDF = houseDF.withColumn("City", initcap(col("City")))

#since cities are strings, changing them into integer index values for machine learning models
indexer = StringIndexer(inputCol="City",outputCol="indexedCity")
houseDF = indexer.fit(houseDF).transform(houseDF)

#also indexing house types for the same purpose
indexer = StringIndexer(inputCol="HouseType",outputCol="indexedHouseType")
houseDF = indexer.fit(houseDF).transform(houseDF)

encoder = OneHotEncoder(inputCol="indexedCity",outputCol="encodedCity")
houseDF = encoder.fit(houseDF).transform(houseDF)

encoder = OneHotEncoder(inputCol="indexedHouseType",outputCol="encodedHouseType")
houseDF = encoder.fit(houseDF).transform(houseDF)

houseDF.show(5)

+----+----------+----+---------+---------+----------+------------+----+------+-------+-----------+----------------+-------------+----------------+
|Room|LivingRoom|Area|FlatCount|HouseType|GardenArea|BalkonyCount|Pool|  City|  Price|indexedCity|indexedHouseType|  encodedCity|encodedHouseType|
+----+----------+----+---------+---------+----------+------------+----+------+-------+-----------+----------------+-------------+----------------+
|   1|         1|  73|        1|    Daire|         0|           0|   0|İzmir| 507000|        3.0|             0.0|(6,[3],[1.0])|   (2,[0],[1.0])|
|   2|         1|   2|        1|    Daire|         0|           1|   0| Bursa| 723600|        4.0|             0.0|(6,[4],[1.0])|   (2,[0],[1.0])|
|   4|         2| 180|        1|    Daire|         0|           2|   0|İzmir|1280000|        3.0|             0.0|(6,[3],[1.0])|   (2,[0],[1.0])|
|   3|         1| 102|        1|    Daire|         0|           0|   0| Bursa| 989000|        4.0|             0.0|(6,[4

In [4]:
#replacing outliers and NULL values with mean 
mean_value = houseDF.agg({"Room": "mean"}).collect()[0][0]

houseDF = houseDF.withColumn("Room", when((col("Room") > 10) | (col("Room") < 1) | (col("Room").isNull()), round(mean_value)).otherwise(col("Room")))

In [5]:
#replacing outliers and NULL values with mean 
mean_value = houseDF.agg({"LivingRoom": "mean"}).collect()[0][0]

houseDF = houseDF.withColumn("LivingRoom", when((col("LivingRoom") > 2) | (col("LivingRoom").isNull()), round(mean_value)).otherwise(col("LivingRoom")))


In [6]:
#replacing outliers and NULL values with mean 
mean_value = houseDF.agg({"Area": "mean"}).collect()[0][0]


houseDF = houseDF.withColumn("Area", when((col("Area").isNull()), round(mean_value)).otherwise(col("Area")))

In [7]:
#leave only needed columns

newDF = houseDF.select("Room","LivingRoom","Area","FlatCount","GardenArea","BalkonyCount","Pool",\
                        "encodedCity","encodedHouseType", "Price")

newDF.show(5)

+----+----------+----+---------+----------+------------+----+-------------+----------------+-------+
|Room|LivingRoom|Area|FlatCount|GardenArea|BalkonyCount|Pool|  encodedCity|encodedHouseType|  Price|
+----+----------+----+---------+----------+------------+----+-------------+----------------+-------+
|   1|         1|  73|        1|         0|           0|   0|(6,[3],[1.0])|   (2,[0],[1.0])| 507000|
|   2|         1|   2|        1|         0|           1|   0|(6,[4],[1.0])|   (2,[0],[1.0])| 723600|
|   4|         2| 180|        1|         0|           2|   0|(6,[3],[1.0])|   (2,[0],[1.0])|1280000|
|   3|         1| 102|        1|         0|           0|   0|(6,[4],[1.0])|   (2,[0],[1.0])| 989000|
|   4|         1|  66|        1|         0|           2|   0|(6,[1],[1.0])|   (2,[0],[1.0])| 679000|
+----+----------+----+---------+----------+------------+----+-------------+----------------+-------+
only showing top 5 rows



In [8]:
#replacing some questionable prices (such as 5) with the mean value
mean_value = newDF.agg({"Price": "mean"}).collect()[0][0]

newDF = newDF.withColumn("Price", when((col("Price") < 100), round(mean_value)).otherwise(col("Price")))


newDF.show(300)

+----+----------+----+---------+----------+------------+----+-------------+----------------+-------+
|Room|LivingRoom|Area|FlatCount|GardenArea|BalkonyCount|Pool|  encodedCity|encodedHouseType|  Price|
+----+----------+----+---------+----------+------------+----+-------------+----------------+-------+
|   1|         1|  73|        1|         0|           0|   0|(6,[3],[1.0])|   (2,[0],[1.0])| 507000|
|   2|         1|   2|        1|         0|           1|   0|(6,[4],[1.0])|   (2,[0],[1.0])| 723600|
|   4|         2| 180|        1|         0|           2|   0|(6,[3],[1.0])|   (2,[0],[1.0])|1280000|
|   3|         1| 102|        1|         0|           0|   0|(6,[4],[1.0])|   (2,[0],[1.0])| 989000|
|   4|         1|  66|        1|         0|           2|   0|(6,[1],[1.0])|   (2,[0],[1.0])| 679000|
|   2|         1|  83|        1|         0|           2|   0|(6,[2],[1.0])|   (2,[0],[1.0])| 543000|
|   2|         1|  98|        1|         0|           2|   0|(6,[1],[1.0])|   (2,[0],[1.0])

In [9]:
#MACHINE LEARNING

#using vector assembler to put all features together in one column
vec = VectorAssembler (inputCols=newDF.columns[0:8],outputCol="features")
newDF = vec.transform(newDF)

#renaming price to label. Also, selecting only "features" and "label" as final DF
newDF = newDF.withColumnRenamed("Price", "label")

finalDF = newDF.select("features","label")

finalDF.show()


+--------------------+-------+
|            features|  label|
+--------------------+-------+
|(13,[0,1,2,3,10],...| 507000|
|(13,[0,1,2,3,5,11...| 723600|
|(13,[0,1,2,3,5,10...|1280000|
|(13,[0,1,2,3,11],...| 989000|
|(13,[0,1,2,3,5,8]...| 679000|
|(13,[0,1,2,3,5,9]...| 543000|
|(13,[0,1,2,3,5,8]...| 800000|
|(13,[0,1,2,3,4,5,...|2558000|
|[5.0,1.0,208.0,2....|4528000|
|(13,[0,1,2,3,4,5,...|5154000|
|(13,[0,1,2,3,4,5,...|3954000|
|(13,[0,1,2,3,5,7]...|1200000|
|(13,[0,1,2,3,5,9]...| 569000|
|(13,[0,1,2,3,5,10...|1200000|
|(13,[0,1,2,3,5,8]...| 800000|
|(13,[0,1,2,3,5,11...| 823000|
|(13,[0,1,2,3,5,7]...|1120000|
|(13,[0,1,2,3,5,8]...| 717000|
|(13,[0,1,2,3,5,8]...| 800000|
|(13,[0,1,2,3,5,10...|1080000|
+--------------------+-------+
only showing top 20 rows



In [10]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

#to evaluate performance, creating a train/test split
trainDF, testDF = finalDF.randomSplit([0.8, 0.2], seed=42)

#creating random forest regressor to predict values. It has 100 trees 
rf = RandomForestRegressor(featuresCol="features", labelCol="label", numTrees=100)

rfModel = rf.fit(trainDF)

predictions = rfModel.transform(testDF)

#checking model performance with RMSE and R2 evaluators
eva = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = eva.evaluate(predictions)
print("Root Mean Squared Error of test data is", rmse)

eva = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="r2")
r2 = eva.evaluate(predictions)
print("R2 Score of the model is", r2)



Root Mean Squared Error of test data is 301878.943809593
R2 Score of the model is 0.989189437954039


In [11]:
#HYPER PARAMETER TUNING

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

rf = RandomForestRegressor()

#creating a grid of different hyperparameters to test which is better.
#Hyperparameters are Number of Trees and Maximum Depth
myParams = ParamGridBuilder()\
                            .addGrid(rf.numTrees,[5,10,50,100])\
                            .addGrid(rf.maxDepth,[5,6,7,8,9,10])\
                            .build()

#evaluate using R2 Score
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

validator = CrossValidator(estimator=rf,
                           estimatorParamMaps=myParams,
                           evaluator=evaluator,
                           numFolds=5,
                           parallelism=4,
                           collectSubModels=True)

tunedModel = validator.fit(trainDF)

resultDF = tunedModel.transform(testDF)

r2 = evaluator.evaluate(resultDF)

#finding parameters that gives best model:
print("Num Trees:", tunedModel.bestModel.getNumTrees)
print("Max Depth:", tunedModel.bestModel.getMaxDepth())
print("Best R2 Score:", r2)


24/01/05 22:09:35 WARN DAGScheduler: Broadcasting large task binary with size 1091.5 KiB
24/01/05 22:09:35 WARN DAGScheduler: Broadcasting large task binary with size 1091.5 KiB
24/01/05 22:09:36 WARN DAGScheduler: Broadcasting large task binary with size 1073.5 KiB
24/01/05 22:09:36 WARN DAGScheduler: Broadcasting large task binary with size 1091.5 KiB
24/01/05 22:09:37 WARN DAGScheduler: Broadcasting large task binary with size 1073.5 KiB
24/01/05 22:09:39 WARN BlockManager: Block rdd_1458_0 already exists on this machine; not re-adding it
24/01/05 22:09:46 WARN DAGScheduler: Broadcasting large task binary with size 1064.6 KiB
24/01/05 22:09:47 WARN DAGScheduler: Broadcasting large task binary with size 1064.6 KiB
24/01/05 22:09:47 WARN DAGScheduler: Broadcasting large task binary with size 1094.0 KiB
24/01/05 22:09:47 WARN DAGScheduler: Broadcasting large task binary with size 1064.6 KiB
24/01/05 22:09:47 WARN DAGScheduler: Broadcasting large task binary with size 1094.0 KiB
24/01/0

Num Trees: 100
Max Depth: 10
Best R2 Score: 0.9951930795545937
