## Extract FRED indicators

This notebook is implemented in Databricks which is scheduled to run every Tuesday

In [0]:
import pandas as pd
from fredapi import Fred
import pandas_datareader.data as web
import datetime
import time
import os
from os.path import dirname, join

fred = Fred(api_key='KEY')
api_url = 'https://fred.stlouisfed.org/'

In [None]:
#Reading files from databricks 
df_Unemployment_Counties= pd.read_excel( r'/dbfs/FileStore/tables/Endpoints/county_unemployment_endpoints-1.xlsx')
df_unemployment_MSA_endpoints=pd.read_excel(r'/dbfs/FileStore/tables/Endpoints/unemployment_msa_endpoints-1.xlsx')
df_Economic_Indicators = pd.read_excel( r'/dbfs/FileStore/tables/Endpoints/fred_national_indicators_endpoints-1.xlsx')
xls = pd.ExcelFile( r'/dbfs/FileStore/tables/Endpoints/fred_house_price_index_endpoints-1.xlsx')
df_House_Price_Index_Non_MSA = pd.read_excel(xls, 'HPI')
df_House_Price_Index_MSA = pd.read_excel(xls, 'MSA_HPI')
df_Market_Hotness= pd.read_excel(r'/dbfs/FileStore/tables/Endpoints/Market_Hotness_API-1.xlsx')
df_Market_Hotness_MSA= pd.read_excel(r'/dbfs/FileStore/tables/Endpoints/Market_Hotness_Endpoints_MSA-2.xlsx')
df_Producer_Price_Index= pd.read_excel(r'/dbfs/FileStore/tables/Endpoints/producer_price_index_endpoints-1.xlsx')


In [0]:
#For unemployment counties; as the column name for the extracted data is State_County_Index instead of Dimension/Measure Name

df_Unemployment_Counties['State_County_Index']=df_Unemployment_Counties['County']+'_'+df_Unemployment_Counties['State']
df_Unemployment_Counties= df_Unemployment_Counties.applymap(lambda x: str(x).replace("'",''))
df_Unemployment_Counties=df_Unemployment_Counties.applymap(lambda x: x.strip() if isinstance(x, str) else x)

def fetch_API_data_uc(df: pd.DataFrame)->pd.DataFrame:
    #Fetch dimension name and APIs from the dataframe
    name = [dim for dim in df['State_County_Index']]
    api = [key for key in df['API']]
    
    #Create a dictionary of the extracted APIs
    d = {}
    for n,a in zip(name,api):
        d[n] = fred.get_series(a,observation_start='01/01/2010')
        time.sleep(0.5) ## Sleep timer to avoid exceeding api response limits.
    df_d_uc = pd.DataFrame((d))
    return df_d_uc

df_list_uc = [df_Unemployment_Counties]

#Extract Unemployment County Data using the function created above.
print("---- Extracting County Unemployment Data ----")
Unemployment_Counties = fetch_API_data_uc(df_Unemployment_Counties)
print("County Unemployment Data Extraction Complete")

In [0]:
#save extracted Unemployment County data to a csv file in FileStore location of dbfs
Unemployment_Counties.to_csv('/dbfs/FileStore/tables/Unemployment_Counties.csv')
print("Data saved to Databricks filepath")

In [0]:
#--- Preprocessing Unemployment MSA endpoints data.
#Replace "," with space and removing MSA string so that it is easier to join with building permits
df_unemployment_MSA_endpoints[["MSA"]]= df_unemployment_MSA_endpoints[["MSA"]].applymap(lambda x: str(x).replace(",",' '))
df_unemployment_MSA_endpoints[["MSA"]]= df_unemployment_MSA_endpoints[["MSA"]].applymap(lambda x: str(x).replace("MSA",''))
df_unemployment_MSA_endpoints[["MSA"]]= df_unemployment_MSA_endpoints[["MSA"]].applymap(lambda x: x.strip() if isinstance(x, str) else x)

df_unemployment_MSA_endpoints['State_MSA_Index']=df_unemployment_MSA_endpoints[["MSA"]].applymap(lambda x: str(x).replace(" ",'_'))

def fetch_API_data_uc_msa(df: pd.DataFrame)->pd.DataFrame:
    """Function to fetch data from Fred
    """
    name = [dim for dim in df['State_MSA_Index']]
    api = [key for key in df['API']]
    #Create a dictionary of the extracted APIs
    d = {}
    for n,a in zip(name,api):
        d[n] = fred.get_series(a,observation_start='01/01/2010')
        time.sleep(0.5) ## Sleep timer to avoid exceeding api response limits.
    df_d = pd.DataFrame((d))
    return df_d
