In [1]:
import matplotlib
import torch 
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
from itertools import permutations 
import sys
base = '/home3/ebrahim/isr/'
sys.path.append(base)
from model import RNN_feedback
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import r2_score
matplotlib.use('Agg')
from matplotlib import pyplot as plt
from skimage.measure import block_reduce
from scipy.stats import pearsonr
import pandas as pd
import seaborn as sns
import os

In [3]:
class Regression_Trials(Dataset):

    def __init__(self, num_letters, letters_subset, list_length):

        '''
        Dataset is composed of all possible permutations of 
        the available subset of letters (s) for the specified list length (l).

        Example: If a,b, and c are the available letters, and list length is 2,
        then the dataset will be {ab, ba, ac, ca, bc, cb}.

        Number of trials generated is equal to s! / (s-l)!. 

        @param num_letters: number of total letters (classes - 1) RNN was trained on
        @param letters_subset: subset of letters used for regression analyses 
        @param list_length: length of list for regression analyses 
        '''
        
        X_p = [] # store permutations 
        X_int = permutations(letters_subset, list_length)

        for p in X_int:
            X_p.append(p)

        X_int = np.stack(X_p) # shape: num_permutations x list_length 
        
        recall_cue = np.ones((X_int.shape[0], list_length+1)) * num_letters 
        self.X = torch.nn.functional.one_hot(torch.from_numpy(np.hstack((X_int, recall_cue))).to(torch.long)
        , num_classes=num_letters+1)

        end_of_list_cue = np.ones((X_int.shape[0], 1)) * num_letters
        y_int = torch.from_numpy(np.hstack((X_int, X_int, end_of_list_cue))).to(torch.long)
        self.y = torch.nn.functional.one_hot(y_int, num_classes=num_letters+1)

        self.X_reg = np.hstack((X_int, X_int)).T

    def __len__(self):

        return self.X.size(0)

    def __getitem__(self, idx):

        return self.X[idx].to(torch.float32), self.y[idx].to(torch.float32)

In [4]:
def compute_element_vectors(X, y, seed):

    '''
    @param X (numpy array): shape num_permutations x (num_features x num_timesteps)
    @param y (numpy array): shape num_permutations x num
    @param num_features (int): number of features used to encode each letters
    @param timestep (int): timestep to predict y, given X from t = 0:timestep
    @param seed (int): seed used to initalize random number generator

    Ridge regression model is fit from selected portions of X to y[timestep],
    and columns of weights correspond to element vectors. 
    '''
    
    rng = np.random.default_rng(seed)
    train_ind = rng.choice(X.shape[0], int(X.shape[0]*.8))
    test_ind = np.setdiff1d(np.arange(0,X.shape[0],1), train_ind) 

    reg = Ridge(alpha=.01).fit(X[train_ind], y[train_ind])
    y_hat = reg.predict(X[test_ind])

    r2_score_value = r2_score(y[test_ind], y_hat)

    return round(r2_score_value,5), reg.coef_

In [152]:
def create_synthetic_y(X_all, timesteps, num_features):

    '''
    X is a matrix of shape num_trials x (num_timesteps x num_features)
    y should be a matrix of shape num_trials x timesteps x hidden_size
    where y[t] gives contains information regarding the features from 0:t
    '''

    X = X_all[:, :timesteps*num_features]

    # reshape X to num_trials x num_timesteps x num_features 
    X_re = X.reshape(X.shape[0], -1, num_features)

    # append input at each time with a positional marker
    # which is a vector of length timesteps that is 1 at timesteps=t and 0 elsewhere
    X_pos = []
    for t in range(timesteps):
        pos_input = np.zeros(timesteps)
        pos_input[t] = 1
        pos_input_mat = np.tile(pos_input, X_re.shape[0]).reshape(X_re.shape[0], timesteps)
        X_pos.append(np.hstack((X_re[:, t], pos_input_mat)))
    X_pos = np.stack(X_pos, axis=1).reshape(X_re.shape[0], timesteps*(timesteps+num_features))
    y_pos = np.sum(X_pos, axis=1)
    return y_pos

In [154]:
# create permutations of specified list length with a given subset of letters
letters_subset = [0,1,4,7,10,13,16,19,22,25]
list_length = 4
rt = Regression_Trials(26, letters_subset, list_length)
rt_dataloader = DataLoader(rt, batch_size=len(rt), shuffle=False)

# convert to one hot encoding, X_transform has shape num_permutations X num_features
X_reg = rt.X_reg
enc = OneHotEncoder(sparse=False)
X_transform = enc.fit_transform(X_reg.T)

y_pos = create_synthetic_y(X_transform, 4, 10)
print(y_pos.shape)

(5040,)


In [157]:
y_pos[2]

8.0

In [80]:
num_repeats = 5040
a = np.asarray([0,1,2,3])
at = np.tile(a, num_repeats).reshape(num_repeats, a.shape[0])
print(at)

[[0 1 2 3]
 [0 1 2 3]
 [0 1 2 3]
 ...
 [0 1 2 3]
 [0 1 2 3]
 [0 1 2 3]]
