# Filter Maximization in Pytorch

Code Reference:

* https://github.com/anaramirli/visualizing-cnn-features

* https://github.com/fg91/visualizing-cnn-feature-maps/blob/master/filter_visualizer.ipynb

In [1]:
import numpy as np
import zipfile
import gc
import cv2
import math
import warnings
import random
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torchvision
import torchvision.transforms.functional as TF
import torchvision.transforms as transforms
import torch.utils.data as data_utils
import torch
import torchvision
import torchvision.transforms as transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import PIL
import imutils
from sklearn.metrics import confusion_matrix, roc_curve

from torch.utils.data import Dataset
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from tensorflow.python.client import device_lib
from zipfile import ZipFile
from IPython import display
from google.colab.patches import cv2_imshow
from imutils.contours import sort_contours

print("Device Specifications:")
print(device_lib.list_local_devices())

Device Specifications:
[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 7872263534344546596
xla_global_id: -1
, name: "/device:GPU:0"
device_type: "GPU"
memory_limit: 13825277952
locality {
  bus_id: 1
  links {
  }
}
incarnation: 11743418767575673948
physical_device_desc: "device: 0, name: Tesla T4, pci bus id: 0000:00:04.0, compute capability: 7.5"
xla_global_id: 416903419
]


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In this notebook, we use a project from a long time ago, which is Pytorch's OCR Notebook.

In [6]:
class MNISTModel(nn.Module):
  def __init__(self):
    super().__init__()

    # We calculate this formula for padding.
    # NOTES: Filter_size = kernel_size
    # In this case, we use same padding, the formula is: [(filter_size  - 1) / 2] ( Same Padding--> input size = output size).

    # Formula of feature map size: [(input_size - filter_size + 2(padding) / stride) + 1]
    # Output after conv1: [(28 - 3 + 2 (1) / 1) + 1] = 28
    # Output after MaxPool1 = 28 / 2 = 14
    self.conv1 = nn.Sequential(
        nn.Conv2d(in_channels = 1, out_channels = 32, kernel_size = 3, padding = 1, bias = True), 
        nn.ReLU(), 
        nn.MaxPool2d(2,2))

    # To attain same padding: we use features of 
    # Padding: [(3 - 1) / 2] = 1
    # Output after conv2: [(14 - 3 + 2 (1) / 1) + 1] = 14
    # Output after MaxPool2: 14 / 2 = 7
    self.conv2 = nn.Sequential(
      nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 3, padding = 1, bias = True), 
      nn.ReLU(), 
      nn.MaxPool2d(2,2)
    )

    self.conv3 = nn.Sequential(
      nn.Conv2d(in_channels = 64, out_channels = 128, kernel_size = 3, padding = 1, bias = True), 
      nn.ReLU(), 
      nn.MaxPool2d(2,2)
    )

    # Flatten the layers.
    # 32 = number of filters
    # 7 = size of max pool 2 feature map output.
    self.fc1 = nn.Sequential(
      nn.Flatten(), 
      nn.Linear(128*3*3,64), 
      nn.ReLU(), 
      nn.Linear(64,32),
      nn.ReLU()
    )
    self.fc2 = nn.Sequential(
      nn.Linear(32,16), 
      nn.ReLU(), 
      nn.Linear(16,8), 
      nn.ReLU()
    )
    self.fc3 = nn.Sequential(nn.Linear(8,10))

  def forward(self, x):
    x = self.conv1(x)
    x = self.conv2(x)
    x = self.conv3(x)
    x = self.fc1(x)
    x = self.fc2(x)
    x = self.fc3(x)
    return x

mnist_model = MNISTModel().to(device)
mnist_model

