In [None]:
# Load necessary libraries and data files.
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.stats.outliers_influence import variance_inflation_factor

# Define the output directory and file paths
out_dir = Path('../../outputs/uds_extraction')
file_path = out_dir / 'moca_only.parquet'

# Check if the file exists
if not file_path.exists():
    raise FileNotFoundError(f"The file {file_path} does not exist. Please ensure it is generated first.")

# Load cleaned DataFrame from existing work or relevant source path.
cleaned_df = pd.read_parquet(file_path)

# Select the specified columns for the new dataset.
"""# Set 1
moca_columns = [
    'DIGFORCT', 'DIGFORSL', 'DIGBACCT', 'DIGBACLS', 'MINTTOTS', 'MINTTOTW', 'MINTSCNG', 'MINTSCNC',
    'MINTPCNG', 'MINTPCNC'
]"""

"""# Set 2
moca_columns = [
    'CRAFTVRS', 'CRAFTURS', 'CRAFTDVR', 'CRAFTDRE', 'CRAFTDTI', 'CRAFTCUE',
    'MOCATRAI', 'MOCACUBE', 'MOCACLOC', 'MOCACLON', 'MOCACLOH'
]"""

# Set 3
moca_columns = [
    'MINTTOTS', 'ANIMALS', 'VEG',
    'TRAILA', 'MOCACUBE', 'UDSBENTC'
]

# Create a modified MOCA-only DataFrame with selected columns.
modified_moca_only_df = cleaned_df[moca_columns].copy()
# Save the raw selection before any filtering
raw_selected_parquet = out_dir / 'moca_only_selected.parquet'
modified_moca_only_df.to_parquet(raw_selected_parquet)
print(f"Saved initial selected MOCA variables to: {raw_selected_parquet}")

In [None]:
from factor_analyzer import FactorAnalyzer
import numpy as np

# Note: Rotation helpers removed; using FactorAnalyzer with built-in 'promax' rotation


