In [1]:
# Core libraries
import pandas as pd
import numpy as np
import json
from urllib.request import urlopen
import warnings

# Visualization
import matplotlib.pyplot as plt
import plotly.express as px
import geopandas as gpd

# Scikit-learn preprocessing and imputation
from sklearn.preprocessing import StandardScaler
from sklearn.experimental import enable_iterative_imputer  # noqa
from sklearn.impute import IterativeImputer

# Scikit-learn models
from sklearn.linear_model import Ridge, Lasso, BayesianRidge, LinearRegression
from xgboost import XGBRegressor

# Scikit-learn model selection and evaluation
from sklearn.model_selection import (
    train_test_split,
    KFold,
    RandomizedSearchCV,
    cross_val_score
)
from sklearn.metrics import (
    mean_squared_error,
    mean_absolute_error,
    mean_absolute_percentage_error,
    r2_score,
    make_scorer
)

# Scikit-learn pipeline
from sklearn.pipeline import Pipeline

# Statsmodels
import statsmodels.api as sm
from statsmodels.stats.diagnostic import (
    het_breuschpagan,
    acorr_breusch_godfrey
)
from statsmodels.stats.stattools import durbin_watson
from statsmodels.stats.outliers_influence import variance_inflation_factor

# Scipy
from scipy.stats import shapiro, anderson

# SHAP for model interpretation
import shap


# Read from local shapefile
gdf = gpd.read_file("cb_2020_us_county_20m/cb_2020_us_county_20m.shp")
# Create FIPS code and convert to GeoJSON format
gdf['id'] = gdf['STATEFP'] + gdf['COUNTYFP']
counties = json.loads(gdf[['id', 'geometry']].to_json())
# Convert to GeoJSON dict for Plotly
dcounties = json.loads(gdf.to_json())

# Functions
---------------------------