MNISTModel(
  (conv1): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc1): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=1152, out_features=64, bias=True)
    (2): ReLU()
    (3): Linear(in_features=64, out_features=32, bias=True)
    (4): ReLU()
  )
  (fc2): Sequential(
    (0): Linear(in_features=32, out_features=16, bias=True)
    (1): ReLU()
    (2): Linear(in_features=16, out_features=8, bias=True)
    (

In [7]:
import torch.optim as optim

cross_entropy_loss_function = nn.CrossEntropyLoss()

# For our gradient descent algorthim or Optimizer
# We use Stochastic Gradient Descent (SGD) with a learning rate of 0.001
# We set the momentum to be 0.9
optimizer = optim.SGD(mnist_model.parameters(), lr=0.001, momentum=0.9)

In [75]:
def load_model(model, optimizer, filename = "model.pth.tar"):
  print("Loading model...")
  checkpoint = torch.load(filename)
  model.load_state_dict(checkpoint['model_state_dict'])
  optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
  print("Finished loading model!")

  return model, optimizer

directory_path = "./drive/MyDrive/Models"
mnist_model, optimizer = load_model(mnist_model, optimizer, f"{directory_path}/ocr_mnist_model.pth.tar")

Loading model...
Finished loading model!


In [70]:
mnist_train_transform = transforms.Compose([
    transforms.RandomAffine(degrees = 10, translate = (0.1, 0.1), shear = 2),
    transforms.RandomRotation(50),
    transforms.ToTensor(),
    transforms.Normalize((0.5, ), (0.5, )) # Scale to -1 to 1
])

mnist_validation_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, ), (0.5, )),
])

mnist_trainset = torchvision.datasets.MNIST('emnist', 
                                      train = True, 
                                      download = True,
                                      transform = mnist_train_transform)

mnist_valset = torchvision.datasets.MNIST('emnist', 
                                      train = False, 
                                      download = True,
                                      transform = mnist_validation_transform)

print(mnist_trainset.data.shape)
print(mnist_valset.data.shape)

print(mnist_trainset.targets.shape)
print(mnist_valset.targets.shape)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to emnist/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:00<00:00, 77652100.72it/s]


Extracting emnist/MNIST/raw/train-images-idx3-ubyte.gz to emnist/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to emnist/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 28105729.43it/s]


Extracting emnist/MNIST/raw/train-labels-idx1-ubyte.gz to emnist/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to emnist/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 27828757.09it/s]


Extracting emnist/MNIST/raw/t10k-images-idx3-ubyte.gz to emnist/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to emnist/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 16623498.05it/s]


Extracting emnist/MNIST/raw/t10k-labels-idx1-ubyte.gz to emnist/MNIST/raw

torch.Size([60000, 28, 28])
torch.Size([10000, 28, 28])
torch.Size([60000])
torch.Size([10000])


In [71]:
batch_size = 128

mnist_train_loader = torch.utils.data.DataLoader(
    mnist_trainset, 
    batch_size = batch_size, 
    shuffle = True,
)

mnist_validation_loader = torch.utils.data.DataLoader(
    mnist_valset, 
    batch_size = batch_size, 
    shuffle = False
)

In [11]:
while True:
  pass

KeyboardInterrupt: ignored

## Filter Maximization.

Let's suppose we'd like to visualize the architecture from conv3 layer. We can replace the last layer of .

In [76]:
def flatten_layers_from_model(model):
  # Get all the model layers, except Sequential Layer, 
  # since we'd like to iterate the layers one by one.
  model_layers = list(model.modules())[1:]
  flattened_model_layers = []

  # Filter layers, as we don't want to process sequential layers.
  for index, module in enumerate(model_layers):
    if type(module) != torch.nn.modules.container.Sequential:
      flattened_model_layers.append(module)

  return flattened_model_layers

MAX_CONV_LAYERS = 7
flattened_model = flatten_layers_from_model(mnist_model)[:MAX_CONV_LAYERS]
flattened_model

[Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
 ReLU(),
 MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
 Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
 ReLU(),
 MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
 Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))]

In [89]:
class VisualizeConvFilterModel(nn.Module):
  def __init__(self, model, last_layer_num_features, num_classes):
    super(VisualizeConvFilterModel, self).__init__()
    self.model = nn.Sequential(*model)
    self.last_layer = nn.Sequential(
      nn.Flatten(),
      nn.Linear(last_layer_num_features, num_classes),
    )

  def forward(self, x):
    x = self.model(x)
    x = self.last_layer(x)
    return x

