In [1]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import col, isnan, when, count, upper, countDistinct, sqrt
import matplotlib.pyplot as plt
from pyspark.sql import Column
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
import psycopg2
import pandas as pd

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
host ="np-ado-assg2-group2-server.postgres.database.azure.com"
port = 5432
dbname = "postgres"
username = "ashycf@np-ado-assg2-group2-server"
pwd =  "03C988322c04"
table = "jan2017_onwards_dataset"
with psycopg2.connect("host='{}' port = {} dbname = '{}' user = {} password = {}". format(host, port, dbname, username, pwd)) as conn:
    sql = "select * from {};".format(table)
    df = pd.read_sql_query(sql, conn)

In [5]:
df = spark.createDataFrame(df) 
df.printSchema()
df.show()

root
 |-- town: string (nullable = true)
 |-- block: string (nullable = true)
 |-- month: string (nullable = true)
 |-- flat_type: string (nullable = true)
 |-- flat_model: string (nullable = true)
 |-- street_name: string (nullable = true)
 |-- resale_price: string (nullable = true)
 |-- storey_range: string (nullable = true)
 |-- floor_area_sqm: string (nullable = true)
 |-- remaining_lease: string (nullable = true)
 |-- lease_commence_date: string (nullable = true)
 |-- _airbyte_ab_id: string (nullable = true)
 |-- _airbyte_emitted_at: timestamp (nullable = true)
 |-- _airbyte_normalized_at: timestamp (nullable = true)
 |-- _airbyte_jan2017_onwards_dataset_hashid: string (nullable = true)

+----------+-----+-------+---------+--------------+-----------------+------------+------------+--------------+------------------+-------------------+--------------------+-------------------+----------------------+---------------------------------------+
|      town|block|  month|flat_type|    flat

In [6]:
df.show(10)

+----------+-----+-------+---------+--------------+-----------------+------------+------------+--------------+------------------+-------------------+--------------------+-------------------+----------------------+---------------------------------------+
|      town|block|  month|flat_type|    flat_model|      street_name|resale_price|storey_range|floor_area_sqm|   remaining_lease|lease_commence_date|      _airbyte_ab_id|_airbyte_emitted_at|_airbyte_normalized_at|_airbyte_jan2017_onwards_dataset_hashid|
+----------+-----+-------+---------+--------------+-----------------+------------+------------+--------------+------------------+-------------------+--------------------+-------------------+----------------------+---------------------------------------+
|ANG MO KIO|  406|2017-01|   2 ROOM|      Improved|ANG MO KIO AVE 10|    232000.0|    10 TO 12|          44.0|61 years 04 months|               1979|233cb434-cc39-4b8...|2023-01-25 23:49:29|  2023-01-25 23:50:...|                   165edd

In [7]:
df = df.drop("_airbyte_ab_id","_airbyte_emitted_at","_airbyte_jan2017_onwards_dataset_hashid","")

In [8]:
df.show()

+----------+-----+-------+---------+--------------+-----------------+------------+------------+--------------+------------------+-------------------+----------------------+
|      town|block|  month|flat_type|    flat_model|      street_name|resale_price|storey_range|floor_area_sqm|   remaining_lease|lease_commence_date|_airbyte_normalized_at|
+----------+-----+-------+---------+--------------+-----------------+------------+------------+--------------+------------------+-------------------+----------------------+
|ANG MO KIO|  406|2017-01|   2 ROOM|      Improved|ANG MO KIO AVE 10|    232000.0|    10 TO 12|          44.0|61 years 04 months|               1979|  2023-01-25 23:50:...|
|ANG MO KIO|  108|2017-01|   3 ROOM|New Generation| ANG MO KIO AVE 4|    250000.0|    01 TO 03|          67.0|60 years 07 months|               1978|  2023-01-25 23:50:...|
|ANG MO KIO|  602|2017-01|   3 ROOM|New Generation| ANG MO KIO AVE 5|    262000.0|    01 TO 03|          67.0|62 years 05 months|      

