In [1]:
import csv
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import OrdinalEncoder
import pickle as pkl
import matplotlib.pyplot as plt
import random
import pyfixest as pf
import statsmodels.api as sm
import itertools as it
import warnings

warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)
warnings.filterwarnings('ignore')

In [2]:
class RegressionResult:

    def __init__(self):
        self.model_vars = []
        self.out_sample_mse = np.NaN
        self.in_sample_mse = np.NaN
        self.out_sample_pred_int_acc = np.NaN
        self.in_sample_pred_int_acc = np.NaN

    def print_result(self):
        print(",".join(self.model_vars))
        print("out_sample_mse:", self.out_sample_mse)
        print("in_sample_mse:", self.in_sample_mse)
        print("out_sample_pred_int_acc:", self.out_sample_pred_int_acc)
        print("in_sample_pred_int_acc:", self.in_sample_pred_int_acc)

    def is_empty(self):
        if self.model_vars == []:
            return True
        else:
            return False

In [3]:
def calculate_prediction_interval_accuracy(y, predictions):
    pred_data = pd.DataFrame(np.transpose([y, predictions.predicted_mean, predictions.var_pred_mean]), columns=["real_y", "pred_mean", "pred_var"])
    pred_data["pred_int_acc"] = np.where(
		(pred_data.pred_mean + np.sqrt(pred_data.pred_var) * 1.9603795 > pred_data.real_y) &
		(pred_data.pred_mean - np.sqrt(pred_data.pred_var) * 1.9603795 < pred_data.real_y),
		1,
		0
	)
    return np.mean(pred_data.pred_int_acc)

In [4]:
def choose_best_model(model1, model2, stat="pred_int"):
    assert stat in ["pred_int","mse","pred_int+mse"]
    if stat == "pred_int+mse":
        if (model1.out_sample_mse > model2.out_sample_mse) and (abs(.95-model1.out_sample_pred_int_acc) > abs(.95-model2.out_sample_pred_int_acc)):
            return model2
        else:
            return model1
    elif stat == "pred_int":
        if (abs(.95-model1.out_sample_pred_int_acc) > abs(.95-model2.out_sample_pred_int_acc)):
            return model2
        else:
            return model1
    elif stat == "mse":
        if (model1.out_sample_mse > model2.out_sample_mse):
            return model2
        else:
            return model1

In [5]:
def run_fe_regression_with_cv(num_folds, target_name, target_var, weights, model_vars, fixed_effects, incremental_effects):
    
    in_sample_mse_list, out_sample_mse_list, in_sample_pred_int_acc, out_sample_pred_int_acc = [], [], [], []
    model_vars_with_weights = [var.replace("[weight]",weights) for var in model_vars]
    print(model_vars_with_weights, flush=True)

    data_columns = train_data_files[0].columns

    if incremental_effects != 0:
        for i in range(incremental_effects):
            for incremental_col in [col for col in data_columns if col.endswith(f"incremental_effect_{str(i+1)}")]:
                model_vars_with_weights.append(incremental_col)

    if fixed_effects != None:
        for fe in fixed_effects:
            for fe_col in [col for col in data_columns if col.endswith(f"{fe}_fixed_effect")]:
                model_vars_with_weights.append(fe_col)

    for fold in range(num_folds):
        train_data = train_data_files[fold]
        test_data = test_data_files[fold]

        train_data_covariates = train_data[model_vars_with_weights]
        test_data_covariates = test_data[model_vars_with_weights]

        if fixed_effects == None:
            train_data_covariates = sm.add_constant(train_data_covariates)
            test_data_covariates = sm.add_constant(test_data_covariates)
        model = sm.OLS(train_data[target_var],train_data_covariates)
        regression = model.fit()

        in_sample_predictions = regression.get_prediction(train_data_covariates)
        out_sample_predictions = regression.get_prediction(test_data_covariates)
        
        in_sample_mse_list.append(np.mean(np.square(in_sample_predictions.predicted_mean-train_data[target_var])))
        out_sample_mse_list.append(np.mean(np.square(out_sample_predictions.predicted_mean-test_data[target_var])))
        
        in_sample_pred_int_acc.append(calculate_prediction_interval_accuracy(train_data[target_var], in_sample_predictions))
        out_sample_pred_int_acc.append(calculate_prediction_interval_accuracy(test_data[target_var], out_sample_predictions))

    reg_result = RegressionResult()
    reg_result.target_name = target_name
    reg_result.weights = weights
    reg_result.model_vars = sorted(model_vars)
    reg_result.in_sample_mse = np.mean(in_sample_mse_list)
    reg_result.out_sample_mse = np.mean(out_sample_mse_list)
    reg_result.in_sample_pred_int_acc = np.mean(in_sample_pred_int_acc)
    reg_result.out_sample_pred_int_acc = np.mean(out_sample_pred_int_acc)
    reg_result.fixed_effects = fixed_effects
    reg_result.incremental_effects = incremental_effects
    return reg_result