visualize_conv_filter_model = VisualizeConvFilterModel(
    flattened_model, 6272, 10
).to(device)
visualize_conv_filter_model

VisualizeConvFilterModel(
  (model): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  )
  (last_layer): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=6272, out_features=10, bias=True)
  )
)

In [90]:
layer_name_grad_exception = "weight"

for layer_name, param in visualize_conv_filter_model.named_parameters():
  param.requires_grad = False

print(visualize_conv_filter_model.model[-1])
visualize_conv_filter_model.model[-1].requires_grad = True

Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))


In [91]:
cross_entropy_loss_function = nn.CrossEntropyLoss()
activation_maximization_optimizer = optim.Adam(
  visualize_conv_filter_model.parameters(), lr = 0.001
)
activation_maximization_optimizer

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.001
    maximize: False
    weight_decay: 0
)

In [92]:
class BasicHook():
  """
  Get All of Layers Outputs, mainly for Visualization.
  """
  def __init__(self, layer_name, layer, filter_numbers_to_visualize):
    self.layer_name = layer_name
    self.layer = layer
    self.filter_numbers_to_visualize = filter_numbers_to_visualize
    self.latest_output = None

    layer.register_forward_hook(self.forward_hook_fn)
    layer.register_backward_hook(self.backward_hook_fn)

  def forward_hook_fn(self, layer, input, output):
    self.latest_output = output
    self.latest_output.requires_grad = True

  def backward_hook_fn(self, layer, input, output):
    self.latest_output = output
    self.latest_output.requires_grad = True

  # def visualize_output(self):
  #   hstacked_output = torch.hstack(list(self.latest_output))
  #   hstacked_output = hstacked_output.unsqueeze(1)

  #   plt.title(self.layer_name)
  #   plt.tight_layout()
  #   plt.show()

  # def convert_output_to_gif(self):
  #   gif_filename = f"{self.layer_name}.gif"
  #   output_frames = []
  #   for frame in self.outputs:
  #     hstacked_output = torch.hstack(list(frame))
  #     hstacked_output = hstacked_output.unsqueeze(1)
  #     grid_of_outputs_per_frame = torchvision.utils.make_grid(hstacked_output)
  #     grid_of_outputs_per_frame = grid_of_outputs_per_frame.detach().cpu().numpy()
  #     grid_of_outputs_per_frame = np.transpose(grid_of_outputs_per_frame, (1, 2, 0))
  #     output_frames.append(grid_of_outputs_per_frame)

  #   convert_frames_to_gif(output_frames, gif_name = gif_filename)

  def close(self):
    self.hook.remove()

In [93]:
selected_layer_name_for_filter_maximization, selected_layer_for_filter_maximization = list(
    visualize_conv_filter_model.named_modules()
)[8]

hook = BasicHook(
    selected_layer_name_for_filter_maximization,
    selected_layer_for_filter_maximization,
    [10, 20, 30]
)

print(selected_layer_for_filter_maximization)

Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))


In [94]:
n_epochs = 10

for epoch in range(0, n_epochs):
  total_loss = 0
  for train_iteration, batch in enumerate(mnist_train_loader):
    images_in_batch, labels_in_batch = batch
    images_in_batch = images_in_batch.to(device)
    labels_in_batch = labels_in_batch.to(device)

    optimizer.zero_grad()

    outputs_in_batch = visualize_conv_filter_model(
        images_in_batch,
    )

    loss = cross_entropy_loss_function(outputs_in_batch, labels_in_batch)
    total_loss += loss.item()
    # outputs_in_batch.backward(labels_in_batch, retain_graph = True)

    loss.backward()
    optimizer.step()

    if train_iteration % 10 == 0:
      print(f"Current Epoch: {epoch}, iteration = {train_iteration}, with current loss: {loss.item()}")
      loss_log.append(loss.item())
      iterations.append(train_iteration)
  
  save_model(mnist_model, optimizer, filename = f"{directory_path}/ocr_mnist_model.pth.tar")

NameError: ignored

## Grad_CAM