In [None]:
# imports
import numpy as np
import pandas as pd
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
from sklearn.model_selection import  GridSearchCV, StratifiedKFold
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import confusion_matrix, accuracy_score, balanced_accuracy_score, f1_score ,make_scorer, mean_squared_error
import matplotlib.font_manager as fm
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.svm import SVC

In [4]:
def read_multi_index_data(path):
    """
    Convert a DataFrame with paired identifiers into a MultiIndex DataFrame.

    Parameters
    ----------
    data : pd.DataFrame
        Data matrix with first 2 columns representing index identifiers, and the remaining clumns contain data values

    Returns
    -------
    data : pd.DataFrame
        A DataFrame with a MultiIndex (subj_i, subj_j) as the index and the remaining columns as data.
    """
    data = pd.read_csv(path)
    multi_index = pd.MultiIndex.from_arrays(data.iloc[:,:2].values.T ,names=['subj_i', 'subj_j'])
    data = data.iloc[:,2:]
    data.index = multi_index
    return data

In [5]:
def resampling(feat, label, sampling_method='over', random_state=0, sampling_strategy='auto', **kargs):
    """
    Resample a dataset to address class imbalance using various sampling techniques.
    
    Parameters
    ----------
    feat : np.array
        Features of the dataset to resample.
    label : np.array
        Target class labels.
    sampling_method : str, default='over'
        Resampling technique to use:
        - 'under': Random undersampling
        - 'over': Random oversampling
    random_state : int, default=0
        Random seed for reproducibility.
    sampling_strategy : str or dict, default='auto'
        Sampling strategy to use.
    **kargs : dict
        Additional parameters to pass to the specific resampling algorithm.
    
    Returns
    -------
    feat_resampled : np.array
        Resampled feature data.
    label_resampled : np.array
        Resampled target labels.
    """

    sampler_dict = {
        'under': RandomUnderSampler,
        'over': RandomOverSampler,
        }

    # Validate input parameters
    valid_methods = ['under', 'over']
    if sampling_method not in valid_methods:
        raise ValueError(f"Invalid value for 'sampling_method'. Use one of {valid_methods}")
    
    np.random.seed(random_state)
    
    sampler = sampler_dict[sampling_method](sampling_strategy=sampling_strategy, random_state=random_state, **kargs)
    feat_resampled, label_resampled = sampler.fit_resample(feat, label)
    
    return feat_resampled, label_resampled

In [6]:
def get_mean_and_se(metric_list):
    """
    Calculate the mean and standard error of a list of metric values.

    Parameters
    ----------
    metric_list : list of float
        List of metric values.

    Returns
    -------
    mean : float
        The mean of the metric values.
    se : float
        The standard error of the values.
"""
    mean = np.mean(metric_list)
    se = np.std(metric_list) / np.sqrt(len(metric_list))

    return mean, se

In [19]:
def my_classifier(feat, label, model, grid, scoring, to_resample=None, cv=10):
    """
    Trains and evaluates a classifier using k-fold cross-validation with optional resampling.
    
    Parameters:
    -----------
    feat : DataFrame
        Feature matrix.
    label : array-like
        Target vector with class labels.
    model : estimator object
        Classifier model (default: LinearSVC).
    grid : dict
        Parameter grid for GridSearchCV.
    scoring : str
        Scoring metric for GridSearchCV.
    to_resample : str or None
        Resampling method ('under', 'over', 'smote', or None).
    cv : int
        Number of cross-validation folds.

    Returns:
    --------
    results : dict
        Dictionary containing various metrics and predictions.
    """
    splitter = StratifiedKFold(n_splits=cv, shuffle=True, random_state=0)
    prediction = np.zeros((feat.shape[0]))
    acc_list, balanced_acc_list, f1_weighted_list, f1_macro_list = [], [], [], []
    n_classes = len(np.unique(label))
    mean_confusion_matrix = np.zeros((n_classes, n_classes))
    best_params_list = []

    # Encode class labels into numeric values
    le = LabelEncoder()
    label = le.fit_transform(label)

    # Get relevant data slices for train-test split
    for fold, (train_idx, test_idx) in enumerate(splitter.split(feat, label)):
        feat_train = feat.iloc[train_idx, :].to_numpy()
        feat_test = feat.iloc[test_idx, :].to_numpy()
        label_train = label[train_idx]
        label_test = label[test_idx]

        # Resample the training set to address class imbalance if requested 
        if to_resample:
            feat_train, label_train = resampling(feat_train, label_train, to_resample)

        # Standardize the training and test feature sets
        scaler = StandardScaler()
        feat_train = scaler.fit_transform(feat_train)
        feat_test = scaler.transform(feat_test) # Apply scaling params learned from train
        
        # Performs an inner grid search with cross validation to find and train the best model parameters for this fold
        clf = GridSearchCV(model, grid, cv=10, scoring=scoring, n_jobs=-1)
        clf.fit(feat_train, label_train)
        best_estimator = clf.best_estimator_
        best_params_list.append(clf.best_params_)

        print(f'Best parameters: {clf.best_params_}')

        # Make prediction on test set
        label_predicted_test = best_estimator.predict(feat_test)
        prediction[test_idx] = label_predicted_test

        # Compute test set performance metrics: accuracy, balanced accuracy, weighted F1, and macro F1.
        acc_test = accuracy_score(label_test, label_predicted_test)
        balanced_acc_test = balanced_accuracy_score(label_test, label_predicted_test)
        f1_weighted_test = f1_score(label_test, label_predicted_test, average='weighted')
        f1_macro_test = f1_score(label_test, label_predicted_test, average='macro')

        # Append the test set metrics of this fold to their respective lists
        acc_list.append(acc_test)
        balanced_acc_list.append(balanced_acc_test)
        f1_weighted_list.append(f1_weighted_test)
        f1_macro_list.append(f1_macro_test)
        
        # Compute confusion matrix
        fold_confusion_mat = confusion_matrix(label_test, label_predicted_test, normalize='true')
        mean_confusion_matrix += fold_confusion_mat

    # Average the confusion matrix
    mean_confusion_matrix /= cv
    
    # Calculate mean and standard error for metrics
    mean_acc, se_acc = get_mean_and_se(acc_list)
    mean_balanced_acc, se_balanced_acc = get_mean_and_se(balanced_acc_list)
    mean_f1_weighted, se_f1_weighted = get_mean_and_se(f1_weighted_list)
    mean_f1_macro, se_f1_macro = get_mean_and_se(f1_macro_list)

    # Print summary
    print('\n=== SUMMARY ===')
    print(f'Mean Accuracy: {mean_acc:.2f} ± {se_acc:.2f}')
    print(f'Mean Balanced Accuracy: {mean_balanced_acc:.2f} ± {se_balanced_acc:.2f}')
    print(f'Mean F1 Weighted: {mean_f1_weighted:.2f} ± {se_f1_weighted:.2f}')
    print(f'Mean F1 Macro: {mean_f1_macro:.2f} ± {se_f1_macro:.2f}')

    # Return results and additional metrics
    results_summary = {
        'predictions': prediction,
        'accuracy': {'mean': mean_acc, 'se': se_acc},
        'balanced_accuracy': {'mean': mean_balanced_acc, 'se': se_balanced_acc},
        'f1_weighted': {'mean': mean_f1_weighted, 'se': se_f1_weighted},
        'f1_macro': {'mean': mean_f1_macro, 'se': se_f1_macro},
        'confusion_matrix': mean_confusion_matrix,
        'best_params': best_params_list,
    }

    folds_results = {
        'accuracy': acc_list,
        'balanced_accuracy': balanced_acc_list,
        'f1_weighted': f1_weighted_list,
        'f1_macro': f1_macro_list,
    }
    
    return results_summary, folds_results

