In [170]:
#####IMPORT HOUSING DATA FROM S3#####

import pandas as pd
import numpy as np
import s3fs
import dask as dd
from dask.dataframe import DataFrame

#declare the states being used for the analysis
#States with Data = ['AZ', 'CA', 'CT', 'FL', 'GA', 'IL', 'IA', 'MD', 'MA', 'MI', 'MN', 'MO', 'NE', 'NV', 'NJ', 'NC', 'OH', 'OK', 'OR', 'PA', 'SC', 'TN', 'TX', 'VA']
#States = ['AZ', 'CA', 'CT', 'FL', 'GA', 'IL', 'IA', 'MD', 'MA', 'MI', 'MN', 'MO', 'NE', 'NV', 'NJ', 'NC', 'OH', 'OK', 'OR', 'PA', 'SC', 'TN', 'TX', 'VA']
States = ['DC']
df_all = pd.DataFrame()
df_all = dd.dataframe.from_pandas(df_all, npartitions=10)

#pull chosen state's data from S3 and append to dask dataframe
for State in States:
    try:
        bucket = 'housingdata123'
        data_key = 'HousingData_Dolt_{}.csv'.format(State)
        data_location = 's3://{}/{}'.format(bucket,data_key)

        df = dd.dataframe.read_csv(data_location, storage_options={'key': 'AKIA5OMWNVBKXKF5SI7O',
                                       'secret': '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'}, 
                                   dtype={'zip5': 'object', 'physical_address': 'object','property_id': 
                                          'object', 'sale_date':'object', 'year_built': 'object','book': 'object',
                                          'buyer_name': 'object','page': 'object','property_type': 'object',
                                          'sale_type': 'object','seller_name': 'object','city': 'object', 
                                          'num_units': 'object','county': 'object'})
    
        df_all = df_all.append(df)
        
    except:
        print("Import failed for the following state: {}".format(State))




In [171]:
#####CLEANUP TIME#####

#choose years for analysis; if you want all years, type 'all'
Years = ['2016','2017','2018','2019']
#Years= ['all']

#drop extraneous columns
df_all.drop(columns=['property_type','sale_price',
                     'seller_name','buyer_name',
                     'num_units','year_built',
                     'source_url','book','page',
                     'sale_type'])

#drop rows with 0 for their sale price
df_all = df_all[df_all.sale_price != 0]

#reformat the sales dates to only include the year
df_all['sale_date'] = df_all['sale_date'].str[0:4]
df_all['sale_date'] = df_all['sale_date'].astype(str)

if 'all' not in Years:
    
    #drop rows that aren't in the desired year range
    df_all = df_all[df_all.sale_date.isin(Years)]

Years = list(set(df_all['sale_date']))

#group and average housing prices by zip code
house_price_avg = df_all.groupby(['zip5','sale_date']).agg({'sale_price': ['mean','count']}).compute()
house_price_avg = house_price_avg.droplevel(level = 0,axis=1)
house_price_avg = house_price_avg.reset_index()

In [172]:
#running a time series analysis on the housing data
#parallelizing model prediction for several parameters: feature number, categorical variable buckets
#comparing the time series forecasting to the census bureau modelling

In [173]:
#####IMPORT CHOSEN CENSUS BUREAU DATA#####

#####melt the dataframes, add a year column, append to consolidated dataframe, edit, and then unmelt

import requests # request http, api
import pandas as pd # tabluar data
import dask as dd
from dask.dataframe import DataFrame

#create a state code dictionary for the census bureau API
api_states= ['AL','AK','AZ','AR','CA','CO','CT','DE','DC','FL','GA','HI','ID','IL','IN','IA','KS','KY','LA','ME','MD','MA','MI','MN','MS','MO','MT','NE','NV','NH','NJ','NM','NY','NC','ND','OH','OK','OR','PA','RI','SC','SD','TN','TX','UT','VT','VA','WA','WV','WI','WY']
api_codes= [1,2,4,5,6,8,9,10,11,12,13,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,44,45,46,47,48,49,50,51,53,54,55,56]
zip_iterator = zip(api_states, api_codes)
API_State_Codes = dict(zip_iterator)

#create list of codes associated with original state query
API_List = ''
for state in States:
    API_List = API_List + str(API_State_Codes[state]) + ','

API_List = API_List[0:-1]

#declare census api key
census_api_key = "35bea501f0c96a696cc609f9cb32b27e6541fbae"

