## Install Bokeh visualization package
We pip install this first because it restarts the Python interpreter. If this is done later in the notebook, all the variables prior to running the pip install will be lost.

In [0]:
%pip install bokeh

# Begin Spark Session

In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkContext


spark = SparkSession.builder.appName('connecticut').getOrCreate()

from pyspark.sql import functions as F

### Read Data into Dataframe

In [0]:
rdd = sc.textFile("dbfs:/FileStore/shared_uploads/kariboalap@uni.coventry.ac.uk/Real_Estate_Sales_2001_2018__1_.csv")
rdd.map(lambda row:row.split(',')).take(2)

In [0]:
# Read the data using Dataframes method
conn = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/kariboalap@uni.coventry.ac.uk/Real_Estate_Sales_2001_2018__1_.csv", inferSchema=True, header=True)

## 1. Exploration
#### 1.1. Display top 5 rows

In [0]:
# conn.show(n)     <------- General syntax to show first n number of rows
conn.display()

SerialNumber,ListYear,DateRecorded,Town,Address,AssessedValue,SaleAmount,SalesRatio,PropertyType,ResidentialType,NonUseCode,Remarks,NumberOfBuildings
17001,2017,10/02/2017,Andover,27 HENDEE RD,58400,82000.0,0.712,,Single Family,,,
40043,2004,05/19/2005,Andover,BURNAP BRK RD,850,50000.0,1.7,Vacant Land,,28,,
30070,2003,08/02/2004,Andover,ROUTE 6,157300,175000.0,89.88,Vacant Land,,0,,
40036,2004,04/29/2005,Andover,ROUTE 6,22400,20000.0,112.0,Vacant Land,,25,,
170003,2017,10/02/2017,Branford,22-24 SILVER ST,205500,357500.0,0.575,,Two Family,,,
50249,2005,07/05/2006,Ansonia,13 MCINTOSH LN,0,197400.0,0.0,Condo,,7,,
50125,2005,02/16/2006,Ansonia,22 WESTFIELD AVE 5 &,331100,375000.0,88.3,Condo,,0,,
170007,2017,10/02/2017,Groton,174 ANN AVE,158410,192000.0,0.825,,Single Family,,,
50059,2005,05/17/2006,Ashford,52 PERRY HL RD,1196070,3700000.0,32.3,Apartments,,0,,
140042,2014,07/22/2015,Ashford,ASHFORD LAKE DR,3900,2500.0,1.56,Vacant Land,,,,


#### 1.2.  View datatypes of all columns

In [0]:
conn.printSchema()

#### 1.3.   Statistical Summary of the dataset
We will only be doing this for numerical columns as there is no need for statistical description of string columns

In [0]:
conn.describe().display()

summary,SerialNumber,ListYear,DateRecorded,Town,Address,AssessedValue,SaleAmount,SalesRatio,PropertyType,ResidentialType,NonUseCode,Remarks,NumberOfBuildings
count,912302.0,912302.0,912294,912302,912299,911997.0,881509.0,912301.0,815904,912301,847221.0,837966,50713
mean,159097.28515009285,2008.4892349244,,,67823.8431372549,268451.45824821794,362152.8324535428,713.9486970041747,,,4.778324718317427,2969.040750635723,0.77301775147929
stddev,2578058.8060479234,5.56863296484261,,,105734.86703455442,1644922.29602784,2006128.6417481836,133131.20656566767,,,7.886109373939544,30423.43072081334,0.7279760144235242
min,0.0,2001.0,01/01/2002,Andover,"""RT 197 PARCEL """"B"""" NORTH""",0.0,0.0,0.0,10 Mill Forest,Apartments,0.0,"""160-6-11 SELLER IS """"YMCA""""""","ESTATE SALE"""
max,1710011174.0,2018.0,12/31/2018,Woodstock,parking space only,881510000.0,940940000.0,,Vacant Land,Vacant Land,,zoning changed from residential to commercial after Oct 1.,NO BSMNTM NO BACK YARD


In [0]:
conn.describe(['ListYear','AssessedValue','SaleAmount',]).show()

#### 1.4  Show Unique Values

* ##### Towns

In [0]:
# Unique towns
conn.select('Town').distinct().show(5)

#Number of unique towns
conn.select(F.countDistinct("Town")).show()

In [0]:
# Which towns are the most frequently occuring (Top 10)
conn.groupBy("Town").count().orderBy('count', ascending=False).show(10)

* ##### Property Type

In [0]:
# Unique property types
conn.select('PropertyType').distinct().show()