In [3]:
warnings.filterwarnings("ignore", category=RuntimeWarning)
def MC(dataset):
    # === Load and clean data ===
    df = pd.read_excel(dataset)
    df = df[df['County'].notna()]
    target_col = '% Vaccinated'
    df = df.replace([np.inf, -np.inf], np.nan)
    df = df.dropna(subset=[target_col])
    temp = df['FIPS'].astype(str).str.pad(5, 'left', '0')
    df = df.drop(columns=['FIPS'], errors='ignore')
    
    X = df.drop(columns=[target_col]).select_dtypes(include=[np.number])
    y = df[target_col]
    X, y = X.align(y, join='inner', axis=0)

    X = X.loc[:, X.isnull().mean() <= 0.20]
    X = X.clip(-1e3, 1e3)
    low_var_cols = X.var()[X.var() < 1e-8].index.tolist()
    X = X.drop(columns=low_var_cols)

    # === 90/10 holdout split ===
    X_train_full, X_test, y_train_full, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

    # === Hyperparameter space ===
    param_dist = {
        'n_estimators': [100, 200, 300],             # more trees, learn slowly
        'max_depth': [2, 3, 4],                      # shallower trees
        'learning_rate': [0.01, 0.03, 0.05],         # smaller steps
        'subsample': [0.6, 0.8],                     # partial row sampling
        'colsample_bytree': [0.6, 0.8],              # partial feature sampling
        'gamma': [0.1, 0.3, 0.5],                    # avoid small splits
        'reg_alpha': [0, 0.1, 0.5],                  # L1 penalty
        'reg_lambda': [1, 2, 5],                     # L2 penalty
    }


    # === Hyperparameter search: outer loop ===
    n_trials = 50
    n_iterations = 5  # Monte Carlo splits per hyperparam set

    results = []

    for t in range(n_trials):
        params = {k: np.random.choice(v) for k, v in param_dist.items()}
        print(f"\n🔍 Trial {t + 1}/{n_trials} with params: {params}")

        rmses = []
        r2s = []
        mapes = []

        # Monte Carlo iterations: inner loop
        for i in range(n_iterations):
            X_train, X_val, y_train, y_val = train_test_split(
                X_train_full, y_train_full, test_size=0.2, random_state=np.random.randint(10000)
            )
            model = XGBRegressor(random_state=42, **params)
            pipeline = Pipeline([
                ('scaler', StandardScaler()),
                ('model', model)
            ])

            try:
                pipeline.fit(X_train, y_train)
                y_pred = pipeline.predict(X_val)

                rmse = np.sqrt(mean_squared_error(y_val, y_pred))
                r2 = r2_score(y_val, y_pred)
                nonzero_mask = y_val != 0
                mape = np.mean(
                    np.abs((y_val[nonzero_mask] - y_pred[nonzero_mask]) / y_val[nonzero_mask])
                ) * 100

                rmses.append(rmse)
                r2s.append(r2)
                mapes.append(mape)

            except Exception as e:
                print(f"⚠️ Iteration {i + 1} failed: {e}")

        # Average performance over Monte Carlo splits
        avg_rmse = np.mean(rmses)
        avg_r2 = np.mean(r2s)
        avg_mape = np.mean(mapes)

        print(f"✅ Avg RMSE: {avg_rmse:.2f}, Avg R²: {avg_r2:.2f}, Avg MAPE: {avg_mape:.2f}%")

        results.append({
            'rmse': avg_rmse,
            'r2': avg_r2,
            'mape': avg_mape,
            'params': params
        })

    # === Select best hyperparameters ===
    best_overall = min(results, key=lambda x: x['rmse'])
    print("\n🏆 Best Hyperparameters Across All Trials:")
    print(f"RMSE: {best_overall['rmse']:.2f}, R²: {best_overall['r2']:.2f}, MAPE: {best_overall['mape']:.2f}%")
    print(f"Params: {best_overall['params']}")

    # === Train final model on full 90% training set ===
    final_model = Pipeline([
        ('scaler', StandardScaler()),
        ('model', XGBRegressor(random_state=42, **best_overall['params']))
    ])
    final_model.fit(X_train_full, y_train_full)

    # === Evaluate on untouched test set ===
    y_pred_test = final_model.predict(X_test)
    y_pred_train = final_model.predict(X_train_full)
    rmse_test = np.sqrt(mean_squared_error(y_test, y_pred_test))
    r2_test = r2_score(y_test, y_pred_test) # training set
    r2_train = r2_score(y_train_full, y_pred_train)
    nonzero_mask_test = y_test != 0
    mape_test = np.mean(
        np.abs((y_test[nonzero_mask_test] - y_pred_test[nonzero_mask_test]) / y_test[nonzero_mask_test])
    ) * 100

    print("\n🧪 Final Test Set Performance:")
    print(f"RMSE: {rmse_test:.2f}")
    print(f"Train R²: {r2_train:.2f}")
    print(f"Test R²: {r2_test:.2f}")
    print(f"MAPE: {mape_test:.2f}%")

    # === SHAP Analysis ===
    X_scaled = final_model.named_steps['scaler'].transform(X)
    explainer = shap.TreeExplainer(final_model.named_steps['model'])
    shap_values = explainer.shap_values(X_scaled)

    shap.summary_plot(
        shap_values,
        features=pd.DataFrame(X_scaled, columns=X.columns),
        feature_names=X.columns,
        max_display=10
    )



    #--------NEW CODE---------
    # Compute mean absolute SHAP values
    mean_abs_shap_values = np.mean(np.abs(shap_values), axis=0)
    
    # Create DataFrame
    shap_df = pd.DataFrame({
        'feature': X.columns,
        'mean_abs_shap': mean_abs_shap_values
    })
    
    # Sort ascending or descending
    shap_df = shap_df.sort_values(by='mean_abs_shap', ascending=True)
    
    # Take bottom 10 or top 10
    top_features_df = shap_df.tail(10)
    
    # Assign a single color (since there is no sign)
    bar_color = 'cornflowerblue'
    
    # === Plot ===
    plt.figure(figsize=(8,6))
    bars = plt.barh(
        y=top_features_df['feature'],
        width=top_features_df['mean_abs_shap'],
        color=bar_color
    )
    
    plt.axvline(0, color='gray', linewidth=1)
    plt.xlim(0, 3)  # no negatives since values are absolute
    
    # Labels
    plt.xlabel("Mean Absolute SHAP Value")
    plt.ylabel("Feature")
    plt.title("Top 10 Features by Mean Absolute SHAP")
    
    # Add value labels
    for bar, val in zip(bars, top_features_df['mean_abs_shap']):
        plt.text(
            val + 0.02,
            bar.get_y() + bar.get_height()/2,
            f"{val:.3f}",
            va='center',
            ha='left'
        )
    
    plt.tight_layout()
    plt.show()





    # Append predictions to dataframe
    df['Predicted'] = final_model.predict(X)
    df['Differences'] = df['Predicted'] - y
    df['FIPS'] = temp

    return df