#declare tables and years
Tables = ['DP05']
Consol_Zip_Table = pd.DataFrame()
Consol_Zip_Table = dd.dataframe.from_pandas(Consol_Zip_Table, npartitions=10)
        
for Year in Years:
    for ID in Tables:
        
        #subject tables
        try:
            if str(ID[0]) == 'S':

                url = "https://api.census.gov/data/{0}/acs/acs5/subject?get=NAME,group({1})&for=zip%20code%20tabulation%20area:*&in=state:{2}&key={3}"\
                .format(Year, ID ,census_api_key)

                response = requests.request("GET", url)

        except:
                print('Subject Table: ' + str(ID) + ', ' + str(Year) + ', Failed')
                continue

        #data profile tables
        try:
            if str(ID[0]) == 'D':

                url = "https://api.census.gov/data/{0}/acs/acs5/profile?get=NAME,group({1})&for=zip%20code%20tabulation%20area:*&in=state:{2}&key={3}"\
                .format(Year, ID , API_List, census_api_key)

                response = requests.request("GET", url)
                
        except:
                print('Data Profile Table: ' + str(ID) + ', ' + str(Year) + ', Failed')
                continue

        #detailed tables
        try:
            if str(ID[0]) == 'B':

                url = "https://api.census.gov/data/{0}/acs/acs5?get=NAME,group({1})&for=zip%20code%20tabulation%20area:*&in=state:{2}&key={3}"\
                .format(Year, ID ,census_api_key)

                response = requests.request("GET", url)
                
        except:
                print('Detailed Table: ' + str(ID) + ', ' + str(Year) + ', Failed')
                continue
                
                
        #create pandas dataframe, remove duplicate columns
        df = pd.DataFrame(response.json()[1:], columns=response.json()[0])
        df = df.loc[:,~df.columns.duplicated()]
                
        #add year column, melt into a long form
        Remove_Col = ['NAME','GEO_ID','state','zip code tabulation area'] 
        Col = list(df.columns)
        Col = [ele for ele in Col if ele not in Remove_Col]
        
        df_melt = df.melt(id_vars=['zip code tabulation area'], value_vars= Col, ignore_index=False)
        df_melt['YEAR'] = Year
        
        #remove margin of error variables
        df_melt = df_melt[df_melt['variable'].str.contains("M")==False]

        #append to consolidated dataframe
        Consol_Zip_Table = Consol_Zip_Table.append(df_melt)
        print('Table: ' + str(ID) + ', ' + str(Year) + ', Completed')



Table: DP05, 2017, Completed




Table: DP05, 2019, Completed




Table: DP05, 2018, Completed
Table: DP05, 2016, Completed




In [174]:
#drop non numeric rows
Consol_Zip_Table['value'] = Consol_Zip_Table['value'].apply(pd.to_numeric, errors='coerce', meta=('value', 'float64')).fillna(0).astype(float).dropna()
        
#remove all non percent rows
Consol_Zip_Table = Consol_Zip_Table[Consol_Zip_Table['value'].astype(float) < 100]
Consol_Zip_Table = Consol_Zip_Table[Consol_Zip_Table['value'].astype(float) > 0]     

In [175]:
#correct the datatypes in preparation for the join
Consol_Zip_Table = Consol_Zip_Table.compute()
Consol_Zip_Table["YEAR"] = Consol_Zip_Table["YEAR"].astype(str)
Consol_Zip_Table["zip code tabulation area"] = Consol_Zip_Table["zip code tabulation area"].astype(str)
Consol_Zip_Table["variable"] = Consol_Zip_Table["variable"].astype(str)
Consol_Zip_Table["value"] = Consol_Zip_Table["value"].astype(float)

Final_Table = pd.pivot_table(pandas_df, index=['YEAR','zip code tabulation area'], columns=['variable'], values=['value'],aggfunc=np.sum,fill_value=0)
Final_Table = Final_Table.droplevel(level = 0,axis=1)
Final_Table = Final_Table.reset_index()

#join the housing data and census data
house_price_avg["sale_date"] = house_price_avg["sale_date"].astype(str)
house_price_avg["zip5"] = house_price_avg["zip5"].astype(str)

Merged_Matrix = Final_Table.join(house_price_avg, lsuffix=['zip code tabulation area','YEAR'], rsuffix=['zip5','sale_date'])  