#                        - Computational Neuroscience 2021-2022 Final Project -        

##   Project Name: Combinatorial Codes in Ventral Temporal Lobe for Visual Object Recognition



In [1]:
!pip install umap
!pip install pipreqs
!pip install lazypredict
!pip install nibabel
!pip install nilearn
!pip install -U kaleido


try:
    import sklearn
    print('Scikit-learn is available, version', sklearn.__version__)
    
except:
    !pip install scikit-learn
    
 
try:
    import cv2
    print('Open-CV is available, version', cv2.__version__)
    
except:
     !pip install opencv-python
    
   
try:
    import seaborn
    print('Seaborn is available, version', seaborn.__version__)
    
except:
     !pip install seaborn


Requirement already up-to-date: kaleido in d:\python\lib\site-packages (0.2.1)
Scikit-learn is available, version 0.23.1
Open-CV is available, version 4.5.1
Seaborn is available, version 0.11.0


In [2]:
from __future__ import print_function, division

# Basics:
import numpy as np,pandas as pd, matplotlib.pyplot as plt, seaborn as sns
import os, random, time, sys, copy, math, pickle

# interactive mode
plt.ion()

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

# For plotting
import plotly.io as plt_io
import plotly.graph_objects as go
%matplotlib inline

# Dimension Reduction Algorithms:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.decomposition import FastICA
from sklearn.decomposition import NMF
import umap

# Transformations
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

# Metrics:
from sklearn.metrics import classification_report

# Train-Test Splitter:
from sklearn.model_selection import train_test_split

# For Classical ML algorithms:
from lazypredict.Supervised import LazyClassifier

# Utilies:
from tqdm import tqdm

# For distance measurements:
from scipy.spatial.distance import cdist

# Extras:
from abc import abstractmethod
from typing import Callable, Iterable, List, Tuple

# Set true for Google Colab:
COLAB = False

if COLAB:
    # To access Google Drive:
    from google.colab import drive
    drive.mount("/content/gdrive")

    
# For neuroimaging:
from nibabel.testing import data_path
from nilearn import plotting as nplt
from nilearn.input_data import NiftiMasker
from nilearn import datasets
from nilearn import plotting
from nilearn.image import mean_img
from nilearn.image import index_img
import nibabel as nib
from nilearn import image



print("NumPy Version: ", np.__version__)


root_dir = r'C:\Users\Administrator\Desktop\VOR'
os.chdir(root_dir)
image_results_dir = os.path.join(root_dir, 'images')
results_dir = os.path.join(root_dir, 'results')

print('Working Directory: \n ', root_dir)


# Creating requirements.txt file
!pip3 freeze > requirements.txt  

NumPy Version:  1.19.1
Working Directory: 
  C:\Users\Administrator\Desktop\VOR\sub notebooks


# Deep Learning Algorithms

In [None]:
!pip install vit-pytorch

In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as dataset
import torchvision.transforms as transforms
from PIL import Image


# PyTorch's versions:
print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)

# We will be working with GPU:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('Device : ' , device)

# Number of GPUs available. 
num_GPU = torch.cuda.device_count()
print('Number of GPU : ', num_GPU)


# Creating stimuli to category and category to stimuli:
stimuli2category = {
                        'scissors'     : 0,
                        'face'         : 1, 
                        'cat'          : 2,
                        'scrambledpix' : 3,
                        'bottle'       : 4,
                        'chair'        : 5,
                        'shoe'         : 6,
                        'house'        : 7
}

category2stimuli = {category:stimuli for stimuli, category in stimuli2category.items()}

In [None]:
class fMRIDataset(torch.utils.data.Dataset):
    scaler = MinMaxScaler()
    def __init__(self, 
                 mode:str = 'fMRI',
                 transforms = None,
                 fetch_from_path:bool = True,
                 prepare_for_transformer:bool = False):
        
        assert mode in ['fMRI','mask'], 'Please provide fMRI or Mask type of mode!'
        
        self.transforms = transforms
        self.num_class = len(stimuli2category) or len(category2stimuli)

        self.batch_data_path = 'batch_fMRI'
        self.batch_label_path = 'batch_label'
        self.batch_mask_path = 'batch_masks'
        
        if prepare_for_transformer:
            self.batch_data_path = 'batch_fMRI_transformer'
            self.batch_data_path = 'batch_label_transformer'

        
        batched_data_path = os.path.join(root_dir, self.batch_data_path)
        bacthed_label_path = os.path.join(root_dir, self.batch_label_path)  
        bacthed_mask_path = os.path.join(root_dir, self.batch_mask_path)  
        
        
        if mode == 'fMRI':            
            if fetch_from_path: 
                if os.path.exists(batched_data_path + '.npy') and os.path.exists(bacthed_label_path  + '.npy'):   
                    
                    print(f'Data is fetching from {root_dir}')
                    self.data = load(batched_data_path)
                    self.labels = load(bacthed_label_path)   
                    
                else:
                    raise NoneError("Object not constructed. Cannot access a 'None' object.")
            else:
                              
                self.data = np.concatenate(load('fMRI_data'), axis = 0)
                self.labels = np.concatenate(load('labels'), axis = 0)
                
                if prepare_for_transformer:
                    self.prepare_transformer()
                    
                save(self.data, batched_data_path)
                save(self.labels, bacthed_label_path)                                    
            
        else:
            pass
                   

        
        assert self.labels.shape[0] == self.data.shape[0], ' # of Targets and Data samples does not match!'
            
    
    def prepare_transformer(self):
        self.data = self.data[:, 1:, :, :, ].reshape(-1, 64, 64, 3)                         
        self.labels = np.repeat(self.labels, repeats = 13, axis = 0)
