## Pretrained VGG

https://pytorch.org/vision/stable/models.html

In [None]:
import torch
from PIL import Image
from torchvision import transforms
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
import collections
from typing import DefaultDict, Tuple, List
from functools import partial
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from torch.nn.modules.loss import L1Loss
import time

model = torch.hub.load('pytorch/vision:v0.10.0', 'vgg16', pretrained=True)

use_cuda = True
device = torch.device("cuda" if use_cuda else "cpu")
model.to(device)
model.eval()

##Creating labeled feature patch from test image manually

In [None]:
## Download images
!gdown --id '161Fw55U_Ng_o0uirdwBle5g2Fpl1qRqR'
!unzip schoolbus.zip

In [None]:
## preprocess image
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


def black_image_outside_patch(img, top_left_x, top_left_y, bot_right_x, bot_right_y):
    """Blacks image outside provided bounding box
    ----------
    img:
        image tensor

    top_left_x, top_left_y, bot_right_x, bot_right_y:
        coordinates of bounding box for patch to keep
    Returns
    -------
    img_copy:
        copy of the image with everything blacked out besides 
    """
    img_copy = img.detach().clone()
    img_copy[:,:,:top_left_x] = 0
    img_copy[:,:,bot_right_x:] = 0
    img_copy[:,:top_left_y,:] = 0
    img_copy[:,bot_right_y:,:] = 0
    return img_copy

def random_noise_outside_patch(img, top_left_x, top_left_y, bot_right_x, bot_right_y):
    img_copy = img.detach().clone()
    img_copy[:,:,:top_left_x] = torch.randn(img[:,:,:top_left_x].size())
    img_copy[:,:,bot_right_x:] = torch.randn(img[:,:,bot_right_x:].size())
    img_copy[:,:top_left_y,:] = torch.randn(img[:,:top_left_y,:].size())
    img_copy[:,bot_right_y:,:] = torch.randn(img[:,bot_right_y:,:].size())
    return img_copy

def show(img):
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1,2,0)), interpolation='nearest')



In [None]:
wheels = []
labels = []  ## "School Bus" text
mirrors = []
windshields = []
sides = []  ## Side of school bus with many windows


def append_features(wheel, label, mirror, windshield, side):
    if wheel is not None:
        wheels.append(wheel.to(device))
    if label is not None:
        labels.append(label.to(device))
    if mirror is not None:
        mirrors.append(mirror.to(device))
    if windshield is not None:
        windshields.append(windshield.to(device))
    if side is not None:
        sides.append(side.to(device))

In [None]:
filename = str(1) + '.jpg'
input_image = Image.open(filename)
input_tensor = preprocess(input_image)
show(input_tensor)
wheel = random_noise_outside_patch(input_tensor, 70, 125, 125, 215).unsqueeze(0)
label = random_noise_outside_patch(input_tensor, 100, 25, 160, 50).unsqueeze(0)
mirror = random_noise_outside_patch(input_tensor, 125, 55, 160, 130).unsqueeze(0)
windshield = random_noise_outside_patch(input_tensor, 80, 40, 200, 100).unsqueeze(0)
side = random_noise_outside_patch(input_tensor, 0, 40, 100, 170).unsqueeze(0)
append_features(wheel, label, mirror, windshield, side)

In [None]:
filename = str(2) + '.jpg'
input_image = Image.open(filename)
input_tensor = preprocess(input_image)
show(input_tensor)
wheel = random_noise_outside_patch(input_tensor, 75, 150, 120, 210).unsqueeze(0)
label = random_noise_outside_patch(input_tensor, 40, 50, 90, 90).unsqueeze(0)
mirror = random_noise_outside_patch(input_tensor, 50, 100, 75, 150).unsqueeze(0)
windshiled = random_noise_outside_patch(input_tensor, 20, 80, 120, 120).unsqueeze(0)
side = random_noise_outside_patch(input_tensor, 110, 75, 210, 175).unsqueeze(0)
append_features(wheel, label, mirror, windshield, side)

In [None]:
filename = str(3) + '.jpg'
input_image = Image.open(filename)
input_tensor = preprocess(input_image)
show(input_tensor)
wheel = random_noise_outside_patch(input_tensor, 75, 120, 140, 210).unsqueeze(0)
label = random_noise_outside_patch(input_tensor, 120, 0, 190, 30).unsqueeze(0)
mirror = random_noise_outside_patch(input_tensor, 140, 50, 180, 130).unsqueeze(0)
windshield = random_noise_outside_patch(input_tensor, 100, 20, 220, 100).unsqueeze(0)
side = random_noise_outside_patch(input_tensor, 0, 0, 100, 180).unsqueeze(0)
append_features(wheel, label, mirror, windshield, side)

In [None]:
filename = str(4) + '.jpg'
input_image = Image.open(filename)
input_tensor = preprocess(input_image)
show(input_tensor)
wheel = random_noise_outside_patch(input_tensor, 30, 135, 90, 220).unsqueeze(0)
label = random_noise_outside_patch(input_tensor, 0, 25, 50, 55).unsqueeze(0)
mirror = random_noise_outside_patch(input_tensor, 0, 60, 35, 130).unsqueeze(0)
windshield = random_noise_outside_patch(input_tensor, 0, 45, 70, 110).unsqueeze(0)
side = random_noise_outside_patch(input_tensor, 80, 25, 220, 180).unsqueeze(0)
append_features(wheel, label, mirror, windshield, side)

