# Introduction

This script performs RSA on FashionMNIST models. The loop in which the models are trained is commented out as the trained models are provided on github. They need to be saved in the directory specified here. Ensure that the notebook is able to save files in the working directory.

In [0]:
from google.colab import drive 
drive.mount('/content/gdrive', force_remount=True)
working_directory       = 'path here/' 
fashionmnist_saves_conv = 'directory name here'  
fashionmnist_saves_mlp  = 'directory name here'

Mounted at /content/gdrive


# Model training and computing Performances

Data is split into 1) train 2) test and 3) valid. Models are trained on train, accuracies estimated on test. Valid is used to perform RSA on.

In [0]:
import torchvision
import torch
import numpy as np
import scipy.stats
import sklearn.manifold
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import seaborn
import pandas as pd
import sklearn
import os
import pickle
import time
from scipy.spatial import distance
from sklearn.metrics.pairwise import manhattan_distances
import torch.nn as nn
from copy import deepcopy


transforms = torchvision.transforms.Compose([
                           torchvision.transforms.ToTensor(),
                       ])

FashionMNIST_train = torchvision.datasets.FashionMNIST(root='./data',train=True, transform=transforms, download=True)
FashionMNIST_train, FashionMNIST_valid = sklearn.model_selection.train_test_split(FashionMNIST_train, test_size=0.1, random_state=1)
FashionMNIST_test = torchvision.datasets.FashionMNIST(root='./data',train=False, transform= transforms, download=True)

dataloader_train = torch.utils.data.DataLoader(FashionMNIST_train, batch_size = 200, shuffle = True)
dataloader_test = torch.utils.data.DataLoader(FashionMNIST_test, batch_size = 10000, shuffle = False)



## Model class definitions
Model classes are defined with hidden layer sizes as variables. 

In [0]:
class feedforward(torch.nn.Module):
    
    def __init__(self, D_in, Hidden, Hidden2, Hidden3, D_out):
  
        super(feedforward, self).__init__()
        self.linear1 = torch.nn.Linear(D_in, Hidden)
        self.linear2 = torch.nn.Linear(Hidden, Hidden2)
        self.linear3 = torch.nn.Linear(Hidden2, Hidden3)
        self.linear4 = torch.nn.Linear(Hidden3, D_out)
        self.sigmoid = torch.nn.Sigmoid()


    def forward(self, x):
      
        l1_activ = self.sigmoid(self.linear1(x))
        l2_activ = self.sigmoid(self.linear2(l1_activ))
        l3_activ = self.sigmoid(self.linear3(l2_activ))
        y_pred =   self.linear4(l3_activ)
        
        return y_pred, l3_activ, l2_activ, l1_activ




class ConvNet(torch.nn.Module):
    
    def __init__(self, h, j):
        
        super(ConvNet, self).__init__()
        
        self.layer1 = nn.Sequential(nn.Conv2d(1, h * 2, kernel_size=5, stride=1, padding=2),
                                    nn.MaxPool2d(kernel_size=2, stride=2),
                                    nn.ReLU(),
                                    nn.BatchNorm2d(h * 2))
     
        self.fc1 = nn.Sequential(nn.Linear(14 * 14 * h * 2, j * 16),
                                 nn.ReLU(),
                                 nn.BatchNorm1d(j * 16),
                                 nn.Dropout(p = 0.5))
       
        self.fc2 = nn.Sequential(nn.Linear(j * 16, j * 8),
                                 nn.Sigmoid(),
                                 nn.BatchNorm1d(j * 8),
                                 nn.Dropout(p = 0.5))                           
      
        self.fc3 = nn.Linear(j*8, 10)

        
    def forward(self, x):
        
        conv1 = self.layer1(x)
        reshaped = conv1.reshape(conv1.size(0), -1)
        fc1 = self.fc1(reshaped)
        fc2 = self.fc2(fc1)                             
        out = self.fc3(fc2)
        return [out, fc2, fc1, reshaped, conv1]
            
criterion = torch.nn.CrossEntropyLoss()


## Training CNNs in a loop
This loop is used to generate CNNs. Every model has a different number of filters/units in each hidden layer. Every model is saved in a seperate file in the same folder. Accuracy is reported but not saved. It is computed again when loading the models, to ensure there is no bug.

