In [6]:
import pandas as pd
import os
import json
from google.cloud import bigquery
from google.oauth2 import service_account
from get_bq_data import get_bq_data
from preprocessing_3 import preprocess_data_3
from upload_bq_data import upload_bq_data
from preprocessing_4 import preprocess_data_4
from model_trainer_2 import get_predictions
from dotenv import load_dotenv

load_dotenv()

os.environ['GOOGLE_APPLICATION_CREDENTIALS' ] = 'service_keys.json'

client = bigquery. Client()

DATA_PATH = "/tmp/data3.csv" 
DATA_PATH2 = "/tmp/data4.csv" 

In [7]:
def download_dataset():
    """Download CSV dataset from URL"""
    url = "https://econdata.s3-us-west-2.amazonaws.com/Reports/Core/RDC_Inventory_Core_Metrics_County.csv"
    df = pd.read_csv(url)
    return df

def aggregate_data(df):
    """Load dataset, aggregate, and push to Supabase""" 

    county_lookup = get_bq_data(client,"county_lookup") 
    state_lookup = get_bq_data(client,"state_lookup") 

    # Preprocess data
    df = preprocess_data_3(df, county_lookup, state_lookup)

    existing_data = get_bq_data(client,"county_market") 

    # Create year_month composite keys for checking
    existing_data['year_month'] = existing_data['year'].astype(str) + '_' + existing_data['month'].astype(str)
    df['year_month'] = df['year'].astype(str) + '_' + df['month'].astype(str)
    
    # Filter out records that already exist in the database
    existing_year_months = set(existing_data['year_month'])
    
    if df['year_month'].isin(existing_year_months).all():
        print("All records already exist in the database. No new data to insert.")
        existing_data = existing_data.drop('year_month', axis=1)
        new_data = existing_data.copy()
        # raise AirflowSkipException("Skipping remaining tasks as no new data was found")
    else:
        df = df.drop('year_month', axis=1)
        existing_data = existing_data.drop('year_month', axis=1)
        new_data = pd.concat([existing_data, df]).drop_duplicates().reset_index(drop=True)

        upload_bq_data(client, "county_market", df, "WRITE_APPEND")

    return new_data

def train_model(new_data):
    """Load dataset, train model, save predictions to Supabase"""
    df = new_data

    # List of features to predict
    features = [
        "median_listing_price",
        "average_listing_price",
        "median_listing_price_per_square_foot",
        "total_listing_count",
        "median_days_on_market"
    ]

    target_df = preprocess_data_4(df.copy())
    prediction_df = target_df.copy()

    for feature in features:
        predictions = get_predictions(df, feature)
        prediction_df[feature] = predictions

    # Add market_trend column based on average_listing_price trend
    prediction_df['market_trend'] = 'stable'

    for county_num in prediction_df['county_num'].unique():
        county_data = prediction_df[prediction_df['county_num'] == county_num]

        # Check if we have at least 3 months of predictions to analyze trend
        if len(county_data) >= 3:
            # Get the average_listing_price for the next 3 months
            prices = county_data['average_listing_price'].iloc[:3].values

            # Calculate if trend is rising, declining or stable
            if prices[2] > prices[0]:
                prediction_df.loc[prediction_df['county_num'] == county_num, 'market_trend'] = 'rising'
            elif prices[2] < prices[0]:
                prediction_df.loc[prediction_df['county_num'] == county_num, 'market_trend'] = 'declining'

    for col in features:
        if col in prediction_df.columns:
            prediction_df[col] = prediction_df[col].astype(int)

    upload_bq_data(client, "county_predictions", prediction_df, "WRITE_TRUNCATE")

In [8]:
df = download_dataset()

In [9]:
df = aggregate_data(df)

All records already exist in the database. No new data to insert.


In [10]:
# List of features to predict
# features = [
#         "median_listing_price",
#         "average_listing_price",
#         "median_listing_price_per_square_foot",
#         "total_listing_count",
#         "median_days_on_market"
# ]
features = [
        "median_listing_price"
]

target_df = preprocess_data_4(df.copy())
prediction_df = target_df.copy()

In [11]:
for feature in features:
    predictions = get_predictions(df, feature)
    prediction_df[feature] = predictions

Initial memory: 108.08 MB
Final memory: 73.09 MB
Reduced by 32.4%
Validation RMSE: 25356.7090, RMSLE: 0.0858, MAE: 14061.1813, MAPE: 4.80%, R²: 0.9874
Validation RMSE: 32044.1651, RMSLE: 0.1080, MAE: 18982.1832, MAPE: 6.53%, R²: 0.9795
Validation RMSE: 38246.0142, RMSLE: 0.1233, MAE: 21436.9404, MAPE: 7.58%, R²: 0.9717
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 1.122634 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 254001
[LightGBM] [Info] Number of data points in the train set: 209230, number of used features: 998
[LightGBM] [Info] Start training from score 0.573377
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 1.192056 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 254001
[LightGBM] [Info] Number of data points in the train set: 209230, number of used features: 998
[LightGBM] [Info] Start training from sc