In [0]:
import torch

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
import numpy as np

from sklearn.utils.validation import check_is_fitted, check_array

def svd_flip(u, v, u_based_decision=True):
    u = u.view(u.size()[0],1)
    v = v.view(v.size()[0],1)
    if u_based_decision:
        # columns of u, rows of v
        max_abs_cols = torch.argmax(torch.abs(u), axis=0)
        signs = torch.sign(u[list(max_abs_cols), range(u[0].size()[0])])
        u *= signs
        v *= signs.unsqueeze(1)
    else:
        # rows of v, columns of u
        max_abs_rows = torch.argmax(torch.abs(v), axis=1)
        signs = torch.sign(v[range(list(v.size())[0]), max_abs_rows])
        u *= signs
        v *= signs.unsqueeze(1)
    u = u.flatten()
    v = v.flatten()
    return u, v

def _nipals_twoblocks_inner_loop(X, Y, mode="A", max_iter=500, tol=1e-06,
                                 norm_y_weights=False):
    """Inner loop of the iterative NIPALS algorithm.
    Provides an alternative to the svd(X'Y); returns the first left and right
    singular vectors of X'Y.  See PLS for the meaning of the parameters.  It is
    similar to the Power method for determining the eigenvectors and
    eigenvalues of a X'Y.
    """

    for col in Y.T:
        if torch.any(torch.abs(col) > torch.finfo(torch.double).eps):

            y_score = col.detach().view(col.size())

            break

    # for col in Y.T:
    #     if np.any(np.abs(col) > np.finfo(np.double).eps):
    #         y_score = col.reshape(len(col), 1)
    #         break

    x_weights_old = 0
    ite = 1
    X_pinv = Y_pinv = None
    eps = torch.finfo(X.dtype).eps

    if mode == "B":
        # Uses condition from scipy<1.3 in pinv2 which was changed in
        # https://github.com/scipy/scipy/pull/10067. In scipy 1.3, the
        # condition was changed to depend on the largest singular value
        X_t = X.dtype.char.lower()
        Y_t = Y.dtype.char.lower()
        factor = {'f': 1E3, 'd': 1E6}

        cond_X = factor[X_t] * eps
        cond_Y = factor[Y_t] * eps

    # Inner loop of the Wold algo.
    while True:
        # 1.1 Update u: the X weights
        if mode == "B":
            if X_pinv is None:
                # We use slower pinv2 (same as np.linalg.pinv) for stability
                # reasons
                X_pinv = torch.pinverse(X, check_finite=False, cond=cond_X)
            x_weights = torch.mm(X_pinv, y_score)
        else:  # mode
            # Mode A regress each X column on y_score

            x_weights = torch.mv(X.T, y_score) / torch.dot(y_score.T, y_score)
        # If y_score only has zeros x_weights will only have zeros. In
        # this case add an epsilon to converge to a more acceptable
        # solution
        if torch.dot(x_weights.T, x_weights) < eps:
            x_weights += eps
        # 1.2 Normalize u
        x_weights /= torch.sqrt(torch.dot(x_weights.T, x_weights)) + eps
        # 1.3 Update x_score: the X latent scores
        x_score = torch.mv(X, x_weights)
        # 2.1 Update y_weights
        if mode == "B":
            if Y_pinv is None:
                # compute once pinv(Y)
                Y_pinv = torch.pinverse(Y, check_finite=False, cond=cond_Y)
            y_weights = torch.mm(Y_pinv, x_score)
        else:
            # Mode A regress each Y column on x_score
            y_weights = torch.mv(Y.T, x_score) / torch.dot(x_score.T, x_score)
        # 2.2 Normalize y_weights
        if norm_y_weights:
            y_weights /= torch.sqrt(torch.mm(y_weights.T, y_weights)) + eps
        # 2.3 Update y_score: the Y latent scores
        y_score = torch.mv(Y, y_weights) / (torch.dot(y_weights.T, y_weights) + eps)
        # y_score = np.dot(Y, y_weights) / np.dot(y_score.T, y_score) ## BUG
        x_weights_diff = x_weights - x_weights_old

        if torch.dot(x_weights_diff.T, x_weights_diff) < tol or Y.size()[1] == 1:
            break
        if ite == max_iter:
            warnings.warn('Maximum number of iterations reached',
                          ConvergenceWarning)
            break
        x_weights_old = x_weights
        ite += 1
    return x_weights, y_weights, ite

