# CNN Visualization

In this notebook we cover:
1. Implementation of Deconvnets 
2. Application of Deconvnets to a pre trained network
3. Implementation and application of occlusion to gather heatmap data. 

The work in this notebook is based on the one provided for the [lab](https://github.com/aeau/MAU-AML-labs/blob/develop/1-computer-vision-lab/3-using-deconvnet.ipynb)

## Imports & Setup

In [370]:
import torch

import numpy as np
import torch.nn as nn
import plotly.express as px
import torch.optim as optim
import torch.nn.functional as F

from torch.optim import lr_scheduler
from torchvision import datasets, transforms, models

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

import warnings
warnings.filterwarnings("ignore", category=UserWarning) 

## Defining a CNN with Deconvnet 

In [371]:
class CNN(nn.Module):
    def __init__(self, input_size, n_feature, output_size):
        super(CNN, self).__init__()
        
        self.n_feature = n_feature
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=n_feature, kernel_size=5)
        self.pool1 = nn.MaxPool2d(2, stride=2, return_indices=True)
            
        self.conv2 = nn.Conv2d(n_feature, n_feature, kernel_size=5)
        self.pool2 = nn.MaxPool2d(2, stride=2, return_indices=True)
        
        # Necessary information for the deconvnet
        self.maxpool_indices = [] # 2, [0: [0,1,2, ... ,n], 1]
        self.feature_maps = [] # size = 4
        self.deconvs = {}
        
        self.fc1 = nn.Linear(n_feature*4*4, 50)
        self.fc2 = nn.Linear(50, 10)
        
    def initialize_deconv(self):
        
        self.deconvs = nn.Sequential(
            nn.MaxUnpool2d(2, stride=2),
            nn.ConvTranspose2d(in_channels=self.n_feature, out_channels=self.n_feature, kernel_size=5),
            nn.MaxUnpool2d(2, stride=2),
            nn.ConvTranspose2d(in_channels=self.n_feature, out_channels=1, kernel_size=5)
        )
        
        self.deconvs[1].weight.data = self.conv2.weight.data 
        self.deconvs[3].weight.data = self.conv1.weight.data
       
    def forward(self, x, verbose=False):
        self.feature_maps = []
        self.maxpool_indices = []
        
        x = self.conv1(x)
        self.feature_maps.append(x)
        
        x = F.relu(x)
        
        x, ind = self.pool1(x)
        self.feature_maps.append(x)
        self.maxpool_indices.append(ind)
        
        x = self.conv2(x)
        self.feature_maps.append(x)
        
        x = F.relu(x)
        
        x, ind = self.pool2(x)
        self.feature_maps.append(x)
        self.maxpool_indices.append(ind)
        
        self.prefc = x
        
        x = x.view(-1, self.n_feature*4*4) #flatten is the same
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.log_softmax(x, dim=1)
        return x
    
    def deconv(self, x, layer):     
        start_pos = abs(((layer * 2) - len(self.deconvs)) + 2)
        next_layer = layer
        
        for idx in range(start_pos, len(self.deconvs), 1):
            if isinstance(self.deconvs[idx], nn.MaxUnpool2d):
                x = self.deconvs[idx](x, self.maxpool_indices[next_layer])
                x = F.relu(x)
                next_layer = next_layer - 1
            else:
                x = self.deconvs[idx](x)
                
        return x

In [372]:
model = CNN(28, 9, 10)
print(model)

CNN(
  (conv1): Conv2d(1, 9, kernel_size=(5, 5), stride=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(9, 9, kernel_size=(5, 5), stride=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=144, out_features=50, bias=True)
  (fc2): Linear(in_features=50, out_features=10, bias=True)
)


## Loading Data 

In [373]:
input_size  = 28*28   # images are 28x28 pixels
output_size = 10      # there are 10 classes

train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=True, download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=64, shuffle=True)


test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, download=True, transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=1000, shuffle=True)

## Defining train and test functions

In [374]:
def train(model, optimizer, verbose=True, print_freq=450):

    epoch_loss = 0
    model.train()

    for batch_idx, (data, target) in enumerate(train_loader):
        # send to device
        data, target = data.to(device), target.to(device) # [64, 1, 28, 28]

        optimizer.zero_grad()
        output = model(data)
        
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()*data.size(0)
        
        if batch_idx % print_freq == 0 & verbose:
            processed = (batch_idx) * len(data)
            percent = 100. * (batch_idx) / len(train_loader)
            print(f'Train step: {batch_idx+1} [{processed}/{len(train_loader.dataset)} ({percent:.0f}%)]\tLoss: {loss.item():.6f}')

    return epoch_loss/len(train_loader.dataset)

