In [1]:
import pandas as pd
import numpy as np
pd.options.plotting.backend = "plotly"

# TODO LIST

* use mean price to set baseline


* features:
    * nr of years since construction
    * nr of years since renovation

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.getOrCreate()
)


2022-12-22 12:37:44,482 WARN util.Utils: Your hostname, laptop-2212 resolves to a loopback address: 127.0.1.1; using 192.168.0.145 instead (on interface wlp3s0)
2022-12-22 12:37:44,483 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2022-12-22 12:37:45,200 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Defining a schema
I prefer to use Floats instead of Integers because it's more flexible and avoids the pipeline erroring out in case of noise.

In [130]:
from pyspark.sql.types import *
schema = StructType(
        [
            StructField("id", LongType(), True),
            StructField("date", StringType(), True),
            StructField("price", FloatType(), True),
            StructField("bedrooms", FloatType(), True),
            StructField("bathrooms", FloatType(), True),
            StructField("sqft_living", FloatType(), True),
            StructField("sqft_lot", FloatType(), True),
            StructField("floors", FloatType(), True),
            StructField("waterfront", IntegerType(), True),
            StructField("view", IntegerType(), True),
            StructField("condition", IntegerType(), True),
            StructField("grade", IntegerType(), True),
            StructField("sqft_above", FloatType(), True),
            StructField("sqft_basement", FloatType(), True),
            StructField("yr_built", IntegerType(), True),
            StructField("yr_renovated", IntegerType(), True),
            StructField("zipcode", IntegerType(), True),
            StructField("lat", DoubleType(), True),
            StructField("long", DoubleType(), True),
            StructField("sqft_living15", FloatType(), True),
            StructField("sqft_lot15", FloatType(), True),
        ]
)
df = spark.read.csv("data/kc_house_data.csv", header=True, schema=schema, mode="PERMISSIVE")
df.show()

+----------+---------------+---------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+
|        id|           date|    price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|yr_renovated|zipcode|    lat|    long|sqft_living15|sqft_lot15|
+----------+---------------+---------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+
|7129300520|20141013T000000| 221900.0|     3.0|      1.0|     1180.0|  5650.0|   1.0|         0|   0|        3|    7|    1180.0|          0.0|    1955|           0|  98178|47.5112|-122.257|       1340.0|    5650.0|
|6414100192|20141209T000000| 538000.0|     3.0|     2.25|     2570.0|  7242.0|   2.0|         0|   0|        3|    7|    2170.0|        400.

# Noisy data and missing values
One of the criteria of the `Production Model` part of the assignment is that the service should be able to handle noisy data and missing values. However, the provided dataset is clean, so I will be creating a version with nulls so that we can work on it from the start.

Regarding noise, I will be configuring the system to turn any parse errors into nulls, so that then they go through the same preprocessing. That is done with the PERMISSIVE mode when parsing

In [151]:
null_frac = 0.05
df_with_nulls = df
for seed, col_name in enumerate(df.columns):
    df_with_nulls = df_with_nulls.withColumn(col_name, when(rand(seed) > null_frac, col(col_name)))
df_with_nulls.show()

+----------+---------------+---------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+
|        id|           date|    price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|yr_renovated|zipcode|    lat|    long|sqft_living15|sqft_lot15|
+----------+---------------+---------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+
|7129300520|20141013T000000| 221900.0|     3.0|      1.0|       null|  5650.0|   1.0|         0|   0|        3| null|    1180.0|          0.0|    1955|           0|  98178|47.5112|-122.257|       1340.0|    5650.0|
|6414100192|20141209T000000| 538000.0|     3.0|     2.25|     2570.0|  7242.0|   2.0|         0|   0|        3|    7|    2170.0|        400.

* Just counting the nulls

In [152]:
df_with_nulls.select([sum(when(isnull(c), 1)).alias(c) for c in df_with_nulls.columns]).show()

