In [2]:
import pandas as pd
import numpy as np
import folium
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('My First Spark application') \
    .getOrCreate() 

sc = spark.sparkContext

In [4]:
df1 = spark.read.csv('ZILLOW-DATA.csv', header = True)
df2 = spark.read.csv('ZILLOW-REGIONS.csv', header = True)
df3 = spark.read.csv('ZILLOW-INDICATORS.csv', header = True)

In [5]:
df1.createOrReplaceTempView("data")
df2.createOrReplaceTempView("regions")
df3.createOrReplaceTempView("indicators")

In [6]:
query = """
SELECT data.indicator_id, data.region_id, regions.region_type, regions.region, indicators.indicator, indicators.category, data.date, data.value
    FROM data, regions, indicators
    where data.indicator_id = indicators.indicator_id and data.region_id = regions.region_id
"""
result = spark.sql(query)
result.show()

+------------+---------+-----------+--------------------+--------------------+-----------+----------+--------+
|indicator_id|region_id|region_type|              region|           indicator|   category|      date|   value|
+------------+---------+-----------+--------------------+--------------------+-----------+----------+--------+
|        ZSFH|    99999|        zip|98847; WA; Wenatc...|ZHVI Single-Famil...|Home values|2005-01-31|164988.0|
|        ZSFH|    99999|        zip|98847; WA; Wenatc...|ZHVI Single-Famil...|Home values|2005-02-28|164998.0|
|        ZSFH|    99999|        zip|98847; WA; Wenatc...|ZHVI Single-Famil...|Home values|2005-03-31|165721.0|
|        ZSFH|    99999|        zip|98847; WA; Wenatc...|ZHVI Single-Famil...|Home values|2005-04-30|167105.0|
|        ZSFH|    99999|        zip|98847; WA; Wenatc...|ZHVI Single-Famil...|Home values|2005-05-31|169868.0|
|        ZSFH|    99999|        zip|98847; WA; Wenatc...|ZHVI Single-Famil...|Home values|2005-06-30|172571.0|
|

In [7]:
df = result.toPandas()
df.head()

Unnamed: 0,indicator_id,region_id,region_type,region,indicator,category,date,value
0,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-01-31,164988.0
1,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-02-28,164998.0
2,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-03-31,165721.0
3,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-04-30,167105.0
4,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-05-31,169868.0


In [8]:
def splitaddress(x,n):
    try:
        return x.split('; ')[n]
    except:
        return np.NaN
    
df['zip'] = df.region.apply(lambda x: splitaddress(x,0))
df['state'] = df.region.apply(lambda x: splitaddress(x,1))
df['city'] = df.region.apply(lambda x: splitaddress(x,2))
df['county'] = df.region.apply(lambda x: splitaddress(x,3))
df['community'] = df.region.apply(lambda x: splitaddress(x,4))
df.head()

Unnamed: 0,indicator_id,region_id,region_type,region,indicator,category,date,value,zip,state,city,county,community
0,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-01-31,164988.0,98847,WA,Wenatchee,Chelan County,Peshastin
1,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-02-28,164998.0,98847,WA,Wenatchee,Chelan County,Peshastin
2,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-03-31,165721.0,98847,WA,Wenatchee,Chelan County,Peshastin
3,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-04-30,167105.0,98847,WA,Wenatchee,Chelan County,Peshastin
4,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-05-31,169868.0,98847,WA,Wenatchee,Chelan County,Peshastin


In [9]:
# https://public.opendatasoft.com/explore/dataset/us-zip-code-latitude-and-longitude/table/
zipcode = pd.read_json('us-zip-code-latitude-and-longitude.json')['fields']
zipcode = pd.DataFrame(zipcode.values.tolist())
zipcode.head()

Unnamed: 0,city,zip,dst,geopoint,longitude,state,latitude,timezone
0,Eudora,66025,1,"[38.917032, -95.06455]",-95.06455,KS,38.917032,-6
1,Savanna,74565,1,"[34.831398, -95.83967]",-95.83967,OK,34.831398,-6
2,Beckville,75631,1,"[32.237924, -94.46427]",-94.46427,TX,32.237924,-6
3,Rancho Santa Fe,92067,1,"[33.016492, -117.20264]",-117.20264,CA,33.016492,-8
4,San Diego,92119,1,"[32.80225, -117.02431]",-117.02431,CA,32.80225,-8


In [10]:
df2 = pd.merge(df, zipcode, how = 'left', on = 'zip')
df2.head()

Unnamed: 0,indicator_id,region_id,region_type,region,indicator,category,date,value,zip,state_x,city_x,county,community,city_y,dst,geopoint,longitude,state_y,latitude,timezone
0,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-01-31,164988.0,98847,WA,Wenatchee,Chelan County,Peshastin,Peshastin,1.0,"[47.552462, -120.60457]",-120.60457,WA,47.552462,-8.0
1,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-02-28,164998.0,98847,WA,Wenatchee,Chelan County,Peshastin,Peshastin,1.0,"[47.552462, -120.60457]",-120.60457,WA,47.552462,-8.0
2,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-03-31,165721.0,98847,WA,Wenatchee,Chelan County,Peshastin,Peshastin,1.0,"[47.552462, -120.60457]",-120.60457,WA,47.552462,-8.0
3,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-04-30,167105.0,98847,WA,Wenatchee,Chelan County,Peshastin,Peshastin,1.0,"[47.552462, -120.60457]",-120.60457,WA,47.552462,-8.0
4,ZSFH,99999,zip,98847; WA; Wenatchee; Chelan County; Peshastin,ZHVI Single-Family Homes Time Series ($),Home values,2005-05-31,169868.0,98847,WA,Wenatchee,Chelan County,Peshastin,Peshastin,1.0,"[47.552462, -120.60457]",-120.60457,WA,47.552462,-8.0


In [93]:
k = []
df3 = df2.copy()
df3.sort_values('date', inplace = True)
df3.date = pd.to_datetime(df3.date)
df3[df3.date >= pd.datetime(2010,1,1)]
df3 = df3[df3['latitude'].notnull()]
df3.value = df3.value.astype('float')
df3['value'] = df3['value'] / df3['value'].max()
days = df3.date.unique()
for i in days:
    k.append(df3[df3.date == i][['latitude','longitude','value']].values.tolist())
k[0]

  """


[[45.712623, -122.63419, 0.2975656899546193],
 [45.604075, -122.51035, 0.2764388060955796],
 [45.627459, -122.52023, 0.23853402151381028],
 [45.709555, -122.68473, 0.2972925654098269],
 [45.67309, -122.51319, 0.22549407529833604],
 [45.594465, -122.28112, 0.2800016807664295],
 [45.681271, -122.66712, 0.24238577791472912],
 [45.928662, -122.70284, 0.22227260630847667],
 [45.843674, -122.39221, 0.27264832763740265]]

In [98]:
m=folium.Map(location=[df2.latitude[0],df2.longitude[0]], tiles='stamentoner', zoom_start=6)
folium.plugins.HeatMapWithTime(k).add_to(m)
display(m)