In [110]:
import pyspark
from pyspark.sql import SparkSession

In [111]:
spark = SparkSession.builder.appName("real_estate_data").getOrCreate()

In [112]:
df =  spark.read.csv("Real_Estate_Sales_2001-2020_GL.csv", header=True)

In [113]:
df.head(5)

[Row(Serial Number='2020177', List Year='2020', Date Recorded='04/14/2021', Town='Ansonia', Address='323 BEAVER ST', Assessed Value='133000.00', Sale Amount='248400.00', Sales Ratio='0.5354', Property Type='Residential', Residential Type='Single Family', Non Use Code=None, Assessor Remarks=None, OPM remarks=None, Location='POINT (-73.06822 41.35014)'),
 Row(Serial Number='2020225', List Year='2020', Date Recorded='05/26/2021', Town='Ansonia', Address='152 JACKSON ST', Assessed Value='110500.00', Sale Amount='239900.00', Sales Ratio='0.4606', Property Type='Residential', Residential Type='Three Family', Non Use Code=None, Assessor Remarks=None, OPM remarks=None, Location=None),
 Row(Serial Number='2020348', List Year='2020', Date Recorded='09/13/2021', Town='Ansonia', Address='230 WAKELEE AVE', Assessed Value='150500.00', Sale Amount='325000.00', Sales Ratio='0.463', Property Type='Commercial', Residential Type=None, Non Use Code=None, Assessor Remarks=None, OPM remarks=None, Location=N

In [114]:
df.describe()

DataFrame[summary: string, Serial Number: string, List Year: string, Date Recorded: string, Town: string, Address: string, Assessed Value: string, Sale Amount: string, Sales Ratio: string, Property Type: string, Residential Type: string, Non Use Code: string, Assessor Remarks: string, OPM remarks: string, Location: string]

# Questions for analysis

* Median and Average Sale Amount and Assessed Value in each town over the year
* Overall Sale Amount and Assessed Value trend by years
* The trend of Sale Amount median/average value for different property type 
* The trend of Sale Amount median/average value for different residental type <br>
Note: we use recorded date for analysis 

# Data cleaning and preprocessing

* Count the missing values in each column

In [115]:
# count the missing value in each column
from pyspark.sql.functions import col,when,count
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+-------------+---------+-------------+----+-------+--------------+-----------+-----------+-------------+----------------+------------+----------------+-----------+--------+
|Serial Number|List Year|Date Recorded|Town|Address|Assessed Value|Sale Amount|Sales Ratio|Property Type|Residential Type|Non Use Code|Assessor Remarks|OPM remarks|Location|
+-------------+---------+-------------+----+-------+--------------+-----------+-----------+-------------+----------------+------------+----------------+-----------+--------+
|            0|        0|            2|   0|     51|             1|          1|          1|       382447|          388310|      707532|          847343|     987240|  799521|
+-------------+---------+-------------+----+-------+--------------+-----------+-----------+-------------+----------------+------------+----------------+-----------+--------+



* Drop the last four columns because of the large proportion of information loss

In [116]:
# drop Property Type, Residental Type, Non Use Code, Assessor Remarks, OPM remarks, Location for large amount of loss
df = df.drop("Non Use Code", "Assessor Remarks","OPM remarks","Location")


In [117]:
df.show()

+-------------+---------+-------------+------------+--------------------+--------------+-----------+-----------+-------------+----------------+
|Serial Number|List Year|Date Recorded|        Town|             Address|Assessed Value|Sale Amount|Sales Ratio|Property Type|Residential Type|
+-------------+---------+-------------+------------+--------------------+--------------+-----------+-----------+-------------+----------------+
|      2020177|     2020|   04/14/2021|     Ansonia|       323 BEAVER ST|     133000.00|  248400.00|     0.5354|  Residential|   Single Family|
|      2020225|     2020|   05/26/2021|     Ansonia|      152 JACKSON ST|     110500.00|  239900.00|     0.4606|  Residential|    Three Family|
|      2020348|     2020|   09/13/2021|     Ansonia|     230 WAKELEE AVE|     150500.00|  325000.00|      0.463|   Commercial|            NULL|
|      2020090|     2020|   12/14/2020|     Ansonia|         57 PLATT ST|     127400.00|  202500.00|     0.6291|  Residential|      Two 

* Drop the row where both 'Assessed Value' and 'Sale Amount' are missing and verify the outcome with count/when method

In [118]:
df = df.filter(col('Assessed Value').isNotNull() & col('Sale Amount').isNotNull())
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+-------------+---------+-------------+----+-------+--------------+-----------+-----------+-------------+----------------+
|Serial Number|List Year|Date Recorded|Town|Address|Assessed Value|Sale Amount|Sales Ratio|Property Type|Residential Type|
+-------------+---------+-------------+----+-------+--------------+-----------+-----------+-------------+----------------+
|            0|        0|            2|   0|     51|             0|          0|          0|       382446|          388309|
+-------------+---------+-------------+----+-------+--------------+-----------+-----------+-------------+----------------+



* Look into the rows where 'Date Recorded' is Null. And we found the some fo the 'Sale Amount'  Values are (or say smaller than the value 2000 mentioned in data description (https://catalog.data.gov/dataset/real-estate-sales-2001-2018)).

In [119]:
df.filter(col('Date Recorded').isNull()).show()
df.filter(col('Sale Amount')==0).show()

+-------------+---------+-------------+------+-------+--------------+-----------+-----------+-------------+----------------+
|Serial Number|List Year|Date Recorded|  Town|Address|Assessed Value|Sale Amount|Sales Ratio|Property Type|Residential Type|
+-------------+---------+-------------+------+-------+--------------+-----------+-----------+-------------+----------------+
|        20280|     2002|         NULL|Orange|   NULL|          0.00|       0.00|          0|         NULL|            NULL|
|            0|     2002|         NULL|Orange|   NULL|          0.00|       0.00|          0|         NULL|            NULL|
+-------------+---------+-------------+------+-------+--------------+-----------+-----------+-------------+----------------+

+-------------+---------+-------------+---------+-------------------+--------------+-----------+-----------+-------------+----------------+
|Serial Number|List Year|Date Recorded|     Town|            Address|Assessed Value|Sale Amount|Sales Ratio|P

* Check and drop rows where 'Sale Amount' is smaller than 2000

In [120]:
df.select(count(when(col('Sale Amount')<2000,'Sale Amount')).alias("Sale Amount")).show()
df = df.filter(col('Sale Amount')>=2000)

+-----------+
|Sale Amount|
+-----------+
|       2139|
+-----------+



* Verify the outcome

In [121]:
# verify
df.select(count(when(col('Sale Amount')<2000,'Sale Amount')).alias("Sale Amount")).show()

+-----------+
|Sale Amount|
+-----------+
|          0|
+-----------+



* Check if there are replicated rows 

In [122]:
# check replicated value (No if True)
df.distinct().count()==df.count()

True

* To use the recorded year for analysis, we need to extract year out from the recorded date

In [123]:
from pyspark.sql.functions import year 
from pyspark.sql.functions import to_date

In [124]:
df = df.withColumn('Recorded Year',year(to_date(col('Date Recorded'),'MM/dd/yyyy')))

In [125]:
df.show()

+-------------+---------+-------------+------------+--------------------+--------------+-----------+-----------+-------------+----------------+-------------+
|Serial Number|List Year|Date Recorded|        Town|             Address|Assessed Value|Sale Amount|Sales Ratio|Property Type|Residential Type|Recorded Year|
+-------------+---------+-------------+------------+--------------------+--------------+-----------+-----------+-------------+----------------+-------------+
|      2020177|     2020|   04/14/2021|     Ansonia|       323 BEAVER ST|     133000.00|  248400.00|     0.5354|  Residential|   Single Family|         2021|
|      2020225|     2020|   05/26/2021|     Ansonia|      152 JACKSON ST|     110500.00|  239900.00|     0.4606|  Residential|    Three Family|         2021|
|      2020348|     2020|   09/13/2021|     Ansonia|     230 WAKELEE AVE|     150500.00|  325000.00|      0.463|   Commercial|            NULL|         2021|
|      2020090|     2020|   12/14/2020|     Ansonia|

* Transform the datatype of Assessed Value, Sale Amount and Sales Ratio

In [126]:
from pyspark.sql.types import FloatType
df = df.withColumn('Assessed Value', df['Assessed Value'].cast(FloatType()))\
.withColumn('Sale Amount',df['Sale Amount'].cast(FloatType()))\
.withColumn('Sales Ratio',df['Sales Ratio'].cast(FloatType()))

* We cloud find some values of the property type are wrongly replaced by the residental type and vice versa. For single/two/three/four family under property type, we should replace them with 'residential'. And for 'condo' under residental type should be filled with NULL. Another point that needs to be aware of is the null values in 'Residential Type' are caused by their corresponding 'Property Type'. 

In [180]:
# replace '-family' by 'residential'
df=df.withColumn('Property Type', when((df['Property Type']=='Single Family')|(df['Property Type']=='Two Family')|(df['Property Type']=='Three Family')|(df['Property Type']=='Four Family'),\
                                     'Residential').otherwise(df['Property Type']))

# Answer the questions

### Q1

In [None]:
from pyspark.sql.function import mean,median

In [139]:
town_mean = df.groupBy('Town','Recorded Year').mean('Sale Amount').orderBy('Town','Recorded Year')

In [140]:
town_mean.show()

+-------------+-------------+------------------+
|         Town|Recorded Year|  avg(Sale Amount)|
+-------------+-------------+------------------+
|***Unknown***|         2007|          282450.0|
|      Andover|         2002|284210.92307692306|
|      Andover|         2003|154602.83333333334|
|      Andover|         2004|201707.20547945207|
|      Andover|         2005|229664.86075949366|
|      Andover|         2006|252078.26086956522|
|      Andover|         2007|          271452.5|
|      Andover|         2008|252309.69696969696|
|      Andover|         2009|229246.66666666666|
|      Andover|         2010|204887.46511627908|
|      Andover|         2011|194305.70967741936|
|      Andover|         2012|206381.53846153847|
|      Andover|         2013|203245.11111111112|
|      Andover|         2014|          198186.0|
|      Andover|         2015|  179958.568627451|
|      Andover|         2016|222121.19512195123|
|      Andover|         2017|211357.14285714287|
|      Andover|     

In [145]:
from pyspark.sql.functions import percentile_approx
town_median = df.groupBy('Town','Recorded Year').agg(median("Sale Amount")).orderBy('Town','Recorded Year')
town_median.show()

+-------------+-------------+-------------------+
|         Town|Recorded Year|median(Sale Amount)|
+-------------+-------------+-------------------+
|***Unknown***|         2007|           282450.0|
|      Andover|         2002|           235000.0|
|      Andover|         2003|           150000.0|
|      Andover|         2004|           175000.0|
|      Andover|         2005|           220000.0|
|      Andover|         2006|           240000.0|
|      Andover|         2007|           274500.0|
|      Andover|         2008|           239000.0|
|      Andover|         2009|           228390.0|
|      Andover|         2010|           211250.0|
|      Andover|         2011|           178000.0|
|      Andover|         2012|           209000.0|
|      Andover|         2013|           206250.0|
|      Andover|         2014|           198000.0|
|      Andover|         2015|           170000.0|
|      Andover|         2016|           222000.0|
|      Andover|         2017|           193500.0|


In [149]:
from pyspark.sql.functions import stddev
town_std = df.groupBy('Town','Recorded Year').agg(stddev('Sale Amount')).orderBy('Town', 'Recorded Year')
town_std.show()

+-------------+-------------+-------------------+
|         Town|Recorded Year|stddev(Sale Amount)|
+-------------+-------------+-------------------+
|***Unknown***|         2007|               NULL|
|      Andover|         2002| 199270.09482377663|
|      Andover|         2003| 105343.62275195896|
|      Andover|         2004|  119560.4046566652|
|      Andover|         2005| 143547.02159024088|
|      Andover|         2006| 127904.98541763722|
|      Andover|         2007| 105956.51673510396|
|      Andover|         2008| 111145.04604020956|
|      Andover|         2009| 125761.11402178335|
|      Andover|         2010| 102981.45777837613|
|      Andover|         2011| 135185.65483146836|
|      Andover|         2012| 114011.94716974135|
|      Andover|         2013| 106682.94442821221|
|      Andover|         2014| 117965.02610025284|
|      Andover|         2015| 126652.08301015105|
|      Andover|         2016| 132963.55830099076|
|      Andover|         2017| 151861.45363093656|


In [150]:
town_mean2 = df.groupBy('Town','Recorded Year').mean('Assessed Value').orderBy('Town','Recorded Year')
town_mean2.show()

+-------------+-------------+-------------------+
|         Town|Recorded Year|avg(Assessed Value)|
+-------------+-------------+-------------------+
|***Unknown***|         2007|            66540.0|
|      Andover|         2002| 161669.23076923078|
|      Andover|         2003|  79024.58333333333|
|      Andover|         2004|  88524.65753424658|
|      Andover|         2005|  91106.70886075949|
|      Andover|         2006|  96660.21739130435|
|      Andover|         2007| 178008.33333333334|
|      Andover|         2008| 171654.84848484848|
|      Andover|         2009| 163170.27777777778|
|      Andover|         2010|  165237.2093023256|
|      Andover|         2011| 150816.12903225806|
|      Andover|         2012| 169721.53846153847|
|      Andover|         2013|           150735.0|
|      Andover|         2014| 144927.56756756757|
|      Andover|         2015| 143228.03921568627|
|      Andover|         2016| 162535.60975609755|
|      Andover|         2017|  158125.7142857143|


In [151]:
town_median2 = df.groupBy('Town','Recorded Year').agg(median("Assessed Value")).orderBy('Town','Recorded Year')
town_median2.show()

+-------------+-------------+----------------------+
|         Town|Recorded Year|median(Assessed Value)|
+-------------+-------------+----------------------+
|***Unknown***|         2007|               66540.0|
|      Andover|         2002|              125400.0|
|      Andover|         2003|               79950.0|
|      Andover|         2004|               76700.0|
|      Andover|         2005|               84300.0|
|      Andover|         2006|               87960.0|
|      Andover|         2007|              190300.0|
|      Andover|         2008|              158600.0|
|      Andover|         2009|              163900.0|
|      Andover|         2010|              160000.0|
|      Andover|         2011|              139100.0|
|      Andover|         2012|              175200.0|
|      Andover|         2013|              155900.0|
|      Andover|         2014|              139400.0|
|      Andover|         2015|              142200.0|
|      Andover|         2016|              141

In [152]:
town_std2 = df.groupBy('Town','Recorded Year').agg(stddev('Assessed Value')).orderBy('Town', 'Recorded Year')
town_std2.show()

+-------------+-------------+----------------------+
|         Town|Recorded Year|stddev(Assessed Value)|
+-------------+-------------+----------------------+
|***Unknown***|         2007|                  NULL|
|      Andover|         2002|    135084.32171928382|
|      Andover|         2003|    49172.108833645594|
|      Andover|         2004|     50734.42229673918|
|      Andover|         2005|     56624.53211945759|
|      Andover|         2006|    45160.999422516994|
|      Andover|         2007|     72526.36458975496|
|      Andover|         2008|      72568.4243542436|
|      Andover|         2009|     83020.51366073366|
|      Andover|         2010|     68614.82216400481|
|      Andover|         2011|     95810.63561969236|
|      Andover|         2012|      80866.1591608993|
|      Andover|         2013|     64702.30455158234|
|      Andover|         2014|     77566.80566544647|
|      Andover|         2015|     72367.35236333046|
|      Andover|         2016|     79225.124109

### Q2

In [154]:
overall_mean = df.groupBy('Recorded Year').mean("Sale Amount").orderBy("Recorded Year")
overall_mean.show()

+-------------+------------------+
|Recorded Year|  avg(Sale Amount)|
+-------------+------------------+
|         1999|           95000.0|
|         2001|226185.16325078282|
|         2002|260854.30521423675|
|         2003| 306830.5820477609|
|         2004|342060.22166938556|
|         2005|382262.65993572713|
|         2006| 378258.5531620519|
|         2007| 482261.8273617049|
|         2008|413001.73154539074|
|         2009| 324366.5452291758|
|         2010|353688.21416888165|
|         2011| 344237.6026551982|
|         2012|399355.64255664364|
|         2013| 410089.9408392876|
|         2014|395638.27309792326|
|         2015| 391299.6584585404|
|         2016|477925.68572388735|
|         2017| 390925.8406875954|
|         2018| 391932.8820067275|
|         2019| 396059.7142524483|
+-------------+------------------+
only showing top 20 rows



In [155]:
overall_median = df.groupBy('Recorded Year').agg(median("Sale Amount")).orderBy("Recorded Year")
overall_median.show()

+-------------+-------------------+
|Recorded Year|median(Sale Amount)|
+-------------+-------------------+
|         1999|            95000.0|
|         2001|           155000.0|
|         2002|           170000.0|
|         2003|           190000.0|
|         2004|           215000.0|
|         2005|           242000.0|
|         2006|           250000.0|
|         2007|           265000.0|
|         2008|           238000.0|
|         2009|           212000.0|
|         2010|           219000.0|
|         2011|           206041.0|
|         2012|           215000.0|
|         2013|           218000.0|
|         2014|           207500.0|
|         2015|           211000.0|
|         2016|           214500.0|
|         2017|           225000.0|
|         2018|           225000.0|
|         2019|           229000.0|
+-------------+-------------------+
only showing top 20 rows



In [156]:
overall_std = df.groupBy('Recorded Year').agg(stddev("Sale Amount")).orderBy("Recorded Year")
overall_std.show()

+-------------+-------------------+
|Recorded Year|stddev(Sale Amount)|
+-------------+-------------------+
|         1999|               NULL|
|         2001| 418202.49171399575|
|         2002|  626407.8689015368|
|         2003|  833582.2807630944|
|         2004|  919108.1974434166|
|         2005| 1015938.5524322496|
|         2006| 1014153.2545975755|
|         2007| 1861559.6734854968|
|         2008|  1388032.524565023|
|         2009|  903942.9310424727|
|         2010|  929676.9326805144|
|         2011|  954261.4768950193|
|         2012| 1020104.6043999388|
|         2013| 2094858.4983431017|
|         2014| 1654366.6915424864|
|         2015| 1309204.8279953485|
|         2016|  5682554.390183069|
|         2017| 1337240.3364326854|
|         2018| 1493382.4209400553|
|         2019| 2114988.2635969813|
+-------------+-------------------+
only showing top 20 rows



In [157]:
overall_mean2 = df.groupBy('Recorded Year').mean("Assessed Value").orderBy("Recorded Year")
overall_mean2.show()

+-------------+-------------------+
|Recorded Year|avg(Assessed Value)|
+-------------+-------------------+
|         1999|            46690.0|
|         2001|  134720.4057073726|
|         2002| 149173.23510038952|
|         2003| 179624.00463697812|
|         2004| 195381.13817292007|
|         2005|  209805.0703585617|
|         2006|  220475.7717998564|
|         2007|  347193.7450682524|
|         2008|  325793.0107627515|
|         2009| 294442.42692265316|
|         2010|  342126.0990850542|
|         2011| 330744.58772749937|
|         2012| 402571.37259600614|
|         2013| 354097.83864775515|
|         2014| 322815.66665077134|
|         2015|  306143.9597087577|
|         2016|  307743.6125685241|
|         2017|  319764.2016461754|
|         2018| 323368.53689421463|
|         2019| 306572.08927824866|
+-------------+-------------------+
only showing top 20 rows



In [158]:
overall_median2 = df.groupBy('Recorded Year').agg(median("Assessed Value")).orderBy("Recorded Year")
overall_median2.show()

+-------------+----------------------+
|Recorded Year|median(Assessed Value)|
+-------------+----------------------+
|         1999|               46690.0|
|         2001|               88340.0|
|         2002|               89060.0|
|         2003|               94200.0|
|         2004|               99820.0|
|         2005|              105070.0|
|         2006|              111000.0|
|         2007|              147350.0|
|         2008|              159390.0|
|         2009|              164630.0|
|         2010|              174825.0|
|         2011|              175195.0|
|         2012|              180975.0|
|         2013|              167185.0|
|         2014|              156590.0|
|         2015|              156000.0|
|         2016|              151550.0|
|         2017|              154630.0|
|         2018|              151050.0|
|         2019|              149525.0|
+-------------+----------------------+
only showing top 20 rows



In [159]:
overall_stddev2 = df.groupBy('Recorded Year').agg(stddev("Assessed Value")).orderBy("Recorded Year")
overall_stddev2.show()

+-------------+----------------------+
|Recorded Year|stddev(Assessed Value)|
+-------------+----------------------+
|         1999|                  NULL|
|         2001|     356032.9475844128|
|         2002|     559916.3273440142|
|         2003|     836184.8050523478|
|         2004|     903839.9807607078|
|         2005|     958885.7397193818|
|         2006|     989478.5830095297|
|         2007|    1875132.0147201566|
|         2008|     1528656.899132062|
|         2009|    1337422.4562338851|
|         2010|    1608151.1108565175|
|         2011|    1391879.1996660284|
|         2012|     1947199.472071419|
|         2013|    1828785.5199577147|
|         2014|    1634386.0328331415|
|         2015|     1664666.796759817|
|         2016|    1458037.3091694487|
|         2017|    1604641.0599327646|
|         2018|     4335742.734808294|
|         2019|    1774574.6747978753|
+-------------+----------------------+
only showing top 20 rows



### Q3

In [218]:
pt_mean = df.groupBy('Property type','Recorded Year').mean("Sale Amount").orderBy('Property type','Recorded Year')
pt_mean.show()

+-------------+-------------+------------------+
|Property type|Recorded Year|  avg(Sale Amount)|
+-------------+-------------+------------------+
|         NULL|         2001| 226104.4569984337|
|         NULL|         2002|260854.30521423675|
|         NULL|         2003| 306833.2203673979|
|         NULL|         2004|342053.01011501753|
|         NULL|         2005| 382270.3247422028|
|         NULL|         2006|380201.39923682326|
|         NULL|         2007| 864805.8138225587|
|         NULL|         2008| 815274.2875399361|
|         NULL|         2009| 551686.1516853933|
|         NULL|         2010| 532340.3656607091|
|         NULL|         2011| 684217.3513767209|
|         NULL|         2012| 694679.6584899649|
|         NULL|         2013| 995697.3873517787|
|         NULL|         2014| 981529.6213110472|
|         NULL|         2015| 1067359.834344967|
|         NULL|         2016| 1993195.544529262|
|         NULL|         2017| 870635.3879443586|
|         NULL|     

In [219]:
pt_median = df.groupBy('Property type','Recorded Year').agg(median("Sale Amount")).orderBy('Property type','Recorded Year')
pt_median.show()

+-------------+-------------+-------------------+
|Property type|Recorded Year|median(Sale Amount)|
+-------------+-------------+-------------------+
|         NULL|         2001|           155000.0|
|         NULL|         2002|           170000.0|
|         NULL|         2003|           190000.0|
|         NULL|         2004|           215000.0|
|         NULL|         2005|           242000.0|
|         NULL|         2006|           249500.0|
|         NULL|         2007|           190000.0|
|         NULL|         2008|           170000.0|
|         NULL|         2009|           150000.0|
|         NULL|         2010|           150000.0|
|         NULL|         2011|           165000.0|
|         NULL|         2012|           155000.0|
|         NULL|         2013|           152900.0|
|         NULL|         2014|           165000.0|
|         NULL|         2015|           182873.5|
|         NULL|         2016|           160000.0|
|         NULL|         2017|           150000.0|


In [220]:
pt_mean2 = df.groupBy('Property type','Recorded Year').mean("Assessed Value").orderBy('Property type','Recorded Year')
pt_mean2.show()

+-------------+-------------+-------------------+
|Property type|Recorded Year|avg(Assessed Value)|
+-------------+-------------+-------------------+
|         NULL|         2001| 134649.27958137548|
|         NULL|         2002| 149173.23510038952|
|         NULL|         2003| 179625.70650971998|
|         NULL|         2004| 195375.41988199146|
|         NULL|         2005| 209809.00134117797|
|         NULL|         2006|  223843.9949699716|
|         NULL|         2007|  1082298.562220928|
|         NULL|         2008| 1059974.8408364798|
|         NULL|         2009|  883719.4922752809|
|         NULL|         2010| 1062359.6481101671|
|         NULL|         2011|  965773.1498748435|
|         NULL|         2012| 1395873.1704364447|
|         NULL|         2013| 1264727.4095469748|
|         NULL|         2014| 1193620.8404283102|
|         NULL|         2015|  1148941.043500253|
|         NULL|         2016| 1392330.5450381679|
|         NULL|         2017| 1065898.2869654817|


In [221]:
pt_median2 = df.groupBy('Property type','Recorded Year').agg(median("Assessed Value")).orderBy('Property type','Recorded Year')
pt_median2.show()

+-------------+-------------+----------------------+
|Property type|Recorded Year|median(Assessed Value)|
+-------------+-------------+----------------------+
|         NULL|         2001|               88340.0|
|         NULL|         2002|               89060.0|
|         NULL|         2003|               94205.0|
|         NULL|         2004|               99820.0|
|         NULL|         2005|              105070.0|
|         NULL|         2006|              105300.0|
|         NULL|         2007|              107580.0|
|         NULL|         2008|              126620.0|
|         NULL|         2009|              133380.0|
|         NULL|         2010|              150080.0|
|         NULL|         2011|              174680.0|
|         NULL|         2012|              174100.0|
|         NULL|         2013|              145950.0|
|         NULL|         2014|              163030.0|
|         NULL|         2015|              163595.0|
|         NULL|         2016|              157

### Q4

In [223]:
rt_mean = df.groupBy('Residential type','Recorded Year').mean("Sale Amount").orderBy('Residential type','Recorded Year')
rt_mean.show()
rt_median = df.groupBy('Residential type','Recorded Year').agg(median("Sale Amount")).orderBy('Residential type','Recorded Year')
rt_median.show()
rt_mean2 = df.groupBy('Residential type','Recorded Year').mean("Assessed Value").orderBy('Residential type','Recorded Year')
rt_mean2.show()
rt_median2 = df.groupBy('Residential type','Recorded Year').agg(median("Assessed Value")).orderBy('Residential type','Recorded Year')
rt_median2.show()

+----------------+-------------+------------------+
|Residential type|Recorded Year|  avg(Sale Amount)|
+----------------+-------------+------------------+
|            NULL|         2001| 226104.4569984337|
|            NULL|         2002|260854.30521423675|
|            NULL|         2003| 306833.2203673979|
|            NULL|         2004|342053.01011501753|
|            NULL|         2005| 382270.3247422028|
|            NULL|         2006|380201.39923682326|
|            NULL|         2007| 864805.8138225587|
|            NULL|         2008| 815274.2875399361|
|            NULL|         2009| 551686.1516853933|
|            NULL|         2010| 532340.3656607091|
|            NULL|         2011| 684217.3513767209|
|            NULL|         2012| 694679.6584899649|
|            NULL|         2013| 995697.3873517787|
|            NULL|         2014| 981529.6213110472|
|            NULL|         2015| 1067359.834344967|
|            NULL|         2016| 1993195.544529262|
|           

In [210]:
# save results to file 
town_mean.toPandas().to_csv('townmean.csv')
town_median.toPandas().to_csv('townmedian.csv')
town_std.toPandas().to_csv('townstd.csv')

town_mean2.toPandas().to_csv('townmean2.csv')
town_median2.toPandas().to_csv('townmedian2.csv')
town_std2.toPandas().to_csv('townstd2.csv')

In [212]:
overall_mean.toPandas().to_csv('overallmean.csv')
overall_median.toPandas().to_csv('overallmedian.csv')
overall_std.toPandas().to_csv('overallstd.csv')

overall_mean2.toPandas().to_csv('overallmean2.csv')
overall_median2.toPandas().to_csv('overallmedian2.csv')
overall_stddev2.toPandas().to_csv('overallstd2.csv')

In [224]:
pt_mean.toPandas().to_csv('ptmean.csv')
pt_median.toPandas().to_csv('ptmedian.csv')

pt_mean2.toPandas().to_csv('ptmean2.csv')
pt_median2.toPandas().to_csv('ptmedian2.csv')


In [225]:
rt_mean.toPandas().to_csv('rtmean.csv')
rt_median.toPandas().to_csv('rtmedian.csv')
rt_mean2.toPandas().to_csv('rtmean2.csv')
rt_median2.toPandas().to_csv('rtmedian2.csv')