def _center_scale_xy(X, Y, scale=True):
    """ Center X, Y and scale if the scale parameter==True
    Returns
    -------
        X, Y, x_mean, y_mean, x_std, y_std
    """
    # center
    x_mean = torch.mean(X, axis=0)
    X -= x_mean
    y_mean = torch.mean(Y, axis=0)
    Y -= y_mean
    # scale
    if scale:
        x_std = torch.std(X, dim = 0)
        x_std[x_std == 0.0] = 1.0
        X = X/x_std
        y_std = torch.std(Y, dim = 0)
        y_std[y_std == 0.0] = 1.0
        Y = Y/y_std
    else:
        x_std = torch.ones(X.size()[1])
        y_std = torch.ones(Y.size()[1])
    return X, Y, x_mean, y_mean, x_std, y_std

class PLS:
    def __init__(self, n_components=2, *, scale=True,
                 deflation_mode="regression",
                 mode="A", algorithm="nipals", norm_y_weights=False,
                 max_iter=500, tol=1e-06, copy=True):
        self.n_components = n_components
        self.deflation_mode = deflation_mode
        self.mode = mode
        self.norm_y_weights = norm_y_weights
        self.scale = scale
        self.algorithm = algorithm
        self.max_iter = max_iter
        self.tol = tol
        self.copy = copy
        self.scale = scale

    def fit(self, X, Y):
        """Fit model to data.
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training vectors, where n_samples is the number of samples and
            n_features is the number of predictors.
        Y : array-like of shape (n_samples, n_targets)
            Target vectors, where n_samples is the number of samples and
            n_targets is the number of response variables.
        """
        X = X.clone().detach()
        Y = Y.clone().detach()
        if Y.ndim == 1:
            Y = Y.reshape(-1, 1)

        n = X.size()[0]
        p = X.size()[1]
        q = Y.size()[1]

        # if self.n_components < 1 or self.n_components > p:
        #     raise ValueError('Invalid number of components: %d' %
        #                      self.n_components)
        # if self.algorithm not in ("svd", "nipals"):
        #     raise ValueError("Got algorithm %s when only 'svd' "
        #                      "and 'nipals' are known" % self.algorithm)
        # if self.algorithm == "svd" and self.mode == "B":
        #     raise ValueError('Incompatible configuration: mode B is not '
        #                      'implemented with svd algorithm')
        # if self.deflation_mode not in ["canonical", "regression"]:
        #     raise ValueError('The deflation mode is unknown')
        # Scale (in place)
        X, Y, self.x_mean_, self.y_mean_, self.x_std_, self.y_std_ = (
            _center_scale_xy(X, Y, self.scale))
        # Residuals (deflated) matrices
        Xk = X
        Yk = Y

        # Results matrices
        self.x_scores_ = torch.zeros((n, self.n_components))
        self.y_scores_ = torch.zeros((n, self.n_components))
        self.x_weights_ = torch.zeros((p, self.n_components))
        self.y_weights_ = torch.zeros((q, self.n_components))
        self.x_loadings_ = torch.zeros((p, self.n_components))
        self.y_loadings_ = torch.zeros((q, self.n_components))
        self.n_iter_ = []

        # NIPALS algo: outer loop, over components
        Y_eps = torch.finfo(Yk.dtype).eps

        for k in range(self.n_components):
            if torch.all(torch.mm(Yk.T, Yk) < torch.finfo(torch.double).eps):
                # Yk constant
                warnings.warn('Y residual constant at iteration %s' % k)
                break
            # 1) weights estimation (inner loop)
            # -----------------------------------
            if self.algorithm == "nipals":
                # Replace columns that are all close to zero with zeros
                Yk_mask = torch.all(torch.abs(Yk) < 10 * Y_eps, axis=0)
                Yk[:, Yk_mask] = 0.0

                x_weights, y_weights, n_iter_ = \
                    _nipals_twoblocks_inner_loop(
                        X=Xk, Y=Yk, mode=self.mode, max_iter=self.max_iter,
                        tol=self.tol, norm_y_weights=self.norm_y_weights)
                self.n_iter_.append(n_iter_)

            elif self.algorithm == "svd":
                x_weights, y_weights = _svd_cross_product(X=Xk, Y=Yk)
            # Forces sign stability of x_weights and y_weights
            # Sign undeterminacy issue from svd if algorithm == "svd"
            # and from platform dependent computation if algorithm == 'nipals'

            # POTENTIAL TO DO

            x_weights, y_weights = svd_flip(x_weights, y_weights.T)
            y_weights = y_weights.T
            # columns of u, rows of v

            # compute scores
            x_scores = torch.mv(Xk, x_weights)

            if self.norm_y_weights:
                y_ss = 1
            else:
                y_ss = torch.dot(y_weights.T, y_weights)

            y_scores = torch.mv(Yk, y_weights) / y_ss

            # test for null variance
            if torch.dot(x_scores.T, x_scores) < torch.finfo(torch.double).eps:
                warnings.warn('X scores are null at iteration %s' % k)
                break
            # 2) Deflation (in place)
            # ----------------------
            # Possible memory footprint reduction may done here: in order to
            # avoid the allocation of a data chunk for the rank-one
            # approximations matrix which is then subtracted to Xk, we suggest
            # to perform a column-wise deflation.
            #
            # - regress Xk's on x_score

            x_loadings = torch.mv(Xk.T, x_scores) / torch.dot(x_scores.T, x_scores)
            # - subtract rank-one approximations to obtain remainder matrix
            Xk -= x_scores[:, None] * x_loadings.T
            if self.deflation_mode == "canonical":
                # - regress Yk's on y_score, then subtract rank-one approx.
                y_loadings = (torch.mv(Yk.T, y_scores)
                              / torch.dot(y_scores.T, y_scores))
                Yk -= y_scores[:, None] * y_loadings.T
            if self.deflation_mode == "regression":
                # - regress Yk's on x_score, then subtract rank-one approx.
                y_loadings = (torch.mv(Yk.T, x_scores)
                              / torch.dot(x_scores.T, x_scores))
                Yk -= x_scores[:, None] * y_loadings.T
            # 3) Store weights, scores and loadings # Notation:
            self.x_scores_[:, k] = x_scores.view(-1)  # T
            self.y_scores_[:, k] = y_scores.view(-1)  # U
            self.x_weights_[:, k] = x_weights.view(-1)  # W
            self.y_weights_[:, k] = y_weights.view(-1)  # C
            self.x_loadings_[:, k] = x_loadings.view(-1)  # P
            self.y_loadings_[:, k] = y_loadings.view(-1)  # Q

        # Such that: X = TP' + Err and Y = UQ' + Err

        # 4) rotations from input space to transformed space (scores)
        # T = X W(P'W)^-1 = XW* (W* : p x k matrix)
        # U = Y C(Q'C)^-1 = YC* (W* : q x k matrix)
        self.x_rotations_ = torch.mm(
            self.x_weights_,
            torch.pinverse(torch.mm(self.x_loadings_.T, self.x_weights_)))
        if Y.size()[1] > 1:
            self.y_rotations_ = torch.mm(
                self.y_weights_,
                torch.pinverse(torch.mm(self.y_loadings_.T, self.y_weights_)))
        else:
            self.y_rotations_ = torch.ones(1)

        if True or self.deflation_mode == "regression":
            # FIXME what's with the if?
            # Estimate regression coefficient
            # Regress Y on T
            # Y = TQ' + Err,
            # Then express in function of X
            # Y = X W(P'W)^-1Q' + Err = XB + Err
            # => B = W*Q' (p x q)

            self.coef_ = torch.mm(self.x_rotations_, self.y_loadings_.T)
            self.coef_ = self.coef_.to("cuda")
            self.y_std_ = self.y_std_.to("cuda")
            # self.coef_ = torch.mv(self.coef_, self.y_std_)
            self.coef_ = self.coef_[:, None] * self.y_std_
            self.coef_ = self.coef_[:,0,:]

        return self

    def transform(self, X, Y=None, copy=True):
        """Apply the dimension reduction learned on the train data.
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training vectors, where n_samples is the number of samples and
            n_features is the number of predictors.
        Y : array-like of shape (n_samples, n_targets)
            Target vectors, where n_samples is the number of samples and
            n_targets is the number of response variables.
        copy : boolean, default True
            Whether to copy X and Y, or perform in-place normalization.
        Returns
        -------
        x_scores if Y is not given, (x_scores, y_scores) otherwise.
        """
        check_is_fitted(self)
        X = check_array(X, copy=copy, dtype=FLOAT_DTYPES)
        # Normalize
        X -= self.x_mean_
        X /= self.x_std_
        # Apply rotation
        x_scores = torch.mm(X, self.x_rotations_)
        if Y is not None:
            Y = check_array(Y, ensure_2d=False, copy=copy, dtype=FLOAT_DTYPES)
            if Y.ndim == 1:
                Y = Y.reshape(-1, 1)
            Y -= self.y_mean_
            Y /= self.y_std_
            y_scores = torch.mm(Y, self.y_rotations_)
            return x_scores, y_scores

        return x_scores

    def inverse_transform(self, X):
        """Transform data back to its original space.
        Parameters
        ----------
        X : array-like of shape (n_samples, n_components)
            New data, where n_samples is the number of samples
            and n_components is the number of pls components.
        Returns
        -------
        x_reconstructed : array-like of shape (n_samples, n_features)
        Notes
        -----
        This transformation will only be exact if n_components=n_features
        """
        check_is_fitted(self)
        X = check_array(X, dtype=FLOAT_DTYPES)
        # From pls space to original space
        X_reconstructed = torch.matmul(X, self.x_loadings_.T)

        # Denormalize
        X_reconstructed *= self.x_std_
        X_reconstructed += self.x_mean_
        return X_reconstructed

    def predict(self, X, copy=True):
        """Apply the dimension reduction learned on the train data.
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training vectors, where n_samples is the number of samples and
            n_features is the number of predictors.
        copy : boolean, default True
            Whether to copy X and Y, or perform in-place normalization.
        Notes
        -----
        This call requires the estimation of a p x q matrix, which may
        be an issue in high dimensional space.
        """
        # TODO: check fitted and check array
        # check_is_fitted(self)
        # X = check_array(X, copy=copy, dtype=FLOAT_DTYPES)
        # Normalize
        X -= self.x_mean_
        X /= self.x_std_
        # print(X[:, None] * self.coef_ + self.y_mean_)
        # print(torch.mv(X, self.coef_) + self.y_mean_)
        Ypred = torch.mm(X, self.coef_)
        return Ypred + self.y_mean_

    def fit_transform(self, X, y=None):
        """Learn and apply the dimension reduction on the train data.
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training vectors, where n_samples is the number of samples and
            n_features is the number of predictors.
        y : array-like of shape (n_samples, n_targets)
            Target vectors, where n_samples is the number of samples and
            n_targets is the number of response variables.
        Returns
        -------
        x_scores if Y is not given, (x_scores, y_scores) otherwise.
        """
        return self.fit(X, y).transform(X, y)

    def _more_tags(self):
        return {'poor_score': True}