-      
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self,idx):
        image = self.data[idx]
               
        
        if image.shape == torch.Size([64, 3, 64]):
            image = image.permute(1,0,2)
                 
        #assert image.shape == torch.Size([3, 64, 64]), 'Mismatch Image Dimension!'
            
        label = self.labels[idx].reshape(1,)
        label = torch.as_tensor(label, dtype=torch.int, device=device)
  
        if self.transforms is not None:
            image = self.transforms(image)
          
        return image, label     

In [None]:
class Normalize():
    def __call__(self, image):
        max_val = image.max()
        return image / max_val
    
class TorchTensor():
    def __call__(self, image):
        return torch.as_tensor(image, dtype=torch.float, device=device)
        
class MeanNormalize():
    def __call__(self, image):
        return F.normalize(image)
    
# [batch * channel(# of channels of each image) * depth(# of frames) * height * width]
class Make3D():
    def __call__(self, image):        
        return image.unsqueeze(0)

In [None]:
transform = transforms.Compose([
                                #transforms.ColorJitter([0.9,0.9]),
                                #transforms.RandomGrayscale(p = 0.3),
                                #transforms.RandomAffine((-30,30)),
                                #transforms.RandomPerspective(),
                                #transforms.GaussianBlur(3),
                                #transforms.RandomHorizontalFlip(p = 0.2),
                                #transforms.RandomVerticalFlip(p = 0.2),

                                #Important parts, above can be ignored
                                #transforms.Resize((224,224)),
                                #transforms.CenterCrop(224),
                                Normalize(),
                                TorchTensor()    
                                #transforms.ToTensor(),
                                                                
])


fMRI_dataset = fMRIDataset(transforms = transform, fetch_from_path = True)


break_point = len(fMRI_dataset) - 100 
train_dataset = torch.utils.data.Subset(fMRI_dataset, indices = range(break_point))
val_dataset = torch.utils.data.Subset(fMRI_dataset, indices = range(break_point, len(fMRI_dataset)))

In [None]:
batch_size = 16
train_loader = torch.utils.data.DataLoader(dataset = train_dataset,
                                          shuffle = False,
                                          batch_size = batch_size,
                                          drop_last = True,
                                          )

val_loader = torch.utils.data.DataLoader(dataset = val_dataset,
                                          shuffle = False,
                                          batch_size = batch_size,
                                          drop_last = True,
                                          )
x, y = next(iter(train_loader))

print(x.shape, x.dtype)
print(y.shape, y.dtype)

