In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import sqlite3
import requests
from datetime import datetime
import logging

In [4]:
#for now need to manually download

PATH = 'personalized /path to/ csv'

df = pd.read_csv(PATH).iloc[:-1]

df = df.drop(df.filter(regex='_yy$|_mm$', axis=1).columns, axis=1)

logging.basicConfig(filename='logs.log', level=logging.DEBUG, 
                    format='%(asctime)s:%(levelname)s:%(message)s')


  df = pd.read_csv(PATH).iloc[:-1]


In [5]:
data_types = df.dtypes

empty_values_count = df.isnull().sum()

empty_values_percent = (empty_values_count / len(df)) * 100

result = pd.DataFrame({
    'Column Name': df.columns,
    'Data Type': data_types.values,
    'Empty Values Count': empty_values_count.values,
    'Empty Values Percent': empty_values_percent.values
})

logging.info("Creating missing info table")

result

Unnamed: 0,Column Name,Data Type,Empty Values Count,Empty Values Percent
0,month_date_yyyymm,object,0,0.0
1,postal_code,object,0,0.0
2,zip_name,object,71051,2.817731
3,median_listing_price,float64,7072,0.28046
4,active_listing_count,float64,4730,0.187582
5,median_days_on_market,float64,25933,1.028447
6,new_listing_count,float64,3279,0.130038
7,price_increased_count,float64,3279,0.130038
8,price_reduced_count,float64,3279,0.130038
9,pending_listing_count,float64,563362,22.341733


The cell below will replace the empty values with yearly avgs, we can change the logic later if we need to. 

For the remaining columns I removed the rows with empty values, again we can chg this logic later

In [6]:
df['month_date_yyyymm'] = df['month_date_yyyymm'].astype(int)
df['year'] = df['month_date_yyyymm'] // 100
missing_data_total_counts = df.groupby(['year', 'postal_code']).apply(lambda group: group.isna().sum().sum())
missing_data_total_counts = missing_data_total_counts.reset_index(name='total_missing_count')

data_types = df.dtypes
empty_values_count = df.isnull().sum()
empty_values_percent = (empty_values_count / len(df)) * 100

missing_data_info = pd.DataFrame({
    'Column Name': df.columns,
    'Data Type': data_types.values,
    'Empty Values Count': empty_values_count.values,
    'Empty Values Percent': empty_values_percent.values
})

columns_to_replace = missing_data_info[missing_data_info['Empty Values Percent'] > 20]['Column Name']

for column in columns_to_replace:
    year_avg = df.groupby('year')[column].transform('mean')
    df[column] = df[column].fillna(year_avg)


df_cleaned = df.copy(deep=True)

df_cleaned.dropna(subset=df.columns.difference(columns_to_replace), inplace=True)


logging.info(f"DF is partially cleaned now; missing data rate above 20% used mean. New shape: {df_cleaned.shape}")

df_cleaned.head()

Unnamed: 0,month_date_yyyymm,postal_code,zip_name,median_listing_price,active_listing_count,median_days_on_market,new_listing_count,price_increased_count,price_reduced_count,pending_listing_count,median_listing_price_per_square_foot,median_square_feet,average_listing_price,total_listing_count,pending_ratio,quality_flag,year
0,202311,11949,"manorville, ny",644500.0,68.0,56.0,18.0,0.0,8.0,10.0,293.0,2376.0,672295.0,78.0,0.1471,1.0,2023
1,202311,35980,"horton, al",394950.0,6.0,72.0,2.0,0.0,4.0,1.0,157.0,2074.0,387017.0,7.0,0.1667,0.0,2023
2,202311,22643,"markham, va",735000.0,3.0,30.0,0.0,0.0,0.0,0.0,328.0,2531.0,761667.0,3.0,0.0,0.0,2023
3,202311,26587,"rachel, wv",195000.0,0.0,51.0,0.0,0.0,0.0,17.079545,111.0,1750.0,195000.0,0.0,0.844677,1.0,2023
4,202311,61360,"seneca, il",269800.0,7.0,64.0,0.0,0.0,2.0,1.0,147.0,2045.0,495900.0,8.0,0.1429,0.0,2023


In [7]:

missing_data_total_counts = df_cleaned.groupby(['year', 'postal_code']).apply(lambda group: group.isna().sum().sum())
missing_data_total_counts = missing_data_total_counts.reset_index(name='total_missing_count')