#Number of unique property types
conn.select(F.countDistinct("PropertyType")).show()

In [0]:
# Which Property types are the most frequently occuring (Top 10)

conn.groupBy("PropertyType").count().orderBy('count', ascending=False).show(10)

* ##### Residential Type

In [0]:
# Unique residential types
conn.select('ResidentialType').distinct().show()

#Number of unique residential types
conn.select(F.countDistinct("ResidentialType")).show()

In [0]:
# Which Residential types are the most frequently occuring (Top 10)
conn.groupBy("ResidentialType").count().orderBy('count', ascending=False).show(10)

#### 1.5  Total listings for each year

Here we can quickly see what year had the most listings. Gives us an idea of the size of the real estate market by year

In [0]:
# Total listings for each year
conn.groupBy("ListYear").count().orderBy('ListYear', ascending=True).show()

While these tables do a good job of quickly describing the dataset, the information is better communicated using visualizations. <br>
Before the dataset can be visualized, it will need to undergo wrangling to ensure proper formatting and also enrichment.

## 2. Wrangling

Let us look at the dataframe again for reference

In [0]:
conn.display()

SerialNumber,ListYear,DateRecorded,Town,Address,AssessedValue,SaleAmount,SalesRatio,PropertyType,ResidentialType,NonUseCode,Remarks,NumberOfBuildings
17001,2017,10/02/2017,Andover,27 HENDEE RD,58400,82000.0,0.712,,Single Family,,,
40043,2004,05/19/2005,Andover,BURNAP BRK RD,850,50000.0,1.7,Vacant Land,,28,,
30070,2003,08/02/2004,Andover,ROUTE 6,157300,175000.0,89.88,Vacant Land,,0,,
40036,2004,04/29/2005,Andover,ROUTE 6,22400,20000.0,112.0,Vacant Land,,25,,
170003,2017,10/02/2017,Branford,22-24 SILVER ST,205500,357500.0,0.575,,Two Family,,,
50249,2005,07/05/2006,Ansonia,13 MCINTOSH LN,0,197400.0,0.0,Condo,,7,,
50125,2005,02/16/2006,Ansonia,22 WESTFIELD AVE 5 &,331100,375000.0,88.3,Condo,,0,,
170007,2017,10/02/2017,Groton,174 ANN AVE,158410,192000.0,0.825,,Single Family,,,
50059,2005,05/17/2006,Ashford,52 PERRY HL RD,1196070,3700000.0,32.3,Apartments,,0,,
140042,2014,07/22/2015,Ashford,ASHFORD LAKE DR,3900,2500.0,1.56,Vacant Land,,,,


Each column is examined with the purpose of converting them to useful formats and datatypes.

#### 2.1. DateRecorded

This column shows when the property sale was recorded. **I will be assuming** this to be the **sale date** for each entry.

It is given in the format: **MM/DD/YYYY**, but it will be more useful to split this into **sale month** and **sale year**.

In [0]:
from pyspark.sql.functions import col, unix_timestamp, to_date

# First convert the DateRecorded column datatype from string to date 
conn1 = conn.withColumn('SaleDate', to_date(unix_timestamp(col('DateRecorded'), 'MM/dd/yyyy').cast("timestamp")))
conn1.select('DateRecorded','SaleDate').show(5)
conn1.select('DateRecorded','SaleDate').printSchema()                  

In [0]:
# Now extract Year from SaleDate

from pyspark.sql.functions import year
from pyspark.sql.functions import to_date

conn1 = conn1.withColumn('SaleYear',year(conn1.SaleDate))
conn1.select('SaleDate','SaleYear').show(5)


In [0]:
# Extract Sale Month
from pyspark.sql.functions import month
conn1 = conn1.withColumn('SaleMonth',month(conn1.SaleDate))
conn1.select('SaleDate','SaleMonth').show(5)

In [0]:
conn1.printSchema()

#### 2.2. AssessedValue and SaleAmount

There two columns represent the **amount the property was valued at**, perhaps at the time of listing, and the **amount it was eventually sold for**.

The difference between these values can be taken as an additional variable, as it contains information on the accuracy of property valuations. 

A new column `PriceDifference` is hence created.

In [0]:
conn2 = conn1.withColumn('PriceDifference', conn1.AssessedValue - conn1.SaleAmount)
conn2.select('AssessedValue','SaleAmount','PriceDifference').show(5)


#### 2.3. Town and Address

In [0]:
conn2.select('Town', 'Address').show(5)

We will attempt to geocode some of the addresses to coordinates in order to view price trends on a map.  This will come much later during visualizations.