def test(model):
    
    model.eval()

    test_loss = 0
    correct = 0

    for data, target in test_loader:
        # send to device
        data, target = data.to(device), target.to(device)

        output = model(data)

        test_loss += F.nll_loss(output, target).item() * len(data) 
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability                                                                 
        correct += pred.eq(target.data.view_as(pred)).cpu().sum().item()

    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)

    print(f'Test set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.0f}%)')

    return (test_loss, accuracy)

def fit(model, device, optimizer, epochs):
    model.to(device)
    train_losses = []
    test_losses = []
    test_accuracies = []
    for epoch in range(0, epochs):
        print(f'Epoch: {epoch +1}/{epochs}')
        train_losses.append(train(model, optimizer))
        test_loss, test_accuracy = test(model)
        test_losses.append(test_loss)
        test_accuracies.append(test_accuracy)
    return {'train_losses': train_losses, 'test_losses': test_losses, 'test_accuracies': test_accuracies}


## Training Model

In [375]:
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
fit(model, device, optimizer, 1)

Epoch: 1/1
Test set: Average loss: 0.1290, Accuracy: 9626/10000 (96%)


{'train_losses': [0.45943947306871413],
 'test_losses': [0.12902900204062462],
 'test_accuracies': [96.26]}

## Visualizing a prediction 

In [376]:
model.to("cpu")
model.eval()

image, _ = train_loader.dataset[7]
pred = model(image.unsqueeze(dim=0))
px.imshow(image.squeeze().numpy()).update_layout(title_text=f'Predicted: {pred.argmax(1).item()}', title_x=0.5)

## Initializing DeconvNet

In [377]:
model.initialize_deconv()
model