data_types = df_cleaned.dtypes
empty_values_count = df_cleaned.isnull().sum()
empty_values_percent = (empty_values_count / len(df_cleaned)) * 100

missing_data_info_post_clean = pd.DataFrame({
    'Column Name': df_cleaned.columns,
    'Data Type': data_types.values,
    'Empty Values Count': empty_values_count.values,
    'Empty Values Percent': empty_values_percent.values
})

missing_data_info_post_clean
logging.info(f"DF is fully cleaned now. Final shape: {df_cleaned.shape}")

In [8]:
conn = sqlite3.connect('HOUSING.db')
cursor = conn.cursor()

def dtype_to_sql(dtype):
    if pd.api.types.is_integer_dtype(dtype):
        return 'INTEGER'
    elif pd.api.types.is_float_dtype(dtype):
        return 'REAL'
    elif pd.api.types.is_object_dtype(dtype):
        return 'TEXT'
    elif pd.api.types.is_datetime64_any_dtype(dtype):
        return 'TIMESTAMP'
    else:
        return 'TEXT' 

create_table_query = f"""
CREATE TABLE IF NOT EXISTS Realtor (
    month_date_yyyymm INTEGER NOT NULL,
    postal_code TEXT NOT NULL,
    zip_name TEXT,
    median_listing_price REAL,
    active_listing_count REAL,
    median_days_on_market REAL,
    new_listing_count REAL,
    price_increased_count REAL,
    price_reduced_count REAL,
    pending_listing_count REAL,
    median_listing_price_per_square_foot REAL,
    median_square_feet REAL,
    average_listing_price REAL,
    total_listing_count REAL,
    pending_ratio REAL,
    quality_flag REAL,
    year INTEGER NOT NULL,
    PRIMARY KEY (month_date_yyyymm, postal_code)
);

"""

cursor.execute(create_table_query)
conn.commit()
conn.close()

logging.info(f"Created main table in Housing database")

In [9]:
conn = sqlite3.connect('HOUSING.db')
cursor = conn.cursor()

def dtype_to_sql(dtype):
    if pd.api.types.is_integer_dtype(dtype):
        return 'INTEGER'
    elif pd.api.types.is_float_dtype(dtype):
        return 'REAL'
    elif pd.api.types.is_object_dtype(dtype):
        return 'TEXT'
    elif pd.api.types.is_datetime64_any_dtype(dtype):
        return 'TIMESTAMP'
    else:
        return 'TEXT'  
    
df_cleaned.to_sql(
    'Realtor',
    conn,
    if_exists='replace',
    index=False,
    dtype={col: dtype_to_sql(dtype) for col, dtype in zip(df_cleaned.columns, df_cleaned.dtypes)}
)

conn.commit()
conn.close()

logging.info(f"updated main table (Realtor) to the correct data types")

In [10]:
conn = sqlite3.connect('HOUSING.db')
cursor = conn.cursor()

API_KEY = '6bf8895c846d89aeaec6a40b739746ce' 

endpoint = 'https://api.stlouisfed.org/fred/series/observations' 

series_ids = ['CPIAUCSL', 'FEDFUNDS', 'UMCSENT', 'RSXFS', 'BOPGSTB','HOUST','PI']

conn = sqlite3.connect('HOUSING.db')
cursor = conn.cursor()


def create_table_for_series(series_id, cursor):
    cursor.execute(f'''
    CREATE TABLE IF NOT EXISTS {series_id} (
        date INTEGER PRIMARY KEY,
        value REAL
    )
    ''')
    conn.commit()

def convert_date_format(date_str):
    return int(datetime.strptime(date_str, '%Y-%m-%d').strftime('%Y%m'))

for series_id in series_ids:
    response = requests.get(endpoint, params={
        'series_id': series_id,
        'api_key': API_KEY,
        'file_type': 'json'
    })
    data = response.json()
    
    df = pd.DataFrame(data['observations'])
    
    df['date'] = df['date'].apply(convert_date_format)
    
    create_table_for_series(series_id, cursor)
    
    df.to_sql(series_id, conn, if_exists='replace', index=False, dtype={
        'date': 'INTEGER',
        'value': 'REAL'
    })


