In [None]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import re
import os
from sklearn.inspection import PartialDependenceDisplay
import shap

In [None]:
results_dir = 'results/'
fig_dir = os.path.join('figures/')
pca_dir = os.path.join(fig_dir, 'overall_pca_and_classifier/')
default_width = 1000
default_height = 750

if not os.path.exists(fig_dir):
    os.makedirs(fig_dir)

if not os.path.exists(pca_dir):
    os.makedirs(pca_dir)

In [None]:
data = pd.read_csv(os.path.join(results_dir, 'transformed_distance_query_results_root_Los_Angeles-_CA.csv'))

In [None]:
def add_confident_column(df):
    def classify_distance(distance):
        # Convert to string to handle both float and int
        distance_str = str(distance)
        
        # Check if the distance ends with two zeros before the decimal point
        if re.search(r'00\.?0*$', distance_str):
            return 'guess'
        else:
            return 'confident'
    
    df['confidence'] = df['extracted_digits'].apply(classify_distance)
    return df
    
data = add_confident_column(data)

In [None]:
data

In [None]:
# pivot so that each index has its own column, fill na with zero
data_pivot = data.pivot_table(index=['city2', 'sae_layer', 'feature_type', 'confidence']
                               , columns='index'
                               , values='activation'
                               , fill_value=0.0)
data_pivot = data_pivot.rename_axis(None, axis = 1).reset_index()

In [None]:
# Drop only the columns that exist
columns_to_drop = ['city1', 'city2', 'extracted_digits']
existing_columns_to_drop = [col for col in columns_to_drop if col in data_pivot.columns]

pca_data = data_pivot.drop(columns=existing_columns_to_drop)
pca_data

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import os

# Function to perform PCA and save plot
# def perform_pca(X, confidence, sae_layer, feature_type, pca_dir):
#     scaler = StandardScaler()
#     X_scaled = scaler.fit_transform(X)
    
#     pca = PCA()
#     X_pca = pca.fit_transform(X_scaled)
    
#     loadings = pd.DataFrame(
#         pca.components_.T,
#         columns=[f'PC{i+1}' for i in range(pca.n_components_)],
#         index=X.columns
#     )
    
#     plt.figure(figsize=(10, 8))
#     scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=confidence.map({'guess': 0, 'confident': 1}), 
#                           cmap='coolwarm', alpha=0.7)
#     plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
#     plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
#     plt.title(f'PCA of Data Colored by Confidence\nLayer: {sae_layer}, Type: {feature_type}')
#     plt.colorbar(scatter, label='Confidence (0: Guess, 1: Confident)')
    
#     for i, (x, y) in enumerate(zip(loadings['PC1'], loadings['PC2'])):
#         plt.arrow(0, 0, x, y, color='k', alpha=0.5, head_width=0.05, head_length=0.05)
#         plt.text(x*1.2, y*1.2, loadings.index[i], color='g', ha='center', va='center')
    
#     plt.tight_layout()
#     os.makedirs(os.path.join(pca_dir, f'pca_plots/{feature_type}'), exist_ok=True)
#     plt.savefig(os.path.join(pca_dir, f'pca_plots/{feature_type}/pca_plot_layer{sae_layer}.png'), dpi = 300)
#     plt.close()
    
#     return loadings

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import pandas as pd
import os

def perform_pca(X, confidence, sae_layer, feature_type, pca_dir):
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    pca = PCA()
    X_pca = pca.fit_transform(X_scaled)
    
    loadings = pd.DataFrame(
        pca.components_.T,
        columns=[f'PC{i+1}' for i in range(pca.n_components_)],
        index=X.columns
    )
    
    # Get the top 5 loadings for PC1 and PC2
    top_loadings = pd.concat([
        loadings['PC1'].abs().nlargest(5),
        loadings['PC2'].abs().nlargest(5)
    ]).index.unique()
    
    plt.figure(figsize=(10, 8))
    scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=confidence.map({'guess': 0, 'confident': 1}), 
                          cmap='coolwarm', alpha=0.7)
    plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
    plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
    plt.title(f'PCA of Data Colored by Confidence\nLayer: {sae_layer}, Type: {feature_type}')
    plt.colorbar(scatter, label='Confidence (0: Guess, 1: Confident)')
    
    scale_factor = 5

    # Plot only the top 5 loadings
    for feature in top_loadings:
        x, y = loadings.loc[feature, 'PC1'], loadings.loc[feature, 'PC2']
        plt.arrow(0, 0, x*scale_factor, y*scale_factor, color='k', alpha=0.5, head_width=0.05, head_length=0.05)
        plt.text(x*1.2*scale_factor, y*1.2*scale_factor, feature, color='g', ha='center', va='center')
    
    plt.tight_layout()
    os.makedirs(os.path.join(pca_dir, f'pca_plots/{feature_type}'), exist_ok=True)
    plt.savefig(os.path.join(pca_dir, f'pca_plots/{feature_type}/pca_plot_layer{sae_layer}.png'))
    plt.close()
    
    return loadings