## 3  Cleaning
#### 3.1. Drop Columns

Now, columns not holding any useful information will be dropped. In the same command, the columns will also be reordered

In [0]:
# Drop unneccesary columns. Here we simply select the columns, but leaving out the 'Remarks','NoneUseCode' and 'No of Buildings' columns 
conn3 = conn2.select('SerialNumber','ListYear','SaleYear','SaleMonth',
                     'AssessedValue','SaleAmount','PriceDifference',
                     'PropertyType','ResidentialType','Address','Town')
conn3.display()

SerialNumber,ListYear,SaleYear,SaleMonth,AssessedValue,SaleAmount,PriceDifference,PropertyType,ResidentialType,Address,Town
17001,2017,2017,10,58400,82000.0,-23600.0,,Single Family,27 HENDEE RD,Andover
40043,2004,2005,5,850,50000.0,-49150.0,Vacant Land,,BURNAP BRK RD,Andover
30070,2003,2004,8,157300,175000.0,-17700.0,Vacant Land,,ROUTE 6,Andover
40036,2004,2005,4,22400,20000.0,2400.0,Vacant Land,,ROUTE 6,Andover
170003,2017,2017,10,205500,357500.0,-152000.0,,Two Family,22-24 SILVER ST,Branford
50249,2005,2006,7,0,197400.0,-197400.0,Condo,,13 MCINTOSH LN,Ansonia
50125,2005,2006,2,331100,375000.0,-43900.0,Condo,,22 WESTFIELD AVE 5 &,Ansonia
170007,2017,2017,10,158410,192000.0,-33590.0,,Single Family,174 ANN AVE,Groton
50059,2005,2006,5,1196070,3700000.0,-2503930.0,Apartments,,52 PERRY HL RD,Ashford
140042,2014,2015,7,3900,2500.0,1400.0,Vacant Land,,ASHFORD LAKE DR,Ashford


#### 3.2. Missing and Null values

In [0]:
from pyspark.sql.functions import col, when, count, isnan

conn3.select(*(count(when(col(x).isNull(), x)).alias(x) 
               for x in conn3.columns)).show()

Considering we have a dataset of over 912,000 rows, we can safely drop every row containing null values. infact The column with the most missing values, `PropertyType` only has about 10% of it's data missing.

In [0]:
print(conn3.count())

conn32 = conn3.dropna()

# Count after dropping null value rows
print(conn32.count())

In [0]:
conn32.select(*(count(when(col(x).isNull(), x)).alias(x) 
               for x in conn32.columns)).show()

In [0]:
conn32.display()

SerialNumber,ListYear,SaleYear,SaleMonth,AssessedValue,SaleAmount,PriceDifference,PropertyType,ResidentialType,Address,Town
40043,2004,2005,5,850,50000.0,-49150.0,Vacant Land,,BURNAP BRK RD,Andover
30070,2003,2004,8,157300,175000.0,-17700.0,Vacant Land,,ROUTE 6,Andover
40036,2004,2005,4,22400,20000.0,2400.0,Vacant Land,,ROUTE 6,Andover
50249,2005,2006,7,0,197400.0,-197400.0,Condo,,13 MCINTOSH LN,Ansonia
50125,2005,2006,2,331100,375000.0,-43900.0,Condo,,22 WESTFIELD AVE 5 &,Ansonia
50059,2005,2006,5,1196070,3700000.0,-2503930.0,Apartments,,52 PERRY HL RD,Ashford
140042,2014,2015,7,3900,2500.0,1400.0,Vacant Land,,ASHFORD LAKE DR,Ashford
900009,2009,2009,12,64000,160000.0,-96000.0,Vacant Land,,HILLSIDE RD,Ashford
20384,2002,2003,6,334750,577500.0,-242750.0,Condo,,1 WILLS WALK,Avon
120148,2012,2013,3,125870,125000.0,870.0,Commercial,,11 CHESTNUT DRIVE,Avon


In [0]:
conn32.describe().display()

summary,SerialNumber,ListYear,SaleYear,SaleMonth,AssessedValue,SaleAmount,PriceDifference,PropertyType,ResidentialType,Address,Town
count,784823.0,784823.0,784823.0,784823.0,784823.0,784823.0,784823.0,784823,784823,784823,784823
mean,107026.62150828911,2007.4165206677171,2008.183835081286,6.769053659232719,257491.64149750964,358862.7474453348,-101371.10594782516,,,59443.13333333333,
stddev,525324.1529975114,4.889514060784347,13.668514196929086,3.260254869183219,1301222.6252629762,2038914.3943246172,2115778.8057146627,,,93004.56597432584,
min,81.0,2001.0,2001.0,1.0,0.0,0.0,-940940000.0,10 Mill Forest,Four Family,#12 #112 #193 #205 #238 #248 #,Andover
max,140002850.0,2016.0,9999.0,12.0,138958820.0,940940000.0,138918464.0,Vacant Land,Two Family,parking space only,Woodstock