cursor.close()
conn.close()

logging.info(f"Uploaded ancillary tables from API")

In [11]:
df_cleaned_SHAPE = df_cleaned.shape

join_query = ''' 
SELECT 
    r.*,
    CPIAUCSL.value AS CPIAUCSL_value,
    FEDFUNDS.value AS FEDFUNDS_value,
    UMCSENT.value AS UMCSENT_value,
    RSXFS.value AS RSXFS_value,
    BOPGSTB.value AS BOPGSTB_value,
    HOUST.value AS HOUST_value,
    PI.value AS PI_value

FROM 
    Realtor r
    
inner JOIN CPIAUCSL ON r.month_date_yyyymm = CPIAUCSL.date
inner JOIN FEDFUNDS ON r.month_date_yyyymm = FEDFUNDS.date
inner JOIN UMCSENT ON r.month_date_yyyymm = UMCSENT.date
inner JOIN RSXFS ON r.month_date_yyyymm = RSXFS.date
inner JOIN BOPGSTB ON r.month_date_yyyymm = BOPGSTB.date
inner JOIN HOUST ON r.month_date_yyyymm = HOUST.date
inner JOIN PI ON r.month_date_yyyymm = PI.date

'''

conn = sqlite3.connect('HOUSING.db')
cursor = conn.cursor()
df_test = pd.read_sql(join_query, conn)


cursor.close()
conn.close()


if df_cleaned_SHAPE[0] == df_test.shape[0]:
    print("Joined successfully")
else:
    assert "Need to have unique dates in additional tables from API"



df_test.head(2)



Unnamed: 0,month_date_yyyymm,postal_code,zip_name,median_listing_price,active_listing_count,median_days_on_market,new_listing_count,price_increased_count,price_reduced_count,pending_listing_count,...,pending_ratio,quality_flag,year,CPIAUCSL_value,FEDFUNDS_value,UMCSENT_value,RSXFS_value,BOPGSTB_value,HOUST_value,PI_value
0,202309,87537,"hernandez, nm",235750.0,2.0,336.0,0.0,0.0,0.0,1.0,...,0.5,1.0,2023,307.481,5.33,67.9,613076.0,-61542.0,1346.0,23185.9
1,202309,3257,"new london, nh",712500.0,4.0,25.0,8.0,0.0,0.0,5.0,...,1.25,1.0,2023,307.481,5.33,67.9,613076.0,-61542.0,1346.0,23185.9


In [None]:
conn = sqlite3.connect('HOUSING.db')

cursor = conn.cursor()

join_query = ''' 
SELECT 
    r.*,
    CPIAUCSL.value AS CPIAUCSL_value,
    FEDFUNDS.value AS FEDFUNDS_value,
    UMCSENT.value AS UMCSENT_value,
    RSXFS.value AS RSXFS_value,
    BOPGSTB.value AS BOPGSTB_value,
    HOUST.value AS HOUST_value,
    PI.value AS PI_value

FROM 
    Realtor r
    
inner JOIN CPIAUCSL ON r.month_date_yyyymm = CPIAUCSL.date
inner JOIN FEDFUNDS ON r.month_date_yyyymm = FEDFUNDS.date
inner JOIN UMCSENT ON r.month_date_yyyymm = UMCSENT.date
inner JOIN RSXFS ON r.month_date_yyyymm = RSXFS.date
inner JOIN BOPGSTB ON r.month_date_yyyymm = BOPGSTB.date
inner JOIN HOUST ON r.month_date_yyyymm = HOUST.date
inner JOIN PI ON r.month_date_yyyymm = PI.date

'''

cursor.execute(f"""CREATE VIEW Realtor_Final_Merged AS{join_query}""")

conn.commit()

conn.close()

logging.info(f"Created view that will be used for models: Realtor_Final_Merged")

In [44]:
# DO NOT RUN


# conn = sqlite3.connect('HOUSING.db')
# cursor = conn.cursor()

# cursor.execute("SELECT name, type FROM sqlite_master WHERE type IN ('table','view')")
# tables_and_views = cursor.fetchall()

# for name, type in tables_and_views:
#     sql = f"DROP {type} IF EXISTS {name}"
#     cursor.execute(sql)

# conn.commit()
# cursor.close()
# conn.close()

# print("All tables and views have been dropped.")