# Function to train classifier and get feature importances
def train_classifier(X, y, sae_layer, feature_type, pca_dir):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
    rf_classifier.fit(X_train_scaled, y_train)
    
    y_pred = rf_classifier.predict(X_test_scaled)
    
    report = classification_report(y_test, y_pred, target_names=['guess', 'confident'], output_dict=True)
    conf_matrix = confusion_matrix(y_test, y_pred)
    
    importances = rf_classifier.feature_importances_
    feature_importances = pd.Series(importances, index=X.columns).sort_values(ascending=False)
    
    # Plot only top 10 feature importances
    plt.figure(figsize=(10, 6))
    feature_importances.head(10).plot(kind='bar')
    plt.title(f'Top 10 Feature Importances\nLayer: {sae_layer}, Type: {feature_type}')
    plt.xlabel('Features')
    plt.ylabel('Importance')
    plt.tight_layout()
    os.makedirs(os.path.join(pca_dir, f'classifier_plots/{feature_type}'), exist_ok=True)
    plt.savefig(os.path.join(pca_dir, f'classifier_plots/{feature_type}/feature_importance_layer{sae_layer}.png'), dpi = 300)
    plt.close()
    
    return report, conf_matrix, feature_importances

def create_activation_heatmap(X, confidence, sae_layer, feature_type, pca_dir, feature_importances):
    # Get the top 10 features
    top_10_features = feature_importances.nlargest(10).index

    # Filter X to include only top 10 features
    X_top10 = X[top_10_features]

    # Calculate average activation for guess and confident
    avg_activation = X_top10.groupby(confidence).mean().T

    # Create heatmap
    plt.figure(figsize=(12, 8))
    sns.heatmap(avg_activation, cmap='Blues', annot=True, fmt='.2f')
    plt.title(f'Average Activation of Top 10 Features\nLayer: {sae_layer}, Type: {feature_type}')
    plt.xlabel('Confidence')
    plt.ylabel('Top 10 Features')

    # Save heatmap
    os.makedirs(os.path.join(pca_dir, f'activation_heatmaps/{feature_type}'), exist_ok=True)
    plt.savefig(os.path.join(pca_dir, f'activation_heatmaps/{feature_type}/activation_heatmap_layer{sae_layer}.png'), dpi=300)
    plt.close()

    return avg_activation

# Create output directories
os.makedirs(os.path.join(pca_dir, 'pca_plots'), exist_ok=True)
os.makedirs(os.path.join(pca_dir, 'classifier_plots'), exist_ok=True)
os.makedirs(os.path.join(pca_dir, 'text_results'), exist_ok=True)
os.makedirs(os.path.join(pca_dir, 'activation_heatmaps'), exist_ok=True)

# Group by sae_layer and feature_type
grouped = pca_data.groupby(['sae_layer', 'feature_type'])

