In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
from torch.optim import Adam, SGD, AdamW
import torch.nn as nn
from torch.nn import Linear
from torch.utils.data import TensorDataset, DataLoader

import numpy as np
import matplotlib.pyplot as plt 
import seaborn as sns
import os
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

In [None]:
#@title downloading the data
import os, requests, tarfile

fnames = ["kay_labels.npy", "kay_labels_val.npy", "kay_images.npz"]
urls = ["https://osf.io/r638s/download",
        "https://osf.io/yqb3e/download",
        "https://osf.io/ymnjv/download"]

for fname, url in zip(fnames, urls):
  if not os.path.isfile(fname):
    try:
      r = requests.get(url)
    except requests.ConnectionError:
      print("!!! Failed to download data !!!")
    else:
      if r.status_code != requests.codes.ok:
        print("!!! Failed to download data !!!")
      else:
        print(f"Downloading {fname}...")
        with open(fname, "wb") as fid:
          fid.write(r.content)
        print(f"Download {fname} completed!")

Downloading kay_labels.npy...
Download kay_labels.npy completed!
Downloading kay_labels_val.npy...
Download kay_labels_val.npy completed!
Downloading kay_images.npz...
Download kay_images.npz completed!


In [None]:
with np.load(fname) as dobj:
  dat = dict(**dobj)
labels = np.load('kay_labels.npy')
val_labels = np.load('kay_labels_val.npy')

In [None]:
print(dat.keys())

dict_keys(['stimuli', 'stimuli_test', 'responses', 'responses_test', 'roi', 'roi_names'])


In [None]:
labels

array([['artifact', 'entity', 'animal', ..., 'artifact', 'artifact',
        'artifact'],
       ['instrumentality', 'round shape', 'vertebrate', ..., 'structure',
        'structure', 'instrumentality'],
       ['equipment', 'sphere', 'mammal', ..., 'building', 'landing',
        'conveyance'],
       ['croquet ball', 'bubble', 'komondor', ..., 'monastery', 'dock',
        'warplane']], dtype='<U71')

In [None]:
labels.shape
labels = np.transpose(labels)
val_labels = np.transpose(val_labels)

In [None]:
print(labels.shape)

(1750, 4)


In [None]:
device = 'cuda'

In [None]:
import cv2

def channel_amplify(data):
    height = 128
    width = 128
    nchannels = 3
    my_new_image_set = []

    for i in range (len(data)):
        # print(f'I am in the {i}th image')
        img = data[i]
        new_img = np.zeros((height,width,nchannels))
        for ch in range(nchannels):
            for xx in range(height):
                for yy in range(width):
                    new_img[xx,yy,ch] = img[xx,yy]
        img = new_img
        img_upscaled = cv2.resize(img, dsize=(224, 224), interpolation=cv2.INTER_CUBIC)
        my_new_image_set.append(img_upscaled)

    print(len(my_new_image_set))

    my_new_image_set = np.array(my_new_image_set)
    my_new_image_set = my_new_image_set.transpose(0,3,1,2)

    return my_new_image_set

train_stimuli = channel_amplify(dat['stimuli'])
test_stimuli = channel_amplify(dat['stimuli_test'])

In [None]:
# extracting the full dimension from the arrays
label_row1 = labels[:, 0]
label_row2 = labels[:, 1]
label_row3 = labels[:, 2]
label_row4 = labels[:, 3]

label_row2.shape

In [None]:
test_labels_row1 = val_labels[:, 0] 
test_labels_row2 = val_labels[:, 1]
test_labels_row3 = val_labels[:, 2]
test_labels_row4 = val_labels[:, 3]

test_labels_row2.shape

In [None]:
labels.shape

In [None]:
# creating training and testing features
train_x = torch.from_numpy(train_stimuli)
test_x = torch.from_numpy(test_stimuli)   

In [None]:
train_x[0].shape

In [None]:
from sklearn.preprocessing import OneHotEncoder, LabelEncoder

# creating training labels 
LabelEncoder = LabelEncoder()
OneHotEncoder = OneHotEncoder()

label_encoded_1 = LabelEncoder.fit_transform(label_row1)
# row1_labels = OneHotEncoder.fit_transform(label_encoded_1.reshape(-1,1)).toarray()
train_y_1 = torch.tensor(label_encoded_1)

label_encoded_2 = LabelEncoder.fit_transform(label_row2)
# row2_labels = OneHotEncoder.fit_transform(label_encoded_2.reshape(-1,1)).toarray()
train_y_2 = torch.tensor(label_encoded_2)