In [5]:
import torchvision.transforms as transforms
import torch.nn as nn
import torch

# from fastai.torch_core import flatten_model

import numpy as np
import pandas as pd
import seaborn as sns
from glob import glob
import matplotlib.pyplot as plt
from tqdm import tqdm_notebook as tqdm

from fastai.vision import *
from fastai.metrics import error_rate
from PIL import Image
from google_drive_downloader import GoogleDriveDownloader as gdd

import pickle

from sklearn.cross_decomposition import PLSRegression

from sklearn.model_selection import train_test_split
from sklearn.metrics import average_precision_score

# prepare data
PATH = "drive/My Drive/neuro140/"
infile = open(PATH+'/mouse_brain_data_sample.pkl','rb')
mouse_pickle = pickle.load(infile)
VISam = mouse_pickle['VISam']
print(VISam[118])
VIspm = mouse_pickle['VISpm']

img_array = np.load(PATH+'/stimulus_set.npy')

create_images = False
dictlist = []
img_names = []
for i in range(119):
  pngfilename = str(i)+str('.jpg')
  if (create_images):
    im = Image.fromarray(img_array[i])
    im.save(PATH + 'images/' + pngfilename)
  row = {'ImageName': pngfilename, 'ImageIndex': i}
  img_names.append(pngfilename)
  dictlist.append(row)