In [0]:
'''
model_index = 0

for h in range(2,15):
    for j in range(2,16):
          
        model = ConvNet(h, j).cuda()
        criterion = torch.nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr= 0.01)


        for epoch in range(15):
            for (inputs, labels) in (dataloader_train):
    
                model.train()
                inputs, labels = inputs.cuda(), labels.cuda()
                outputs = model(inputs)[0]
                loss = criterion(outputs, labels)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
        with torch.no_grad():
            for (inputs, labels) in (dataloader_test):
            
                model.eval()
                inputs, labels = inputs.cuda(), labels.cuda()
                out = model(inputs)[0]
                total = labels.size(0)
                correct = ((torch.max(out,1)[1]) == labels).sum()
                acc =   correct.float() / total
                print(acc)
        
            state = {'layers': [h,j],
                     'epoch': epoch, 'state_dict': model.state_dict()}
            try:
              
                 os.makedirs(f'{working_directory}/{fashionmnist_saves_conv/')
                
            except FileExistsError:
              
                pass
              
            torch.save(state, f'{working_directory}/{fashionmnist_saves_conv/{model_index}.tar')
            model_index += 1
            
'''


"\nmodel_index = 0\n\nfor h in range(2,15):\n    for j in range(2,16):\n          \n        model = ConvNet(h, j).cuda()\n        criterion = torch.nn.CrossEntropyLoss()\n        optimizer = torch.optim.Adam(model.parameters(), lr= 0.01)\n\n\n        for epoch in range(15):\n            for (inputs, labels) in (dataloader_train):\n    \n                model.train()\n                inputs, labels = inputs.cuda(), labels.cuda()\n                outputs = model(inputs)[0]\n                loss = criterion(outputs, labels)\n                optimizer.zero_grad()\n                loss.backward()\n                optimizer.step()\n                \n        with torch.no_grad():\n            for (inputs, labels) in (dataloader_test):\n            \n                model.eval()\n                inputs, labels = inputs.cuda(), labels.cuda()\n                out = model(inputs)[0]\n                total = labels.size(0)\n                correct = ((torch.max(out,1)[1]) == labels).sum()\n       

## Compute CNN accuracies
Loads the CNNs and computes accuracies, saves accuracies as a list.

In [0]:
acc_list = []
conv_saves = f'{working_directory}/{fashionmnist_saves_conv}/'
for save in os.listdir(conv_saves):
    
    model_dict = torch.load(f'{working_directory}/{fashionmnist_saves_conv}/{save}')
    layers = model_dict['layers']
    model = ConvNet(*layers).cuda()
    model.load_state_dict(model_dict['state_dict'])

  
    with torch.no_grad():
         for i, (inputs, labels) in enumerate(dataloader_test):
        
            inputs, labels = (inputs, labels)
            inputs, labels = inputs.cuda(), labels.long().cuda()
            out = model(inputs)[0]
            total = labels.size(0)
            correct = ((torch.max(out,1)[1]) == labels).sum()
            acc =   correct.float() / total
            
    print(f'Test accuracy of the network: {acc}')
    acc_list.append([acc.cpu().numpy(),save])
    
with open(f'{working_directory}/accuracy_list_fashionmnist_cnn.pkl', 'wb') as fp:
   
    pickle.dump(acc_list, fp)

Test accuracy of the network: 0.8107999563217163
Test accuracy of the network: 0.8517000079154968
Test accuracy of the network: 0.8671000003814697
Test accuracy of the network: 0.8725000023841858
Test accuracy of the network: 0.8823999762535095
Test accuracy of the network: 0.8789999485015869
Test accuracy of the network: 0.883899986743927
Test accuracy of the network: 0.8834999799728394
Test accuracy of the network: 0.8795999884605408
Test accuracy of the network: 0.8817999958992004
Test accuracy of the network: 0.8860999941825867
Test accuracy of the network: 0.887999951839447
Test accuracy of the network: 0.8880999684333801
Test accuracy of the network: 0.8818999528884888
Test accuracy of the network: 0.8305000066757202
Test accuracy of the network: 0.8637999892234802
Test accuracy of the network: 0.8736000061035156
Test accuracy of the network: 0.8860999941825867
Test accuracy of the network: 0.8831999897956848
Test accuracy of the network: 0.8849999904632568
Test accuracy of the n