print("---- Extracting MSA Unemployment Data -----")
Unemployment_MSA = fetch_API_data_uc_msa(df_unemployment_MSA_endpoints)
print("MSA Unemployment Data Extraction Complete")

In [0]:
Unemployment_MSA.to_csv('/dbfs/FileStore/tables/Unemployment_MSA.csv')

In [0]:
output_file_names = ['Economic_Indicators','Market_Hotness_Indicators','Market_Hotness_Indicators_MSA','House_Price_Index_Non_MSA','House_Price_Index_MSA','Producer_Price_Index']

In [0]:
df_list = [df_Economic_Indicators,df_Market_Hotness,df_Market_Hotness_MSA,
           df_House_Price_Index_Non_MSA,df_House_Price_Index_MSA, df_Producer_Price_Index]

In [0]:
def fetch_API_data(df: pd.DataFrame)->pd.DataFrame:
    #Fetch dimension name and APIs from the dataframe
    name = [dim for dim in df['Dimension/Measure Name']]
    api = [key for key in df['API']]
    
    #Create a dictionary of the extracted APIs
    d = {}
    for n,a in zip(name,api):
        d[n] = fred.get_series(a,observation_start='01/01/2010')
        time.sleep(0.5) ## Sleep timer to avoid exceeding api response limits.
    df_d = pd.DataFrame((d))
    return df_d

In [0]:
# fred_extract_dict["Market_Hotness_Indicators_MSA"]=Market_Hotness_Indicators_MSA


In [0]:

fred_extract_dict={}
for f,o in zip(df_list,output_file_names):
    print(o)
    print('Extracting ---- {}'.format(o))
    fred_extract_dict[o] = fetch_API_data(f)
    print('Extract complete ---- {}'.format(o))
print('---- Data read complete ----')

In [0]:
#fred_extract_dict['Unemployment_Counties'].to_csv('/dbfs/FileStore/tables/Unemployment_Counties.csv')
fred_extract_dict['Economic_Indicators'].to_csv('/dbfs/FileStore/tables/Economic_Indicators.csv')
fred_extract_dict['Market_Hotness_Indicators'].to_csv('/dbfs/FileStore/tables/Market_Hotness_Indicators.csv')
fred_extract_dict['Market_Hotness_Indicators_MSA'].to_csv('/dbfs/FileStore/tables/Market_Hotness_Indicators_MSA.csv')
fred_extract_dict['House_Price_Index_Non_MSA'].to_csv('/dbfs/FileStore/tables/House_Price_Index_Non_MSA.csv')
fred_extract_dict['House_Price_Index_MSA'].to_csv('/dbfs/FileStore/tables/House_Price_Index_MSA.csv')
fred_extract_dict['Producer_Price_Index'].to_csv('/dbfs/FileStore/tables/Producer_Price_Index.csv')

print('Files converted to CSV and loaded to Databricks Filepath')

In [0]:

#Converting stored csv files to pandas df to rename Unnamed date column

pd_df_Unemployment_Counties= pd.read_csv('/dbfs/FileStore/tables/Unemployment_Counties.csv')
pd_df_Unemployment_Counties.rename( columns={'Unnamed: 0':'Date'}, inplace=True )
pd_df_Unemployment_Counties.to_csv('Unemployment_Counties.csv',index=False)

pd_df_Unemployment_MSA= pd.read_csv('/dbfs/FileStore/tables/Unemployment_MSA.csv')
pd_df_Unemployment_MSA.rename( columns={'Unnamed: 0':'Date'}, inplace=True )
pd_df_Unemployment_MSA.to_csv('Unemployment_MSA.csv',index=False)

pd_df_Economic_Indicators= pd.read_csv('/dbfs/FileStore/tables/Economic_Indicators.csv')
pd_df_Economic_Indicators.rename( columns={'Unnamed: 0':'Date'}, inplace=True )
pd_df_Economic_Indicators.to_csv('/dbfs/FileStore/tables/Economic_Indicators.csv',index=False)

pd_df_Market_Hotness_Indicators= pd.read_csv('/dbfs/FileStore/tables/Market_Hotness_Indicators.csv')
pd_df_Market_Hotness_Indicators.rename( columns={'Unnamed: 0':'Date'}, inplace=True )
pd_df_Market_Hotness_Indicators.to_csv('/dbfs/FileStore/tables/Market_Hotness_Indicators.csv',index=False)