I observed that the `SaleYear` column has a maximum value of **9999**, which is not practical. Let's look at the top values for this column.

In [0]:
conn32.orderBy('SaleYear', ascending=False).display()

SerialNumber,ListYear,SaleYear,SaleMonth,AssessedValue,SaleAmount,PriceDifference,PropertyType,ResidentialType,Address,Town
60237,2006,9999,1,38500,79900.0,-41400.0,Vacant Land,,20 MELISSA WAY,Stafford
60684,2006,9999,1,0,83400.0,-83400.0,Residential,Single Family,70 N TURNPIKE RD,Wallingford
110276,2011,2102,8,208800,265000.0,-56200.0,Residential,Single Family,5 ROSEMARY LN,Guilford
160412,2016,2017,5,64740,38410.0,26330.0,Residential,Single Family,476 HIGH ST,Torrington
160499,2016,2017,6,124300,168000.0,-43700.0,Residential,Single Family,71 CANTERBURY TPKE,Norwich
161477,2016,2017,7,998180,1350000.0,-351820.0,Residential,Single Family,2 WALLENBERG DRIVE,Stamford
161780,2016,2017,8,171830,244000.0,-72170.0,Residential,Single Family,127 GREYROCK PLACE # 1005,Stamford
161471,2016,2017,7,218810,370000.0,-151190.0,Residential,Single Family,850 EAST MAIN STREET # 418,Stamford
160718,2016,2017,5,117670,217000.0,-99330.0,Residential,Single Family,255 HEPBURN RD,Hamden
160249,2016,2017,9,205050,290000.0,-84950.0,Residential,Single Family,41 BENNETT CIRCLE,Voluntown


We can thus drop the first three rows since their Sale Years are wrong. The dataset contains records for only houses sold between 2001 and 2018

In [0]:
conn32 =conn32.filter(conn32.SaleYear<2018)
conn32.select('SaleYear').describe().display()

summary,SaleYear
count,784820.0
mean,2008.1633521062156
stddev,4.908836362606605
min,2001.0
max,2017.0


## 4  Visualization and Analysis

#### 4.1. Grouping by listing and sale years

In [0]:
# Rename dataframe to be more generic
df = conn32
# How did total sale volume fare over the years
conn411 = df.groupBy('SaleYear').agg({'SaleAmount':'sum','SerialNumber':'count'}).display()

SaleYear,count(SerialNumber),sum(SaleAmount)
2003,84802,25603846106.0
2007,44058,20832718563.4
2015,47153,17913335674.67
2006,54818,20293195147.0
2013,36029,14117630457.0
2014,40314,15401226661.0
2004,71042,23526528692.0
2012,31239,12157705140.22
2009,34833,11068288456.0
2016,45919,21438690332.0


In [0]:
# How did the assessed values compare with actual values over the years

conn412 = df.groupBy('SaleYear').agg({'SaleAmount':'avg','AssessedValue':'avg'}).display()

SaleYear,avg(SaleAmount),avg(AssessedValue)
2003,301925.02660314617,178091.03214546826
2007,472847.57736165967,343262.9041944709
2015,379898.1119901173,333951.69363561174
2006,370192.18408187095,215733.88270276185
2013,391840.7520885953,341099.1027505621
2014,382031.717542293,314406.3624051198
2004,331163.6594127418,191733.59559134036
2012,389183.5570991389,398016.7310093153
2009,317752.9485258232,287815.62633135245
2016,466880.6013197152,264717.04242252663


In [0]:
# Let's see how the top three have differed over the years
conn422 = df.display()