+----+----+-----+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+----+----+-------------+----------+
|  id|date|price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|yr_renovated|zipcode| lat|long|sqft_living15|sqft_lot15|
+----+----+-----+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+----+----+-------------+----------+
|1042|1067| 1109|    1074|     1105|       1074|    1076|  1118|      1070|1130|     1095| 1067|      1068|         1091|    1089|        1082|    998|1081|1058|         1057|      1005|
+----+----+-----+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+----+----+-------------+----------+



* Parquet is always better because the schema goes embedded

In [154]:
df_with_nulls.write.parquet("data/house_data_with_nulls.parquet", mode="overwrite")

# Convert date

In [72]:
from pyspark.sql.functions import *

df = df.withColumn("date", to_timestamp(col("date"), "yyyyMMdd'T'HHmmss"))
df.show()
df.count()

+----------+-------------------+---------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+----------------+--------------+
|        id|               date|    price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|condition|grade|sqft_above|sqft_basement|yr_built|yr_renovated|zipcode|    lat|    long|sqft_living15|sqft_lot15|construction_age|renovation_age|
+----------+-------------------+---------+--------+---------+-----------+--------+------+----------+----+---------+-----+----------+-------------+--------+------------+-------+-------+--------+-------------+----------+----------------+--------------+
|7129300520|2014-10-13 00:00:00| 221900.0|     3.0|      1.0|     1180.0|  5650.0|   1.0|         0|   0|        3|    7|    1180.0|          0.0|    1955|           0|  98178|47.5112|-122.257|       1340.0|    5650.0|              59|            

21613

# Age of construction feature

In [65]:
df = df.withColumn("construction_age", year(col("date")) - col("yr_built"))
df.select("date", "yr_built", "construction_age").show()

+-------------------+--------+----------------+
|               date|yr_built|construction_age|
+-------------------+--------+----------------+
|2014-10-13 00:00:00|    1955|              59|
|2014-12-09 00:00:00|    1951|              63|
|2015-02-25 00:00:00|    1933|              82|
|2014-12-09 00:00:00|    1965|              49|
|2015-02-18 00:00:00|    1987|              28|
|2014-05-12 00:00:00|    2001|              13|
|2014-06-27 00:00:00|    1995|              19|
|2015-01-15 00:00:00|    1963|              52|
|2015-04-15 00:00:00|    1960|              55|
|2015-03-12 00:00:00|    2003|              12|
|2015-04-03 00:00:00|    1965|              50|
|2014-05-27 00:00:00|    1942|              72|
|2014-05-28 00:00:00|    1927|              87|
|2014-10-07 00:00:00|    1977|              37|
|2015-03-12 00:00:00|    1900|             115|
|2015-01-24 00:00:00|    1979|              36|
|2014-07-31 00:00:00|    1994|              20|
|2014-05-29 00:00:00|    1916|          

# Age of renovation
* When creating a renovation age feature, it makes sense that its maximum value is the construction age.

In [70]:
df = df.withColumn("renovation_age", year(col("date")) - greatest(col("yr_built"), col("yr_renovated")))
df.select("date", "yr_built", "yr_renovated", "construction_age", "renovation_age").show(10)
df.select("date", "yr_built", "yr_renovated", "construction_age", "renovation_age").where(col("yr_renovated") != 0).show(10)

+-------------------+--------+------------+----------------+--------------+
|               date|yr_built|yr_renovated|construction_age|renovation_age|
+-------------------+--------+------------+----------------+--------------+
|2014-10-13 00:00:00|    1955|           0|              59|            59|
|2014-12-09 00:00:00|    1951|        1991|              63|            23|
|2015-02-25 00:00:00|    1933|           0|              82|            82|
|2014-12-09 00:00:00|    1965|           0|              49|            49|
|2015-02-18 00:00:00|    1987|           0|              28|            28|
|2014-05-12 00:00:00|    2001|           0|              13|            13|
|2014-06-27 00:00:00|    1995|           0|              19|            19|
|2015-01-15 00:00:00|    1963|           0|              52|            52|
|2015-04-15 00:00:00|    1960|           0|              55|            55|
|2015-03-12 00:00:00|    2003|           0|              12|            12|
+-----------