In [None]:
from torch_utils import utils_torch
def train_one_epoch(model, criterion, optimizer, data_loader, device, epoch, print_freq, apex=False):
    model.train()
    metric_logger = utils_torch.MetricLogger(delimiter="  ")
    metric_logger.add_meter('lr', utils_torch.SmoothedValue(window_size=1, fmt='{value}'))
    metric_logger.add_meter('img/s', utils_torch.SmoothedValue(window_size=10, fmt='{value}'))

    header = 'Epoch: [{}]'.format(epoch)
    for image, target in metric_logger.log_every(data_loader, print_freq, header):
        start_time = time.time()
        image, target = image.to(device), target.to(device).squeeze(-1).long()
        output = model(image)
        loss = criterion(output, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        acc1, acc5 = utils_torch.accuracy(output, target, topk=(1, 5))
        batch_size = image.shape[0]
        metric_logger.update(loss=loss.item(), lr=optimizer.param_groups[0]["lr"])
        metric_logger.meters['acc1'].update(acc1.item(), n=batch_size)
        metric_logger.meters['acc5'].update(acc5.item(), n=batch_size)
        metric_logger.meters['img/s'].update(batch_size / (time.time() - start_time))


def evaluate(model, criterion, data_loader, device, print_freq=100):
    model.eval()
    metric_logger = utils_torch.MetricLogger(delimiter="  ")
    header = 'Test:'
    with torch.no_grad():
        for image, target in metric_logger.log_every(data_loader, print_freq, header):
            image = image.to(device, non_blocking=True)
            target = target.to(device, non_blocking=True).squeeze(-1).long()
            output = model(image)
            loss = criterion(output, target)

            acc1, acc5 = utils_torch.accuracy(output, target, topk=(1,5))
            # FIXME need to take into account that the datasets
            # could have been padded in distributed setup
            batch_size = image.shape[0]
            metric_logger.update(loss=loss.item())
            metric_logger.meters['acc1'].update(acc1.item(), n=batch_size)
            metric_logger.meters['acc5'].update(acc5.item(), n=batch_size)
    # gather the stats from all processes
    metric_logger.synchronize_between_processes()

    print(' * Acc@1 {top1.global_avg:.3f} Acc@5 {top5.global_avg:.3f}'
          .format(top1=metric_logger.acc1, top5=metric_logger.acc5))
    return metric_logger.acc1.global_avg

In [None]:
class Model(nn.Module):
    def __init__(self,model = None):
        super(Model,self).__init__()
        if model is not None:
            self.model = model    
        else:
            self.model = nn.Sequential(
            self.conv_block(40,  60,  0.1),
            self.conv_block(60,  80,  0.15),
            self.conv_block(80,  128, 0.25),
            self.conv_block(128, 256, 0.3),
            nn.Flatten(),
            self.linear_block(1024, 256, 0.4),
            self.linear_block(256, 128, 0.4),
            nn.Linear(128, 8)
)     

    def forward(self,img):    
        return self.model(img)  

    @staticmethod
    def conv_block(in_channel, out_channel, p):
        return nn.Sequential(
            nn.Conv2d(in_channel, out_channel, 3),
            nn.BatchNorm2d(out_channel),
            nn.ReLU(),
            nn.MaxPool2d(2,2),
            nn.Dropout2d(p)
            )

    @staticmethod
    def linear_block(in_ftrs,out_ftrs,p):
        return nn.Sequential(
            nn.Linear(in_ftrs,out_ftrs),
            nn.BatchNorm1d(num_features=out_ftrs),
            nn.ReLU(),
            nn.Dropout(p)
            )

net = Model().to(device)
print('Traniable parameter of the model: ' , sum(param.numel() for param in net.parameters() if param.requires_grad == True))
print(net)

In [None]:
# loss function
criterion = nn.CrossEntropyLoss()
# optimizer
optimizer = optim.Adam(model.parameters())
# scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma = 0.7)

In [None]:
# let's train it for 10 epochs
num_epochs = 10
print_freq = 10

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(net, criterion, optimizer, train_loader, device, epoch, print_freq, apex=False)
    
    # update the learning rate
    scheduler.step()
    # evaluate on the test dataset
    evaluate(net, criterion, val_loader, device, print_freq)

print("That's it!")

## 3-D Convolutional Neural Network

In [None]:
class Model3D(nn.Module):
    def __init__(self, model = None):
        super(Model,self).__init__()
        if model is not None:
            self.model = model    
        else:
            self.conv3d = nn.Sequential(               
                            self.conv_block(1, 32, (3,3,3)),
                            self.conv_block(32, 64, (3,3,3)),
                            self.conv_block(64, 128, (3,3,3)),
                            self.conv_block(128, 512, (3,3,3)),
                            nn.Flatten()
            )           
            
            self.linear = nn.Sequential(
                            self.linear_block(1024,512,0.4),
                            self.linear_block(512,256,0.4),
                            nn.Linear(256, 8)
            ) 
            
            
            self.model = nn.Sequential(                                    
                           self.conv3d,
                           self.linear            
            )

    def forward(self,img):    
        return self.model(img)  

    @staticmethod
    def conv_block(in_channel, out_channel, p):
        return nn.Sequential(
            nn.Conv3d(in_channel,  out_channel,  (3,3,3) , (3,3,3)),
            nn.BatchNorm3d(out_channel),
            nn.ReLU(),
            nn.MaxPool3d(2,2),
            nn.Dropout3d(p)
            )

    @staticmethod
    def linear_block(in_ftrs,out_ftrs,p):
        return nn.Sequential(
            nn.Linear(in_ftrs,out_ftrs),
            nn.BatchNorm1d(num_features=out_ftrs),
            nn.ReLU(),
            nn.Dropout(p)
            )

net = Model().to(device)



# loss function
criterion = nn.CrossEntropyLoss()
# optimizer
optimizer = optim.Adam(model.parameters())
# scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma = 0.7)


# let's train it for 10 epochs
num_epochs = 10
print_freq = 10

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(net, criterion, optimizer, train_loader, device, epoch, print_freq, apex=False)
    
    # update the learning rate
    scheduler.step()
    # evaluate on the test dataset
    evaluate(net, criterion, val_loader, device, print_freq)

print("That's it!")