SerialNumber,ListYear,SaleYear,SaleMonth,AssessedValue,SaleAmount,PriceDifference,PropertyType,ResidentialType,Address,Town
40043,2004,2005,5,850,50000.0,-49150.0,Vacant Land,,BURNAP BRK RD,Andover
30070,2003,2004,8,157300,175000.0,-17700.0,Vacant Land,,ROUTE 6,Andover
40036,2004,2005,4,22400,20000.0,2400.0,Vacant Land,,ROUTE 6,Andover
50249,2005,2006,7,0,197400.0,-197400.0,Condo,,13 MCINTOSH LN,Ansonia
50125,2005,2006,2,331100,375000.0,-43900.0,Condo,,22 WESTFIELD AVE 5 &,Ansonia
50059,2005,2006,5,1196070,3700000.0,-2503930.0,Apartments,,52 PERRY HL RD,Ashford
140042,2014,2015,7,3900,2500.0,1400.0,Vacant Land,,ASHFORD LAKE DR,Ashford
900009,2009,2009,12,64000,160000.0,-96000.0,Vacant Land,,HILLSIDE RD,Ashford
20384,2002,2003,6,334750,577500.0,-242750.0,Condo,,1 WILLS WALK,Avon
120148,2012,2013,3,125870,125000.0,870.0,Commercial,,11 CHESTNUT DRIVE,Avon


#### 4.2. Group by property type

In [0]:
# What are the average sale prices for the different property types
conn421 = df.groupBy('PropertyType').agg({'SaleAmount':'avg'}).display()

PropertyType,avg(SaleAmount)
Apartments,1198831.0944005742
Vacant Land,198013.20119754985
,680015.4588020674
Residential,351985.14680104627
Industrial,997839.3087719298
Condo,214239.4411592828
Public Utility,222799.6015037594
10 Mill Forest,183016.0
Commercial,1040685.0529760688


In [0]:
# On average, how accurate were the valuations compared to the eventual sale prices for different property types.
conn423 = df.display()

SerialNumber,ListYear,SaleYear,SaleMonth,AssessedValue,SaleAmount,PriceDifference,PropertyType,ResidentialType,Address,Town
40043,2004,2005,5,850,50000.0,-49150.0,Vacant Land,,BURNAP BRK RD,Andover
30070,2003,2004,8,157300,175000.0,-17700.0,Vacant Land,,ROUTE 6,Andover
40036,2004,2005,4,22400,20000.0,2400.0,Vacant Land,,ROUTE 6,Andover
50249,2005,2006,7,0,197400.0,-197400.0,Condo,,13 MCINTOSH LN,Ansonia
50125,2005,2006,2,331100,375000.0,-43900.0,Condo,,22 WESTFIELD AVE 5 &,Ansonia
50059,2005,2006,5,1196070,3700000.0,-2503930.0,Apartments,,52 PERRY HL RD,Ashford
140042,2014,2015,7,3900,2500.0,1400.0,Vacant Land,,ASHFORD LAKE DR,Ashford
900009,2009,2009,12,64000,160000.0,-96000.0,Vacant Land,,HILLSIDE RD,Ashford
20384,2002,2003,6,334750,577500.0,-242750.0,Condo,,1 WILLS WALK,Avon
120148,2012,2013,3,125870,125000.0,870.0,Commercial,,11 CHESTNUT DRIVE,Avon


#### 4.3. Group by Town

In [0]:
conn431 = df.groupby('Town').agg({'SaleAmount':'sum'}).orderBy('sum(SaleAmount)', ascending=False)
conn431.display()
# We can see the top 10 towns by total sales over the years

Town,sum(SaleAmount)
Greenwich,25321389878.0
Stamford,20187328310.0
Westport,10012903568.0
Norwalk,9735927797.0
Fairfield,8659282672.0
Darien,7720055425.0
New Canaan,7130430456.0
Danbury,5625856634.0
Bridgeport,5455730714.0
West Hartford,5241808613.0


In [0]:
conn432 = df.groupby('Town').agg({'SaleAmount':'avg'}).orderBy('avg(SaleAmount)', ascending=False)
conn432.display()

Town,avg(SaleAmount)
Greenwich,1977615.579350203
New Canaan,1480571.107973422
Darien,1478937.820881226
Westport,1337729.2676018705
Weston,942484.3584158416
Wilton,897279.0299277606
Washington,835049.3728813559
Stamford,771893.4084043895
Rocky Hill,744568.6447298675
Ridgefield,740730.4704407023


#### 4.4.  Analysis: The 2008 Real Estate Crash

We can inspect what property type dominated sales by quantity.

In [0]:
conn441 = df.filter('SaleYear>2007 and SaleYear<2009')
conn441.display()