## Train MLPs in a loop
Train the MLPs in a loop (similar to CNNs). Save a a dictionary including all necessary information to reload them.

In [0]:
'''
model_index = 0
for i in range(8):
    for j in range(8):
        for h in range(8):
            
            layers = [28*28, i*2 +j*2 + h*2 +3, j*2 + h*2 +3, h*2 +3 ,10]
            model   = feedforward(*layers).cuda()
            optimizer = torch.optim.Adam(model.parameters(), lr= 0.001)
            for epoch in range(15):
                for h, (inputs, labels) in enumerate(dataloader_train):
        
                    inputs, labels = (inputs, labels)
                    inputs, labels = torch.autograd.Variable(inputs).view((-1, 28*28)).float().cuda(), torch.autograd.Variable(labels).long().cuda()
                    out = model(inputs)[0]
                    loss = criterion(out, labels)
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
               
            state = {'layers': layers,
                     'epoch': epoch,'state_dict': model.state_dict(), 'optimizer': optimizer.state_dict()}
            try:
              
                os.makedirs(f'{working_directory}/{fashionmnist_saves_mlp}')
            except FileExistsError:
              
                pass
            
            torch.save(state, f'{working_directory}/{fashionmnist_saves_mlp}/{model_index}.tar')
            print(model_index)
            model_index += 1
            
'''


"\nmodel_index = 0\nfor i in range(8):\n    for j in range(8):\n        for h in range(8):\n            \n            layers = [28*28, i*2 +j*2 + h*2 +3, j*2 + h*2 +3, h*2 +3 ,10]\n            model   = feedforward(*layers).cuda()\n            optimizer = torch.optim.Adam(model.parameters(), lr= 0.001)\n            for epoch in range(15):\n                for h, (inputs, labels) in enumerate(dataloader_train):\n        \n                    inputs, labels = (inputs, labels)\n                    inputs, labels = torch.autograd.Variable(inputs).view((-1, 28*28)).float().cuda(), torch.autograd.Variable(labels).long().cuda()\n                    out = model(inputs)[0]\n                    loss = criterion(out, labels)\n                    optimizer.zero_grad()\n                    loss.backward()\n                    optimizer.step()\n               \n            state = {'layers': layers,\n                     'epoch': epoch,'state_dict': model.state_dict(), 'optimizer': optimizer.state_d

## Compute MLP accuracies 
Loads the saved MLP models and computes accuracies. Saves accuracies in a list.

In [0]:
mlp_saves = f'{working_directory}/{fashionmnist_saves_mlp}'

acc_list = []
for save in os.listdir(mlp_saves):
    
    model_dict = torch.load(f'{working_directory}/{fashionmnist_saves_mlp}/{save}')
    layers = model_dict['layers']
    model = feedforward(*layers).cuda()
    model.eval()
    model.load_state_dict(model_dict['state_dict'])

  
    with torch.no_grad():
         for i, (inputs, labels) in enumerate(dataloader_test):    
        
            inputs, labels = (inputs, labels)
            inputs, labels = torch.autograd.Variable(inputs).view((-1, 28*28)).float().cuda(), torch.autograd.Variable(labels).long().cuda()
            out = model(inputs)[0]
            total = labels.size(0)
            correct = ((torch.max(out,1)[1]) == labels).sum()
            acc =   correct.float() / total
            
    print(f'Test accuracy of the network: {acc}')
    acc_list.append([acc.cpu().numpy(),save])
    
with open(f'{working_directory}/accuracy_list_fashionmnist_mlp.pkl', 'wb') as fp:
    
    pickle.dump(acc_list, fp)
  

Test accuracy of the network: 0.5507000088691711
Test accuracy of the network: 0.675599992275238
Test accuracy of the network: 0.8018999695777893
Test accuracy of the network: 0.7699999809265137
Test accuracy of the network: 0.6991999745368958
Test accuracy of the network: 0.5741999745368958
Test accuracy of the network: 0.6505999565124512
Test accuracy of the network: 0.7443000078201294
Test accuracy of the network: 0.7166999578475952
Test accuracy of the network: 0.4892999827861786
Test accuracy of the network: 0.7339000105857849
Test accuracy of the network: 0.5722999572753906
Test accuracy of the network: 0.4869999885559082
Test accuracy of the network: 0.8082000017166138
Test accuracy of the network: 0.7788999676704407
Test accuracy of the network: 0.8014000058174133
Test accuracy of the network: 0.6636999845504761
Test accuracy of the network: 0.7128999829292297
Test accuracy of the network: 0.6534000039100647
Test accuracy of the network: 0.8265999555587769
Test accuracy of the 

