In [None]:
%pylab inline
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data.dataloader as dataloader
import torch.optim as optim

from torch.utils.data import TensorDataset
from torch.autograd import Variable
from torchvision import transforms
from torchvision.datasets import MNIST
from tqdm import tqdm
from time import sleep
import pickle
import matplotlib.pyplot as plt
import math
import sys


SEED = 1

# CUDA?
cuda = torch.cuda.is_available()

# For reproducibility
torch.manual_seed(SEED)

if cuda:
    torch.cuda.manual_seed(SEED)

Populating the interactive namespace from numpy and matplotlib


In [None]:

train = MNIST('./data', train=True, download=True, transform=transforms.Compose([
    transforms.ToTensor(), # ToTensor does min-max normalization. 
]), )

test = MNIST('./data', train=False, download=True, transform=transforms.Compose([
    transforms.ToTensor(), # ToTensor does min-max normalization. 
]), )

# Create DataLoader
dataloader_args = dict(shuffle=True, batch_size=256,num_workers=4, pin_memory=True) if cuda else dict(shuffle=True, batch_size=64)
train_loader = dataloader.DataLoader(train, **dataloader_args)
test_loader = dataloader.DataLoader(test, **dataloader_args)

In [None]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4*4*50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x1 = F.relu(self.conv1(x))
        x = F.max_pool2d(x1, 2, 2)
        x2 = F.relu(self.conv2(x))
        x = F.max_pool2d(x2, 2, 2)
        x = x.view(-1, 4*4*50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x, x2, x1
    
model = Model()
if cuda:
    model.cuda() # CUDA!
optimizer = optim.Adam(model.parameters(), lr=1e-3) 

In [None]:
EPOCHS = 15
losses = []

model.train()
best_acc = 0
for epoch in range(EPOCHS):
    for batch_idx, (data, target) in enumerate(train_loader):
        # Get Samples
        data, target = Variable(data), Variable(target)
        
        if cuda:
            data, target = data.cuda(), target.cuda()
        
        # Init
        optimizer.zero_grad()

        # Predict
        y_pred = model(data)[0]

        # Calculate loss
        loss = F.cross_entropy(y_pred, target)
        losses.append(loss.cpu().data)       
        # Backpropagation
        loss.backward()
        optimizer.step()
        
        
        # Display
        if batch_idx % 100 == 1:
            print('\r Train Epoch: {}/{} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch+1,
                EPOCHS,
                batch_idx * len(data), 
                len(train_loader.dataset),
                100. * batch_idx / len(train_loader), 
                loss.cpu().data), 
                end='')
    # Eval
    evaluate_x = Variable(test_loader.dataset.data.type_as(torch.FloatTensor()))
    evaluate_y = Variable(test_loader.dataset.targets)
    if cuda:
        evaluate_x, evaluate_y = evaluate_x.cuda(), evaluate_y.cuda()

    model.eval()
    output = model(evaluate_x[:,None,...])[0]
    pred = output.data.max(1)[1]
    d = pred.eq(evaluate_y.data).cpu()
    accuracy = d.sum().type(dtype=torch.float64)/d.size()[0]
    
    # save best
    if accuracy > best_acc:
        best_acc = accuracy
        torch.save({
                  'model': model.state_dict(),
                 }, "./bestcnn.weights")
        print('\r Best model saved.\r')
        
    print('\r Train Epoch: {}/{} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\t Test Accuracy: {:.4f}%'.format(
        epoch+1,
        EPOCHS,
        len(train_loader.dataset), 
        len(train_loader.dataset),
        100. * batch_idx / len(train_loader), 
        loss.cpu().data,
        accuracy*100,
        end=''))

In [None]:
weights = model.conv1.weight.data.cpu()

# plot the first layer features
for i in range(weights.shape[0]):
    plt.subplot(4, 5, i+1)
    plt.axis('off')
    to_show = weights[i][0].cpu().numpy()
    plt.imshow((to_show - to_show.min())/(to_show.max() - to_show.min()))