SerialNumber,ListYear,SaleYear,SaleMonth,AssessedValue,SaleAmount,PriceDifference,PropertyType,ResidentialType,Address,Town
70072,2007,2008,5,0,508955.0,-508955.0,Condo,,6 FIELDSTONE LN,Beacon Falls
70019,2007,2008,4,198100,350000.0,-151900.0,Residential,Single Family,111 LAKESIDE DR,Andover
70022,2007,2008,6,102300,172500.0,-70200.0,Residential,Single Family,12 ROSE LN,Andover
70037,2007,2008,7,178400,239000.0,-60600.0,Residential,Single Family,146 LAKESIDE DR,Andover
70027,2007,2008,8,260200,250000.0,10200.0,Residential,Single Family,15 DOGWOOD AVENUE,Andover
70023,2007,2008,7,215800,325000.0,-109200.0,Residential,Single Family,177 SHODDY MILL RD,Andover
70036,2007,2008,8,154200,224900.0,-70700.0,Residential,Single Family,18 LAKESIDE DR,Andover
80007,2008,2008,12,155600,210000.0,-54400.0,Residential,Single Family,19 PARKER BRIDGE ROAD,Andover
70026,2007,2008,8,131500,221000.0,-89500.0,Residential,Single Family,2 MERRITT VALLEY ROAD,Andover
80001,2008,2008,10,91400,90000.0,1400.0,Residential,Single Family,233 LAKE ROAD,Andover


With residential properties accounting for the highest propertion of sale volume in 2008, we take a closer look at what types of residential homes were most prominent.

In [0]:
conn442 = df.filter('SaleYear>2007 and SaleYear<2009')
conn442.display()

SerialNumber,ListYear,SaleYear,SaleMonth,AssessedValue,SaleAmount,PriceDifference,PropertyType,ResidentialType,Address,Town
70072,2007,2008,5,0,508955.0,-508955.0,Condo,,6 FIELDSTONE LN,Beacon Falls
70019,2007,2008,4,198100,350000.0,-151900.0,Residential,Single Family,111 LAKESIDE DR,Andover
70022,2007,2008,6,102300,172500.0,-70200.0,Residential,Single Family,12 ROSE LN,Andover
70037,2007,2008,7,178400,239000.0,-60600.0,Residential,Single Family,146 LAKESIDE DR,Andover
70027,2007,2008,8,260200,250000.0,10200.0,Residential,Single Family,15 DOGWOOD AVENUE,Andover
70023,2007,2008,7,215800,325000.0,-109200.0,Residential,Single Family,177 SHODDY MILL RD,Andover
70036,2007,2008,8,154200,224900.0,-70700.0,Residential,Single Family,18 LAKESIDE DR,Andover
80007,2008,2008,12,155600,210000.0,-54400.0,Residential,Single Family,19 PARKER BRIDGE ROAD,Andover
70026,2007,2008,8,131500,221000.0,-89500.0,Residential,Single Family,2 MERRITT VALLEY ROAD,Andover
80001,2008,2008,10,91400,90000.0,1400.0,Residential,Single Family,233 LAKE ROAD,Andover


#### 4.5.  Analysis: Coordinate Plot in the year 2008

In [0]:
# Here we filter the dataset to only 2018, and select just the three columns showing address and sale amount
df2008 = df.filter(df.SaleYear  == 2008).select(['Address','Town','SaleAmount'])

# Join both the address and Town columns
df2008 = df2008.withColumn('FullAddress', F.concat(F.col('Address'), F.lit(' '),F.col('Town'))).drop('Address','Town')  \
                                                                           .select('FullAddress','SaleAmount')

print(df2008.count())
df2008.show(5)


In [0]:
# Use databrick's display() command to download the first 1000 rows as a csv
df2008.display()

FullAddress,SaleAmount
6 FIELDSTONE LN Beacon Falls,508955.0
111 LAKESIDE DR Andover,350000.0
12 ROSE LN Andover,172500.0
146 LAKESIDE DR Andover,239000.0
15 DOGWOOD AVENUE Andover,250000.0
177 SHODDY MILL RD Andover,325000.0
18 LAKESIDE DR Andover,224900.0
19 PARKER BRIDGE ROAD Andover,210000.0
2 MERRITT VALLEY ROAD Andover,221000.0
233 LAKE ROAD Andover,90000.0


#### Import Geocoded address CSV into the notebook

In [0]:
path = "dbfs:/FileStore/shared_uploads/kariboalap@uni.coventry.ac.uk/Connecticut_Sample_2008___Sheet1.csv"

geodf =spark.read.format("csv").load(path, inferSchema=True, header=True)

In [0]:
geodf.display()