# Perform analysis for each group
for (sae_layer, feature_type), group in grouped:
    print(f"Processing Layer: {sae_layer}, Type: {feature_type}")
    
    # Prepare data
    X = group.drop(['confidence', 'sae_layer', 'feature_type'], axis=1)
    confidence = group['confidence']
    y = confidence.map({'guess': 0, 'confident': 1})
    
    # Perform PCA
    loadings = perform_pca(X, confidence, sae_layer, feature_type, pca_dir)
    
    # Train classifier
    report, conf_matrix, feature_importances = train_classifier(X, y, sae_layer, feature_type, pca_dir)
    
    # Create activation heatmap
    avg_activation = create_activation_heatmap(X, confidence, sae_layer, feature_type, pca_dir, feature_importances)
    
    # Save results
    os.makedirs(os.path.join(pca_dir, f'text_results/{feature_type}'), exist_ok=True)
    with open(os.path.join(pca_dir, f'text_results/{feature_type}/results_layer{sae_layer}.txt'), 'w') as f:
        f.write(f"Results for Layer: {sae_layer}, Type: {feature_type}\n\n")
        f.write("PCA Loadings (top 10):\n")
        f.write(loadings['PC1'].abs().sort_values(ascending=False).head(10).to_string())
        f.write("\n\nClassification Report:\n")
        f.write(pd.DataFrame(report).transpose().to_string())
        f.write("\n\nConfusion Matrix:\n")
        f.write(str(conf_matrix))
        f.write("\n\nTop 10 Feature Importances:\n")
        f.write(feature_importances.head(10).to_string())
        f.write("\n\nAverage Feature Activation:\n")
        f.write(avg_activation.to_string())

print("Analysis complete. Results saved in the 'output' directory.")

In [None]:
# import pandas as pd
# import numpy as np
# from sklearn.preprocessing import StandardScaler
# from sklearn.decomposition import PCA
# from sklearn.model_selection import train_test_split
# from sklearn.ensemble import RandomForestClassifier
# from sklearn.metrics import classification_report, confusion_matrix
# import matplotlib.pyplot as plt
# import seaborn as sns
# import os

# # Function to perform PCA and save plot
# # def perform_pca(X, confidence, sae_layer, feature_type, pca_dir):
# #     scaler = StandardScaler()
# #     X_scaled = scaler.fit_transform(X)
    
# #     pca = PCA()
# #     X_pca = pca.fit_transform(X_scaled)
    
# #     loadings = pd.DataFrame(
# #         pca.components_.T,
# #         columns=[f'PC{i+1}' for i in range(pca.n_components_)],
# #         index=X.columns
# #     )
    
# #     plt.figure(figsize=(10, 8))
# #     scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=confidence.map({'guess': 0, 'confident': 1}), 
# #                           cmap='coolwarm', alpha=0.7)
# #     plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
# #     plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
# #     plt.title(f'PCA of Data Colored by Confidence\nLayer: {sae_layer}, Type: {feature_type}')
# #     plt.colorbar(scatter, label='Confidence (0: Guess, 1: Confident)')
    
# #     for i, (x, y) in enumerate(zip(loadings['PC1'], loadings['PC2'])):
# #         plt.arrow(0, 0, x, y, color='k', alpha=0.5, head_width=0.05, head_length=0.05)
# #         plt.text(x*1.2, y*1.2, loadings.index[i], color='g', ha='center', va='center')
    
# #     plt.tight_layout()
# #     os.makedirs(os.path.join(pca_dir, f'pca_plots/{feature_type}'), exist_ok=True)
# #     plt.savefig(os.path.join(pca_dir, f'pca_plots/{feature_type}/pca_plot_layer{sae_layer}.png'), dpi = 300)
# #     plt.close()
    
# #     return loadings

# import matplotlib.pyplot as plt
# import seaborn as sns
# import numpy as np
# from sklearn.preprocessing import StandardScaler
# from sklearn.decomposition import PCA
# import pandas as pd
# import os

# def perform_pca(X, confidence, sae_layer, feature_type, pca_dir):
#     scaler = StandardScaler()
#     X_scaled = scaler.fit_transform(X)
    
#     pca = PCA()
#     X_pca = pca.fit_transform(X_scaled)
    
#     loadings = pd.DataFrame(
#         pca.components_.T,
#         columns=[f'PC{i+1}' for i in range(pca.n_components_)],
#         index=X.columns
#     )
    
#     # Get the top 5 loadings for PC1 and PC2
#     top_loadings = pd.concat([
#         loadings['PC1'].abs().nlargest(5),
#         loadings['PC2'].abs().nlargest(5)
#     ]).index.unique()
    