pd_df_Market_Hotness_Indicators_MSA= pd.read_csv('/dbfs/FileStore/tables/Market_Hotness_Indicators_MSA.csv')
pd_df_Market_Hotness_Indicators_MSA.rename( columns={'Unnamed: 0':'Date'}, inplace=True )
pd_df_Market_Hotness_Indicators_MSA.to_csv('/dbfs/FileStore/tables/Market_Hotness_Indicators_MSA.csv',index=False)

pd_df_House_Price_Index_Non_MSA= pd.read_csv('/dbfs/FileStore/tables/House_Price_Index_Non_MSA.csv')
pd_df_House_Price_Index_Non_MSA.rename( columns={'Unnamed: 0':'Date'}, inplace=True )
pd_df_House_Price_Index_Non_MSA.to_csv('/dbfs/FileStore/tables/House_Price_Index_Non_MSA.csv',index=False)

pd_df_House_Price_Index_MSA= pd.read_csv('/dbfs/FileStore/tables/House_Price_Index_MSA.csv')
pd_df_House_Price_Index_MSA.rename( columns={'Unnamed: 0':'Date'}, inplace=True )
pd_df_House_Price_Index_MSA.to_csv('/dbfs/FileStore/tables/House_Price_Index_MSA.csv',index=False)

pd_df_Producer_Price_Index= pd.read_csv('/dbfs/FileStore/tables/Producer_Price_Index.csv')
pd_df_Producer_Price_Index.rename( columns={'Unnamed: 0':'Date'}, inplace=True )
pd_df_Producer_Price_Index.to_csv('/dbfs/FileStore/tables/Producer_Price_Index.csv',index=False)

In [0]:
#Get spark dataframe from pandas dataframe for sending data to azure blob storage
spark_df_unmep_counties=spark.createDataFrame(pd_df_Unemployment_Counties) 
spark_df_unmep_msa=spark.createDataFrame(pd_df_Unemployment_MSA) 
spark_df_economic_indicators=spark.createDataFrame(pd_df_Economic_Indicators) 
spark_df_market_hotness=spark.createDataFrame(pd_df_Market_Hotness_Indicators)
spark_df_market_hotness_msa=spark.createDataFrame(pd_df_Market_Hotness_Indicators_MSA)
spark_df_hpi_non_msa=spark.createDataFrame(pd_df_House_Price_Index_Non_MSA) 
spark_df_hpi_msa=spark.createDataFrame(pd_df_House_Price_Index_MSA) 
spark_df_ppi=spark.createDataFrame(pd_df_Producer_Price_Index) 
print('Conversion to Spark Dataframe complete')


In [0]:

#### Directly converting rejects date field 
# #Creating spark dataframe from Pandas Dataframe to write into a csv file later
# spark_df_unmep_counties=spark.createDataFrame(fred_extract_dict['Unemployment_Counties']) 
# spark_df_economic_indicators=spark.createDataFrame(fred_extract_dict['Economic_Indicators']) 
# spark_df_market_hotness=spark.createDataFrame(fred_extract_dict['Market_Hotness_Indicators']) 
# spark_df_hpi_non_msa=spark.createDataFrame(fred_extract_dict['House_Price_Index_Non_MSA']) 
# spark_df_hpi_msa=spark.createDataFrame(fred_extract_dict['House_Price_Index_MSA']) 
# spark_df_ppi=spark.createDataFrame(fred_extract_dict['Producer_Price_Index']) 



In [0]:
spark_df_unmep_counties.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/storage_account_name/container_name/Unemployment_Counties")
spark_df_unmep_msa.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/storage_account_name/container_name/Unemployment_MSA")
spark_df_economic_indicators.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/storage_account_name/container_name/Economic_Indicators")
spark_df_market_hotness.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/storage_account_name/container_name/Market_Hotness_Indicators")
spark_df_market_hotness_msa.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/storage_account_name/container_name/Market_Hotness_Indicators_MSA")
spark_df_hpi_non_msa.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/storage_account_name/container_name/House_Price_Index_Non_MSA")
spark_df_hpi_msa.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/storage_account_name/container_name/House_Price_Index_MSA")
spark_df_ppi.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv("/mnt/storage_account_name/container_name/Producer_Price_Index")

print('------Files added to storage_account_name/container_name path on Azure---------')

##### Note: These files are saved with default spark naming format therefore we now need to rename them.

In [0]:
# read files that start with part-
output_blob_folder = "/mnt/storage_account_name/container_name/Unemployment_Counties/"
files = dbutils.fs.ls(output_blob_folder)
output_file = [x for x in files if x.name.startswith("part-")]

