Name: Holly Kular\
Date: 03-19-2024\
Email: hkular@ucsd.edu\
decode_L1.m\
Description: Script for decoding analysis on layer 1 of probabilistic RNN

In [3]:
import numpy as np
import matplotlib.pyplot as plt
import sys

In [4]:
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.svm import SVC  
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import make_classification
from scipy.optimize import curve_fit

In [None]:
def update_progress(progress, total):
    bar_length = 20  # Adjust this for desired bar length
    filled_length = int(round(progress / total * bar_length))
    bar = '=' * filled_length + '-' * (bar_length - filled_length)
    print(f'Progress: [{bar}] {progress}/{total}', end='\r')

In [69]:
# MODIFY HERE
# what conditions were the RNNs trained on?
prob_split = '70_30' # the probability of stimulus 1 vs all
afc = '2' # number of alternatives
coh = 'lo' # coherence
feedback = False # interlayer feedback (true or false)
thresh = [ 0.3,0.7 ] # threshold for whether stim 1 or other

# decode opts
time_avg = False # do we want to look at average over time window?
if time_avg:
    t_win = [ 200,-1 ]
n_cvs = 5
# store the accuracy
acc = np.full( ( n_cvs ), np.nan )

# penalties to eval
num_cgs = 30
Cs = np.logspace( -5,1,num_cgs )

# set up the grid
param_grid = { 'C': Cs, 'kernel': ['linear'] }

# define object - use a SVC that balances class weights (because they are biased, e.g. 70/30)
# note that can also specify cv folds here, but I'm doing it by hand below in a loop
grid = GridSearchCV( SVC(class_weight = 'balanced'),param_grid,refit=True,verbose=0 )


In [70]:
# Data Directory
if sys.platform.startswith('linux'):
    data_dir = f"/mnt/neurocube/local/serenceslab/holly/RNN_Geo/data/rdk_{prob_split}_{afc}afc/feedforward_only/{coh}_coh"
else:
    data_dir = f"/Volumes/serenceslab/holly/RNN_Geo/data/rdk_{prob_split}_{afc}afc/feedforward_only/{coh}_coh"

# Load data
data = np.load(f"{data_dir}/Trials.npz")
data_expected = np.load(f"{data_dir}/Trials200_0expected.npz")
data_unexpected = np.load(f"{data_dir}/Trials200_1expected.npz")
task_info = {}
task_info['trials'] = 200
task_info['trial_dur'] = 250  # trial duration (timesteps)
task_info['stim_on'] = 80
task_info['stim_dur'] = 50

______________________________________________________________________

## Next: Compare decode expected vs. unexpected
Hypothesis: We can decode expected stimulus better than unexpected because the RNN has acquired the expectation. 

### Decode expected stim input

In [43]:
# Decode trials: RNN stim presented

# get the data from layer 1 decode stim
# this is a [trial x time step x unit] matrix
data_d = data_expected['fr1']
labs = data_expected['labs'].squeeze()

# get some info about structure of the data
tris = data_d.shape[0]             # number of trials
tri_ind = np.arange(0,tris)      # list from 0...tris
hold_out = int( tris / n_cvs )   # how many trials to hold out

In [44]:
# Do decoding
if time_avg:
       
        # Within each cross-validation fold
    for i in range(n_cvs):

        # trials to hold out as test set on this cv fold
        tst_ind = tri_ind[ i*hold_out : (i+1)*hold_out ]

        # index into the training data on this cv fold
        trn_ind = np.setdiff1d( tri_ind, tst_ind )

        # get the training data (X) and the training labels (y)
        # note that y is unbalanced unless prob is 50/50
        X = data_d[ trn_ind,: ]
        y = labs[trn_ind]

        # Fit the model on the binary labels
        grid.fit( X, y )

        # get the test data (X) and the test labels (y)
        X_test = data_d[tst_ind, :]
        y_test = labs[tst_ind]

        # predict!
        acc[ i ] = grid.score( X_test,y_test )

        # Evaluate accuracy
        accuracy = np.mean( acc )
        # Print overall results
        print(f'CV: {i}, {grid.best_estimator_}')  
        