plt.show()



In [None]:
weights = model.conv2.weight.data.cpu()
print(weights.shape)
for i in range(weights.shape[1]):
    plt.subplot(5,4, i+1)
    plt.axis('off')
    to_show = weights[i][0].cpu().numpy()
    # plt.imshow(rf['conv2_rf'][i].numpy()/(rf['processed']))
    plt.imshow((to_show - to_show.min())/(to_show.max() - to_show.min()))
plt.show()

In [None]:
convnet =   [[5,1,0], [2,2,0], [5,1,0], [2,2,0]]
layer_names = ['conv1','pool1','conv2','pool2']
imsize = 28

def outFromIn(conv, layerIn):
    n_in = layerIn[0]
    j_in = layerIn[1]
    r_in = layerIn[2]
    start_in = layerIn[3]
    
    k = conv[0]
    s = conv[1]
    p = conv[2]
    
    n_out = math.floor((n_in - k + 2*p)/s) + 1
    actualP = (n_out-1)*s - n_in + k
    pR = math.ceil(actualP/2)
    pL = math.floor(actualP/2)
    
    j_out = j_in * s
    r_out = r_in + (k - 1)*j_in
    start_out = start_in + ((k-1)/2 - pL)*j_in
    return n_out, j_out, r_out, start_out
  
def printLayer(layer, layer_name):
    print(layer_name + ":")
    print("\t n features: %s \n \t jump: %s \n \t receptive size: %s \t start: %s " % (
        layer[0], layer[1], layer[2], layer[3]))
    
layerInfos = []
#first layer is the data layer (image) with n_0 = image size; j_0 = 1; r_0 = 1; and start_0 = 0.5
print ("-------Net summary------")
currentLayer = [imsize, 1, 1, 0.5]
printLayer(currentLayer, "input image")
for i in range(len(convnet)):
    currentLayer = outFromIn(convnet[i], currentLayer)
    layerInfos.append(currentLayer)
    printLayer(currentLayer, layer_names[i])
print ("------------------------")

In [None]:
# generate random pattern, recording conv1 and conv2 neuron responses
iter_ = 100
all_size = 10000
p = 28

pos_x = 5
pos_y = 5

model.eval()
act_conv1 = []
act_conv2 = []
noise = []
with tqdm(total=iter_, file=sys.stdout) as pbar:
    for i in range(iter_):
        z = torch.rand(all_size, p, p)
        if cuda:
            z = z.cuda()
        with torch.no_grad():
            _, x2, x1 = model(z[:,None,...])
        act_conv2.append(x2[:, :, pos_x, pos_y].cpu())
        act_conv1.append(x1[:, :, pos_x, pos_y].cpu())
        noise.append(z.cpu())

        pbar.update(1)

act_conv1 = torch.cat(act_conv1)
act_conv2 = torch.cat(act_conv2)
noise = torch.cat(noise)

print(act_conv1.shape)
print(act_conv2.shape)
print(noise.shape)
print(act_conv1)
print(act_conv2)

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.utils.multiclass import unique_labels
import copy
import matplotlib.cm as cm

x = np.arange(11)
ys = [i+x+(i*x)**2 for i in range(11)]
colors = cm.rainbow(np.linspace(0, 1, len(ys)))

# - measure response to a noise pattern
# - increase the bias for all neurons proportional to their response in a certain layer
# - see if the classification has been biased toward the class of favor

test_x = test.data.type(torch.FloatTensor).clone() # Variable(test_loader.dataset.test_data.type_as(torch.FloatTensor()))
test_y = test.targets.type(torch.ByteTensor).clone() #Variable(test_loader.dataset.test_labels)
# test_x = test_x/256

train_x = train.data.type(torch.FloatTensor).clone() # Variable(test_loader.dataset.test_data.type_as(torch.FloatTensor()))
train_y = train.targets.type(torch.ByteTensor).clone() #Variable(test_loader.dataset.test_labels)