GeocodedAddress,Latitude,Longitude,SaleAmount,Status
"Fieldstone Lane, Beacon Falls, CT, USA",41.425483,-73.040236,508955,doubt
"111 Lakeside Dr, Andover, CT, USA",41.720633,-72.361915,350000,success
"12 Rose Ln, Andover, CT, USA",41.71064,-72.357205,172500,success
"146 Lakeside Dr, Andover, CT, USA",41.72311,-72.362294,239000,success
"Andover, England, United Kingdom",51.214307,-1.478605,250000,doubt
"177 Shoddy Mill Rd, Andover, CT, USA",41.739405,-72.403346,325000,success
"18 Lakeside Dr, Andover, CT, USA",41.713959,-72.360867,224900,success
"19 Parker Bridge Rd, Andover, CT, USA",41.732249,-72.336445,210000,success
"2 Merritt Valley Rd, Andover, CT, USA",41.73513,-72.36166,221000,success
"233 Lake Rd, Andover, CT, USA",41.710886,-72.358037,90000,success


In [0]:
from pyspark.sql.types import DoubleType

# Select only successful geocoding attempts
geodf_success = geodf.filter(geodf.Status  == "success").select(['Latitude','Longitude','SaleAmount'])

# Cast the coordinate columns to numerical datatypes
geodf_success = geodf_success.withColumn("Longitude", geodf_success["Longitude"].cast(DoubleType()))
geodf_success = geodf_success.withColumn("Latitude", geodf_success["Latitude"].cast(DoubleType()))

geodf_success.show(5)

In [0]:
import pandas as pd
import matplotlib.pyplot as plt 
from bokeh.resources import CDN
from bokeh.embed import components
from bokeh.embed import file_html
from bokeh.io import show
from bokeh.plotting import figure
from bokeh.models import WMTSTileSource
from bokeh.models import LinearColorMapper, ColorBar, ColumnDataSource
from bokeh.transform import linear_cmap
from bokeh.palettes import Cividis, Spectral6

In [0]:
import numpy as np

# Function to convert coordinate data to mercator format
def to_web_mercator(data, longitude="Longitude", latitude="Latitude"):
    
    k = 6378137
    data["new_long"] = data[longitude] * (k * np.pi/180.0)
    data["new_lat"] = np.log(np.tan((90 + data[latitude]) * np.pi/360.0)) * k
    return data


map_df = geodf_success.toPandas()
converted_df = to_web_mercator(map_df)

In [0]:
converted_df.head()

Unnamed: 0,Latitude,Longitude,SaleAmount,new_long,new_lat
0,41.720633,-72.361915,350000,-8055292.0,5119223.0
1,41.71064,-72.357205,172500,-8054767.0,5117733.0
2,41.72311,-72.362294,239000,-8055334.0,5119592.0
3,41.739405,-72.403346,325000,-8059904.0,5122023.0
4,41.713959,-72.360867,224900,-8055175.0,5118228.0


In [0]:
url = 'http://a.basemaps.cartocdn.com/rastertiles/voyager/{Z}/{X}/{Y}.png'
attribution = "Tiles by Carto, under CC BY 3.0. Data by OSM, under ODbL"

# web mercator coordinates
USA = x_r,y_r = ((-13884029,-7453304), (2698291,6455972))

final_map = figure(tools='pan, wheel_zoom', x_range=x_r, y_range=y_r, x_axis_type="mercator", y_axis_type="mercator", )
           
# Attribution
final_map.add_tile(WMTSTileSource(url=url, attribution=attribution))

# Colour mapper
new_mapper = linear_cmap(field_name='z', palette=Spectral6 ,low=min(converted_df.SaleAmount)*3, high=max(converted_df.SaleAmount)/2)

source = ColumnDataSource(dict(x=converted_df['new_long'],y=converted_df['new_lat'], z=converted_df['SaleAmount']))

# Bokeh plot
final_map.circle(x='x', y='y', line_color=new_mapper,color=new_mapper, fill_alpha=1, size=10, source=source)

# Colour Bar
c_bar = ColorBar(color_mapper=new_mapper['transform'], width=15,  location=(0,0))
final_map.add_layout(c_bar, 'right')

html = file_html(final_map, CDN, "Connecticut")

# display the figure
displayHTML(html)   

## 5  Machine Learning

#### 5.1. Dataframe preparation

In [0]:
mldf = df
mldf.display()