else:
    
    total_iterations = task_info['trial_dur'] * n_cvs
    decoding_acc = np.zeros((task_info['trial_dur'],))
    for t_step in range(task_info['trial_dur']):

        data_slice = data_d[:, t_step, :]

        # loop over cvs and do classification
        for i in range(n_cvs):

            # trials to hold out as test set on this cv fold
            tst_ind = tri_ind[ i*hold_out : (i+1)*hold_out ]

            # index into the training data on this cv fold
            trn_ind = np.setdiff1d( tri_ind, tst_ind )

            # get the training data (X) and the training labels (y)
            X = data_slice[trn_ind,:]
            y = labs[trn_ind]

            # fit the model
            grid.fit( X,y )

            # progress report
            #print(f'CV: {i}, {grid.best_estimator_}')

            # get the test data (X) and the test labels (y)
            X_test = data_slice[tst_ind, :]
            y_test = labs[tst_ind]

            # predict!
            acc[ i ] = grid.score( X_test,y_test )
            update_progress((t_step * n_cvs) + i + 1, total_iterations)
        decoding_acc[t_step] = np.mean(acc)        
    plots = True # show decoding acc over time
    print('')  # Print a newline after the progress bar    
print(f'done decoding')

if plots:
    # Plot decoding accuracy over time
    plt.figure()
    plt.plot(range(task_info['trial_dur']), decoding_acc)
    plt.xlabel('Time Step')
    plt.ylabel('Decoding Accuracy')
    plt.title('Decoding Accuracy Over Time')
    plt.axvspan(task_info['stim_on'], task_info['stim_on']+task_info['stim_dur'], color = 'gray', alpha = 0.3)
    #plt.savefig(f"{data_dir}/decode_stim_exp.png")
    plt.show()  

CV: 0, SVC(C=2.592943797404667e-05, class_weight='balanced', kernel='linear')
CV: 1, SVC(C=0.020433597178569417, class_weight='balanced', kernel='linear')
CV: 2, SVC(C=0.020433597178569417, class_weight='balanced', kernel='linear')
CV: 3, SVC(C=0.020433597178569417, class_weight='balanced', kernel='linear')
CV: 4, SVC(C=0.004893900918477494, class_weight='balanced', kernel='linear')
0.385


### Decode unexpected stim input

In [45]:
# Decode trials: RNN stim presented

# get the data from layer 1 decode stim
# this is a [trial x time step x unit] matrix
data_d = data_unexpected['fr1']
labs = data_unexpected['labs'].squeeze()


In [46]:
# Do decoding
if time_avg:
       
        # Within each cross-validation fold
    for i in range(n_cvs):

        # trials to hold out as test set on this cv fold
        tst_ind = tri_ind[ i*hold_out : (i+1)*hold_out ]

        # index into the training data on this cv fold
        trn_ind = np.setdiff1d( tri_ind, tst_ind )

        # get the training data (X) and the training labels (y)
        # note that y is unbalanced unless prob is 50/50
        X = data_d[ trn_ind,: ]
        y = labs[trn_ind]

        # Fit the model on the binary labels
        grid.fit( X, y )

        # get the test data (X) and the test labels (y)
        X_test = data_d[tst_ind, :]
        y_test = labs[tst_ind]

        # predict!
        acc[ i ] = grid.score( X_test,y_test )

        # Evaluate accuracy
        accuracy = np.mean( acc )
        # Print overall results
        print(f'CV: {i}, {grid.best_estimator_}')  
        
else:
    
    total_iterations = task_info['trial_dur'] * n_cvs
    decoding_acc = np.zeros((task_info['trial_dur'],))
    for t_step in range(task_info['trial_dur']):

        data_slice = data_d[:, t_step, :]

        # loop over cvs and do classification
        for i in range(n_cvs):

            # trials to hold out as test set on this cv fold
            tst_ind = tri_ind[ i*hold_out : (i+1)*hold_out ]

            # index into the training data on this cv fold
            trn_ind = np.setdiff1d( tri_ind, tst_ind )

            # get the training data (X) and the training labels (y)
            X = data_slice[trn_ind,:]
            y = labs[trn_ind]

            # fit the model
            grid.fit( X,y )

            # progress report
            #print(f'CV: {i}, {grid.best_estimator_}')

            # get the test data (X) and the test labels (y)
            X_test = data_slice[tst_ind, :]
            y_test = labs[tst_ind]

            # predict!
            acc[ i ] = grid.score( X_test,y_test )
            update_progress((t_step * n_cvs) + i + 1, total_iterations)
        decoding_acc[t_step] = np.mean(acc)        
    plots = True # show decoding acc over time
    print('')  # Print a newline after the progress bar    
