In [1]:
import torch
from torch import nn
from torch.utils.data import DataLoader
import torchvision
from torchvision import transforms
import matplotlib.pyplot as plt
import numpy as np
import os

from PIL import Image

import torch.nn.functional as F
from tqdm import tqdm

In [2]:
class EncoderVGG16(nn.Module):
    def __init__(self,
                 n_layers = 5,
                 h_dims = [512,256,128,10]):

        super(EncoderVGG16,self).__init__()
        self.n_layers = n_layers
        self.h_dims = h_dims
        self.layers = [5,10,17,24,31]
        self.full_vgg16 = torchvision.models.vgg16()
        self.vgg = self.full_vgg16.features[:self.layers[self.n_layers-1]]

        self.classifier = self.full_vgg16.classifier


        self.classifier[0] = nn.Linear(self.h_dims[0], self.h_dims[1])
        self.classifier[3] = nn.Linear(self.h_dims[1], self.h_dims[2])
        self.classifier[6] = nn.Linear(self.h_dims[2], self.h_dims[3])


    def encode(self,x, layer = 5):
        return self.vgg[:self.layers[layer-1]](x)

    def forward(self,x):
        latent = self.encode(x)
        latent = torch.flatten(latent,start_dim = 1)
        output = self.classifier(latent)

        return output

In [4]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [10]:
model = EncoderVGG16()
# Aqui se modifica cuales parametros utiliza
model.load_state_dict(torch.load("eachVGGParams_Clean_10.pt"))

<All keys matched successfully>

In [4]:
if not os.path.exists("data"):
    os.makedirs("data")
    print("Data directory created :D")

if not os.path.exists("data/train"):
    os.makedirs("data/train")
    print("Data train directory created :D")

if not os.path.exists("data/val"):
    os.makedirs("data/val")
    print("Data val directory created :D")

Data directory created :D
Data train directory created :D
Data val directory created :D


In [5]:
transform = transforms.Compose([
     transforms.ToTensor(),
    #  transforms.Lambda(gaussian_pixels),
     transforms.Normalize(mean=[0.49139968, 0.48215827 ,0.44653124],std=[0.24703233, 0.24348505, 0.26158768]),
     ])

In [6]:
train_dataset = torchvision.datasets.CIFAR10(root = "data/train/",train=True, transform= transform,download=True)
val_dataset = torchvision.datasets.CIFAR10(root = "data/val/",train = False, transform= transform, download=True)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/train/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:03<00:00, 49840064.19it/s]


Extracting data/train/cifar-10-python.tar.gz to data/train/
Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/val/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:02<00:00, 84677644.15it/s]


Extracting data/val/cifar-10-python.tar.gz to data/val/


In [7]:
trainloader = DataLoader(train_dataset, batch_size=1000,shuffle=True,)
valloader = DataLoader(val_dataset,batch_size= 1000,shuffle= False)

In [8]:
class EMI_Net(nn.Module):
    def __init__(self, x_shape=1, T_shape=1, H=20):
        super(EMI_Net, self).__init__()
        self.fc1 = nn.Linear(x_shape, H)
        self.fc2 = nn.Linear(T_shape, H)
        self.fc3 = nn.Linear(H, 1)

    def forward(self, x, T):
        h1 = F.relu(self.fc1(x)+self.fc2(T))
        h2 = self.fc3(h1)
        return h2


def estimateMI(model, train_loader, H=10, l=1, n_epoch=15):

  latent_shape = [16384,8192,4096,2048,512]

  model_MINE_X = EMI_Net(x_shape=3072,T_shape=latent_shape[l-1], H=H) # cambiar tamaños
  model_MINE_Y = EMI_Net(x_shape=10,T_shape=latent_shape[l-1], H=H) # cambiar tamaños

  optimizer_X = torch.optim.Adam(model_MINE_X.parameters(), lr=0.01)
  optimizer_Y = torch.optim.Adam(model_MINE_Y.parameters(), lr=0.01)

  X_emi = []
  Y_emi = []

  for epoch in tqdm(range(n_epoch)):
      for i, data in enumerate(train_loader):

          optimizer_X.zero_grad()
          optimizer_Y.zero_grad()

          # Data
          inputs, labels = data
          x_sample = torch.flatten(inputs, start_dim=1)
          T_sample = model.encode(inputs, layer = l)
          T_sample = torch.flatten(T_sample, start_dim=1)
          T_shuffle = T_sample[torch.randperm(T_sample.size()[0])]

          Y_sample = np.zeros((len(labels),10), dtype='float32')
          for k, idx in enumerate(labels):
            Y_sample[k,idx] = 1
          Y_sample = torch.from_numpy(Y_sample)

          # I(X,T)
          pred_xy = model_MINE_X(x_sample, T_sample)
          pred_x_y = model_MINE_X(x_sample, T_shuffle)
          ret = torch.mean(pred_xy) - torch.log(torch.mean(torch.exp(pred_x_y)))
          loss_X = - ret  # maximize
          model_MINE_X.zero_grad()
          loss_X.backward()
          optimizer_X.step()

          # Data
          T_sample = model.encode(inputs, layer = l)
          T_sample = torch.flatten(T_sample, start_dim=1)
          T_shuffle = T_sample[torch.randperm(T_sample.size()[0])]

          # I(Y,T)
          pred_xy = model_MINE_Y(Y_sample, T_sample)
          pred_x_y = model_MINE_Y(Y_sample, T_shuffle)
          ret = torch.mean(pred_xy) - torch.log(torch.mean(torch.exp(pred_x_y)))
          loss_Y = - ret  # maximize
          model_MINE_Y.zero_grad()
          loss_Y.backward()
          optimizer_Y.step()

      # save on each epoch
      X_emi.append(-loss_X.data.item())
      Y_emi.append(-loss_Y.data.item())

      print(f'Epoch {epoch}: I(X,T) = {-loss_X.data.item()}, I(Y,T) = {-loss_Y.data.item()}')
  return np.array(X_emi), np.array(Y_emi)

In [None]:
from google.colab import files

for l in [1,3,5]: # capas analizadas
  print(f'Analizando capa {l}')
  I_XT, I_YT = estimateMI(model, trainloader, H=10, l=l, n_epoch=10)
  filename = 'clean_epoch_10_layer_'+str(l)+'.npy'
  np.save('I_XT_'+filename, I_XT)
  np.save('I_YT_'+filename, I_YT)
  files.download('I_XT_'+filename)
  files.download('I_YT_'+filename)