In [None]:
filename = str(5) + '.jpg'
input_image = Image.open(filename)
input_tensor = preprocess(input_image)
show(input_tensor)
wheel = random_noise_outside_patch(input_tensor, 120, 160, 150, 220).unsqueeze(0)
label = random_noise_outside_patch(input_tensor, 70, 60, 120, 90).unsqueeze(0)
mirror = random_noise_outside_patch(input_tensor, 110, 115, 140, 160).unsqueeze(0)
windshield = random_noise_outside_patch(input_tensor, 40, 85, 150, 125).unsqueeze(0)
side = random_noise_outside_patch(input_tensor, 145, 75, 220, 190).unsqueeze(0)
append_features(wheel, label, mirror, windshield, side)

In [None]:
filename = str(6) + '.jpg'
input_image = Image.open(filename)
input_tensor = preprocess(input_image)
show(input_tensor)
wheel = random_noise_outside_patch(input_tensor, 90, 140, 160, 230).unsqueeze(0)
label = random_noise_outside_patch(input_tensor, 130, 10, 180, 45).unsqueeze(0)
mirror = random_noise_outside_patch(input_tensor, 160, 60, 220, 130).unsqueeze(0)
windshield = random_noise_outside_patch(input_tensor, 90, 40, 200, 90).unsqueeze(0)
side = random_noise_outside_patch(input_tensor, 0, 0, 90, 220).unsqueeze(0)
append_features(wheel, label, mirror, windshield, side)

## Pytorch Hooks for saving activations


In [None]:
################################################################################
#                    CITATIONS                    
# https://www.lyndonduong.com/saving-activations/
# https://web.stanford.edu/~nanbhas/blog/forward-hooks-pytorch/#extracting-activations-from-a-layer
#
###############################################################################

def save_adaptive_activations(
        activations: DefaultDict,
        name: str,
        module: nn.Module,
        inp: Tuple,
        out: torch.Tensor
) -> None:
    """PyTorch Forward hook to save outputs and inhibitory hidden state at each forward
    pass. Mutates specified dict objects with each fwd pass.
    """
    activations[name].append(out.detach().cpu())


def register_activation_hooks(
        model: nn.Module,
) -> DefaultDict[List, torch.Tensor]:
    """Registers forward hooks in specified layers.
    Parameters
    ----------
    model:
        PyTorch model

    Returns
    -------
    activations_dict:
        dict of lists containing activations of specified layers in the
        form (k,v) where k is the name of the layer and v is a list of the 
        activation tensors in the order that they were run through the nerual 
        network
    """
    activations_dict = collections.defaultdict(list)
    hooks = []
    for name, module in model.named_modules():
        hooks.append(module.register_forward_hook(
            partial(save_adaptive_activations, activations_dict, name)
        ))

    return activations_dict, hooks

def remove_hooks(
    hooks: List,
) -> None:
    """Registers forward hooks in specified layers.
    Parameters
    ----------
    hooks:
        list of hooks attached to the model

    Returns
    -------
    None
    """
    for hook in hooks:
        hook.remove()
        

# # example use 
# activations_dict, hooks = register_activation_hooks(net)
# net(input)
# print(activations_dict)
# remove_activations(hooks)

##Possible Error Functions

In [None]:
# for L2 
# nn.MSELoss()

# for L1
# nn.L1Loss()

##Calculate the error between saved activation data structures

In [None]:
def matched_activations(activations, train_indices, test_ind, error_func):
    """ Returns what the patch indices that have similar activations to patch
    represented by test_ind
    ----------
    activations:
        dict: {'layer name': List of tensors of activations}
    
    train_indices:
        list of indices that represent what indices within the values of 
        activations dict lists are train indices

    test_index:
        list of indices that represent what index within the values of 
        activations dict lists are the test example

    error_func:
        func (tensor, tensor) -> int that gives error metric between tensors

    Returns
    -------
    matched_indices: (index, error)
        sorted list of what train indices match up well with the test index, 
        sorted from lowest error to highest error
    """
    matched_indices = []

    for train_ind in train_indices:
        # calculate error between train_ind and test_ind
        train_err = 0
        for i, name in enumerate(activations.keys()):
            train_err += np.exp(2 * i / len(activations)) * error_func(activations[name][train_ind], activations[name][test_ind])
        # create mew match tuple
        new_matched_el = (train_ind, train_err)

        # find index to insert new_matched_el
        ind = len(matched_indices)
        for i in range(len(matched_indices)):
            if new_matched_el[1] < matched_indices[i][1]:
              ind = i
              break;

        # insert new_matched_el into correct place in list
        if ind == len(matched_indices):
            matched_indices.append(new_matched_el)
        else:
            matched_indices = matched_indices[:i] + [new_matched_el] + matched_indices[i:]

    return matched_indices