In [9]:
df = df.drop("_airbyte_normalized_at")

In [10]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+-----+-----+---------+----------+-----------+------------+------------+--------------+---------------+-------------------+
|town|block|month|flat_type|flat_model|street_name|resale_price|storey_range|floor_area_sqm|remaining_lease|lease_commence_date|
+----+-----+-----+---------+----------+-----------+------------+------------+--------------+---------------+-------------------+
|   0|    0|    0|        0|         0|          0|           0|           0|             0|              0|                  0|
+----+-----+-----+---------+----------+-----------+------------+------------+--------------+---------------+-------------------+



In [11]:
numlist = [item[0] for item in df.dtypes if (item[1].startswith('int') or item[1].startswith('double'))]
for i in numlist:
    if i != 'resale_price':
        df = df.withColumn(i, sqrt(col(i)))

In [12]:
strings_used = ["town", "block", 'street_name', 'flat_model','remaining_lease','floor_area_sqm','lease_commence_date','month']

stage_string = [StringIndexer(inputCol= c, outputCol= c+"_string_encoded") for c in strings_used]
stage_one_hot = [OneHotEncoder(inputCol= c+"_string_encoded", outputCol= c+ "_one_hot") for c in strings_used]

ppl = Pipeline(stages= stage_string + stage_one_hot)
df_ohe = ppl.fit(df).transform(df)

In [13]:
mapping= {
        '1 ROOM': '1',
        '2 ROOM': '2', '3 ROOM': '3', '4 ROOM': '4', '5 ROOM': '5', 'EXECUTIVE': '6', 'MULTI-GENERATION': '7'
    }
df_ohe = df_ohe.replace(to_replace=mapping, subset=['flat_type'])
df_ohe = df_ohe.withColumn("flat_type", df_ohe["flat_type"].cast(IntegerType()))

In [14]:
mapping= {
    '01 TO 03': "1",
    '04 TO 06': "2",
    '07 TO 09': "3",
    '10 TO 12': "4",
    '13 TO 15': "5",
    '16 TO 18': "6",
    '19 TO 21': "7",
    '22 TO 24': "8",
    '25 TO 27': "9",
    '28 TO 30': "10",
    '31 TO 33': "11",
    '34 TO 36': "12",
    '37 TO 39': "13",
    '40 TO 42': "14",
    '43 TO 45': "15",
    '46 TO 48': "16",
    '49 TO 51': "17",
        
    }
df_ohe = df_ohe.replace(to_replace=mapping, subset=['storey_range'])
df_ohe = df_ohe.withColumn("storey_range", df_ohe["storey_range"].cast(IntegerType()))

In [15]:
df_ohe = df_ohe.drop("town", "block", 'street_name', 'flat_model','remaining_lease','floor_area_sqm','lease_commence_date','month')

In [16]:
df_ohe.describe().show()

+-------+------------------+------------------+------------------+-------------------+--------------------+--------------------------+-------------------------+------------------------------+-----------------------------+----------------------------------+--------------------+
|summary|         flat_type|      resale_price|      storey_range|town_string_encoded|block_string_encoded|street_name_string_encoded|flat_model_string_encoded|remaining_lease_string_encoded|floor_area_sqm_string_encoded|lease_commence_date_string_encoded|month_string_encoded|
+-------+------------------+------------------+------------------+-------------------+--------------------+--------------------------+-------------------------+------------------------------+-----------------------------+----------------------------------+--------------------+
|  count|            114221|            114221|            114221|             114221|              114221|                    114221|                   114221|      

In [17]:
df_final = df_ohe.withColumn('resale_price_target', df_ohe.resale_price)
df_final = df_final.drop('resale_price')
df_final.show()