SerialNumber,ListYear,SaleYear,SaleMonth,AssessedValue,SaleAmount,PriceDifference,PropertyType,ResidentialType,Address,Town
40043,2004,2005,5,850,50000.0,-49150.0,Vacant Land,,BURNAP BRK RD,Andover
30070,2003,2004,8,157300,175000.0,-17700.0,Vacant Land,,ROUTE 6,Andover
40036,2004,2005,4,22400,20000.0,2400.0,Vacant Land,,ROUTE 6,Andover
50249,2005,2006,7,0,197400.0,-197400.0,Condo,,13 MCINTOSH LN,Ansonia
50125,2005,2006,2,331100,375000.0,-43900.0,Condo,,22 WESTFIELD AVE 5 &,Ansonia
50059,2005,2006,5,1196070,3700000.0,-2503930.0,Apartments,,52 PERRY HL RD,Ashford
140042,2014,2015,7,3900,2500.0,1400.0,Vacant Land,,ASHFORD LAKE DR,Ashford
900009,2009,2009,12,64000,160000.0,-96000.0,Vacant Land,,HILLSIDE RD,Ashford
20384,2002,2003,6,334750,577500.0,-242750.0,Condo,,1 WILLS WALK,Avon
120148,2012,2013,3,125870,125000.0,870.0,Commercial,,11 CHESTNUT DRIVE,Avon


For the Machine Learning task, we can drop the `address` column since it does not information critical to our model, and would potentially create too many categories during one-hot encoding.

In [0]:
mldf = mldf.drop('Address')

##### Firstly, the categorical columns need to be converted to numerical types before being fed into any Machine Learning model

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer

categorical_columns = ["PropertyType", "ResidentialType", "Town"]
stage = [] # The steps of the pipeline 


for cat_col in categorical_columns:
    strIndexer = StringIndexer(inputCol=cat_col, outputCol=cat_col + "Index")
    encoder = OneHotEncoder(inputCols=[strIndexer.getOutputCol()], outputCols=[cat_col + "classVector"])
                                     
    stage += [strIndexer, encoder]

numericals = ["ListYear","SaleYear","SaleMonth","AssessedValue","SaleAmount"]    
assembler_in = [col + "classVector" for col in categorical_columns] + numericals

ass1 = VectorAssembler(inputCols=assembler_in, outputCol="features")
stage += [ass1]

##### Next we make the pipeline

In [0]:
# Build Pipeline.
pl = Pipeline(stages=stage)
pl_model = pl.fit(mldf)
transformed_df = pl_model.transform(mldf)

In [0]:
# Pick only the predictor and target columns
processed_df = transformed_df.select(['SaleAmount','features'])

In [0]:
processed_df.show(5)

#### 5.2. Model Construction

In [0]:
# Split data into training and testing portions

train, test = processed_df.randomSplit([0.7,0.3], seed=7)
train.describe().show()
test.describe().show()

In [0]:
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

#### Linear Regression

In [0]:
sale_lr = LinearRegression(featuresCol="features", 
                           labelCol="SaleAmount")

sale_trained_model_lr = sale_lr.fit(train)
predictions = sale_trained_model_lr.transform(test)

eval1 = RegressionEvaluator(labelCol="SaleAmount", 
                            predictionCol="prediction", 
                            metricName="rmse")
rmse = eval1.evaluate(predictions)

eval2 = RegressionEvaluator(labelCol="SaleAmount", 
                            predictionCol="prediction", 
                            metricName="mae")
mae = eval2.evaluate(predictions)

eval3 = RegressionEvaluator(labelCol="SaleAmount", 
                            predictionCol="prediction", 
                            metricName="r2")
r2 = eval3.evaluate(predictions)

print(f"Root Mean Squared Error = {rmse}")
print(f"Mean Absolute Error = {mae}")
print(f"R-Squared Value (R^2) = {r2}")

#### Reality Check on what is an absurdly good result.

In [0]:
from pyspark.sql.functions import corr

mldf.select(corr("AssessedValue","SaleAmount")).show()

#### Decision Tree Regression

In [0]:
sale_dtr = DecisionTreeRegressor(featuresCol="features", 
                                 labelCol="SaleAmount")
sale_trained_model_dtr = sale_dtr.fit(train)
predictions = sale_trained_model_dtr.transform(test)

eval1 = RegressionEvaluator(labelCol="SaleAmount", 
                            predictionCol="prediction", 
                            metricName="rmse")
rmse = eval1.evaluate(predictions)

eval2 = RegressionEvaluator(labelCol="SaleAmount", 
                            predictionCol="prediction", 
                            metricName="mae")
mae = eval2.evaluate(predictions)

evalr3 = RegressionEvaluator(labelCol="SaleAmount", 
                             predictionCol="prediction", 
                             metricName="r2")
r2 = eval3.evaluate(predictions)

print(f"Root Mean Squared Error = {rmse}")
print(f"Mean Absolute Error = {mae}")
print(f"R-Squared Value (R^2) = {r2}")