In [1]:
from utils import load_it_data, visualize_img
import matplotlib.pyplot as plt
import numpy as np
from sklearn.decomposition import PCA
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.metrics import explained_variance_score, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import cross_val_score
import torch
from torch import Tensor
from torchvision.models import ResNet, resnet50, ResNet50_Weights
import pickle
import gc
from tqdm import tqdm
import csv

In [2]:
path_to_data = '' ## Insert the folder where the data is, if you download in the same folder as this notebook then leave it blank

stimulus_train, stimulus_val, stimulus_test, objects_train, objects_val, objects_test, spikes_train, spikes_val = load_it_data(path_to_data)

layers = ["conv1", "layer1", "layer2", "layer3", "layer4", "avgpool"]

In [3]:
def apply_PCA(layer, n_components = 1000) :
    """apply PCA on the activations of a layer

    Args:
        layer_file (string): name of the layer where the activations data are extracted
        n_components (int): number of components we want to keep

    Returns:
        activations: computed PC from the activations
    """
    file_name = layer +'.csv'
    activations = np.loadtxt(file_name, delimiter=",")
    pca = PCA(n_components=n_components)
    reduced_activations = pca.fit_transform(activations)
    
    pca_file = layer +'_pca.csv'
    np.savetxt(pca_file, reduced_activations, delimiter=",")    
    return reduced_activations

def extract_activation(self, stimuli: Tensor) :
    """extract the activations of the model for the given stimuli and layer

    Args:
        model (model): model we want to extract the activations from
        stimuli (ndarray): input data of the processed image's pixels

    Returns:
        dict: dictionary containing the activations for each layer of the model
    """
    import os
    for layer in layers :
        file_name = layer + '.csv'
        if os.path.exists(file_name): os.remove(file_name)
        n_stim = stimuli.size(dim=0)
        
    for x in tqdm(stimuli) : 
        x = x.unsqueeze(0)
        
        x = self.conv1(x)
        with open('conv1.csv', 'a', encoding='UTF8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(torch.flatten(x.squeeze(0)).detach().numpy())
        
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = self.layer1(x)
        with open('layer1.csv', 'a', encoding='UTF8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(torch.flatten(x.squeeze(0)).detach().numpy())
        
        x = self.layer2(x)
        with open('layer2.csv', 'a', encoding='UTF8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(torch.flatten(x.squeeze(0)).detach().numpy())
        
        x = self.layer3(x)
        with open('layer3.csv', 'a', encoding='UTF8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(torch.flatten(x.squeeze(0)).detach().numpy())      
            
        x = self.layer4(x)
        with open('layer4.csv', 'a', encoding='UTF8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(torch.flatten(x.squeeze(0)).detach().numpy())        

        x = self.avgpool(x)
        with open('avgpool.csv', 'a', encoding='UTF8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(torch.flatten(x.squeeze(0)).detach().numpy())        
    
ResNet.extract_activation = extract_activation

In [4]:
# load the pre-trained ResNet50 model
stimuli = torch.tensor(stimulus_train)
neural_activity = spikes_train
weights = ResNet50_Weights.DEFAULT
model = resnet50(weights=weights) # include_top = False?
model.eval()
print()




In [5]:
#Preprocess the stimuli
preprocess = weights.transforms()
img_transformed = preprocess(stimuli)

In [6]:
# extract the activations of the layers
activations = model.extract_activation(img_transformed)

no file
no file
no file
no file
no file
no file


100%|██████████| 2592/2592 [43:54<00:00,  1.02s/it]


In [None]:
# compute the 1000 first PCs
for layer in tqdm(layers) : 
    print(layer)
    print(apply_PCA(layer).shape)

  0%|          | 0/6 [00:00<?, ?it/s]

conv1


In [None]:
def best_alpha_Ridge(X, y, alphas):
    """implement cross validation to find the best alpha for Ridge regression

    Args:
        X (ndarray): input data
        y (ndarray): output data, neuronal activity
        alphas (list of double): list of alpha to test

    Returns:
        tuple (double, ndarray): best alpha and all the scores for each alpha
    """
    scores = []
    for alpha in alphas:
        model = Ridge(alpha=alpha)
        cv_scores = cross_val_score(model, X, y, cv=5)
        scores.append(np.mean(cv_scores))
    return alphas[np.argmax(scores)], scores

def plot_RidgeCV(alphas, scores):
    """plot the scores for each alpha

    Args:
        alphas (list of double): list of alpha that were tested
        scores (list of double): list of scores for each alpha
    """
    plt.plot(alphas, scores)
    plt.xlabel('alpha')
    plt.ylabel('score')
    plt.show()
    
def RidgeCV(X, y, alphas):
    """find the best alpha for Ridge regression and plot the scores for each alpha, then fit the model with the best alpha

    Args:
        X (ndarray): input data
        y (ndarray): output data, neuronal activity
        alphas (list of double): list of alpha to test

    Returns:
        tuple (model, double): the ridge model fitted with the best alpha and the corresponding alpha
    """
    best_alpha, scores = best_alpha_Ridge(X, y, alphas)
    plot_RidgeCV(alphas, scores)
    model = Ridge(alpha=best_alpha)
    model.fit(X, y)
    return model, best_alpha

In [None]:
activations = np.loadtxt("conv1.csv", delimiter=",") #, dtype=str)
print(activations.shape)

alphas = [100000000, 500000000, 1000000000]
ridge_model, best_alpha = RidgeCV(activations, spikes_train, alphas)