In [4]:
def map(df, field, customcolor, min_value, max_value):
    df['State, County'] = df['State']+','+df['County']
    dfx = df[['FIPS', field, 'State, County']]

    if min_value == max_value:
        custom_range = (min(dfx[field]), max(dfx[field]))
    else:
        custom_range = (min_value, max_value)
    
    fig = px.choropleth(
        dfx,
        geojson=counties,
        locations='FIPS',
        featureidkey="properties.id",
        color=field,
        color_continuous_scale=customcolor,
        range_color = custom_range,
        scope='usa',
        hover_data={'State, County': True, 'FIPS': True}
    )
    fig.update_layout(margin = {"r":0, "t":0, "l":0, "b":0}) #add 0 margins to output
    print(field, "COVID19 Model")
    fig.show()
    
    # summary_df = df.drop(["State", "County", "State, County", "FIPS"], axis=1)
    # print(list(summary_df.columns))

In [5]:
def ridgeRegression(dataset):
    # === Load and clean data ===
    df = pd.read_excel(dataset)
    
    # Drop aggregate state rows
    df = df[df['County'].notna()]
    
    # Drop identifier columns
    df = df.drop(columns=["FIPS_x", "County", "State"], errors="ignore")
    
    # Keep only numeric data
    df_numeric = df.select_dtypes(include=[np.number])
    
    # Define X and y
    X = df_numeric.drop(columns=["% Vaccinated"], errors="ignore")
    y = df_numeric["% Vaccinated"]
    
    # === Impute missing values in X ===
    imputer = IterativeImputer(random_state=42)
    X_imputed = imputer.fit_transform(X)
    X_df = pd.DataFrame(X_imputed, columns=X.columns)
    
    # === Drop rows with missing y values (aligned correctly) ===
    y = y.reset_index(drop=True)
    valid_mask = y.notna()
    X_df = X_df[valid_mask].reset_index(drop=True)
    y = y[valid_mask].reset_index(drop=True)
    
    # === Scale features ===
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_df)
    X_scaled_df = pd.DataFrame(X_scaled, columns=X_df.columns)
    
    # === Train-test split ===
    X_train, X_test, y_train, y_test = train_test_split(X_scaled_df, y, test_size=0.3, random_state=3141)
    
    # === Fit Ridge regression ===
    ridge = Ridge(alpha=1.0)
    ridge.fit(X_train, y_train)
    y_pred = ridge.predict(X_test)
    
    # === Evaluation metrics ===
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    mape = mean_absolute_percentage_error(y_test, y_pred) * 100
    r2 = r2_score(y_test, y_pred)
    
    print("=== Ridge Regression Results ===")
    print(f"R²: {r2:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAPE: {mape:.2f} %")
    
    # Convert coefficients into a DataFrame
    coef_df = pd.DataFrame({
        'Feature': X_scaled_df.columns,
        'Coefficient': ridge.coef_
    })
    
    # Sort by absolute magnitude of the coefficient
    top_5 = coef_df.reindex(coef_df.Coefficient.abs().sort_values(ascending=False).index).head(5)
    
    print("Top 5 most influential predictors in Ridge Regression:")
    print(top_5)
    
    # === Plot predictions (all test samples) ===
    plt.figure(figsize=(12, 4))
    plt.plot(y_test.values, label="Actual", color="cornflowerblue")
    plt.plot(y_pred, label="Predicted", color="red", linestyle=":")
    plt.title("Ridge Regression: Actual vs Predicted % Vaccinated")
    plt.xlabel("Test Sample Index")
    plt.ylabel("% Vaccinated")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