def clean_and_run_efa(df, label, out_dir):
    """Clean data and run EFA with multiple rotation methods"""
    print(f"\n=== Running EFA for {label} ===")
    print(f"Original shape: {df.shape}")

    # Remove rows with any missing values
    df_clean = df.dropna()
    print(f"After removing missing values: {df_clean.shape}")

    # Check for and remove constant columns (zero variance)
    constant_cols = []
    for col in df_clean.columns:
        if df_clean[col].std() == 0:
            constant_cols.append(col)

    if constant_cols:
        print(f"Removing {len(constant_cols)} constant columns: {constant_cols}")
        df_clean = df_clean.drop(columns=constant_cols)

    if df_clean.shape[1] < 2:
        print(f"Error: Not enough variables ({df_clean.shape[1]}) for EFA after removing constant columns")
        return

    print(f"Final shape for analysis: {df_clean.shape}")

    # Standardize the data
    Z = (df_clean - df_clean.mean()) / df_clean.std(ddof=0)

    # Verify standardization worked - convert to numeric and check
    try:
        Z_numeric = Z.apply(pd.to_numeric, errors='coerce')
        if Z.isnull().any().any() or np.isinf(Z_numeric.values).any():
            print("Error: Standardization produced NaN or infinite values")
            return
    except Exception:
        # If conversion fails, just check for NaN
        if Z.isnull().any().any():
            print("Error: Standardization produced NaN values")
            return

    # Check for zero variance columns before correlation calculation
    variance_check = Z.var()
    zero_var_cols = variance_check[variance_check == 0].index.tolist()

    if zero_var_cols:
        print(f"Warning: Removing {len(zero_var_cols)} zero-variance columns: {zero_var_cols}")
        Z = Z.drop(columns=zero_var_cols)
    
    # Also check for near-zero variance (numerical stability)
    near_zero_var_cols = variance_check[variance_check < 1e-10].index.tolist()
    if near_zero_var_cols:
        print(f"Warning: Removing {len(near_zero_var_cols)} near-zero variance columns: {near_zero_var_cols}")
        Z = Z.drop(columns=near_zero_var_cols)

    # Check if we have enough columns left for EFA
    if Z.shape[1] < 3:
        print(f"Error: Only {Z.shape[1]} variables remaining after variance check. EFA requires at least 3 variables.")
        return

    # Now safely calculate correlation matrix
    try:
        corr = np.corrcoef(Z.values, rowvar=False)
    except Exception as e:
        print(f"Error calculating correlation matrix: {e}")
        # Alternative: use pandas correlation which is more robust
        corr = Z.corr().values

    # Determine number of factors via Kaiser criterion (eigenvalues > 1)
    try:
        eigvals = np.linalg.eigvalsh(corr)
    except np.linalg.LinAlgError:
        eigvals = np.linalg.eigvals(corr).real
    n_factors = int((eigvals > 1.0).sum())
    n_factors = max(1, min(n_factors, Z.shape[1]))

    # Fit FactorAnalyzer with promax rotation
    fa = FactorAnalyzer(n_factors=n_factors, rotation='promax')
    fa.fit(Z.values)

    loadings = fa.loadings_  # variables x factors
    features = list(df.columns)
    loadings_df = pd.DataFrame(loadings, index=features, columns=[f"F{i+1}" for i in range(loadings.shape[1])])
    try:
        communalities = pd.Series(fa.get_communalities(), index=features)
    except Exception:
        communalities = pd.Series((loadings ** 2).sum(axis=1), index=features)

    rot_name = "promax"

    # Save artifacts
    loadings_csv = out_dir / f"efa_{label}_{rot_name}_loadings.csv"
    loadings_df.to_csv(loadings_csv)
    print(f"[{label} | {rot_name}] Saved loadings to {loadings_csv}")

    comm_csv = out_dir / f"efa_{label}_{rot_name}_communalities.csv"
    pd.DataFrame({"communality": communalities}, index=features).to_csv(comm_csv)
    print(f"[{label} | {rot_name}] Saved communalities to {comm_csv}")

    # Plot and save heatmap of factors vs. variable loadings
    try:
        # Dynamic figure size based on matrix shape
        n_vars, n_factors = loadings_df.shape
        width = max(6, min(18, 1.2 * n_factors + 3))
        height = max(6, min(24, 0.35 * n_vars + 3))

        plt.figure(figsize=(width, height))
        sns.heatmap(
            loadings_df,
            annot=True,
            fmt=".2f",
            cmap="coolwarm",
            center=0,
            linewidths=0.5,
            linecolor="#f0f0f0",
            cbar_kws={"label": "Loading"}
        )
        plt.title(f"EFA Loadings Heatmap — {label} ({rot_name})")
        plt.xlabel("Factors")
        plt.ylabel("Variables")
        plt.tight_layout()

        heatmap_png = out_dir / f"efa_{label}_{rot_name}_loadings_heatmap.png"
        plt.savefig(heatmap_png, dpi=300)
        plt.close()
        print(f"[{label} | {rot_name}] Saved loadings heatmap to {heatmap_png}")
    except Exception as e:
        print(f"[{label} | {rot_name}] Failed to create heatmap: {e}")

# Add this code before line 70 to create the reduced_moca_df
# Remove highly correlated variables to create a de-collinearized dataset
correlation_matrix = modified_moca_only_df.corr().abs()

# Find pairs of variables with high correlation (threshold = 0.9)
high_corr_pairs = []
for i in range(len(correlation_matrix.columns)):
    for j in range(i+1, len(correlation_matrix.columns)):
        if correlation_matrix.iloc[i, j] > 0.9:
            high_corr_pairs.append((correlation_matrix.columns[i], correlation_matrix.columns[j]))

# Remove one variable from each highly correlated pair
variables_to_remove = set()
for var1, var2 in high_corr_pairs:
    # Keep the variable with higher variance (or you could use other criteria)
    if modified_moca_only_df[var1].var() < modified_moca_only_df[var2].var():
        variables_to_remove.add(var1)
    else:
        variables_to_remove.add(var2)