# RSA

## A function to compute Input RDMs & confusion matrices
This function calculates 

1.   Activations for a number num_samples of sample images from FashionMNIST for every model in the save files and for the desired layer.
2.   The Input RDM (=correlation Matrix) for every model
3.   The confusion matrices of every model (but does not as a default).



In [0]:
mlp_saves = f'{working_directory}/{fashionmnist_saves_mlp}/'
conv_saves = f'{working_directory}/{fashionmnist_saves_conv}/'


def calculate_activations_correlations(files, path, layer, num_samples=1000, show_progress=True, activations_dict = {}, corr_distances_dict = {}, 
                                       confusion_dict = {}, calc_confusion = False):
  
  dataloader_valid = torch.utils.data.DataLoader(FashionMNIST_valid, batch_size = num_samples, shuffle = False)
  inputs, labels  = next(iter(dataloader_valid))
  index_sorted =  np.argsort(labels.numpy())
  inputs, labels = inputs[index_sorted, :], labels[index_sorted]
  inputs_copy, labels_copy = deepcopy(inputs), deepcopy(labels)
  n = len(labels)

  def calculate_confusion(model, inputs, labels):
      
      confusion_matrix = np.zeros((10,10))
      with torch.no_grad():
          
          model.eval()
          inputs, labels = inputs.cuda(), labels.cuda()
          out = model(inputs)[0]
        
          for true_label in range(10):
              
              true_index = (labels==true_label) # in case not all labels are in the sample
                
              if(true_index.sum() != 0) :
                
                   (torch.max(out,1)[1])
                  
                   for output in torch.max(out[true_index],1)[1]:
                       for estimate in range(10):
                        
                           confusion_matrix[true_label, estimate] += (output == estimate).sum()
              else:
                
                  pass
    
      return(confusion_matrix)

  


  def activations_list(model):
    
      activations = []
      with torch.no_grad():
          model.eval()
          for i in range(n):
            
              if model_type == ConvNet:
                
                  single_input = inputs[i].cuda().view(1,1,28,28)
              else:
                
                  single_input = inputs[i].cuda()
                  
              activations.append(model(single_input)[layer].cpu().numpy().squeeze())
      return(activations)

  def input_rdm(list_of_activations):
      
      correlation = np.zeros((n,n))
      for i in range(n):
          for j in range(i, n):
            
              correlation[i,j] = correlation[j, i] = 1 - scipy.stats.pearsonr(list_of_activations[i], list_of_activations[j])[0]
      return(correlation)

  for i in range(len(files,)):
      if show_progress:
          
          print(i)
          
      inputs, labels = inputs_copy, labels_copy
      model_name = files[i]
      if path[i]==0:
        
          path_to_saves = mlp_saves
          model_type = feedforward
          inputs, labels = torch.autograd.Variable(inputs).view((-1, 28*28)).float(), torch.autograd.Variable(labels).long()
      
      else: 
        
          path_to_saves = conv_saves
          model_type = ConvNet
         
     
      model_dict = torch.load(f'{path_to_saves}{model_name}')
      layers = model_dict['layers']
      model = model_type(*layers).cuda()
      model.load_state_dict(model_dict['state_dict'])
      activations_dict[i] = activations_list(model)
      corr_distances_dict[i] = input_rdm(activations_dict[i])
      if calc_confusion:
        
          confusion_dict[i]= calculate_confusion(model, inputs, labels)
          
      
  return corr_distances_dict,  confusion_dict


## Get dataframe of model accuracies & locations

Here, the accuracy list from MLPs and CNNs are arranged into one dataframe, which contains there accuracies, location in their model type's folder and their model type. Then, the dataframe is ordered by accuracies. This allows obtaining a Model RDM with models sorted by accuracy.