label_encoded_3 = LabelEncoder.fit_transform(label_row3)
# row3_labels = OneHotEncoder.fit_transform(label_encoded_3.reshape(-1,1)).toarray()
train_y_3 = torch.tensor(label_encoded_3)

label_encoded_4 = LabelEncoder.fit_transform(label_row4)
# row4_labels = OneHotEncoder.fit_transform(label_encoded_4.reshape(-1,1)).toarray()
train_y_4 = torch.tensor(label_encoded_4)

#print(train_y_1.shape)
#print(train_y_1)
#print(np.unique(label_encoded))

#print(row1_labels.shape)
#print(row1_labels)



In [None]:
# creating testing labels

test_1 = LabelEncoder.fit_transform(test_labels_row1)
test_y_1 = torch.tensor(test_1)

test_2 = LabelEncoder.fit_transform(test_labels_row2)
test_y_2 = torch.tensor(test_2)

test_3 = LabelEncoder.fit_transform(test_labels_row3)
test_y_3 = torch.tensor(test_3)

test_4 = LabelEncoder.fit_transform(test_labels_row4)
test_y_4 = torch.tensor(test_4)

# print(test_y_1.shape)
# print(test_y_2.shape)
# print(test_y_3.shape)
# print(test_y_4.shape)
# print(test_y_1[0].shape)

In [None]:
from torch.utils.data import TensorDataset, DataLoader

batch_size = 128
train_data_1 = TensorDataset(train_x, train_y_1)
train_dataloader_1 = DataLoader(train_data_1, batch_size=batch_size, shuffle=True)

train_data_2 = TensorDataset(train_x, train_y_2)
train_dataloader_2 = DataLoader(train_data_2, batch_size=batch_size, shuffle=True)

train_data_3 = TensorDataset(train_x, train_y_3)
train_dataloader_3 = DataLoader(train_data_3, batch_size=batch_size, shuffle=True)

train_data_4 = TensorDataset(train_x, train_y_4)
train_dataloader_4 = DataLoader(train_data_4, batch_size=batch_size, shuffle=True)

In [None]:
batch_size = 128
test_data_1 = TensorDataset(test_x, test_y_1)
test_dataloader_1 = DataLoader(test_data_1, batch_size=batch_size, shuffle=True)

test_data_2 = TensorDataset(test_x, test_y_2)
test_dataloader_2 = DataLoader(test_data_2, batch_size=batch_size, shuffle=True)

test_data_3 = TensorDataset(test_x, test_y_3)
test_dataloader_3 = DataLoader(test_data_3, batch_size=batch_size, shuffle=True)

test_data_4 = TensorDataset(test_x, test_y_4)
test_dataloader_4 = DataLoader(test_data_4, batch_size=batch_size, shuffle=True)

In [None]:
!git clone https://github.com/dicarlolab/CORnet.git

In [None]:
!pip install git+https://github.com/dicarlolab/CORnet

In [None]:
import torch.nn as nn
from torch.nn import Linear
from collections import OrderedDict

import cornet
from CORnet.cornet import cornet_z
from cornet.cornet_z import HASH as HASH_Z

class Flatten(nn.Module):

    """
    Helper module for flattening input tensor to 1-D for the use in Linear modules
    """

    def forward(self, x):
        return x.view(x.size(0), -1)

class Identity(nn.Module):

    """
    Helper module that stores the current tensor. Useful for accessing by name
    """

    def forward(self, x):
        return x

def get_model(x):

    model = cornet_z(pretrained=True, map_location=None)

    model.module.decoder = nn.Sequential((OrderedDict([
                ('avgpool', nn.AdaptiveAvgPool2d(1)),
                ('flatten', Flatten()),
                ('linear', nn.Linear(512, x)),
                ('output', Identity())])))

    for m in [model.module.decoder]:
            if isinstance(m, (nn.Conv2d, nn.Linear)):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
    
    return model

# FINE-TUNE: 
#   1) train only the decoder layer
#   2) train only the last two layers
#   3) use different learning rates (lower for V1-1... higher for IT, decoder) (see 1 cycle policy + freezing in fastai)

In [None]:
model = get_model(8)

Downloading: "https://s3.amazonaws.com/cornet-models/cornet_z-5c427c9c.pth" to /root/.cache/torch/hub/checkpoints/cornet_z-5c427c9c.pth


  0%|          | 0.00/15.8M [00:00<?, ?B/s]

In [None]:
for name in model.module.parameters():
  print(name)

In [None]:
model.module.decoder

Sequential(
  (avgpool): AdaptiveAvgPool2d(output_size=1)
  (flatten): Flatten()
  (linear): Linear(in_features=512, out_features=8, bias=True)
  (output): Identity()
)

In [None]:
model.cuda()

