<table class="table table-bordered">
    <tr>
        <th style="width:250px"><img src='https://www.np.edu.sg/images/default-source/default-album/img-logo.png?sfvrsn=764583a6_0' style="width: 100%; height: 125px; "></th>
        <th style="text-align:center;"><h1>Distributed Data Pipelines</h1><h2>Assignment 1 </h2><h3>Diploma in Data Science</h3></th>
    </tr>
</table>

Learning Objectives:
- Design PySpark Based Machine Learning
- Execute PySpark Syntax Correctly
- Evaluate and Select Final Model based on Metrics

You will be **graded on the use of PySpark**, so usage of **Pandas itself should be avoided as much as possible**, especially if a particular native method or function is already available in PySpark. **Penalties will be imposed in such cases.**

# Table of Contents <a id = "top"></a>

### 0. [Import Libraries](#0)

### 1. [Problem Statement Formulation](#1)

### 2. [Exploratory Data Analysis and Data Cleansing](#2)

### 3. [Data Wrangling and Transformation](#3)
- [3.1 Remove Redundant Columns](#3.1)
- [3.2 Outlier Removal](#3.2)
- [3.2 Check for Rare Values](#3.3)
- [3.3 StringIndex](#3.4)
- [3.4 OHE](#3.5)
- [3.5 Consolidating X columns](#3.6)
- [3.6 Standard Scaling](#3.7)
- [3.7 Train-test Split](#3.8)

### 4. [Machine Learning Modelling](#4)

### 5. [Model Evaluation and Selection](#5)

### 6. [Report](#6)
- [6.1 Problem Statement Formulation](#6.1)
- [6.2 Exploratory Data Analysis and Data Cleansing](#6.2)
- [6.3 Data Wrangling and Transformation](#6.3)
- [6.4 Machine Learning Modelling](#6.4)
- [6.5 Model Evaluation and Selection](#6.5)
- [6.6 Summary and Further Improvements](#6.6)

### 7. ["Unlisted" Youtube Link to Video Presentation](#7)

# 0. Import Libraries <a id = "0"></a>

[Back to top](#top)

In [1]:
# import the packages
import pyspark.sql
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression

In [2]:
# Start session
spark = SparkSession.builder.appName('DDP_ASG1').getOrCreate()

# 1. Problem Statement Formulation <a id = "1"></a>

[Back to top](#top)

In [3]:
# What columns can be used to predict resale price?

In [4]:
# load and explore data
df = spark.read.csv('./data/sg_flat_prices_mod.csv', header = True, inferSchema = True)

In [5]:
df.show(10)

+----+-----+----------+---------+-----+-----------------+------------+--------------+--------------+-------------------+---------------+------------+
|year|month|      town|flat_type|block|      street_name|storey_range|floor_area_sqm|    flat_model|lease_commence_date|remaining_lease|resale_price|
+----+-----+----------+---------+-----+-----------------+------------+--------------+--------------+-------------------+---------------+------------+
|2017|    1|ANG MO KIO|   2 ROOM|  406|ANG MO KIO AVE 10|    10 TO 12|          44.0|      Improved|               1979|            736|    232000.0|
|2017|    1|ANG MO KIO|   3 ROOM|  108| ANG MO KIO AVE 4|    01 TO 03|          67.0|New Generation|               1978|            727|    250000.0|
|2017|    1|ANG MO KIO|   3 ROOM|  602| ANG MO KIO AVE 5|    01 TO 03|          67.0|New Generation|               1980|            749|    262000.0|
|2017|    1|ANG MO KIO|   3 ROOM|  465|ANG MO KIO AVE 10|    04 TO 06|          68.0|New Generation|

In [6]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- town: string (nullable = true)
 |-- flat_type: string (nullable = true)
 |-- block: string (nullable = true)
 |-- street_name: string (nullable = true)
 |-- storey_range: string (nullable = true)
 |-- floor_area_sqm: double (nullable = true)
 |-- flat_model: string (nullable = true)
 |-- lease_commence_date: integer (nullable = true)
 |-- remaining_lease: integer (nullable = true)
 |-- resale_price: double (nullable = true)



# 2. Exploratory Data Analysis and Data Cleansing  <a id = "2"></a>

[Back to top](#top)

In [7]:
df.select([f.count(f.when(f.isnan(c) | f.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+-----+----+---------+-----+-----------+------------+--------------+----------+-------------------+---------------+------------+
|year|month|town|flat_type|block|street_name|storey_range|floor_area_sqm|flat_model|lease_commence_date|remaining_lease|resale_price|
+----+-----+----+---------+-----+-----------+------------+--------------+----------+-------------------+---------------+------------+
|   0|    0|   0|        0|    0|          0|           0|            50|         0|                  0|              0|           0|
+----+-----+----+---------+-----+-----------+------------+--------------+----------+-------------------+---------------+------------+



In [8]:
# Replace nulls in floor_area_sqm with median
imputer = Imputer(inputCols = ['floor_area_sqm'], 
                  outputCols = ["floor_area_sqm_imputed"]
                 ).setStrategy("median")

# Add imputation cols to df
df_imputed = imputer.fit(df).transform(df)

In [9]:
# Check for nulls again
df_imputed.select([f.count(f.when(f.isnan(c) | f.col(c).isNull(), c)).alias(c) for c in df_imputed.columns]).show()

+----+-----+----+---------+-----+-----------+------------+--------------+----------+-------------------+---------------+------------+----------------------+
|year|month|town|flat_type|block|street_name|storey_range|floor_area_sqm|flat_model|lease_commence_date|remaining_lease|resale_price|floor_area_sqm_imputed|
+----+-----+----+---------+-----+-----------+------------+--------------+----------+-------------------+---------------+------------+----------------------+
|   0|    0|   0|        0|    0|          0|           0|            50|         0|                  0|              0|           0|                     0|
+----+-----+----+---------+-----+-----------+------------+--------------+----------+-------------------+---------------+------------+----------------------+



# 3. Data Wrangling and Transformation  <a id = "3"></a>

[Back to top](#top)

## 3.1 Remove Redundant Columns  <a id = "3.1"></a>

[Back to top](#top)

In [10]:
df_imputed = df_imputed.select('town', 'flat_type', 'flat_model', 'remaining_lease', 'floor_area_sqm_imputed', 
                               'resale_price')
df_imputed.show(10)

+----------+---------+--------------+---------------+----------------------+------------+
|      town|flat_type|    flat_model|remaining_lease|floor_area_sqm_imputed|resale_price|
+----------+---------+--------------+---------------+----------------------+------------+
|ANG MO KIO|   2 ROOM|      Improved|            736|                  44.0|    232000.0|
|ANG MO KIO|   3 ROOM|New Generation|            727|                  67.0|    250000.0|
|ANG MO KIO|   3 ROOM|New Generation|            749|                  67.0|    262000.0|
|ANG MO KIO|   3 ROOM|New Generation|            745|                  68.0|    265000.0|
|ANG MO KIO|   3 ROOM|New Generation|            749|                  67.0|    265000.0|
|ANG MO KIO|   3 ROOM|New Generation|            756|                  68.0|    275000.0|
|ANG MO KIO|   3 ROOM|New Generation|            738|                  68.0|    280000.0|
|ANG MO KIO|   3 ROOM|New Generation|            700|                  67.0|    285000.0|
|ANG MO KI

## 3.2 Outlier Removal  <a id = "3.2"></a>

[Back to top](#top)

reference: https://github.com/Rajshekar-2021/Outlier-Detection-PYSPARK/blob/main/Customer_Data_Outliers_pyspark.ipynb

In [11]:
def find_outliers(df):

    # Identifying the numerical columns in a spark dataframe
    numeric_columns = [column[0] for column in df.dtypes if column[1] in ['int', 'double']]

    # Using the `for` loop to create new columns by identifying the outliers for each feature
    for column in numeric_columns:

        less_Q1 = 'less_Q1_{}'.format(column)
        more_Q3 = 'more_Q3_{}'.format(column)
        Q1 = 'Q1_{}'.format(column)
        Q3 = 'Q3_{}'.format(column)

        # Q1 : First Quartile ., Q3 : Third Quartile
        Q1 = df.approxQuantile(column, [0.25], relativeError=0)
        Q3 = df.approxQuantile(column, [0.75], relativeError=0)
        
        # IQR : Inter Quantile Range
        # We need to define the index [0], as Q1 & Q3 are a set of lists., to perform a mathematical operation
        # Q1 & Q3 are defined seperately so as to have a clear indication on First Quantile & 3rd Quantile
        IQR = Q3[0] - Q1[0]
        
        #selecting the data, with -1.5*IQR to + 1.5*IQR., where param = 1.5 default value
        less_Q1 =  Q1[0] - 1.5*IQR
        more_Q3 =  Q3[0] + 1.5*IQR
        
        isOutlierCol = 'is_outlier_{}'.format(column)
        
        df = df.withColumn(isOutlierCol, f.when((df[column] > more_Q3) | (df[column] < less_Q1), 1).otherwise(0))
    
    # Selecting the specific columns which we have added above, to check if there are any outliers
    selected_columns = [column for column in df.columns if column.startswith("is_outlier")]

    # Adding all the outlier columns into a new colum "total_outliers", to see the total number of outliers
    df = df.withColumn('total_outliers', sum(df[column] for column in selected_columns))

    # Dropping the extra columns created above, just to create nice dataframe., without extra columns
    df = df.drop(*[column for column in df.columns if column.startswith("is_outlier")])

    return df

In [12]:
# Create a new dataframe that shows the number of outliers
new_df = find_outliers(df_imputed)
new_df.show(10)

+----------+---------+--------------+---------------+----------------------+------------+--------------+
|      town|flat_type|    flat_model|remaining_lease|floor_area_sqm_imputed|resale_price|total_outliers|
+----------+---------+--------------+---------------+----------------------+------------+--------------+
|ANG MO KIO|   2 ROOM|      Improved|            736|                  44.0|    232000.0|             0|
|ANG MO KIO|   3 ROOM|New Generation|            727|                  67.0|    250000.0|             0|
|ANG MO KIO|   3 ROOM|New Generation|            749|                  67.0|    262000.0|             0|
|ANG MO KIO|   3 ROOM|New Generation|            745|                  68.0|    265000.0|             0|
|ANG MO KIO|   3 ROOM|New Generation|            749|                  67.0|    265000.0|             0|
|ANG MO KIO|   3 ROOM|New Generation|            756|                  68.0|    275000.0|             0|
|ANG MO KIO|   3 ROOM|New Generation|            738|  

In [13]:
# Creates a new dataframe that filters out outliers based on a threshhold
df2 = new_df.filter(new_df['total_Outliers'] == 0)

# Filter out total_outliers column
df2 = df2.drop('total_outliers')

# Show dataframe
df2.show(10)

+----------+---------+--------------+---------------+----------------------+------------+
|      town|flat_type|    flat_model|remaining_lease|floor_area_sqm_imputed|resale_price|
+----------+---------+--------------+---------------+----------------------+------------+
|ANG MO KIO|   2 ROOM|      Improved|            736|                  44.0|    232000.0|
|ANG MO KIO|   3 ROOM|New Generation|            727|                  67.0|    250000.0|
|ANG MO KIO|   3 ROOM|New Generation|            749|                  67.0|    262000.0|
|ANG MO KIO|   3 ROOM|New Generation|            745|                  68.0|    265000.0|
|ANG MO KIO|   3 ROOM|New Generation|            749|                  67.0|    265000.0|
|ANG MO KIO|   3 ROOM|New Generation|            756|                  68.0|    275000.0|
|ANG MO KIO|   3 ROOM|New Generation|            738|                  68.0|    280000.0|
|ANG MO KIO|   3 ROOM|New Generation|            700|                  67.0|    285000.0|
|ANG MO KI

In [14]:
# Show the original row count
df.count()

64247

In [15]:
# Show row count after outlier removal
# Original row count was 64247
df2.count()

61430

In [16]:
# Show how many rows were removed
print(f'Number of rows removed: {df.count() - df2.count()}')

Number of rows removed: 2817


## 3.3 Check for Rare Values  <a id = "3.3"></a>

[Back to top](#top)

In [17]:
# Count the number of unique values in flat_type
df2.groupby('flat_type').count().alias('Count').sort(f.desc('Count')).show()

+----------------+-----+
|       flat_type|count|
+----------------+-----+
|          4 ROOM|26102|
|          3 ROOM|15571|
|          5 ROOM|14456|
|       EXECUTIVE| 4386|
|          2 ROOM|  911|
|MULTI-GENERATION|    4|
+----------------+-----+



In [18]:
# Replace values with <1000 count
# 2 ROOM, MULTI-GENERATION
temp:list = ["2 ROOM", "MULTI-GENERATION"]

# Replace rare values
df2 = df2.replace(temp, "RARE", 'flat_type')

df2.groupby('flat_type').count().alias('Count').sort(f.desc('Count')).show(23)

+---------+-----+
|flat_type|count|
+---------+-----+
|   4 ROOM|26102|
|   3 ROOM|15571|
|   5 ROOM|14456|
|EXECUTIVE| 4386|
|     RARE|  915|
+---------+-----+



In [19]:
df2.groupby('town').count().alias('Count').sort(f.desc('Count')).show(26)

+---------------+-----+
|           town|count|
+---------------+-----+
|       SENGKANG| 4970|
|    JURONG WEST| 4944|
|      WOODLANDS| 4798|
|         YISHUN| 4287|
|       TAMPINES| 4007|
|        PUNGGOL| 4006|
|          BEDOK| 3369|
|        HOUGANG| 2927|
|     ANG MO KIO| 2763|
|  CHOA CHU KANG| 2657|
|    BUKIT BATOK| 2443|
|  BUKIT PANJANG| 2382|
|    BUKIT MERAH| 2107|
|      PASIR RIS| 1826|
|      TOA PAYOH| 1824|
|      SEMBAWANG| 1754|
|KALLANG/WHAMPOA| 1726|
|        GEYLANG| 1470|
|    JURONG EAST| 1416|
|     QUEENSTOWN| 1415|
|       CLEMENTI| 1300|
|      SERANGOON| 1263|
|         BISHAN|  976|
|   CENTRAL AREA|  364|
|  MARINE PARADE|  325|
|    BUKIT TIMAH|  111|
+---------------+-----+



In [20]:
# Replace values with <1000 count
# BISHAN, CENTRAL AREA, MARINE PARADE, BUKIT TIMAH
temp:list = ["BISHAN", "CENTRAL AREA", "MARINE PARADE", "BUKIT TIMAH"]

# Replace rare values
df2 = df2.replace(temp, "RARE", 'town')

df2.groupby('town').count().alias('Count').sort(f.desc('Count')).show(23)

+---------------+-----+
|           town|count|
+---------------+-----+
|       SENGKANG| 4970|
|    JURONG WEST| 4944|
|      WOODLANDS| 4798|
|         YISHUN| 4287|
|       TAMPINES| 4007|
|        PUNGGOL| 4006|
|          BEDOK| 3369|
|        HOUGANG| 2927|
|     ANG MO KIO| 2763|
|  CHOA CHU KANG| 2657|
|    BUKIT BATOK| 2443|
|  BUKIT PANJANG| 2382|
|    BUKIT MERAH| 2107|
|      PASIR RIS| 1826|
|      TOA PAYOH| 1824|
|           RARE| 1776|
|      SEMBAWANG| 1754|
|KALLANG/WHAMPOA| 1726|
|        GEYLANG| 1470|
|    JURONG EAST| 1416|
|     QUEENSTOWN| 1415|
|       CLEMENTI| 1300|
|      SERANGOON| 1263|
+---------------+-----+



In [21]:
df2.groupby('flat_model').count().alias('Count').sort(f.desc('Count')).show()

+-------------------+-----+
|         flat_model|count|
+-------------------+-----+
|            Model A|20434|
|           Improved|15279|
|     New Generation| 9068|
|  Premium Apartment| 6896|
|         Simplified| 2754|
|          Apartment| 2224|
|           Standard| 1692|
|         Maisonette| 1565|
|           Model A2|  885|
|               DBSS|  441|
| Model A-Maisonette|   76|
|      Adjoined flat|   76|
|            Terrace|   19|
|Improved-Maisonette|   13|
|   Multi Generation|    4|
| Premium Maisonette|    4|
+-------------------+-----+



In [22]:
# Replace values with <500 count
# DBSS, Adjoined flat, Model A-Maisonette, Terrace, Improved-Maisonette, Premium Maisonette, Multi Generation
temp:list = ["DBSS", "Adjoined flat", "Model A-Maisonette", "Terrace", "Improved-Maisonette", "Premium Maisonette", 
             "Multi Generation"]

# Replace rare values
df2 = df2.replace(temp, "RARE", 'flat_model')

df2.groupby('flat_model').count().alias('Count').sort(f.desc('Count')).show()

+-----------------+-----+
|       flat_model|count|
+-----------------+-----+
|          Model A|20434|
|         Improved|15279|
|   New Generation| 9068|
|Premium Apartment| 6896|
|       Simplified| 2754|
|        Apartment| 2224|
|         Standard| 1692|
|       Maisonette| 1565|
|         Model A2|  885|
|             RARE|  633|
+-----------------+-----+



## 3.4 StringIndex <a id = "3.4"></a>

[Back to top](#top)

In [23]:
# create indexer
strings_used:list = ["town", "flat_type", "flat_model"]
stage_string:list = [StringIndexer(inputCol = c, outputCol = c + "_string_encoded") for c in strings_used]

In [24]:
# create pipeline
pipeline = Pipeline(stages = stage_string)

# index string columns
df_string_indexed = pipeline.fit(df2).transform(df2)

# show new dataframe
df_string_indexed.show(10)

+----------+---------+--------------+---------------+----------------------+------------+-------------------+------------------------+-------------------------+
|      town|flat_type|    flat_model|remaining_lease|floor_area_sqm_imputed|resale_price|town_string_encoded|flat_type_string_encoded|flat_model_string_encoded|
+----------+---------+--------------+---------------+----------------------+------------+-------------------+------------------------+-------------------------+
|ANG MO KIO|     RARE|      Improved|            736|                  44.0|    232000.0|                8.0|                     4.0|                      1.0|
|ANG MO KIO|   3 ROOM|New Generation|            727|                  67.0|    250000.0|                8.0|                     1.0|                      2.0|
|ANG MO KIO|   3 ROOM|New Generation|            749|                  67.0|    262000.0|                8.0|                     1.0|                      2.0|
|ANG MO KIO|   3 ROOM|New Generati

## 3.5 OHE <a id = "3.5"></a>

[Back to top](#top)

In [25]:
# create encoder
stage_one_hot:list = [OneHotEncoder(inputCol = c + "_string_encoded", outputCol = c + "_one_hot") for c in strings_used]

# create pipeline
ppl = Pipeline(stages = stage_one_hot)

# encode string columns
df_ohe = ppl.fit(df_string_indexed).transform(df_string_indexed)

# show new dataframe
df_ohe.show(10)

+----------+---------+--------------+---------------+----------------------+------------+-------------------+------------------------+-------------------------+--------------+-----------------+------------------+
|      town|flat_type|    flat_model|remaining_lease|floor_area_sqm_imputed|resale_price|town_string_encoded|flat_type_string_encoded|flat_model_string_encoded|  town_one_hot|flat_type_one_hot|flat_model_one_hot|
+----------+---------+--------------+---------------+----------------------+------------+-------------------+------------------------+-------------------------+--------------+-----------------+------------------+
|ANG MO KIO|     RARE|      Improved|            736|                  44.0|    232000.0|                8.0|                     4.0|                      1.0|(22,[8],[1.0])|        (4,[],[])|     (9,[1],[1.0])|
|ANG MO KIO|   3 ROOM|New Generation|            727|                  67.0|    250000.0|                8.0|                     1.0|              

## 3.6 Consolidating X columns <a id = "3.6"></a>

[Back to top](#top)

In [26]:
# Filter dataset to remove original string columns and _string_encoded columns
df_vector = df_ohe.select("floor_area_sqm_imputed", "remaining_lease", "town_one_hot", 
                          "flat_type_one_hot", "flat_model_one_hot", "resale_price")

# Combine x_values into a single column
featureassembler = VectorAssembler(inputCols = df_vector.columns[:-1], outputCol = "Xcols")
df_vector = featureassembler.transform(df_vector)
df_vector.show(10)

+----------------------+---------------+--------------+-----------------+------------------+------------+--------------------+
|floor_area_sqm_imputed|remaining_lease|  town_one_hot|flat_type_one_hot|flat_model_one_hot|resale_price|               Xcols|
+----------------------+---------------+--------------+-----------------+------------------+------------+--------------------+
|                  44.0|            736|(22,[8],[1.0])|        (4,[],[])|     (9,[1],[1.0])|    232000.0|(37,[0,1,10,29],[...|
|                  67.0|            727|(22,[8],[1.0])|    (4,[1],[1.0])|     (9,[2],[1.0])|    250000.0|(37,[0,1,10,25,30...|
|                  67.0|            749|(22,[8],[1.0])|    (4,[1],[1.0])|     (9,[2],[1.0])|    262000.0|(37,[0,1,10,25,30...|
|                  68.0|            745|(22,[8],[1.0])|    (4,[1],[1.0])|     (9,[2],[1.0])|    265000.0|(37,[0,1,10,25,30...|
|                  67.0|            749|(22,[8],[1.0])|    (4,[1],[1.0])|     (9,[2],[1.0])|    265000.0|(37,[0

## 3.7 Standard Scaling <a id = "3.7"></a>

[Back to top](#top)

In [27]:
# Scale data
sScaler = StandardScaler(withMean = True, withStd = True, inputCol = "Xcols", outputCol = "Xcols_sscaled")
df_scaled = sScaler.fit(df_vector).transform(df_vector)
df_scaled.show(10)

+----------------------+---------------+--------------+-----------------+------------------+------------+--------------------+--------------------+
|floor_area_sqm_imputed|remaining_lease|  town_one_hot|flat_type_one_hot|flat_model_one_hot|resale_price|               Xcols|       Xcols_sscaled|
+----------------------+---------------+--------------+-----------------+------------------+------------+--------------------+--------------------+
|                  44.0|            736|(22,[8],[1.0])|        (4,[],[])|     (9,[1],[1.0])|    232000.0|(37,[0,1,10,29],[...|[-2.2420629029525...|
|                  67.0|            727|(22,[8],[1.0])|    (4,[1],[1.0])|     (9,[2],[1.0])|    250000.0|(37,[0,1,10,25,30...|[-1.2611984221375...|
|                  67.0|            749|(22,[8],[1.0])|    (4,[1],[1.0])|     (9,[2],[1.0])|    262000.0|(37,[0,1,10,25,30...|[-1.2611984221375...|
|                  68.0|            745|(22,[8],[1.0])|    (4,[1],[1.0])|     (9,[2],[1.0])|    265000.0|(37,[0,

## 3.8 Train-test Split <a id = "3.8"></a>

[Back to top](#top)

In [28]:
# Create final dataframe that only has Xcols_sscaled and resale_price
df_final = df_scaled.select("Xcols_sscaled", "resale_price")
df_final.show(10)

+--------------------+------------+
|       Xcols_sscaled|resale_price|
+--------------------+------------+
|[-2.2420629029525...|    232000.0|
|[-1.2611984221375...|    250000.0|
|[-1.2611984221375...|    262000.0|
|[-1.2185521403630...|    265000.0|
|[-1.2611984221375...|    265000.0|
|[-1.2185521403630...|    275000.0|
|[-1.2185521403630...|    280000.0|
|[-1.2611984221375...|    285000.0|
|[-1.2185521403630...|    285000.0|
|[-1.2611984221375...|    285000.0|
+--------------------+------------+
only showing top 10 rows



In [29]:
# generate the train/test split
(train, test) = df_final.randomSplit([0.7, 0.3], seed = 0)

# 4. Machine Learning Modeling  <a id = "4"></a>

[Back to top](#top)

In [30]:
train.show(10)

+--------------------+------------+
|       Xcols_sscaled|resale_price|
+--------------------+------------+
|[-2.4979405935998...|    191000.0|
|[-2.4126480300507...|    200000.0|
|[-2.4126480300507...|    200000.0|
|[-2.4126480300507...|    230000.0|
|[-2.4126480300507...|    195000.0|
|[-2.4126480300507...|    215000.0|
|[-2.4126480300507...|    215000.0|
|[-2.4126480300507...|    210000.0|
|[-2.4126480300507...|    205000.0|
|[-2.4126480300507...|    205000.0|
+--------------------+------------+
only showing top 10 rows



In [31]:
print(f'Number of train rows: {train.count()}')
print(f'Number of train columns: {len(train.columns)}')

Number of train rows: 43013
Number of train columns: 2


In [32]:
test.show(10)

+--------------------+------------+
|       Xcols_sscaled|resale_price|
+--------------------+------------+
|[-2.5405868753744...|    210000.0|
|[-2.4126480300507...|    182000.0|
|[-2.4126480300507...|    215000.0|
|[-2.4126480300507...|    208000.0|
|[-2.4126480300507...|    218000.0|
|[-2.3700017482761...|    207000.0|
|[-2.3700017482761...|    225000.0|
|[-2.3700017482761...|    186000.0|
|[-2.3700017482761...|    268000.0|
|[-2.3273554665016...|    180000.0|
+--------------------+------------+
only showing top 10 rows



In [33]:
print(f'Number of train rows: {test.count()}')
print(f'Number of train columns: {len(test.columns)}')

Number of train rows: 18417
Number of train columns: 2


In [34]:
regressor = LinearRegression(featuresCol = "Xcols_sscaled", labelCol = 'resale_price')
regressor = regressor.fit(train)

# 5. Model Evaluation and Selection  <a id = "5"></a>

[Back to top](#top)

In [35]:
# get y_train
train_pred_results = regressor.evaluate(train)

# show y_train
train_pred_results.predictions.show(10)

+--------------------+------------+------------------+
|       Xcols_sscaled|resale_price|        prediction|
+--------------------+------------+------------------+
|[-2.4979405935998...|    191000.0|173136.84017379128|
|[-2.4126480300507...|    200000.0|175002.35294644226|
|[-2.4126480300507...|    200000.0| 175754.9252636602|
|[-2.4126480300507...|    230000.0|178765.21453253203|
|[-2.4126480300507...|    195000.0|181022.93148418586|
|[-2.4126480300507...|    215000.0|181775.50380140386|
|[-2.4126480300507...|    215000.0|180208.76546584966|
|[-2.4126480300507...|    210000.0|188119.15430662787|
|[-2.4126480300507...|    205000.0|186981.91632081135|
|[-2.4126480300507...|    205000.0| 248754.4400338022|
+--------------------+------------+------------------+
only showing top 10 rows



In [36]:
# get y_test
test_pred_results = regressor.evaluate(test)

# show y_test
test_pred_results.predictions.show(10)

+--------------------+------------+------------------+
|       Xcols_sscaled|resale_price|        prediction|
+--------------------+------------+------------------+
|[-2.5405868753744...|    210000.0| 48706.06897278002|
|[-2.4126480300507...|    182000.0|178765.21453253203|
|[-2.4126480300507...|    215000.0|182904.36227723077|
|[-2.4126480300507...|    208000.0|  169718.157503489|
|[-2.4126480300507...|    218000.0|170847.01597931597|
|[-2.3700017482761...|    207000.0|183775.52496243804|
|[-2.3700017482761...|    225000.0| 186409.5280727009|
|[-2.3700017482761...|    186000.0|188290.95886574584|
|[-2.3700017482761...|    268000.0|279131.85112719826|
|[-2.3273554665016...|    180000.0|176438.36536935563|
+--------------------+------------+------------------+
only showing top 10 rows



In [37]:
# Train MAE, MSE, R2
train_mae = train_pred_results.meanAbsoluteError
train_mse = train_pred_results.meanSquaredError
train_r2 = train_pred_results.r2

print(f'Train MAE: {train_mae}')
print(f'Train MSE: {train_mse}')
print(f'Train R2: {train_r2}')

Train MAE: 42821.10377449093
Train MSE: 3071397777.332468
Train R2: 0.8085364266769526


In [38]:
# Test MAE, MSE, R2
test_mae = test_pred_results.meanAbsoluteError
test_mse = test_pred_results.meanSquaredError
test_r2 = test_pred_results.r2

print(f'Test MAE: {test_mae}')
print(f'Test MSE: {test_mse}')
print(f'Test R2: {test_r2}')

Test MAE: 42108.76809618526
Test MSE: 2989888821.967632
Test R2: 0.8128824075980264


# 6. Report  <a id = "6"></a>

[Back to top](#top)

## 6.1 Problem Statement Formulation <a id = "6.1"></a>

[Back to top](#top)

This report contains the documentation of the analysis and findings from the `sg_flat_prices_mod` dataset. The objectives of this report are:

- To formulate a problem statement, conduct data preparation, exploration, and analysis through visualisation and statistical methods
- To prepare the data ready for machine learning

This report will also explain problems for each dataset, the steps taken, and solutions used to modify the datasets.


After loading the data, the `.printSchema()` function shows the data type of each column and shows that some are strings, integers, and doubles.

This notebook will explore which columns are significant in affecting resale_price.

## 6.2 Exploratory Data Analysis and Data Cleansing <a id = "6.2"></a>

[Back to top](#top)

Upon loading the data, `df.select([f.count(f.when(f.isnan(c) | f.col(c).isNull(), c)).alias(c) for c in df.columns]).show()` is used to check for null values. From here, it is observed that `floor_area_sqm` contains 50 nulls.

An Imputer is used to replace the null values with the median because it is unaffected by extreme values. Using the same code as before to show the null values, a new column is shown called `floor_area_sqm_imputed` which contains 0 null values.

## 6.3 Data Wrangling and Transformation <a id = "6.3"></a>

[Back to top](#top)

### Removing Redundant Columns

After removing null values, the columns `year`, `month`, `block`, `street_name`, and `story_range` are redundant and can be removed.

### Outlier Removal

reference: https://github.com/Rajshekar-2021/Outlier-Detection-PYSPARK/blob/main/Customer_Data_Outliers_pyspark.ipynb

A method called find_outliers() is created. This method finds any outliers in a row and adds an outlier count at the last column

Outliers are then removed by filtering rows with outlier count is more than 0. After removal, the final row count is 61430. The orginal row count was 64247, so this process removed 2817 rows.

### Check for Rare Values

Next, the dataframe is checked for rare values. The number of unique values in `flat_type` is shown and the values `2 ROOM` and `MULTI-GENERATION` can be replaced with `RARE` since their counts are less than 1000. 

The next column to be checked is `town`. `BISHAN`, `CENTRAL AREA`, `MARINE PARADE`, and `BUKIT TIMAH` can be replaced with `RARE` since their counts are also less than 1000. 

In `flat_model`, `DBSS`, `Adjoined flat`, `Model A-Maisonette`, `Terrace`, `Multi Generation`, `Improved-Maisonette`, and `Premium Maisonette` can be replaced with `RARE` as their counts are less than 500.

### StringIndex

After checking for rare values, the string columns are mapped to integers using a StringIndexer.

### OHE

After mapping the strings to integers, a One Hot Encoder is applied.

### Consolidating X Columns

After OHE, a new column (Xcols) is created which contains all the X-values (`town`, `flat_type`, `flat_model`, `remaining_lease`, `floor_area_sqm_imputed`).

### Standard Scaling

Next, a z-transformation is applied to Xcols and a new column (Xcols_sscaled) is created.

### Train-test Split

After scaling the values, the dataframe is filtered to only have `Xcols_sscaled` and `resale_price`. It is then split into train (70%) and test (30%) sets.

# 6.4 Machine Learning Modelling <a id = "6.4"></a>

[Back to top](#top)

The number of rows and columns of the train and test sets are shown.

- Number of train rows: 45007
- Number of train columns: 2


- Number of test rows: 19240
- Number of test columns: 2

Since the target column `resale_price` has continuous values, a linear regressor is created. Its features columns will be `Xcol_sscaled`, and the label column will be `resale_price`.

Finally, the regressor is fitted onto the train set.

## 6.5 Model Evaluation and Selection <a id = "6.5"></a>

[Back to top](#top)

Using the regressor, the predicted y_train and predicted y_test are calculated.

Using these results, the MAE, MSE, and R2 values can be found.

- Train MAE: 42821.10377449093
- Train MSE: 3071397777.332468
- Train R2: 0.8085364266769526


- Test MAE: 42108.76809618526
- Test MSE: 2989888821.967632
- Test R2: 0.8128824075980264

From here, it is observed that the test set had better scores than the train set. The test had lower MAE, MSE, and higher R2 scores than the train.

## 6.6 Summary and Further Improvements <a id = "6.6"></a>

[Back to top](#top)

Without using Pandas and its related libraries, the notebook could not use matplotlib or seaborn visualisations to aid data exploration and wrangling. This limitation affected forms of data representation and the discovery of data insights and correlation.

For example, a heatmap could have helped show data correlation. Or using a kdeplot or histogram could show data distribution.

Additionally, a library called `statsmodels.api` has an Ordinary Least Squares (OLS) function that builds a model and shows which columns are significant in predicting `resale_price`. The OLS model is beneficial as it identifies removable columns to avoid overfitting the regressor model.

# 7. "Unlisted" Youtube Link to Video Presentation  <a id = "7"></a>

[Back to top](#top)

In [39]:
# insert your link in this cell, you are allowed to comment it out
# youtube link: https://youtu.be/aChC1QdI-5A 