#     plt.figure(figsize=(10, 8))
#     scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=confidence.map({'guess': 0, 'confident': 1}), 
#                           cmap='coolwarm', alpha=0.7)
#     plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
#     plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
#     plt.title(f'PCA of Data Colored by Confidence\nLayer: {sae_layer}, Type: {feature_type}')
#     plt.colorbar(scatter, label='Confidence (0: Guess, 1: Confident)')
    
#     scale_factor = 5

#     # Plot only the top 5 loadings
#     for feature in top_loadings:
#         x, y = loadings.loc[feature, 'PC1'], loadings.loc[feature, 'PC2']
#         plt.arrow(0, 0, x*scale_factor, y*scale_factor, color='k', alpha=0.5, head_width=0.05, head_length=0.05)
#         plt.text(x*1.2*scale_factor, y*1.2*scale_factor, feature, color='g', ha='center', va='center')
    
#     plt.tight_layout()
#     os.makedirs(os.path.join(pca_dir, f'pca_plots/{feature_type}'), exist_ok=True)
#     plt.savefig(os.path.join(pca_dir, f'pca_plots/{feature_type}/pca_plot_layer{sae_layer}.png'))
#     plt.close()
    
#     return loadings

# def create_feature_effect_plot(model, X, feature_names, sae_layer, feature_type, pca_dir):
#     try:
#         # Get feature importances
#         importances = model.feature_importances_
#         indices = np.argsort(importances)[::-1]

#         # Ensure feature_names are strings and select top 5 features
#         feature_names = [str(name) for name in feature_names]
#         top_features = [feature_names[i] for i in indices[:5]]

#         # Create partial dependence plot
#         fig, ax = plt.subplots(figsize=(15, 10))
#         display = PartialDependenceDisplay.from_estimator(
#             model, X, features=top_features,  # Use feature names instead of indices
#             kind="average", centered=True, 
#             subsample=1000, n_jobs=3, 
#             grid_resolution=20, random_state=42,
#             ice_lines_kw={"color": "tab:blue", "alpha": 0.2, "linewidth": 0.5},
#             pd_line_kw={"color": "tab:orange", "linewidth": 2},
#         )
#         display.plot(ax=ax)
        
#         fig.suptitle(f'Partial Dependence of Top 5 Features\nLayer: {sae_layer}, Type: {feature_type}')
#         plt.tight_layout()

#     except Exception as e:
#         print(f"Partial dependence plot creation failed: {str(e)}")
#         print("Falling back to feature importance plot.")
        
#         # Ensure we have valid feature names for the top 5 features
#         valid_indices = [i for i in indices if i < len(feature_names)][:5]
#         top_features = [feature_names[i] for i in valid_indices]
#         top_importances = importances[valid_indices]

#         # Plot feature importances
#         plt.figure(figsize=(12, 8))
#         plt.title(f'Feature Importances\nLayer: {sae_layer}, Type: {feature_type}')
#         plt.bar(range(len(top_features)), top_importances)
#         plt.xticks(range(len(top_features)), top_features, rotation=45, ha='right')
#         plt.xlabel('Features')
#         plt.ylabel('Importance')
#         plt.tight_layout()

#     os.makedirs(os.path.join(pca_dir, f'feature_effect_plots/{feature_type}'), exist_ok=True)
#     plt.savefig(os.path.join(pca_dir, f'feature_effect_plots/{feature_type}/effect_plot_layer{sae_layer}.png'), dpi=300)
#     plt.close()

#     return top_features

# # Modify the train_classifier function to return the trained model and feature names
# def train_classifier(X, y, sae_layer, feature_type, pca_dir):
#     X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

#     scaler = StandardScaler()
#     X_train_scaled = scaler.fit_transform(X_train)
#     X_test_scaled = scaler.transform(X_test)

#     rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
#     rf_classifier.fit(X_train_scaled, y_train)

#     y_pred = rf_classifier.predict(X_test_scaled)

#     report = classification_report(y_test, y_pred, target_names=['guess', 'confident'], output_dict=True)
#     conf_matrix = confusion_matrix(y_test, y_pred)

#     importances = rf_classifier.feature_importances_
#     feature_importances = pd.Series(importances, index=X.columns).sort_values(ascending=False)

