In [1]:
import os
import copy
import time
import math
import random
import itertools
import scipy.io
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.svm import SVC
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.model_selection import ParameterGrid
from IPython.display import clear_output

In [2]:
def df_to_arr(df):
    
    vals = []
    for _, row in df.iterrows():
        vals.extend(row.tolist())
    return np.array([x for x in vals if str(x) != 'nan'])

In [8]:
def get_subjects(path):
    
    '''
    Gets a list of subject IDs and the file suffix, given a path to the data files. 
    
    Note: subject ID must be only 2 characters for this to work, and all data files
    must have same suffix.
    
    Parameters
    ----------
    path: str
        directory to the data files
        
    Returns
    -------
    list
        a list of subject IDs
    str
        the suffix to the filenames
    '''
    
    files = os.listdir(path)
    subjects = [f[:2] for f in files]
    suffix = files[0][2:]
        
    subjects.sort()
    
    return subjects, suffix

In [9]:
def scramble_labels(y_data):
    
    '''
    Randomly selects half of the labels in the data to switch to the other class.
    
    Parameters
    ----------
    y_data: array-like
        label data to scramble
    '''
    
    classes = list(set(y_data))
    classes.sort()
    
    y_data_copy = y_data.copy()
    for index in np.nditer(np.random.choice(len(y_data), size=len(y_data)//2, replace=False)):
        
        if y_data[index] == classes[0]:
            y_data[index] = classes[1]
        else:
            y_data[index] = classes[0]
    
    # Makes sure labels are scrambled properly
    num_diff = sum(i != j for i, j in zip(y_data, y_data_copy))  
    if num_diff != len(y_data)//2:
        raise ValueError
    

In [11]:
def get_optimal_run(x_train, y_train, x_test, y_test, kernels, gamma_range, C_range):
    
    '''
    Gets best hyperparameters (kernel, C, and gamma values) that optimize SVM's predictions for given
    x and y test dataset.
    
    Parameters
    ----------
    x_train: array-like
        dataset of block data used to train classifier
    y_train: array-like
        dataset of label data used to train classifier
    x_test: array-like
        testing dataset of block data used to optimize hyperparameters on
    y_test: array-like
        testing dataset of label data used to optimize hyperparameters on
    kernels: list
        kernels to test (recommended options are 'linear', 'rbf', and 'sigmoid')
    gamma_range: dict
        dict that specifies the range of values of gamma to test; should include start, stop to range,
        num of values, and the exponential base
    C_range: dict
        dict that specifies the range of values of C to test; should include start, stop to range,
        num of values, and the exponential base
        
    Returns
    -------
    dict
        best combination of parameters found from grid search
    float
        best accuracy obtained from testing
    '''
    
    gamma_vals = np.logspace(gamma_range['start'], gamma_range['stop'], gamma_range['num'], base=gamma_range['base'])
    C_vals = np.logspace(C_range['start'], C_range['stop'], C_range['num'], base=C_range['base'])

    param_grid = ParameterGrid({'kernel': kernels, 'gamma': gamma_vals, 'C': C_vals})
    
    best_acc = 0
    best_params = None
    
    # Tests each parameter combination to find best one for given testing data
    for params in list(param_grid):
        
        svclassifier = SVC(kernel=params['kernel'], gamma=params['gamma'], C=params['C'], max_iter=-1)
        svclassifier.fit(x_train, y_train)
        
        curr_acc = svclassifier.score(x_test, y_test)
        
        if curr_acc > best_acc:
            best_acc = curr_acc
            best_params = params
            
    return best_params, best_acc

# Training Within Subjects

In [3]:
def extract_tr_subject_data(path, subject, suffix, roi, conds):
    
    '''
    Extracts individual subject data from the .mat files.
    
    Parameters
    ----------
    path: str
        directory to data files
    subject: str
        ID of subject to load data for
    suffix: str
        ending suffix of the data filename
    roi: int
        0 for V1 data, 1 for MT data
    conds: list
        list of integers specifying the conditional datasets to extract
        (0 for trained_cp, 1 for trained_ip, 2 for untrained_cp, 3 for untrained_ip)
        
    Returns
    -------
    Lists of voxel data (x_data) separated by individual TRs and the corresponding labels (y_data)
    '''
    
    x_data = []
    y_data = []
    
    path_to_file = path + subject + suffix
    mat = scipy.io.loadmat(path_to_file)['roi_scanData'][0][roi]
    
    test_TRs = []
    
    for scan in range(len(mat[0])):

        for cond in conds:
            
            for block in range(len(mat[0][scan][0][cond][0])):
                
                for tr in range(len(mat[0][scan][0][cond][0][block][0])):

                    tr_data = mat[0][scan][0][cond][0][block][0][tr][0][0][0].tolist()
                    
                    if tr == 0 or tr == len(mat[0][scan][0][cond][0][block][0]) - 1:
                        test_TRs.append(len(x_data))
                        
                    x_data.append(tr_data)
                    y_data.append(mat[0][scan][1][cond][0].replace('_post', ''))
                               
    scaler = MinMaxScaler(feature_range=(0, 1))
    x_standardized = scaler.fit_transform(x_data)        
    
    test_x_data = []
    test_y_data = []
    for i in test_TRs:
        test_x_data.append(x_standardized[i])
        test_y_data.append(y_data[i])
    
    train_x_data = []
    train_y_data = []
    for i in range(len(x_standardized)):
        if i not in test_TRs:
            train_x_data.append(x_standardized[i])
            train_y_data.append(y_data[i])
    
    data = {'train_x': train_x_data, 'train_y': train_y_data, 'test_x': test_x_data, 'test_y': test_y_data}
    return data

In [4]:
def split_tr_dataset(data, scramble):
    
    '''
    Splits the TR dataset into inner testing, outer testing, and training folds.
    
    Parameters
    ----------
    data: dict
        dictionary that contains data split into training and testing partition
    scramble: boolean
        whether or not to scramble the labels when training
        
    Returns
    -------
    list
        TR voxel data for training use
    list
        TR voxel data for inner fold testing use
    list
        TR voxel data for outer fold testing use
    list 
        labels for training use
    list
        labels for inner fold testing use
    list
        labels for outer fold testing use
    '''
    
    train_x, inner_test_x, outer_test_x, train_y, inner_test_y, outer_test_y = [], [], [], [], [], []
    
    test_indices = [i for i in range(len(data['test_x']))]
    test_indices, outer_test_indices, test_y, outer_test_y = train_test_split(test_indices, data['test_y'], test_size=2, stratify=data['test_y'])

    for i in outer_test_indices:
        if i%2 == 0:
            index_to_remove = test_indices.index(i+1)
            del test_y[index_to_remove]
            del test_indices[index_to_remove]
            train_x.append(data['test_x'][index_to_remove])
            train_y.append(data['test_y'][index_to_remove])
        else:
            index_to_remove = test_indices.index(i-1)
            del test_y[index_to_remove]
            del test_indices[index_to_remove]
            train_x.append(data['test_x'][index_to_remove])
            train_y.append(data['test_y'][index_to_remove])

    train_indices, inner_test_indices, _, inner_test_y = train_test_split(test_indices, test_y, test_size=6, stratify=test_y)
    
    outer_test_x = [data['test_x'][i] for i in outer_test_indices]
    inner_test_x = [data['test_x'][i] for i in inner_test_indices]
    
    train_x.extend([data['test_x'][i] for i in train_indices])
    train_x.extend(data['train_x'])
    train_y.extend([data['test_y'][i] for i in train_indices])
    train_y.extend(data['train_y'])

    if scramble:
        scramble_labels(train_y)
    
    return train_x, inner_test_x, outer_test_x, train_y, inner_test_y, outer_test_y
            

In [5]:
def train_within_subjects(data_params, grid_params, runs=50, scramble=False):
    
    '''
    Trains and tests the classifier for accuracy using SVMs.
    
    Parameters
    ----------
    data_params: dict
        path: str
            the path to the data files
        roi: int
            0 for V1 data, 1 for MT data
        conds: list
            list of integers specifying the conditional datasets to extract
            (0 for trained_cp, 1 for trained_ip, 2 for untrained_cp, 3 for untrained_ip)   
    grid_params: dict
        kernels: list
            kernels to test (recommended options are 'linear', 'rbf', and 'sigmoid')
        gamma: dict
            dict that specifies the range of values of gamma to test; should include start, stop to range,
            num of values, and the exponential base
        C: dict
            dict that specifies the range of values of C to test; should include start, stop to range,
            num of values, and the exponential base
    runs: int
        number of runs to test on for each subject
    scramble: boolean, optional
        whether or not to scramble the labels when training, 
        default is False
        
    Returns
    -------
    DataFrame
        data of inner subject combination testing accuracy
    DataFrame
        data of outer subject testing accuracy
    '''
    
    subjects, suffix = get_subjects(data_params['path'])
    
    # Sets up DataFrames used to track inner and outer subject test accuracies
    cols = [n for n in range(runs)]
    inner_acc_report = pd.DataFrame(index=subjects, columns=cols)
    outer_acc_report = pd.DataFrame(index=subjects, columns=cols)
    
    for subject in subjects:
        
        print(f"Currently on subject {subject}.")
        start_time = time.time()
        
        subject_data = extract_tr_subject_data(path, subject, suffix, roi, conds)
        x_train, x_test_inner, x_test_outer, y_train, y_test_inner, y_test_outer = split_tr_dataset(subject_data, scramble)
        print(f"Training data size: {len(x_train)}")
        print(f"Inner testing data size: {len(x_test_inner)}")
        print(f"Outer testing data size: {len(x_test_outer)}")
              
        for run in range(runs):
            
            if (run+1) % 10 == 0:
                print(f"On run #{run+1} of {runs}.")
            x_train, x_test_inner, x_test_outer, y_train, y_test_inner, y_test_outer = split_tr_dataset(subject_data, scramble)
            
            # Gets optimal params for training dataset from grid search
            opt_params, inner_acc = get_optimal_run(x_train, y_train, x_test_inner, y_test_inner, grid_params['kernels'], grid_params['gamma'], grid_params['C']) 

            # Trains model using optimal params for this set
            svclassifier = SVC(kernel=opt_params['kernel'], gamma=opt_params['gamma'], C=opt_params['C'], max_iter=-1)
            svclassifier.fit(x_train, y_train)
            
            outer_acc = svclassifier.score(x_test_outer, y_test_outer)
            
            # Logs inner and outer subject accuracy data in DataFrame
            inner_acc_report.at[subject, run] = inner_acc
            outer_acc_report.at[subject, run] = outer_acc
            
        clear_output()
        
        # Prints how long it took for last outer subject test
        end_time = time.time()
        exec_time = end_time - start_time
        minutes = exec_time // 60
        seconds = exec_time % 60
        print(f"Last turn took {minutes} minutes and {seconds} seconds.")
        
    return inner_acc_report, outer_acc_report
    

In [6]:
def train_within_subjects_combined(data_params, grid_params, runs=50, scramble=False):
    
    '''
    Trains and tests the classifier for accuracy using SVMs. Combines post-training and pre-training
    data for training and inner testing of SVM for comparison purposes.
    
    Parameters
    ----------
    data_params: dict
        path_pre: str
            the path to the pre-training data files
        path_post: str
            the path to the post-training data files
        roi: int
            0 for V1 data, 1 for MT data
        conds: list
            list of integers specifying the conditional datasets to extract
            (0 for trained_cp, 1 for trained_ip, 2 for untrained_cp, 3 for untrained_ip)   
    grid_params: dict
        kernels: list
            kernels to test (recommended options are 'linear', 'rbf', and 'sigmoid')
        gamma: dict
            dict that specifies the range of values of gamma to test; should include start, stop to range,
            num of values, and the exponential base
        C: dict
            dict that specifies the range of values of C to test; should include start, stop to range,
            num of values, and the exponential base
    runs: int
        number of runs to test on for each subject
    scramble: boolean, optional
        whether or not to scramble the labels when training, 
        default is False
        
    Returns
    -------
    DataFrame
        data of inner subject combination testing accuracy
    DataFrame
        data of outer pre-training subject testing accuracy
    DataFrame
        data of outer post-training subject testing accuracy
    '''
    
    subjects, suffix_post = get_subjects(data_params['path_post'])
    _, suffix_pre = get_subjects(data_params['path_pre'])
    
    # Sets up DataFrames used to track inner and outer subject test accuracies
    cols = [n for n in range(runs)]
    inner_acc_report = pd.DataFrame(index=subjects, columns=cols)
    outer_acc_report_post = pd.DataFrame(index=subjects, columns=cols)
    outer_acc_report_pre = pd.DataFrame(index=subjects, columns=cols)
    
    for subject in subjects:
        
        print(f"Currently on subject {subject}.")
        start_time = time.time()
        
        subject_data_pre = extract_tr_subject_data(data_params['path_pre'], subject, suffix_pre, roi, conds)
        subject_data_post = extract_tr_subject_data(data_params['path_post'], subject, suffix_post, roi, conds)
        
        x_train_pre, x_test_inner_pre, x_test_outer_pre, y_train_pre, y_test_inner_pre, y_test_outer_pre = split_tr_dataset(subject_data_pre, scramble)
        x_train_post, x_test_inner_post, x_test_outer_post, y_train_post, y_test_inner_post, y_test_outer_post = split_tr_dataset(subject_data_post, scramble)
        
        x_train = x_train_pre + x_train_post
        x_test_inner = x_test_inner_pre + x_test_inner_post
        x_test_outer = x_test_outer_pre + x_test_outer_post
        y_train = y_train_pre + y_train_post
        y_test_inner = y_test_inner_pre + y_test_inner_post
        y_test_outer = y_test_outer_pre + y_test_outer_post
        
        print(f"Training data size: {len(x_train)}")
        print(f"Inner testing data size: {len(x_test_inner)}")
        print(f"Outer testing data size: {len(x_test_outer)}")
              
        for run in range(runs):
            
            if (run+1) % 10 == 0:
                print(f"On run #{run+1} of {runs}.")
                
            x_train_pre, x_test_inner_pre, x_test_outer_pre, y_train_pre, y_test_inner_pre, y_test_outer_pre = split_tr_dataset(subject_data_pre, scramble)
            x_train_post, x_test_inner_post, x_test_outer_post, y_train_post, y_test_inner_post, y_test_outer_post = split_tr_dataset(subject_data_post, scramble)

            # Combine data for training and inner testing
            x_train = x_train_pre + x_train_post
            x_test_inner = x_test_inner_pre + x_test_inner_post
            y_train = y_train_pre + y_train_post
            y_test_inner = y_test_inner_pre + y_test_inner_post
            
            # Gets optimal params for training dataset from grid search
            opt_params, inner_acc = get_optimal_run(x_train, y_train, x_test_inner, y_test_inner, grid_params['kernels'], grid_params['gamma'], grid_params['C']) 

            # Trains model using optimal params for this set
            svclassifier = SVC(kernel=opt_params['kernel'], gamma=opt_params['gamma'], C=opt_params['C'], max_iter=-1)
            svclassifier.fit(x_train, y_train)
            
            outer_acc_pre = svclassifier.score(x_test_outer_pre, y_test_outer_pre)
            outer_acc_post = svclassifier.score(x_test_outer_post, y_test_outer_post)
            
            # Logs inner and outer subject accuracy data in DataFrame
            inner_acc_report.at[subject, run] = inner_acc
            outer_acc_report_pre.at[subject, run] = outer_acc_pre
            outer_acc_report_post.at[subject, run] = outer_acc_post
            
        clear_output()
        
        # Prints how long it took for last outer subject test
        end_time = time.time()
        exec_time = end_time - start_time
        minutes = exec_time // 60
        seconds = exec_time % 60
        print(f"Last turn took {minutes} minutes and {seconds} seconds.")
        
    return inner_acc_report, outer_acc_report_pre, outer_acc_report_post
    

In [13]:
path = r'scans/output/PRE/'
roi = 1                            # V1-roi: 0, MT-roi: 1
conds = [0, 2]                     # trained_cp: 0, trained_ip: 1, untrained_cp: 2, untrained_ip: 3

gamma_range = {'start': -15, 'stop': 3, 'num': 19, 'base': 2.0}
C_range = {'start': -3, 'stop': 15, 'num': 19, 'base': 2.0}
kernels = ['rbf', 'sigmoid']

data_params = {'path': path, 'roi': roi, 'conds': conds}
grid_params = {'gamma': gamma_range, 'C': C_range, 'kernels': kernels}

inner_acc_report, outer_acc_report = train_within_subjects(data_params, grid_params, runs=200)

#outer_acc_report['Average'] = outer_acc_report.mean(axis=1)
#outer_acc_report.to_csv('output/post_cp/outer_accs_within.csv')
#inner_acc_report['Average'] = inner_acc_report.mean(axis=1)
#inner_acc_report.to_csv('output/post_cp/inner_accs_within.csv')

In [120]:
path_pre = r'scans/output/PRE/'
path_post = r'scans/output/cp&ip/'
roi = 1                            # V1-roi: 0, MT-roi: 1
conds = [0, 2]                     # trained_cp: 0, trained_ip: 1, untrained_cp: 2, untrained_ip: 3

gamma_range = {'start': -15, 'stop': 3, 'num': 19, 'base': 2.0}
C_range = {'start': -3, 'stop': 15, 'num': 19, 'base': 2.0}
kernels = ['rbf', 'sigmoid']

data_params = {'path_pre': path_pre, 'path_post': path_post, 'roi': roi, 'conds': conds}
grid_params = {'gamma': gamma_range, 'C': C_range, 'kernels': kernels}

inner_acc_report, outer_acc_report_pre, outer_acc_report_post = train_within_subjects_combined(data_params, grid_params, runs=200)

outer_acc_report_pre['Average'] = outer_acc_report_pre.mean(axis=1)
outer_acc_report_pre.to_csv('output/post_cp/outer_accs_within_pre.csv')
outer_acc_report_post['Average'] = outer_acc_report_post.mean(axis=1)
outer_acc_report_post.to_csv('output/post_cp/outer_accs_within_post.csv')
inner_acc_report['Average'] = inner_acc_report.mean(axis=1)
inner_acc_report.to_csv('output/post_cp/inner_accs_within_combined.csv')


Last turn took 66.0 minutes and 55.70770287513733 seconds.


## Permutation Runs

In [None]:
def permutation_within_subjects(data_params, grid_params, inner_dist, outer_dist, runs=50, train_runs=100, history=True):
    
    '''
    Performs a specified number of runs where data labels are scrambled.
    
    Parameters
    ----------
    data_params: dict
        contains specifications for data processing (see train method for documentation)
    grid_params: dict
        contains values for grid search (see train method for documentation)
    inner_dist: list
        holds accuracy values for individual inner subject tests
    outer_dist: list
        holds accuracy values for individual outer subject tests
    runs: int
        number of runs to perform, default is 50
    train_runs: int
        number of runs to train on each subject, default is 100
    history: boolean
        whether to track accuracy over runs and output permutation accuracy plot, 
        default is True
    '''
    
    subjects, suffix = get_subjects(data_params['path'])
    if history:
        outer_sample_means = []
        for i in range(len(outer_dist)//(len(subjects)*train_runs)):
            outer_sample_means.append(np.mean(outer_dist[i*len(subjects)*train_runs:(i+1)*len(subjects)*train_runs]))
        
        x = [i for i in range(1, len(outer_sample_means)+1)]
        if len(outer_sample_means) > 0:
            y = [outer_sample_means[0]]
            for i in range(2, len(outer_sample_means)+1):
                y.append(np.mean(outer_sample_means[:i]))
        else:
            y = []
        
    for n in range(runs):
        print(f'On run #{n+1} of {runs}.')
        inner_accs, outer_accs = train_within_subjects(data_params, grid_params, runs=train_runs, scramble=True)
        
        inner_dist.extend(df_to_arr(inner_accs).tolist())
        outer_dist.extend(df_to_arr(outer_accs).tolist())
        
        outer_sample_means.append(np.mean(df_to_arr(outer_accs)))
        
        if history:
            y.append(np.mean(outer_sample_means))
            x.append(len(y))

            plt.plot(x, y)
            plt.xlabel('Run')
            plt.ylabel('Overall Mean Accuracy')
            plt.title('Overall Outer Subject Accuracy')
            plt.savefig(f"output/cp/perm_hist.png")
        

In [None]:
path = r'scans/output/PRE/'
roi = 1                            # V1-roi: 0, MT-roi: 1
conds = [1, 3]                     # trained_cp: 0, trained_ip: 1, untrained_cp: 2, untrained_ip: 3

gamma_range = {'start': -15, 'stop': 3, 'num': 19, 'base': 2.0}
C_range = {'start': -3, 'stop': 15, 'num': 19, 'base': 2.0}
kernels = ['rbf', 'sigmoid']

data_params = {'path': path, 'roi': roi, 'conds': conds}
grid_params = {'gamma': gamma_range, 'C': C_range, 'kernels': kernels}

inner_dist = []
outer_dist = []
permutation_within_subjects(data_params, grid_params, inner_dist, outer_dist, runs=10, train_runs=200)