In [None]:
def plot_confusion_matrix(confusion_mat, categories, vmin=0, vmax=1, ax=None):
    """
    Plots a heatmap of the confusion matrix from the given results.

    Parameters 
    ----------
    confusion_mat: 
        matrix to plot.
    categories : list of str
        list of category labels for x and y axes.
    vmin : float
        Minimum value for colormap normalization.
    vmax : float
        Maximum value for colormap normalization.
    ax : matplotlib.axes.Axes
        matplotlib axis object. If not provided, a new figure and axis will be created.

    Returns:
    ax : matplotlib.axes.Axes
        The axis with the heatmap plotted.
    """
    # Set font to Calibri
    calibri_font_path = '/Volumes/homes/Maya/Code/python/Calibri/Calibri.ttf'
    fm.fontManager.addfont(calibri_font_path)
    plt.rcParams['font.family'] = 'Calibri'

    # Create figure and axes if not provided
    if ax is None:
        fig, ax = plt.subplots(figsize=(5, 5))  # Adjust size as needed

    # Define and create plot heatmap
    sns.heatmap(confusion_mat, 
                ax=ax, 
                xticklabels=categories, 
                yticklabels=categories, 
                annot=True, 
                fmt='.2f', 
                cmap='coolwarm', 
                cbar=True, 
                square=True, 
                vmin=vmin, vmax=vmax,
                annot_kws={"size": 16})
    
    ax.set_xlabel('Predicted', fontsize=16)  
    ax.set_ylabel('Actual', fontsize=16)
    ax.set_title('Confusion Matrix', fontsize=18) 
    ax.tick_params(axis='both', labelsize=14) 
    
    return ax

In [None]:
def run_SVM(grid):
    # Load data
    behav_path = '/Volumes/homes/Maya/students_study/friendships/shortest_path_length/my_sample/second_time_point_social_distance_baseline_subj.csv'
    behav_data = read_multi_index_data(behav_path)
    label = behav_data.astype(int).values.ravel()
    categories = list(range(1, int(np.max(label)) + 1))
    # Define model
    to_resample = 'over' 
    model = SVC()
    scorer = make_scorer(mean_squared_error, greater_is_better=False)

    # Define matrix to store results 
    predictions_per_time_point = np.zeros((98,2))

    for movie_type in ['social']: #'average_across_movies', 'social', 'academic', 'neutral'
        fig, axes = plt.subplots(1, 2, figsize=(10, 5))

        for i, time_point in enumerate(['first_time_point','second_time_point']): 
            print(f'\n{movie_type}')
            print(time_point)

            # Load features
            feat_path = f'/Volumes/homes/Maya/students_study/friendships/ISC/Schaefer100/{time_point}/{movie_type}_preproc_isc_data_baseline_subj.csv'
            feat = read_multi_index_data(feat_path)

            # Run model
            results_summary, folds_results = my_classifier(feat, label, model, grid, scorer, to_resample, 10)
            predictions_per_time_point[:,i] = results_summary['predictions'] # fill result matrix

            plot_confusion_matrix(results_summary['confusion_matrix'], categories, 0.2, 0.5, axes[i])

        plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1, wspace=0.3)
        plt.show()

In [None]:
print('SVM linear')
grid_linear = {'C': np.logspace(-3, 3, 20),
               'kernel' : ['linear']}
run_SVM(grid_linear)

print('SVM rbf')
grid_rbf = {'C': np.logspace(-3, 3, 20),
            'kernel' : ['rbf']}
run_SVM(grid_rbf)

print('SVM poly')
grid_poly = {'C': np.logspace(-3, 3, 20),
             'kernel' : ['poly']}
run_SVM(grid_poly)