In [11]:
train_data_files, test_data_files = {}, {}
target_name = "gdp"
num_folds = 1
for i in range(num_folds):
    train_data_files[i] = pd.read_csv(f"../data/regression/cross_validation/{target_name}_regression_data_insample_festratified_{str(i)}.csv")
    test_data_files[i] = pd.read_csv(f"../data/regression/cross_validation/{target_name}_regression_data_outsample_festratified_{str(i)}.csv")
result = run_fe_regression_with_cv(num_folds, target_name, "fd_ln_gdp", "unweighted", [
        'fd_humidity_[weight]', 'fd_humidity_[weight]_2', 'fd_humidity_annual_std_[weight]', 
        'fd_humidity_annual_std_[weight]_2', 'fd_precip_[weight]', 'fd_precip_[weight]_2', 
        'fd_precip_[weight]_3', 'fd_precip_annual_std_[weight]', 'fd_precip_daily_std_[weight]', 
        'fd_precip_daily_std_[weight]_2', 'fd_precip_daily_std_[weight]_3', 'fd_temp_annual_std_[weight]', 
        'fd_temp_annual_std_[weight]_2', 'fd_temp_annual_std_[weight]_3', 'fd_temp_daily_std_[weight]', 
        'fd_temp_daily_std_[weight]_2', 'humidity_daily_std_[weight]', 'precip_[weight]', 
        'precip_daily_std_[weight]', 'temp_[weight]', 'temp_[weight]_2', 
        'temp_[weight]_3', 'wildfire', 'wildfire_heat_wave'
    ], ["country","year"], 3)
result.print_result()

['fd_humidity_unweighted', 'fd_humidity_unweighted_2', 'fd_humidity_annual_std_unweighted', 'fd_humidity_annual_std_unweighted_2', 'fd_precip_unweighted', 'fd_precip_unweighted_2', 'fd_precip_unweighted_3', 'fd_precip_annual_std_unweighted', 'fd_precip_daily_std_unweighted', 'fd_precip_daily_std_unweighted_2', 'fd_precip_daily_std_unweighted_3', 'fd_temp_annual_std_unweighted', 'fd_temp_annual_std_unweighted_2', 'fd_temp_annual_std_unweighted_3', 'fd_temp_daily_std_unweighted', 'fd_temp_daily_std_unweighted_2', 'humidity_daily_std_unweighted', 'precip_unweighted', 'precip_daily_std_unweighted', 'temp_unweighted', 'temp_unweighted_2', 'temp_unweighted_3', 'wildfire', 'wildfire_heat_wave']
fd_humidity_[weight],fd_humidity_[weight]_2,fd_humidity_annual_std_[weight],fd_humidity_annual_std_[weight]_2,fd_precip_[weight],fd_precip_[weight]_2,fd_precip_[weight]_3,fd_precip_annual_std_[weight],fd_precip_daily_std_[weight],fd_precip_daily_std_[weight]_2,fd_precip_daily_std_[weight]_3,fd_temp_ann