CNN(
  (conv1): Conv2d(1, 9, kernel_size=(5, 5), stride=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(9, 9, kernel_size=(5, 5), stride=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=144, out_features=50, bias=True)
  (fc2): Linear(in_features=50, out_features=10, bias=True)
  (deconvs): Sequential(
    (0): MaxUnpool2d(kernel_size=(2, 2), stride=(2, 2), padding=(0, 0))
    (1): ConvTranspose2d(9, 9, kernel_size=(5, 5), stride=(1, 1))
    (2): MaxUnpool2d(kernel_size=(2, 2), stride=(2, 2), padding=(0, 0))
    (3): ConvTranspose2d(9, 1, kernel_size=(5, 5), stride=(1, 1))
  )
)

In [378]:
len(model.feature_maps)

4

## Defining function to visualize layer

In [379]:
def visualize_layer(model, layer, max_act):
    pos = layer * 2 + 1 # this is due to how I created the arrays in the network.

    num_feat = model.feature_maps[pos].shape[1]

    #Get all the specific feature maps!
    new_feat_map = model.feature_maps[pos].clone()

    # Choose max activation
    act_lst = []
    for i in range(0, num_feat):
        choose_map = new_feat_map[0, i, :, :]
        activation = torch.max(choose_map)
        act_lst.append(activation.item())
    
    act_lst = np.array(act_lst)
    mark = np.argmax(act_lst) #Get index of max activation
    top_k = (-act_lst).argsort()[:3]
    mark = (-act_lst).argsort()[:max_act][max_act - 1]

    choose_map = new_feat_map[0, mark, :, :]
    max_activation = torch.max(choose_map)

    # make zeros for other feature maps - we are only interested in the top! (or the max_act we choose)
    if mark == 0:
        new_feat_map[:, 1:, :, :] = 0
    else:
        new_feat_map[:, :mark, :, :] = 0
        if mark != num_feat - 1:
            new_feat_map[:, mark + 1:, :, :] = 0

    choose_map = torch.where(choose_map==max_activation,
            choose_map,
            torch.zeros(choose_map.shape)
            )

    # make zeros for their activations
    new_feat_map[0, mark, :, :] = choose_map

    deconv_output = model.deconv(new_feat_map, layer)
    return deconv_output

## Feeding an image forward through the model and back through the DeconvNet

In [380]:
# Forward pass

model.to("cpu")
model.eval()

image, _ = train_loader.dataset[7]
pred = model(image.unsqueeze(dim=0))
px.imshow(image.squeeze().numpy()).update_layout(title_text=f'Predicted: {pred.argmax(1).item()}', title_x=0.5)

In [381]:
# Back through Deconv & plot results 
import matplotlib.pyplot as plt
layer_activations = dict()

for layer in range(0, 2):
    imgs = []
    for top_k in range(1, 10):
        deconv_output = visualize_layer(model, layer, top_k)
        imgs.append(deconv_output.squeeze().detach().numpy())
    layer_activations[layer] = imgs


px.imshow(sum(layer_activations[1]))

In [382]:
px.imshow(layer_activations[1][2])

In [383]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

fig = make_subplots(1, 3, subplot_titles=['Original', 'Layer 1', 'Layer 2'])
fig.add_trace(go.Heatmap(z=image.squeeze().numpy()[::-1]), row=1, col=1)
fig.add_trace(go.Heatmap(z=sum(layer_activations[0])[::-1]), row=1, col=2)
fig.add_trace(go.Heatmap(z=sum(layer_activations[1])[::-1]), row=1, col=3)
fig.update_layout(title_text=f'title', title_x=0.5)
fig.update_traces(showscale=False)
fig.show()

## Defining a function to display activations of an image

In [384]:
def deconv_layers(model, layers, num_filters):
    layer_activations = dict()

    for layer in layers:
        imgs = []
        for top_k in range(1, num_filters):
            deconv_output = visualize_layer(model, layer, top_k)
            imgs.append(deconv_output.squeeze().detach().numpy())
        layer_activations[layer] = imgs
    return layer_activations

def plot_layer_activations(model, layers, num_filers, images):
    model.to("cpu")
    model.eval()

    titles = ['Original'] + [f'Layer:{l}' for l in layers] + [""]*(len(images)-1)
    fig = make_subplots(rows=len(images), cols=3, subplot_titles=titles, vertical_spacing = 0.02,horizontal_spacing = 0.05)
    
    for i, image in enumerate(images):
        
        # forward pass through cnn model
        pred = model(image.unsqueeze(dim=0))

        # 'backward' pass through deconv 
        image_activations = deconv_layers(model, layers, num_filers)

        # plotting
        fig.add_trace(go.Heatmap(z=image.squeeze().numpy()[::-1]), row=i+1, col=1)
        for l, filter_activations in image_activations.items():
            fig.add_trace(go.Heatmap(z=sum(filter_activations)[::-1]), row=i+1, col=l+2)

    fig.update_traces(showscale=False)
    fig.update_layout(autosize=False, height=250*len(images))
    fig.show()


In [385]:
plot_layer_activations(model, range(0,2), 9, [train_loader.dataset[i][0] for i in range(0,7)])

## Applying Deconv to AlexNet

We will apply the DeconvNet to a pretrained AlexNet which was retrained on the hymenoptera dataset

### Loading in data

In [386]:
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

data_dir = os.path.join(os.pardir, 'data', 'hymenoptera_data')
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4, shuffle=True, num_workers=4) for x in ['train', 'val']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes

### Training & Testing functions