In [6]:
def lassoRegression(dataset):
    # Load data
    df = pd.read_excel(dataset)
    
    # Drop rows without county names (to exclude state aggregates)
    df = df[df['County'].notna()].copy()
    df = df.drop(columns=["FIPS", "County", "State"], errors="ignore")
    
    # Drop any non-numeric columns except target
    target_col = "% Vaccinated"
    df_numeric = df.select_dtypes(include=[np.number])
    if target_col not in df_numeric.columns:
        df_numeric[target_col] = df[target_col]
    
    # Drop rows where target is missing
    df_numeric = df_numeric[df_numeric[target_col].notna()].reset_index(drop=True)
    
    # Split features and target
    X_df = df_numeric.drop(columns=[target_col])
    y = df_numeric[target_col]
    
    # Impute missing values
    imputer = IterativeImputer(random_state=42)
    X_imputed = imputer.fit_transform(X_df)
    X_imputed_df = pd.DataFrame(X_imputed, columns=X_df.columns)
    
    # Calculate VIF and drop high-VIF columns iteratively
    def drop_high_vif(X, threshold=10.0):
        while True:
            X_const = sm.add_constant(X)
            vif = pd.Series(
                [variance_inflation_factor(X_const.values, i) for i in range(1, X_const.shape[1])],
                index=X.columns
            )
            max_vif = vif.max()
            if max_vif > threshold:
                max_vif_feature = vif.idxmax()
                print(f"Dropping '{max_vif_feature}' due to high VIF ({max_vif:.2f})")
                X = X.drop(columns=[max_vif_feature])
            else:
                return X, vif
    
    X_filtered, vif_series = drop_high_vif(pd.DataFrame(X_imputed_df, columns=X_df.columns))
    
    # Scale features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_filtered)
    
    # Train-test split
    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42)
    
    # Fit Lasso regression
    lasso = Lasso(alpha=.05, max_iter=50000, random_state=42)
    lasso.fit(X_train, y_train)
    y_pred = lasso.predict(X_test)
    
    # Evaluation metrics
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)
    epsilon = 1e-10
    mape = np.mean(np.abs((y_test - y_pred) / (y_test + epsilon))) * 100
    
    print("\n--- Model Performance ---")
    print(f"RMSE: {rmse:.2f}")
    print(f"MAE: {mae:.2f}")
    print(f"MAPE: {mape:.2f}%")
    print(f"R²: {r2:.3f}")
    
    # Top 5 predictors (by absolute coefficient value)
    coef_series = pd.Series(lasso.coef_, index=X_filtered.columns)
    top5 = coef_series.reindex(coef_series.abs().sort_values(ascending=False).index)[:5]
    print("\nTop 5 Predictors (Lasso):")
    print(top5.to_frame(name="Coefficient"))
    
    # Plot actual vs predicted
    plt.figure(figsize=(10, 4))
    plt.plot(y_test.values, label='Actual % Vaccinated', color='cornflowerblue')
    plt.plot(y_pred, label='Predicted % Vaccinated', color='red', linestyle='--')
    plt.title('Lasso Regression: Actual vs Predicted')
    plt.xlabel('Sample Index (Test Set)')
    plt.ylabel('% Vaccinated')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()
    '''
    # Print VIF table
    print("\nFinal VIF Table:")
    vif_table = pd.DataFrame({'Feature': X_filtered.columns, 'VIF': vif_series.values})
    print(vif_table)
    '''