##Test activation similarities between labeled features

In [None]:

activations_dict, hooks = register_activation_hooks(model)


num_images = 6
num_features = 5
num_feature_images = num_images * num_features
for wheel in wheels:
    model(wheel)
for label in labels:
    model(label)
for mirror in mirrors:
    model(mirror)
for windshield in windshields:
    model(windshield)
for side in sides:
    model(side)

start_time = time.time()
classes = ['wheel', 'label', 'mirror', 'windshield', 'side']
def index_to_class():
    index_to_class_dict = {}
    for i in range(num_feature_images):
        index_to_class_dict[i] = classes[i//num_images]

    return index_to_class_dict

## TODO: DIVIDE LOSS BY NUMBER OF TRAIN IMAGES
def class_to_score(scores, index_to_class_dict):
    class_to_scores = {}
    class_counters = {}

    for class_name in classes:
        class_to_scores[class_name] = 0
        class_counters[class_name] = 0

    for ind, loss in scores:
        class_name = index_to_class_dict[ind]
        class_to_scores[class_name] += loss
        class_counters[class_name] += 1

    for class_name, counter in class_counters.items():
        class_to_scores[class_name] /= counter

    return class_to_scores

index_to_class_dict = index_to_class()

incorrect_preds = 0
for i in range(num_feature_images):
    test_indices = list(np.arange(num_feature_images))
    test_indices = test_indices[:i] + test_indices[i+1:]
    matched_activation_results = matched_activations(activations_dict, test_indices, i, nn.L1Loss())
    class_to_score_dict = class_to_score(matched_activation_results, index_to_class_dict)
    print(f"{index_to_class_dict[i]}, {i}: {class_to_score_dict}")
    predicted_class = (None, 99999999)
    for class1, score in class_to_score_dict.items():
        if score < predicted_class[1]:
            predicted_class = (class1, score)
    if predicted_class[0] != index_to_class_dict[i]:
        incorrect_preds += 1
    print(f"prediction for {index_to_class_dict[i]}, {i}: {predicted_class[0]}, loss: {predicted_class[1]}")

print(f"correct predictions for {1 - round(incorrect_preds / num_feature_images,2)}%")

##Extract Potential Features from Test Images

Now that we have saved the activations of many features within the training set, we have to attempt to extract features patches from new test images that the CNN recognizes. 

###Naivly testing all possible patches of image

In [None]:
filename = 'test1.jpeg'
input_image = Image.open(filename)
input_tensor = preprocess(input_image).to(device)

def garbage_collect(activations):
    for k, v in activations.items():
        activations[k] = v[:-1]

def extract_all_patches(img):
    train_indices = list(np.arange(num_feature_images))  
    width = img.size()[2]
    height = img.size()[1]
    # dict that maps feature to patch with minimum loss in the form "class":(dim, x, y, loss)
    feature_mins = {}
    for i in range(num_features):
        feature_mins[index_to_class_dict[i*num_images]] = (0,0,0,9999999999)
    # iterate through all the possible dimensions, and coordinates
    for dim in [20, 40, 80]:
        for x in range(0, width-dim, 10): ## 5
            for y in range(0, height-dim, 10): ## 5
                print(dim, x, y)
                # create test patch
                test_patch = random_noise_outside_patch(img, x, y, x+dim, y+dim)
                # run test patch through nn, so hooks save the activations
                model(test_patch.unsqueeze(0).to(device))
                #indices for 
                matched_activation_results = matched_activations(activations_dict, train_indices, -1, nn.L1Loss())
                class_to_score_dict = class_to_score(matched_activation_results, index_to_class_dict)
                for feat, loss in class_to_score_dict.items():
                    if loss < feature_mins[feat][3]:
                        feature_mins[feat] = (dim, x, y, loss)
                garbage_collect(activations_dict)

    return feature_mins

start_time = time.time()
extracted_patches = extract_all_patches(input_tensor)
print(time.time() - start_time)

In [None]:
def show_images(img) -> None:
    """
    View multiple images stored in files, stacking vertically

    Arguments:
        filename: str - path to filename containing image
    """
    # <something gets done here>
    plt.figure()
    npimg = img.cpu().numpy()
    plt.imshow(np.transpose(npimg, (1,2,0)), interpolation='nearest')

show_images(input_tensor)

def print_extracted_patches(extracted_patches, img):
    for feature_name, (dim, x, y, loss) in extracted_patches.items():
        patch = random_noise_outside_patch(img, x, y, x+dim, y+dim)
        print(f"the extracted patch for {feature_name} is at dim:{dim}, x:{x}, y:{y}, with loss: {loss}")
        show_images(patch)
print_extracted_patches(extracted_patches, input_tensor)

In [None]:
filename = 'test2.jpeg'
input_image = Image.open(filename)
input_tensor = preprocess(input_image).to(device)
start_time = time.time()
extracted_patches = extract_all_patches(input_tensor)
print(time.time() - start_time)

In [None]:
show_images(input_tensor)
print_extracted_patches(extracted_patches, input_tensor)