+---------+------------+-------------------+--------------------+--------------------------+-------------------------+------------------------------+-----------------------------+----------------------------------+--------------------+--------------+------------------+-------------------+------------------+-----------------------+----------------------+---------------------------+---------------+-------------------+
|flat_type|storey_range|town_string_encoded|block_string_encoded|street_name_string_encoded|flat_model_string_encoded|remaining_lease_string_encoded|floor_area_sqm_string_encoded|lease_commence_date_string_encoded|month_string_encoded|  town_one_hot|     block_one_hot|street_name_one_hot|flat_model_one_hot|remaining_lease_one_hot|floor_area_sqm_one_hot|lease_commence_date_one_hot|  month_one_hot|resale_price_target|
+---------+------------+-------------------+--------------------+--------------------------+-------------------------+------------------------------+-----------

In [36]:
df_final.printSchema()

root
 |-- flat_type: integer (nullable = true)
 |-- storey_range: integer (nullable = true)
 |-- town_string_encoded: double (nullable = false)
 |-- block_string_encoded: double (nullable = false)
 |-- street_name_string_encoded: double (nullable = false)
 |-- flat_model_string_encoded: double (nullable = false)
 |-- remaining_lease_string_encoded: double (nullable = false)
 |-- floor_area_sqm_string_encoded: double (nullable = false)
 |-- lease_commence_date_string_encoded: double (nullable = false)
 |-- month_string_encoded: double (nullable = false)
 |-- town_one_hot: vector (nullable = true)
 |-- block_one_hot: vector (nullable = true)
 |-- street_name_one_hot: vector (nullable = true)
 |-- flat_model_one_hot: vector (nullable = true)
 |-- remaining_lease_one_hot: vector (nullable = true)
 |-- floor_area_sqm_one_hot: vector (nullable = true)
 |-- lease_commence_date_one_hot: vector (nullable = true)
 |-- month_one_hot: vector (nullable = true)
 |-- resale_price_target: string (null

In [38]:
df_final = df_final.withColumn("resale_price_target",df_final.resale_price_target.cast('int'))

In [39]:
df_final.printSchema()

root
 |-- flat_type: integer (nullable = true)
 |-- storey_range: integer (nullable = true)
 |-- town_string_encoded: double (nullable = false)
 |-- block_string_encoded: double (nullable = false)
 |-- street_name_string_encoded: double (nullable = false)
 |-- flat_model_string_encoded: double (nullable = false)
 |-- remaining_lease_string_encoded: double (nullable = false)
 |-- floor_area_sqm_string_encoded: double (nullable = false)
 |-- lease_commence_date_string_encoded: double (nullable = false)
 |-- month_string_encoded: double (nullable = false)
 |-- town_one_hot: vector (nullable = true)
 |-- block_one_hot: vector (nullable = true)
 |-- street_name_one_hot: vector (nullable = true)
 |-- flat_model_one_hot: vector (nullable = true)
 |-- remaining_lease_one_hot: vector (nullable = true)
 |-- floor_area_sqm_one_hot: vector (nullable = true)
 |-- lease_commence_date_one_hot: vector (nullable = true)
 |-- month_one_hot: vector (nullable = true)
 |-- resale_price_target: integer (nul

In [40]:
(train, test) = df_final.randomSplit([0.8, 0.2], seed=0)

In [41]:
train.count(), len(train.columns)

(91355, 19)

In [42]:
test.count(), len(test.columns)

(22866, 19)

In [43]:
train.show(10, truncate = False)

+---------+------------+-------------------+--------------------+--------------------------+-------------------------+------------------------------+-----------------------------+----------------------------------+--------------------+---------------+-------------------+-------------------+------------------+-----------------------+----------------------+---------------------------+---------------+-------------------+
|flat_type|storey_range|town_string_encoded|block_string_encoded|street_name_string_encoded|flat_model_string_encoded|remaining_lease_string_encoded|floor_area_sqm_string_encoded|lease_commence_date_string_encoded|month_string_encoded|town_one_hot   |block_one_hot      |street_name_one_hot|flat_model_one_hot|remaining_lease_one_hot|floor_area_sqm_one_hot|lease_commence_date_one_hot|month_one_hot  |resale_price_target|
+---------+------------+-------------------+--------------------+--------------------------+-------------------------+------------------------------+-------

In [44]:
featureassembler=VectorAssembler(inputCols=train.columns[:-1],outputCol="Xcols")
train_vectored = featureassembler.transform(train)
test_vectored = featureassembler.transform(test)
train_vectored.show(5)

+---------+------------+-------------------+--------------------+--------------------------+-------------------------+------------------------------+-----------------------------+----------------------------------+--------------------+---------------+-------------------+-------------------+------------------+-----------------------+----------------------+---------------------------+---------------+-------------------+--------------------+
|flat_type|storey_range|town_string_encoded|block_string_encoded|street_name_string_encoded|flat_model_string_encoded|remaining_lease_string_encoded|floor_area_sqm_string_encoded|lease_commence_date_string_encoded|month_string_encoded|   town_one_hot|      block_one_hot|street_name_one_hot|flat_model_one_hot|remaining_lease_one_hot|floor_area_sqm_one_hot|lease_commence_date_one_hot|  month_one_hot|resale_price_target|               Xcols|
+---------+------------+-------------------+--------------------+--------------------------+----------------------

In [45]:
sScaler = StandardScaler(withMean=True, withStd=True, inputCol="Xcols", outputCol="Xcols_sscaled")
train_sscaled = sScaler.fit(train_vectored).transform(train_vectored)
train_sscaled.show()

+---------+------------+-------------------+--------------------+--------------------------+-------------------------+------------------------------+-----------------------------+----------------------------------+--------------------+---------------+-------------------+-------------------+------------------+-----------------------+----------------------+---------------------------+---------------+-------------------+--------------------+--------------------+
|flat_type|storey_range|town_string_encoded|block_string_encoded|street_name_string_encoded|flat_model_string_encoded|remaining_lease_string_encoded|floor_area_sqm_string_encoded|lease_commence_date_string_encoded|month_string_encoded|   town_one_hot|      block_one_hot|street_name_one_hot|flat_model_one_hot|remaining_lease_one_hot|floor_area_sqm_one_hot|lease_commence_date_one_hot|  month_one_hot|resale_price_target|               Xcols|       Xcols_sscaled|
+---------+------------+-------------------+--------------------+-------

In [46]:
test_sscaled = sScaler.fit(test_vectored).transform(test_vectored)
test_sscaled.show()

+---------+------------+-------------------+--------------------+--------------------------+-------------------------+------------------------------+-----------------------------+----------------------------------+--------------------+---------------+------------------+-------------------+------------------+-----------------------+----------------------+---------------------------+---------------+-------------------+--------------------+--------------------+
|flat_type|storey_range|town_string_encoded|block_string_encoded|street_name_string_encoded|flat_model_string_encoded|remaining_lease_string_encoded|floor_area_sqm_string_encoded|lease_commence_date_string_encoded|month_string_encoded|   town_one_hot|     block_one_hot|street_name_one_hot|flat_model_one_hot|remaining_lease_one_hot|floor_area_sqm_one_hot|lease_commence_date_one_hot|  month_one_hot|resale_price_target|               Xcols|       Xcols_sscaled|
+---------+------------+-------------------+--------------------+---------

In [47]:
train_data = train_sscaled.select("Xcols","resale_price_target")
train_data.show(10)

+--------------------+-------------------+
|               Xcols|resale_price_target|
+--------------------+-------------------+
|(4177,[0,1,2,3,4,...|             200000|
|(4177,[0,1,2,3,4,...|             218000|
|(4177,[0,1,2,3,4,...|             217000|
|(4177,[0,1,3,4,6,...|             240000|
|(4177,[0,1,2,3,4,...|             240000|
|(4177,[0,1,2,3,4,...|             250000|
|(4177,[0,1,2,3,4,...|             240000|
|(4177,[0,1,2,3,4,...|             246000|
|(4177,[0,1,2,3,4,...|             205000|
|(4177,[0,1,2,4,5,...|             205000|
+--------------------+-------------------+
only showing top 10 rows



In [48]:
test_data = test_sscaled.select("Xcols","resale_price_target")
test_data.show(10)

+--------------------+-------------------+
|               Xcols|resale_price_target|
+--------------------+-------------------+
|(4177,[0,1,2,3,4,...|             220000|
|(4177,[0,1,2,3,4,...|             230000|
|(4177,[0,1,2,3,4,...|             225000|
|(4177,[0,1,2,3,4,...|             250000|
|(4177,[0,1,2,3,4,...|             240000|
|(4177,[0,1,2,3,4,...|             238000|
|(4177,[0,1,2,3,4,...|             245000|
|(4177,[0,1,2,3,4,...|             255000|
|(4177,[0,1,2,3,4,...|             240000|
|(4177,[0,1,2,3,4,...|             205000|
+--------------------+-------------------+
only showing top 10 rows



In [49]:
train_data_scaled = train_sscaled.select("Xcols_sscaled","resale_price_target")

In [50]:
test_data_scaled = test_sscaled.select("Xcols_sscaled","resale_price_target")

In [51]:
regressor=LinearRegression(featuresCol="Xcols", labelCol='resale_price_target')
regressor=regressor.fit(train_data)

In [52]:
train_pred_results=regressor.evaluate(train_data)
train_pred_results.predictions.show()

+--------------------+-------------------+------------------+
|               Xcols|resale_price_target|        prediction|
+--------------------+-------------------+------------------+
|(4177,[0,1,2,3,4,...|             200000|191966.62397878134|
|(4177,[0,1,2,3,4,...|             218000|191966.62397878134|
|(4177,[0,1,2,3,4,...|             217000|  215130.337164209|
|(4177,[0,1,3,4,6,...|             240000|186193.04571980634|
|(4177,[0,1,2,3,4,...|             240000|217428.21800692278|
|(4177,[0,1,2,3,4,...|             250000| 261858.7183465945|
|(4177,[0,1,2,3,4,...|             240000|191302.69635636252|
|(4177,[0,1,2,3,4,...|             246000| 192017.5278576208|
|(4177,[0,1,2,3,4,...|             205000|183793.28510047455|
|(4177,[0,1,2,4,5,...|             205000|226889.47511410966|
|(4177,[0,1,2,3,4,...|             260000|203146.16887530076|
|(4177,[0,1,2,3,4,...|             215000|226027.41481083012|
|(4177,[0,1,2,3,4,...|             256000| 267153.3494372277|
|(4177,[

In [53]:
test_pred_results=regressor.evaluate(test_data)
test_pred_results.predictions.show()

+--------------------+-------------------+------------------+
|               Xcols|resale_price_target|        prediction|
+--------------------+-------------------+------------------+
|(4177,[0,1,2,3,4,...|             220000| 193250.0832874582|
|(4177,[0,1,2,3,4,...|             230000| 220375.4535684948|
|(4177,[0,1,2,3,4,...|             225000| 275531.6623782346|
|(4177,[0,1,2,3,4,...|             250000|279076.77490534814|
|(4177,[0,1,2,3,4,...|             240000| 246680.1315682449|
|(4177,[0,1,2,3,4,...|             238000|221182.46494629182|
|(4177,[0,1,2,3,4,...|             245000|208663.61537480954|
|(4177,[0,1,2,3,4,...|             255000|247078.63815296703|
|(4177,[0,1,2,3,4,...|             240000|226875.77398549768|
|(4177,[0,1,2,3,4,...|             205000|246978.63471477557|
|(4177,[0,1,2,3,4,...|             250000|230008.71875772774|
|(4177,[0,1,2,3,4,...|             255000|  274185.081850845|
|(4177,[0,1,2,3,4,...|             229000| 238712.0696715629|
|(4177,[

In [54]:
train_pred_results.meanAbsoluteError, train_pred_results.meanSquaredError, train_pred_results.r2

(26146.800546172282, 1259362819.1452284, 0.953176524904894)

In [55]:
test_pred_results.meanAbsoluteError, test_pred_results.meanSquaredError, test_pred_results.r2

(27605.582737245302, 1394974634.71882, 0.9491501769823724)