print(f'done decoding')

if plots:
    # Plot decoding accuracy over time
    plt.figure()
    plt.plot(range(task_info['trial_dur']), decoding_acc)
    plt.xlabel('Time Step')
    plt.ylabel('Decoding Accuracy')
    plt.title('Decoding Accuracy Over Time')
    plt.axvspan(task_info['stim_on'], task_info['stim_on']+task_info['stim_dur'], color = 'gray', alpha = 0.3)
    #plt.savefig(f"{data_dir}/decode_stim_unx.png")
    plt.show()  

CV: 0, SVC(C=6.723357536499335e-05, class_weight='balanced', kernel='linear')
CV: 1, SVC(C=1.610262027560939e-05, class_weight='balanced', kernel='linear')
CV: 2, SVC(C=6.723357536499335e-05, class_weight='balanced', kernel='linear')
CV: 3, SVC(C=0.0018873918221350976, class_weight='balanced', kernel='linear')
CV: 4, SVC(C=0.00017433288221999874, class_weight='balanced', kernel='linear')
0.595


______________________________________________________________________

______________________________________________________________________

### Decode expected stim choice

In [71]:
# Decode trials: RNN stim choice

# get the data from layer 1 decode choice
# this is a [trial x time step x unit] matrix
data_d = data_expected['fr1']
labs = data_expected['outs'][:,-1]

In [73]:
# Do decoding
if time_avg:
       
        # Within each cross-validation fold
    for i in range(n_cvs):

        # trials to hold out as test set on this cv fold
        tst_ind = tri_ind[ i*hold_out : (i+1)*hold_out ]

        # index into the training data on this cv fold
        trn_ind = np.setdiff1d( tri_ind, tst_ind )

        # get the training data (X) and the training labels (y)
        X = data_d[ trn_ind,: ]
        y = np.select([labs[trn_ind] >= thresh[1], labs[trn_ind] <= thresh[0]], [0,1], default=0)#np.select([labs[trn_ind] >= thresh], [1], default=0) 

        # Fit the model on the binary labels
        grid.fit( X, y )

        # get the test data (X) and the test labels (y)
        X_test = data_d[tst_ind, :]
        y_test = np.select([labs[tst_ind] >= thresh[1], labs[trn_ind] <= thresh[0]], [0,1], default=0)

        # predict!
        acc[ i ] = grid.score( X_test,y_test )

        # Evaluate accuracy
        accuracy = np.mean( acc )
        # Print overall results
        print(f'CV: {i}, {grid.best_estimator_}')  
        
else:
    
    total_iterations = task_info['trial_dur'] * n_cvs
    decoding_acc = np.zeros((task_info['trial_dur'],))
    for t_step in range(task_info['trial_dur']):

        data_slice = data_d[:, t_step, :]

        # loop over cvs and do classification
        for i in range(n_cvs):

            # trials to hold out as test set on this cv fold
            tst_ind = tri_ind[ i*hold_out : (i+1)*hold_out ]

            # index into the training data on this cv fold
            trn_ind = np.setdiff1d( tri_ind, tst_ind )

            # get the training data (X) and the training labels (y)
            X = data_slice[trn_ind,:]
            y = np.select([labs[trn_ind] >= thresh[1], labs[trn_ind] <= thresh[0]], [0,1], default=0)

            # fit the model
            grid.fit( X,y )

            # progress report
            #print(f'CV: {i}, {grid.best_estimator_}')

            # get the test data (X) and the test labels (y)
            X_test = data_slice[tst_ind, :]
            y_test = np.select([labs[tst_ind] >= thresh[1], labs[tst_ind] <= thresh[0]], [0,1], default=0)

            # predict!
            acc[ i ] = grid.score( X_test,y_test )
            update_progress((t_step * n_cvs) + i + 1, total_iterations)
        decoding_acc[t_step] = np.mean(acc)        
    plots = True # show decoding acc over time
    print('')  # Print a newline after the progress bar    