output_container_path ="/mnt/storage_account_name/container_name/Unemployment_Counties/"
dbutils.fs.mv(output_file[0].path, "%s/Unemployment_Counties.csv" % output_container_path)

# read files that start with part-
output_blob_folder = "/mnt/storage_account_name/container_name/Unemployment_MSA/"
files = dbutils.fs.ls(output_blob_folder)
output_file = [x for x in files if x.name.startswith("part-")]

output_container_path ="/mnt/storage_account_name/container_name/Unemployment_MSA/"
dbutils.fs.mv(output_file[0].path, "%s/Unemployment_MSA.csv" % output_container_path)

# read files that start with part-
output_blob_folder = "/mnt/storage_account_name/container_name/Economic_Indicators/"
files = dbutils.fs.ls(output_blob_folder)
output_file = [x for x in files if x.name.startswith("part-")]

output_container_path ="/mnt/storage_account_name/container_name/Economic_Indicators/"
dbutils.fs.mv(output_file[0].path, "%s/Economic_Indicators.csv" % output_container_path)

# read files that start with part-
output_blob_folder = "/mnt/storage_account_name/container_name/Market_Hotness_Indicators/"
files = dbutils.fs.ls(output_blob_folder)
output_file = [x for x in files if x.name.startswith("part-")]

output_container_path ="/mnt/storage_account_name/container_name/Market_Hotness_Indicators/"
dbutils.fs.mv(output_file[0].path, "%s/Market_Hotness_Indicators.csv" % output_container_path)

# read files that start with part-
output_blob_folder = "/mnt/storage_account_name/container_name/Market_Hotness_Indicators_MSA/"
files = dbutils.fs.ls(output_blob_folder)
output_file = [x for x in files if x.name.startswith("part-")]

output_container_path ="/mnt/storage_account_name/container_name/Market_Hotness_Indicators_MSA/"
dbutils.fs.mv(output_file[0].path, "%s/Market_Hotness_Indicators_MSA.csv" % output_container_path)

# read files that start with part-
output_blob_folder = "/mnt/storage_account_name/container_name/House_Price_Index_Non_MSA/"
files = dbutils.fs.ls(output_blob_folder)
output_file = [x for x in files if x.name.startswith("part-")]

output_container_path ="/mnt/storage_account_name/container_name/House_Price_Index_Non_MSA/"
dbutils.fs.mv(output_file[0].path, "%s/House_Price_Index_Non_MSA.csv" % output_container_path)


# read files that start with part-
output_blob_folder = "/mnt/storage_account_name/container_name/House_Price_Index_MSA/"
files = dbutils.fs.ls(output_blob_folder)
output_file = [x for x in files if x.name.startswith("part-")]

output_container_path ="/mnt/storage_account_name/container_name/House_Price_Index_MSA/"
dbutils.fs.mv(output_file[0].path, "%s/House_Price_Index_MSA.csv" % output_container_path)

# read files that start with part-
output_blob_folder = "/mnt/storage_account_name/container_name/Producer_Price_Index/"
files = dbutils.fs.ls(output_blob_folder)
output_file = [x for x in files if x.name.startswith("part-")]

output_container_path ="/mnt/storage_account_name/container_name/Producer_Price_Index/"
dbutils.fs.mv(output_file[0].path, "%s/Producer_Price_Index.csv" % output_container_path)

print('File rename complete')

#Convert this section to a function

In [0]:
# pd_df_= pd.read_csv('Unemployment_Counties.csv')
# pd_dataframe.rename( columns={'Unnamed: 0':'Date'}, inplace=True )

# pd_dataframe= pd.read_csv('Unemployment_Counties.csv')
# pd_dataframe.rename( columns={'Unnamed: 0':'Date'}, inplace=True )

# pd_dataframe= pd.read_csv('Unemployment_Counties.csv')
# pd_dataframe.rename( columns={'Unnamed: 0':'Date'}, inplace=True )


# pd_dataframe= pd.read_csv('Unemployment_Counties.csv')
# pd_dataframe.rename( columns={'Unnamed: 0':'Date'}, inplace=True )

# pd_dataframe= pd.read_csv('Unemployment_Counties.csv')
# pd_dataframe.rename( columns={'Unnamed: 0':'Date'}, inplace=True )

# pd_dataframe= pd.read_csv('Unemployment_Counties.csv')
# pd_dataframe.rename( columns={'Unnamed: 0':'Date'}, inplace=True )