It has been observed in prior work that tasks sharing some underlying structure will exhibit overlapping neural activity. For instance, Yang et. al. (2019) trained RNNs on several cognitive tasks and observed clustering in the neural activity space: some clusters specialized to particular tasks, while others were shared between tasks. In theory, it is also possible to observe a completely distributed representation (i.e. no modular clusters). While Yang focused on sensory tasks, we aim to study tasks involving abstract relations: transitive inference and divisibility. These tasks are likely to have some common underlying structure, as both represent transitive relations. We will compare the neural geometry of the same RNN trained on one of these tasks at a time to that trained on both (using interleaving).

Questions we hope to answer: How will the neural representation of a given task change when more than one task is learned simultaneously? In the latter case, will we find that the activations shared between the two tasks are also present in some form when only one task is learned at a time? That is—does a neural network organize its activity differently when related tasks must be learned together? We will use RDM analysis and dimensionality reduction techniques to look for specialized clusters in neural activity space. We will then compare our networks using RSA/RDA, as well as dynamics-based methods such as DSA and fixed/slow point analysis.


### Imports

In [1]:
# General
import numpy as np
import pandas as pd
from scipy.stats import zscore
import random
#from statistics import mean

# Deep learning
import torch
from torch import nn, optim

# # Response visualizations
# !pip install umap-learn
# import umap
import matplotlib as mpl
from matplotlib import pyplot as plt
import seaborn as sns

# Set random seeds for reproducibility
np.random.seed(12)
torch.manual_seed(12)

<torch._C.Generator at 0x123876c70>

### Tasks Example

In [None]:
# Transitive Inference: (i.e. index 0 is greater than index 1 = "A > B")
# Example stimulus: [[1, 0, 0],[0, 1, 0]]
# Example output: 0

# Subset Inclusion: (i.e. index 0 is greater than index 1 = "A \contains B")
# Example stimulus: [[1, 1, 1],[0, 1, 0]]
# Example output: 0 = "A \contains B"

# Divisibility:
# Example stimulus: [6,3]
# Example output: 0 = "A is divisble by B"

In [None]:
[1,0,0] < [1,0,0]

False

----
### Task Design and Data Generation

In [30]:
# @title Task design
def int2bits(n, width):
    '''
    Convert an Integer to Its Binary Representation as a List/Array/Tensor
    e.g. n = 3, width = 4 -> [0, 0, 1, 1]
    '''
    return [int(b) for b in bin(n)[2:].zfill(width)] # bin() returns a str with prefix '0b'
    


def gen_batch(batch_size, n_elements = 3, 
              task = 'ti'):
    '''
    Generate a Batch of Stimuli (list) for the Desired Task
    
    Inputs:
    - batch_size   : (int) num of stimulus
    - n_elements   : (int) Total number of elements on the base set
    - task         : (str)
                     'ti' - Transitive Inference; without Reflexitivity (relation with itself)
                     'si' - Subset Inclusion;
                     'div'- Divisibility; without 0

    Returns: [[A, B], label] x batch_size       ## of shape [batch_size, 2, n_elements]
    '''
    assert task in ['ti',
                    'si',
                    'div'], f'Requested Task [{task}] Not Supported!'
    stimuli = []
    if 'ti' == task.lower() or 'div' == task.lower():
        max_b_size = int(n_elements)
        # Transitive Inference
        if 'ti' == task.lower():
            # All possible instances
            stimulus_dict = np.eye(n_elements, dtype = int)
            np.random.shuffle(stimulus_dict)
            # Randomly sample two instances in the dict as a stimulus
            for pair_id in range(batch_size):
                dict_indices = random.sample(range(max_b_size), k = 2) # without replacement
                # target = which instance is greater
                target = 0 if dict_indices[0] > dict_indices[1] else 1
                stimuli.append([stimulus_dict[dict_indices].tolist(), target])
        else: # Divisibility
            for pair_id in range(batch_size):
                stimulus_dict = random.sample(range(1, max_b_size + 1), k = 2) # excluding 0
                # target = if the previous is divisible by the latter
                target = int(0 == stimulus_dict[0] % stimulus_dict[1])
                stimuli.append([stimulus_dict, target])
    # Subset Inclusion
    elif 'si' == task.lower():
        max_b_size = 2 ** int(n_elements)
        # randomly sample two instances
        for pair_id in range(batch_size):
            idx_A, idx_B = random.sample(range(max_b_size), k = 2)
            # target if the previous is a superset of the latter
            A, B = np.array(int2bits(idx_A, width = n_elements)), np.array(int2bits(idx_B, n_elements))
            element_indices_B = np.arange(n_elements)[1 == B]
            target = int(A[element_indices_B].all()) # if non-zero entries in B are also in A
            stimuli.append([[A.tolist(), B.tolist()], target])
    return stimuli


In [31]:
gen_batch(10, 5, task = 'div')

[[[1, 2], 0],
 [[1, 3], 0],
 [[1, 6], 0],
 [[2, 1], 1],
 [[4, 2], 1],
 [[6, 3], 1],
 [[1, 5], 0],
 [[6, 4], 0],
 [[2, 5], 0],
 [[6, 4], 0]]

### Data Preparation (PyTorch)

In [37]:
def prepare_data(data, 
                 dtype = torch.float, device = 'cpu'):
    '''
    data : [[A, B], label] x batch_size

    returns three tensors of shapes [batch_size, n_elements], [batch_size, n_elements], [n_labels]
    '''
    x1 = []
    x2 = []
    y_t = []
    for [A, B], label in data:
        x1.append(A)
        x2.append(B)
        y_t.append(label)
    return  torch.tensor(x1, dtype = dtype, device = device), torch.tensor(x2, dtype = dtype, device = device), torch.tensor(y_t, dtype = dtype, device = device)


### Data Loader, Train_Test split and Shuffle Helpers

In [43]:
# example
x1, x2, y_t = prepare_data(gen_batch(batch_size = 10, n_elements = 5, task = 'si'))

----
## Model Architecture

### Base Backbones

In [None]:
# 1. MLP
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, out_dim, 
                 num_layers = 2, dp_rate = 0.1):
        super(MLP, self).__init__()
        # Hyperparameters
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.out_dim = out_dim
        self.num_layers = num_layers
        
        c_in, c_out = input_dim, hidden_dim
        layers = []
        for lid in range(num_layers - 1):
            layers += [nn.Linear(c_in, c_out),
                       nn.LeakyReLU(inplace = True),
                       nn.Dropout(dp_rate)]
            c_in = hidden_dim
        layers += [nn.Linear(c_in, c_out)]
        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)
        

----
### Model (Classifier)

In [None]:
class MyModel(nn.Module):
    '''
    A Higher Level Model that takes input
    - x1 
    - x2 
    and output a scalar as the probability for the binary label being 1
    '''
    def __init__(self, input_dim, hidden_dim, out_dim = 1,
                num_layers = 3):
        super(MyModel, self).__init__()
        self.feedforward1 = MLP(input_dim, hidden_dim, hidden_dim, num_layers)
        self.feedforward2 = MLP(input_dim, hidden_dim, hidden_dim, num_layers)
        self.head = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2)
            nn.LeakyReLU(inplace = True),
            nn.Linear(hidden_dim // 2, out_dim)
            nn.Sigmoid()
        )

    def forward(self, x1, x2):
        x1 = self.feedforward1(x1)
        x2 = self.feedforward2(x2)
        return self.head(x1 + x2)