In [0]:
with open (f'{working_directory}/accuracy_list_fashionmnist_mlp.pkl', 'rb') as fp:
    
    acc_list_mlp = pickle.load(fp)
    
with open (f'{working_directory}/accuracy_list_fashionmnist_cnn.pkl', 'rb') as fp:
    
    acc_list_convnet = pickle.load(fp)
   
acc_df_mlp = pd.DataFrame(acc_list_mlp, columns = ['acc', 'loc'])
acc_df_mlp['model_type'] = np.zeros(len(acc_df_mlp))
acc_df_convnet = pd.DataFrame(acc_list_convnet, columns = ['acc', 'loc'])
acc_df_convnet['model_type'] = np.ones(len(acc_df_convnet))
acc_df =  pd.concat([acc_df_mlp,acc_df_convnet], axis=0, ignore_index = True) 
acc_df['index'] = acc_df.index
acc_df = acc_df.sort_values(by = 'acc', ascending = True)



## Calculate (or load) Input Rdms and confusion matrices
Run the function if_calculate_activations_correlations to compute Input RDMs and confusion matrices on 400 samples taken from the FashionMNIST validation data for the penultimate layer. Save every returned dictionary using pickle.

In [0]:
calculate_activations_correlations_should_be_run = True
files = acc_df['loc']
path  = acc_df['model_type']
if calculate_activations_correlations_should_be_run:
 
    input_rdm_dict_fashionmnist, confusion_dict_fashionmnist = calculate_activations_correlations(list(files), list(path), num_samples= 400, calc_confusion = True, layer = 2)

    
    with open(f'{working_directory}/input_rdm_dict_fashionmnist.pkl', 'wb') as fp:
     
        pickle.dump(input_rdm_dict_fashionmnist, fp)
  
    with open(f'{working_directory}/confusion_dict_fashionmnist.pkl', 'wb') as fp:
    
        pickle.dump(confusion_dict_fashionmnist, fp)    

else:
  
    with open (f'{working_directory}/input_rdm_dict_fashionmnist.pkl', 'rb') as fp:
   
        input_rdm_dict_fashionmnist = pickle.load(fp)


0
1


## Function for Model RDM
Functions do:

1.   define the rank-correlation distance between two upper triangular matrices of Input RDMs
2.   calculate the distance between two Input RDMs for a given distance function
3.   calculate the Model RDM (depends on 2 (necessery) and 1 (if rank correlation distance needed)
4.   visualize the Model RDM (shows every 15th axis tick)


In [0]:
def rank_corr_dist(triu_model_1, triu_model_2):
  return 1 - scipy.stats.spearmanr(triu_model_1.flatten(), triu_model_2.flatten())[0]

def dist_between_input_rdms(dist_fun, corr_matrix_1, corr_matrix_2):
  triu_model_1 = corr_matrix_1[np.triu_indices(corr_matrix_1.shape[0], k = 1)]
  triu_model_2 = corr_matrix_2[np.triu_indices(corr_matrix_2.shape[0], k = 1)]
  
  return dist_fun(triu_model_1, triu_model_2)

def calc_rdm(dist_fun, corr_distances_dict,  files=files):
  n = len(files)
  rdm = np.zeros((n,n))
  for i in range(n):
      for j in range(i, n):
          rdm[i,j] = rdm[j, i] = dist_between_input_rdms(dist_fun,
                                                            corr_distances_dict[i], 
                                                            corr_distances_dict[j])
  return rdm

def visualise_rdm(rdm, acc = acc_df['acc'] ):
  plt.figure(figsize=(14,14))
  ax = seaborn.heatmap(rdm, xticklabels = np.around(acc.astype('float') * 100), yticklabels = np.around(acc.astype('float') * 100), cmap='rainbow')
  n = 15  
  [l.set_visible(False) for (i,l) in enumerate(ax.xaxis.get_ticklabels()) if i % n != 0]
  [l.set_visible(False) for (i,l) in enumerate(ax.yaxis.get_ticklabels()) if i % n != 0]

## Calculate euclidean distance Model rdm
Calculates the Model RDM for the given dictionary (dictionary can contain Input RDMs for any layer, right now it is the penultimate layer)

In [0]:
rdm = calc_rdm(distance.euclidean, corr_distances_dict_fashionmnist)
visualise_rdm(rdm)