#     # Plot only top 10 feature importances
#     plt.figure(figsize=(10, 6))
#     feature_importances.head(10).plot(kind='bar')
#     plt.title(f'Top 10 Feature Importances\nLayer: {sae_layer}, Type: {feature_type}')
#     plt.xlabel('Features')
#     plt.ylabel('Importance')
#     plt.tight_layout()
#     os.makedirs(os.path.join(pca_dir, f'classifier_plots/{feature_type}'), exist_ok=True)
#     plt.savefig(os.path.join(pca_dir, f'classifier_plots/{feature_type}/feature_importance_layer{sae_layer}.png'), dpi=300)
#     plt.close()

#     return report, conf_matrix, feature_importances, rf_classifier, X_test_scaled, X.columns
# def create_activation_heatmap(X, confidence, sae_layer, feature_type, pca_dir):
#     # Calculate average activation for guess and confident
#     avg_activation = X.groupby(confidence).mean().T
    
#     # Create heatmap
#     plt.figure(figsize=(12, 8))
#     sns.heatmap(avg_activation, cmap='coolwarm', center=0)
#     plt.title(f'Average Feature Activation\nLayer: {sae_layer}, Type: {feature_type}')
#     plt.xlabel('Confidence')
#     plt.ylabel('Features')
    
#     # Save heatmap
#     os.makedirs(os.path.join(f'activation_heatmaps/{feature_type}'), exist_ok=True)
#     plt.savefig(os.path.join(f'activation_heatmaps/{feature_type}/activation_heatmap_layer{sae_layer}.png'), dpi = 300)
#     plt.close()
    
#     return avg_activation

# # Create output directories
# os.makedirs(os.path.join(pca_dir, 'pca_plots'), exist_ok=True)
# os.makedirs(os.path.join(pca_dir, 'classifier_plots'), exist_ok=True)
# os.makedirs(os.path.join(pca_dir, 'text_results'), exist_ok=True)
# os.makedirs(os.path.join(pca_dir, 'activation_heatmaps'), exist_ok=True)
# os.makedirs(os.path.join(pca_dir, 'feature_effect_plots'), exist_ok=True)

# pca_data = pca_data[pca_data['sae_layer'] == 21]
# # Group by sae_layer and feature_type
# grouped = pca_data.groupby(['sae_layer', 'feature_type'])

# # Perform analysis for each group
# for (sae_layer, feature_type), group in grouped:
#     print(f"Processing Layer: {sae_layer}, Type: {feature_type}")

#     # Prepare data
#     X = group.drop(['confidence', 'sae_layer', 'feature_type'], axis=1)
#     confidence = group['confidence']
#     y = confidence.map({'guess': 0, 'confident': 1})

#     # Perform PCA
#     loadings = perform_pca(X, confidence, sae_layer, feature_type, pca_dir)

#     # Train classifier
#     report, conf_matrix, feature_importances, model, X_test, feature_names = train_classifier(X, y, sae_layer, feature_type, pca_dir)

#     # Create activation heatmap
#     avg_activation = create_activation_heatmap(X, confidence, sae_layer, feature_type, pca_dir)

#     # Create feature effect plot
#     top_features = create_feature_effect_plot(model, X_test, feature_names, sae_layer, feature_type, pca_dir)

#     # Save results
#     os.makedirs(os.path.join(pca_dir, f'text_results/{feature_type}'), exist_ok=True)
#     with open(os.path.join(pca_dir, f'text_results/{feature_type}/results_layer{sae_layer}.txt'), 'w') as f:
#         f.write(f"Results for Layer: {sae_layer}, Type: {feature_type}\n\n")
#         f.write("PCA Loadings (top 10):\n")
#         f.write(loadings['PC1'].abs().sort_values(ascending=False).head(10).to_string())
#         f.write("\n\nClassification Report:\n")
#         f.write(pd.DataFrame(report).transpose().to_string())
#         f.write("\n\nConfusion Matrix:\n")
#         f.write(str(conf_matrix))
#         f.write("\n\nTop 10 Feature Importances:\n")
#         f.write(feature_importances.head(10).to_string())
#         f.write("\n\nAverage Feature Activation:\n")
#         f.write(avg_activation.to_string())
#         f.write("\n\nTop 5 Features for Partial Dependence Plot:\n")
#         f.write(", ".join(map(str, top_features)))  # Ensure all elements are strings

# print("Analysis complete. Results saved in the 'output' directory.")