In [387]:
def train_model(model, criterion, optimizer, device):
    model.train()   
    running_loss = 0.0
    running_corrects = 0
    
    for inputs, labels in dataloaders['train']:
        inputs = inputs.to(device)
        labels = labels.to(device)
    
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        
        # zero the parameter gradients
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        
    epoch_loss = running_loss / dataset_sizes['train']
    epoch_acc = running_corrects.double() / dataset_sizes['train']

    print(f'train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

def test_model(model, criterion, device):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    
    for inputs, labels in dataloaders['val']:
        inputs = inputs.to(device)
        labels = labels.to(device)
    
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        
    epoch_loss = running_loss / dataset_sizes['val']
    epoch_acc = running_corrects.double() / dataset_sizes['val']

    print(f'test Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

def fit(model, device, n_epochs):

    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

    for epoch in range(n_epochs):
        print(f'Epoch {epoch+1}/{n_epochs}\n')
        train_model(model, criterion, optimizer, device)
        test_model(model, criterion, device)
        scheduler.step()

### Model setup and retraining on fully connected layer 

In [390]:
anet =  models.alexnet(pretrained=True)

# freezing weights 
for param in anet.parameters():
    param.requires_grad = False

fc_input = anet.classifier[-1].in_features
anet.classifier[-1] = nn.Linear(fc_input, 10)

# retrain on mnist
fit(model=anet, device=device, n_epochs=3)

Epoch 1/3

train Loss: 1.0607 Acc: 0.7664
test Loss: 0.6843 Acc: 0.9150
Epoch 2/3



RuntimeError: DataLoader worker (pid(s) 27220, 29236, 26168) exited unexpectedly

### Defining function to update Alex with DeconvNet

In [None]:
def update_alexnet(model):
    # Setting params needed for deconv
    model.maxpool_indices = []
    model.feature_maps = []
    model.deconvs = {}

    # Initializing deconv 
    # For alex net we pass model.features this can be adapted for other 
    # models by using the appropriate sequential object
    model.deconvs = nn.Sequential(*get_deconv_layers(model.features))

def get_deconv_layers(conv_layers: nn.Sequential):
    """This method expects a Sequential layer object
    it will then generate a corrisponding list of deconvnet layers.
    This method coppies over appropriate weights"""
    deconv_layers = []
    for f in reversed([f for f in conv_layers]):
        if isinstance(f, nn.MaxPool2d):
            f.return_indices=True
            deconv_layers.append(
                nn.MaxUnpool2d(
                    kernel_size=f.kernel_size,
                    stride=f.stride
                )
            )
        if isinstance(f, nn.Conv2d):
            conv = nn.ConvTranspose2d(
                in_channels=f.out_channels,
                out_channels=f.in_channels,
                kernel_size=f.kernel_size,
                stride=f.stride,
                padding=f.padding
            )
            conv.weight.data = f.weight.data
            deconv_layers.append(conv)
        else:
            pass
    return deconv_layers


In [None]:
# initialize deconv net 
update_alexnet(anet)
anet

AlexNet(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (avgpool): AdaptiveAvgPool2d(output_size=(6, 6))
  (classifier): Sequential(
    (0): Dropout(p=0.5, inplace=False)
    (1): Linear(in_features=9216, out_features=4096, bias=True)
 

In [389]:
# Forward pass of image to update feature maps for a layer

anet.to("cpu")
anet.eval()

image, _ = dataloaders['val'].dataset[1]

x = image.unsqueeze(dim=0)

anet.feature_maps = []
anet.maxpool_indices = []
anet.feature_index = []

# Passing through features 
for i, f in enumerate(anet.features):
    if isinstance(f, nn.Conv2d):
        x = f(x)
        anet.feature_maps.append(x)
        anet.feature_index.append(i)
    if isinstance(f, nn.MaxPool2d):
        x, ind = f(x)
        anet.feature_maps.append(x)
        anet.maxpool_indices.append(ind)

# Adaptive pooling 
anet.pre_avg_pool = x
x = anet.avgpool(x)

x = torch.flatten(x, 1)
pred = anet.classifier(x)

inp = image.cpu().numpy().transpose((1, 2, 0))
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
inp = std * inp + mean

px.imshow(inp).update_layout(title_text=f'Predicted: {class_names[pred.argmax(1).item()]}', title_x=0.5)

        

ValueError: not enough values to unpack (expected 2, got 1)

In [None]:
# Walking through the deconv on the first layer 

# here we would loop through the layers but we will look at one for now
layer_index = 0
layer_images = []
nth_act = 1 # kth top activation

# here we wold loop through the top k activations but we will look at one for now

# visualizing the output 
pos = layer_index * 2 + 1
num_features = anet.feature_maps[pos].shape[1]
new_feat_maps = anet.feature_maps[pos].clone()

# find max activation for each filter in layer
activation_list = []
for f in range(0, num_features):
    _map = new_feat_maps[0, f, :, :]
    activation = torch.max(_map)
    activation_list.append(activation.item())

# get the kth most activated filter
activation_list= np.array(activation_list)
mark = (-activation_list).argsort()[nth_act-1]

_map = new_feat_maps[0, mark, :, :]
max_activation = torch.max(_map)

# Zeroing out other feature maps 
_tmp = torch.zeros_like(new_feat_maps)
_tmp[:, mark, :, :] = new_feat_maps[:, mark, :, :]
new_feat_maps = _tmp

choose_map = torch.where(_map==max_activation,
            _map,
            torch.zeros(_map.shape)
            )

# make zeros for their activations
new_feat_maps[0, mark, :, :] = choose_map

# Deconvolving the map 


In [None]:
len(anet.feature_maps)

8

In [None]:
layer = 1
abs(((layer * 2) - 4) + 2)

0