DataParallel(
  (module): Sequential(
    (V1): CORblock_Z(
      (conv): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
      (nonlin): ReLU(inplace=True)
      (pool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (output): Identity()
    )
    (V2): CORblock_Z(
      (conv): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (nonlin): ReLU(inplace=True)
      (pool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (output): Identity()
    )
    (V4): CORblock_Z(
      (conv): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (nonlin): ReLU(inplace=True)
      (pool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (output): Identity()
    )
    (IT): CORblock_Z(
      (conv): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (nonlin): ReLU(inplace=True)
      (pool): MaxPool2d(kernel_size=3, strid

In [None]:
from torch.optim import SGD, Adam, AdamW

optimizer = Adam(model.module.parameters(), lr=0.0005)

In [None]:
from torch.nn import CrossEntropyLoss

loss_fn = CrossEntropyLoss()

In [None]:
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

In [None]:
from torchsummary import summary

summary(model, (3,224,224))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 112, 112]           9,472
              ReLU-2         [-1, 64, 112, 112]               0
         MaxPool2d-3           [-1, 64, 56, 56]               0
          Identity-4           [-1, 64, 56, 56]               0
        CORblock_Z-5           [-1, 64, 56, 56]               0
            Conv2d-6          [-1, 128, 56, 56]          73,856
              ReLU-7          [-1, 128, 56, 56]               0
         MaxPool2d-8          [-1, 128, 28, 28]               0
          Identity-9          [-1, 128, 28, 28]               0
       CORblock_Z-10          [-1, 128, 28, 28]               0
           Conv2d-11          [-1, 256, 28, 28]         295,168
             ReLU-12          [-1, 256, 28, 28]               0
        MaxPool2d-13          [-1, 256, 14, 14]               0
         Identity-14          [-1, 256,

In [None]:
def evaluation(dataloader):

    model.eval()

    # Tracking variables 
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0

    # Evaluate df1 for one epoch
    for batch in dataloader:
        # Add batch to GPU
        batch = tuple(t.to(device, dtype=torch.float) for t in batch)
        # Unpack the inputs from our df1loader
        b_inputs, b_labels = batch
        # Telling the model not to compute or store gradients, saving memory and speeding up validation
        with torch.no_grad():
        # Forward pass, calculate logit predictions
            predictions = model(b_inputs)
        
        # Move logits and labels to CPU
        predictions = predictions.detach().cpu().numpy()
        b_labels = b_labels.to('cpu').numpy()

        tmp_eval_accuracy = flat_accuracy(predictions, b_labels)
        
        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1

    print("Validation Accuracy: {}".format(eval_accuracy/nb_eval_steps))
    return (eval_accuracy/nb_eval_steps)

In [None]:
def training(model, epochs, train_dataloader, test_dataloader):

    train_loss_set = []

    for _ in range(epochs):

        model.train()

        tr_loss = 0
        nb_tr_examples, nb_tr_steps = 0, 0

        for step, batch in enumerate(train_dataloader):
            # Add batch to GPU
            batch = tuple(t.to(device, dtype=torch.float) for t in batch)
            # print(batch)
            # Unpack the inputs from our df1loader
            b_inputs, b_labels = batch

            b_labels = b_labels.to(torch.int64)

            '''
            print(b_inputs.shape)
            print(b_labels.shape)
            b_labels.flatten()
            print(b_labels[0].shape)
            '''

            # Clear out the gradients (by default they accumulate)
            optimizer.zero_grad()
            # Forward pass
            predictions = model(b_inputs)
            loss = loss_fn(predictions, b_labels)
            
            train_loss_set.append(loss.item())    
            # Backward pass
            loss.backward()
            # Update parameters and take a step using the computed gradient
            optimizer.step()

            # Update tracking variables
            tr_loss += loss.item()
            nb_tr_steps += 1
        
        print("Train loss for epoch {}: {}".format(_, tr_loss/nb_tr_steps))
    
        evaluation(test_dataloader)
    return train_loss_set
            

In [None]:
loss_set = training(model, epochs=15, train_dataloader=train_dataloader_1, test_dataloader=test_dataloader_1)

In [None]:
len(loss_set)

In [None]:
c = []
start = 0
for i in range(0, len(loss_set)):
    if i%14!=0:
        c.append(np.mean(loss_set[start:i]))
        start+=14

In [None]:
plt.plot(range(len(loss_set)), loss_set)
plt.show()

In [None]:
outputs = model(train_x[:32].to(device=device, dtype=torch.float))
#loss = loss_fn(outputs, train_y_1[0].to(device=device))
print(outputs)

In [None]:
print(train_y_1[0])