print(f'done decoding')

if plots:
    # Plot decoding accuracy over time
    plt.figure()
    plt.plot(range(task_info['trial_dur']), decoding_acc)
    plt.xlabel('Time Step')
    plt.ylabel('Decoding Accuracy')
    plt.title('Decoding Accuracy Over Time')
    plt.axvspan(task_info['stim_on'], task_info['stim_on']+task_info['stim_dur'], color = 'gray', alpha = 0.3)
    #plt.savefig(f"{data_dir}/decode_choice_exp.png")
    plt.show() 

Optimal threshold: 0.8000000000000002, Best Accuracy: 0.825


### Decode unexpected stim choice

In [74]:
# Decode trials: RNN stim choice

# get the data from layer 1 decode choice
# this is a [trial x time step x unit] matrix
data_d = data_unexpected['fr1']
labs = data_unexpected['outs'][:,-1]


In [75]:
# Do decoding
if time_avg:
       
        # Within each cross-validation fold
    for i in range(n_cvs):

        # trials to hold out as test set on this cv fold
        tst_ind = tri_ind[ i*hold_out : (i+1)*hold_out ]

        # index into the training data on this cv fold
        trn_ind = np.setdiff1d( tri_ind, tst_ind )

        # get the training data (X) and the training labels (y)
        X = data_d[ trn_ind,: ]
        y = np.select([labs[trn_ind] >= thresh[1], labs[trn_ind] <= thresh[0]], [0,1], default=0)#np.select([labs[trn_ind] >= thresh], [1], default=0) 

        # Fit the model on the binary labels
        grid.fit( X, y )

        # get the test data (X) and the test labels (y)
        X_test = data_d[tst_ind, :]
        y_test = np.select([labs[tst_ind] >= thresh[1], labs[trn_ind] <= thresh[0]], [0,1], default=0)

        # predict!
        acc[ i ] = grid.score( X_test,y_test )

        # Evaluate accuracy
        accuracy = np.mean( acc )
        # Print overall results
        print(f'CV: {i}, {grid.best_estimator_}')  
        
else:
    
    total_iterations = task_info['trial_dur'] * n_cvs
    decoding_acc = np.zeros((task_info['trial_dur'],))
    for t_step in range(task_info['trial_dur']):

        data_slice = data_d[:, t_step, :]

        # loop over cvs and do classification
        for i in range(n_cvs):

            # trials to hold out as test set on this cv fold
            tst_ind = tri_ind[ i*hold_out : (i+1)*hold_out ]

            # index into the training data on this cv fold
            trn_ind = np.setdiff1d( tri_ind, tst_ind )

            # get the training data (X) and the training labels (y)
            X = data_slice[trn_ind,:]
            y = np.select([labs[trn_ind] >= thresh[1], labs[trn_ind] <= thresh[0]], [0,1], default=0)

            # fit the model
            grid.fit( X,y )

            # progress report
            #print(f'CV: {i}, {grid.best_estimator_}')

            # get the test data (X) and the test labels (y)
            X_test = data_slice[tst_ind, :]
            y_test = np.select([labs[tst_ind] >= thresh[1], labs[tst_ind] <= thresh[0]], [0,1], default=0)

            # predict!
            acc[ i ] = grid.score( X_test,y_test )
            update_progress((t_step * n_cvs) + i + 1, total_iterations)
        decoding_acc[t_step] = np.mean(acc)        
    plots = True # show decoding acc over time
    print('')  # Print a newline after the progress bar    
print(f'done decoding')

if plots:
    # Plot decoding accuracy over time
    plt.figure()
    plt.plot(range(task_info['trial_dur']), decoding_acc)
    plt.xlabel('Time Step')
    plt.ylabel('Decoding Accuracy')
    plt.title('Decoding Accuracy Over Time')
    plt.axvspan(task_info['stim_on'], task_info['stim_on']+task_info['stim_dur'], color = 'gray', alpha = 0.3)
    #plt.savefig(f"{data_dir}/decode_choice_unx.png")
    plt.show() 



Optimal threshold: 0.8500000000000002, Best Accuracy: 0.9650000000000001