[0.120102 0.020811 0.       0.018055 ... 0.008298 0.004659 0.       0.      ]


In [6]:
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.autograd import Variable
from PIL import Image

from tqdm import tqdm_notebook as tqdm

from fastai.torch_core import flatten_model

preprocessing = transforms.Compose([
    transforms.Resize((224,224)), 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

class SaveFeatures():
    def __init__(self, module):
        self.hook = module.register_forward_hook(self.hook_fn)
    def hook_fn(self, module, input, output):
        self.features = output.clone().detach().requires_grad_(True).cuda()
    def close(self):
        self.hook.remove()
        
def get_layer_names(layers):
    layer_names = []
    for layer in layers:
        layer_name = str(layer).split('(')[0]
        layer_names.append(layer_name + '-' + str(sum(layer_name in string for string in layer_names) + 1))
    return layer_names

def get_activations(model, img, layers, target_layer):
    img_tensor = preprocessing(img).unsqueeze(0)
    activations = SaveFeatures(layers[target_layer])
    model(img_tensor)
    activations.close()
    return activations.features.detach().cpu().numpy().squeeze()

# def flatten_batch(features):
#     return features.view(features.size()[0], torch.prod(torch.tensor(features.size()[1:])).item())

model = models.resnet18(pretrained=True)
# model.eval()

layers = flatten_model(model)
layer_names = get_layer_names(layers)
print(layer_names)

path = Path(PATH + 'images/')

layer_wise_activation = False
if(layer_wise_activation):
  classifier_rdms = {}
  for layer_index, layer in enumerate(tqdm(layers[:-2])):
      image_activations = []
      for image in img_names:
          image_array = Image.open(path/image)
          print(get_activations(model, image_array, layers, layer_index).flatten())
          image_activations.append(get_activations(model, image_array, layers, layer_index).flatten())
      layer_features = np.stack(image_activations)
  #save file
  with open(PATH + 'classifier_dict.pickle', 'wb') as f:
    pickle.dump(classifier_rdms, f)

image_wise_activations = False
if(image_wise_activations):
  image_activations = []
  for image in img_names:
    image_array = Image.open(path/image)
    layer_activations = []
    for layer_index, layer in enumerate(tqdm(layers[:-2])):
      layer_activations.append(get_activations(model, image_array, layers, layer_index))
    image_activations.append(layer_activations)
  #save file
  x = np.array(image_activations)
  np.save(str(PATH + 'image_activations.npy'), x, allow_pickle=True)

# scaler = transforms.Scale((224, 224))df
# normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
#                                  std=[0.229, 0.224, 0.225])
# to_tensor = transforms.ToTensor()

# print(model.named_modules())

# layer1 = model._modules.get('Conv2d-1')
    
# print(layer1)
# img_vectors = []
# for i in range(len(img_names)):
#   img_vectors.append([])
#   img = str(PATH + 'images/' + img_names[i])
#   for j in layer_names:
#     print(layer1)
#     layer1 = model._modules.get(layer1)
#     img_vectors[i].append(get_vector(img, layer1))

# print(img_vectors['0.jpg'].size())

['Conv2d-1', 'BatchNorm2d-1', 'ReLU-1', 'MaxPool2d-1', 'Conv2d-2', 'BatchNorm2d-2', 'ReLU-2', 'Conv2d-3', 'BatchNorm2d-3', 'Conv2d-4', 'BatchNorm2d-4', 'ReLU-3', 'Conv2d-5', 'BatchNorm2d-5', 'Conv2d-6', 'BatchNorm2d-6', 'ReLU-4', 'Conv2d-7', 'BatchNorm2d-7', 'Conv2d-8', 'BatchNorm2d-8', 'Conv2d-9', 'BatchNorm2d-9', 'ReLU-5', 'Conv2d-10', 'BatchNorm2d-10', 'Conv2d-11', 'BatchNorm2d-11', 'ReLU-6', 'Conv2d-12', 'BatchNorm2d-12', 'Conv2d-13', 'BatchNorm2d-13', 'Conv2d-14', 'BatchNorm2d-14', 'ReLU-7', 'Conv2d-15', 'BatchNorm2d-15', 'Conv2d-16', 'BatchNorm2d-16', 'ReLU-8', 'Conv2d-17', 'BatchNorm2d-17', 'Conv2d-18', 'BatchNorm2d-18', 'Conv2d-19', 'BatchNorm2d-19', 'ReLU-9', 'Conv2d-20', 'BatchNorm2d-20', 'AdaptiveAvgPool2d-1', 'Linear-1']


In [15]:
# image_activations = np.load(str(PATH+'image_activations.npy'), allow_pickle=True)

# for row in img_activations:
#   print(column.shape)

# print(VISam[0])
# X = np.transpose(VISam)[0].shape

layer_3 = np.zeros((119, 200704))
for i in range(len(image_activations)):
  layer_3[i] = image_activations[i][3].flatten()

print(layer_3.shape)
print(VISam.shape)
# neuron wise activations
VISam_neuron_wise_pred = []
for i in range(VISam.shape[0]):
  x = layer_3
  y = np.transpose(VISam)[i]
  y = y.reshape(-1, 1)
  # check_array(y, dtype=np.float64, ensure_2d=False)
  print(x.shape)
  print(y.shape)
  X_train, X_test, Y_train, Y_test = train_test_split(x, y, test_size=0.2, random_state=42)
  print(X_test)
  modl = PLSRegression(n_components=25)
  modl = modl.fit(x, y)
  Y_pred = modl.predict(X_test)
  VISam_neuron_wise_pred.append(Y_pred)
  print(Y_pred)

# # R_X = torch.tensor(X_train).type(torch.cuda.FloatTensor)
# # R_Y = torch.tensor(Y_train).type(torch.cuda.FloatTensor)
# # R_Xtest = torch.tensor(X_test).type(torch.cuda.FloatTensor)ççç
# # modl = PLS(n_components=2).fit(R_X, R_Y)
# # Y_pred = modl.predict(R_Xtest)
# # Y_pred = Y_pred.cpu().numpy()

# # print(np.average(np.corrcoef(Y_pred, Y_test)))


(119, 200704)
(119, 221)
(119, 200704)
(119, 1)
[[0.144833 0.167497 0.186572 0.186572 ... 0.92448  0.886856 0.802933 0.593327]
 [0.422123 0.422123 0.864352 1.13177  ... 0.327513 0.327513 0.616996 0.616996]
 [1.497047 1.649969 1.968521 1.975232 ... 1.073336 1.230291 1.031057 1.675296]
 [2.28661  2.28661  2.359669 2.380687 ... 0.405165 0.212393 0.399554 0.399554]
 ...
 [1.234699 1.234699 1.448261 1.448261 ... 0.676144 0.686929 0.636533 0.846747]
 [0.196176 0.196176 0.158061 0.120332 ... 0.357042 0.248835 0.248835 0.23788 ]
 [0.175792 0.079074 0.187394 0.187394 ... 0.238883 0.238883 0.235768 0.244489]
 [1.372641 1.503724 1.534388 1.534388 ... 1.005076 1.246447 1.259979 1.614688]]
[[ 8.877196e-03]
 [ 2.486467e-03]
 [ 9.368593e-02]
 [ 7.341749e-10]
 [ 2.133432e-02]
 [ 8.296191e-02]
 [ 1.401422e-09]
 [ 6.056975e-03]
 [ 8.654715e-03]
 [ 2.715355e-02]
 [ 1.796429e-02]
 [ 2.202499e-03]
 [ 1.319784e-02]
 [ 3.728627e-03]
 [ 2.055124e-02]
 [ 6.709184e-03]
 [ 3.975109e-03]
 [ 8.755714e-03]
 [ 7.766

KeyboardInterrupt: ignored

In [0]:
def cov(x, rowvar = False):
  if not rowvar:
    x = x.t()
  f = 1.0 / (x.size(1) - 1)
  x -= torch.mean(x, dim=1, keepdim=True)
  xt = x.t()
  c = f * x.matmul(xt).squeeze()
  return c


def PCA(data, dims = 2):
  """
  based on numpy function:
  https://stackoverflow.com/questions/13224362/principal-component-analysis-pca-in-python
  """
  n = data.shape
  mx = torch.mean(data, dim=0)

  x = data - mx
  R = cov(x, False)

  evals, evecs = torch.symeig(R_t, eigenvectors = True, upper = True)
  
  idx = torch.argsort(evals, descending = True)
  evecs = evecs[:,idx]
  evals = evals[idx]

  evecs = evecs[:, :dims]

  return torch.mm(evecs.T, data.T).T, evals, evecs


from sklearn.decomposition import PCA as PCA_sk

pca = PCA_sk(n_components=5)
pca.fit(VISam)
Sam_c = pca.components_
print(Sam_c)

pca = PCA_sk(n_components=5)
pca.fit(VIspm)
Spm_c = pca.components_
print(Spm_c)

In [0]:
# model_ids = {'alexnet': models.alexnet, 'vgg16': models.vgg16_bn, 'vgg19': models.vgg19_bn, 'resnet18': models.resnet18, 'resnet34': models.resnet34, 'resnet50': models.resnet50, 'resnet152': models.resnet152}

# experiment = {'train_data': 'SimpleShapes',
#               'test_data': 'SimpleShapes',
#               'subset': 'Stack_3',
#               'model': 'alexnet', 
#               'finetuning': False, 
#               'trainCycles': 3, 
#               'trainSetSize': 90,
#               'valSetSize': 28}

# from random import shuffle
# def shuffling_split(train_size, test_size, n_splits=1):
#     total_size = train_size + test_size
#     indices = [i for i in range(1,total_size+1)]
#     train_indices = []
#     test_indices = []
#     for i in range(n_splits):
#         indices = [i for i in range(1,total_size+1)]
#         shuffle(indices)
#         train = indices[:train_size]
#         test = indices[train_size:]
#         train_indices.append(train)
#         test_indices.append(test)
#     if n_splits == 1:
#         return train_indices[0], test_indices[0]
#     if n_splits > 1:
#         return train_indices, test_indices

# '_'.join('{}_{}'.format(key, val) for key, val in experiment.items())

# labels = pd.DataFrame(dictlist)
# test_indices = shuffling_split(experiment['trainSetSize'], experiment['valSetSize'])[1]
# labels['TestSet'] = pd.to_numeric(labels['ImageIndex']).isin(test_indices)

# data = (ImageList.from_df(labels, PATH + 'images/', cols='ImageName').split_from_df(col='TestSet').label_from_df(cols='ImageIndex').databunch(bs=48)).normalize(imagenet_stats)
# ipath = Path(PATH + 'images/')

# learn = cnn_learner(data, model_ids['alexnet'],  metrics=accuracy)

# learn.summary()
# learn.lr_find()
# learn.recorder.plot()

# class SaveFeatures():
#     def __init__(self, module):
#         self.hook = module.register_forward_hook(self.hook_fn)
#     def hook_fn(self, module, input, output):
#         self.features = output.clone().detach().requires_grad_(True).cuda()
#     def close(self):
#         self.hook.remove()
        
# def get_layer_names(layers):
#     layer_names = []
#     for layer in layers:
#         layer_name = str(layer).split('(')[0]
#         layer_names.append(layer_name + '-' + str(sum(layer_name in string for string in layer_names) + 1))
#     return layer_names

# def get_activations(model, img, layers, target_layer):
#     img_tensor = preprocessing(img).unsqueeze(0).cuda()
#     activations = SaveFeatures(layers[target_layer])
#     model(img_tensor)
#     activations.close()
#     return activations.features.detach().cpu().numpy().squeeze()

# def flatten_batch(features):
#     return features.view(features.size()[0], torch.prod(torch.tensor(features.size()[1:])).item())

# model = learn.model
# layers = flatten_model(model)
# layer_names = get_layer_names(layers)
# model.eval()

# classifier_rdms = {}
# for layer_index, layer in enumerate(tqdm(layers[:-2])):
#     image_activations = []
#     for image in tqdm(labels[labels.TestSet == 1]['ImageName'][:100], leave=False):
#         image_array = Image.open(ipath/image)
#         image_activations.append(get_activations(model, image_array, layers, layer_index).flatten())
#     layer_features = np.stack(image_activations)
#     classifier_rdms[layer_names[layer_index]] = np.corrcoef(layer_features)

In [0]:
print(classifier_rdms)