# Create the reduced dataset
reduced_moca_df = modified_moca_only_df.drop(columns=list(variables_to_remove))

print(f"Removed {len(variables_to_remove)} highly correlated variables: {list(variables_to_remove)}")
print(f"Reduced dataset shape: {reduced_moca_df.shape}")

clean_and_run_efa(modified_moca_only_df, "moca_only", out_dir)
clean_and_run_efa(reduced_moca_df, "moca_only_reduced", out_dir)

In [None]:
import numpy as np

def calculate_vif(df):
    # Select only numeric columns and drop any non-numeric data
    numeric_df = df.select_dtypes(include=[np.number])
    
    # Remove columns with all NaN or constant values
    numeric_df = numeric_df.dropna(axis=1, how='all')
    numeric_df = numeric_df.loc[:, numeric_df.nunique() > 1]
    
    # Remove rows with any NaN values (listwise deletion)
    X = numeric_df.dropna(axis=0, how='any')
    
    if X.shape[1] == 0:
        return pd.DataFrame(columns=["Feature", "VIF"])
    
    if X.shape[0] < X.shape[1]:
        print(f"Warning: More variables ({X.shape[1]}) than observations ({X.shape[0]}). VIF may be unreliable.")
    
    # Ensure all data is float type to avoid type issues
    X = X.astype(float)
    
    try:
        vif_vals = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
        return pd.DataFrame({"Feature": X.columns, "VIF": vif_vals}).sort_values("VIF", ascending=False).reset_index(drop=True)
    except Exception as e:
        print(f"Error calculating VIF: {e}")
        return pd.DataFrame(columns=["Feature", "VIF"])


def reduce_multicollinearity(df, threshold=10.0):
    """Iteratively drop variables with highest VIF above threshold.
    Returns (reduced_df, dropped_features, final_vif_df)
    dropped_features is a list of tuples (feature, vif_at_drop)
    """
    X = df.select_dtypes(include=[np.number]).copy()
    X = X.dropna(axis=1, how='all')
    X = X.loc[:, X.nunique() > 1]
    dropped = []
    while True:
        vif_df = calculate_vif(X)
        if vif_df.empty:
            break
        max_row = vif_df.iloc[0]
        if float(max_row["VIF"]) <= threshold:
            break
        feat_to_drop = str(max_row["Feature"])
        dropped.append((feat_to_drop, float(max_row["VIF"])) )
        X = X.drop(columns=[feat_to_drop])
        if X.shape[1] < 2:
            break
    final_vif = calculate_vif(X)
    return X, dropped, final_vif

# Run VIF analysis on the selected MOCA features
print("[multicollinearity] Calculating VIF on selected MOCA variables…")
vif_initial = calculate_vif(modified_moca_only_df)
vif_csv = out_dir / 'efa_moca_only_vif.csv'
vif_initial.to_csv(vif_csv, index=False)

# Reduce multicollinearity by removing highly collinear tests
reduced_moca_df, dropped_features, vif_final = reduce_multicollinearity(modified_moca_only_df, threshold=10.0)
vif_final_csv = out_dir / 'efa_moca_only_vif_reduced.csv'
vif_final.to_csv(vif_final_csv, index=False)
print(f"Saved reduced VIF table to: {vif_final_csv}")

if dropped_features:
    print("[multicollinearity] Dropped the following redundant tests (feature, VIF when dropped):")
    for feat, val in dropped_features:
        print(f"  - {feat}: {val:.2f}")
else:
    print("[multicollinearity] No features exceeded the VIF threshold; none dropped.")

# Save reduced dataset
reduced_parquet = out_dir / 'moca_only_selected_reduced.parquet'
reduced_moca_df.to_parquet(reduced_parquet)
print(f"Saved reduced MOCA variables to: {reduced_parquet}")