f, axarr = plt.subplots(3, 5, )
f.set_figheight(20)
f.set_figwidth(30)


f2, axarr2 = plt.subplots(1, 21, )
f2.set_figheight(5)
f2.set_figwidth(30)

all_size = test_x.size(0)

layers = ['fc1', 'conv2', 'conv1']


dig_names = [str(i) for i in range(5,10)]

for l_idx, layer in enumerate(layers):

  for which in range(5,10):
    which_data = train_x[train_y == which]/255.    # 1 vs. 7

    with torch.no_grad():

      for k_idx, k in enumerate(range(0,11)):
        fraction_pred_favor = []

        # modulate model
        model2 = Model()
        model2.load_state_dict(copy.deepcopy(model.state_dict()))
        model2.cuda()
        model2.eval()
        y_pred_x, fc1, conv2, conv1 = model2(which_data[:,None,...].cuda()) # amplify zero sensitive ones
        
        if layer == 'conv1':
          avg_activation = torch.mean(conv1, dim=0).mean(-1).mean(-1)
          aa = avg_activation/avg_activation.max()          
          model2.conv1.bias -= 0.1*k*aa # * model2.fc1.bias  #*avg_activation          
          l_legend = ['{0:.0f}'.format(0.1*k*i) for i in range(0,11)]
        elif  layer == 'conv2':
          avg_activation = torch.mean(conv2, dim=0).mean(-1).mean(-1)
          aa = avg_activation/avg_activation.max()          
          model2.conv2.bias -= k*aa # * model2.fc1.bias  #*avg_activation
          l_legend = [str(k*i) for i in range(0,11)]          
        else:
          avg_activation = torch.mean(fc1, dim=0)
          aa = avg_activation/avg_activation.max()
          model2.fc1.bias -= 0.01*k* avg_activation  #*avg_activation
          l_legend = ['{0:.1f}'.format(k*0.01*i) for i in range(0,11)]

        for idx, weight in enumerate(np.linspace(0,1,21)):    
            uu = test_x[test_y==which].clone()/255.
            z = torch.rand(uu.size(0), 28, 28) #.cuda()  
            z.cuda()

    #         stimuli = (1*z + weight*uu)/ (weight+1)
            stimuli = (1-weight)*z + weight*uu
#             stimuli = uu

            output, _, _, _ = model(stimuli[:,None,...].cuda())
            pred_n = output.data.max(1)[1]
            pred_n = pred_n.type(torch.ByteTensor).squeeze()

            if k==0:  
              axarr2[idx].imshow(stimuli[0])
              axarr2[idx].axis('off')      
              axarr2[idx].set_title('{:.2f}'.format((weight)))

            output_m, _, _, _ = model2(stimuli[:,None,...].cuda())
            pred_m = output_m.data.max(1)[1]
            pred_m = pred_m.type(torch.ByteTensor).squeeze()

            
            pp = (pred_n == which).sum().type(torch.FloatTensor) / pred_n.size(0) #       (test_y == which).sum()
            qq = (pred_m == which).sum().type(torch.FloatTensor)  / pred_m.size(0) #       (test_y == which).sum()  
            fraction_pred_favor.append([pp, qq])

        mat = np.array(fraction_pred_favor)

        axarr[l_idx, which-5].plot(mat[:,1], color=(colors[k_idx,:3]))
        axarr[l_idx, which-5].set_yticks([0.5,1])     

        axarr[l_idx, which-5].set_ylabel('acc')
        axarr[l_idx, which-5].set_title(dig_names[which-5] + '-' + layer)
  
        axarr[l_idx, which-5].set_xticks([i for i in range(0,21,4)])       
        axarr[l_idx, which-5].set_xticklabels(['{:.1f}'.format(i/20) for i in range(0,21,4)])
        l_legend = ['- ' + q for q in l_legend]
        axarr[l_idx, which-5].legend(l_legend)                   

f.subplots_adjust(hspace=0.8)
f.show()