In [None]:
from PIL import Image
import numpy as np
import torch
import torch.nn.functional as F
import torchvision
from torchvision import transforms as T
import matplotlib.pyplot as plt
from collections import OrderedDict
import pandas as pd
import json
import os

In [None]:
model1 = torchvision.models.resnet18(pretrained=True)
model1.layer4 = torch.nn.Identity()
model1.layer3 = torch.nn.Identity()
model1.fc = torch.nn.Linear(128, 2)
model1

In [None]:
print(f"Model parameters: {sum(p.numel() for p in model1.parameters())}")  

In [None]:
x = torch.randn(1, 3, 540, 960)
#z = torchvision.transforms.CenterCrop(960)(x)
z = model1(x)
z.shape


In [None]:
class CamExtractor():
    """
        Extracts cam features from the model
    """
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer

    def forward_pass_on_convolutions(self, x):
        """
            Does a forward pass on convolutions, hooks the function at given layer
        """
        conv_output = None
        for module_pos, module in self.model.features._modules.items():
            x = module(x)  # Forward
            if int(module_pos) == self.target_layer:
                conv_output = x  # Save the convolution output on that layer
        return conv_output, x

    def forward_pass(self, x):
        """
            Does a full forward pass on the model
        """
        # Forward pass on the convolutions
        conv_output, x = self.forward_pass_on_convolutions(x)
        x = x.view(x.size(0), -1)  # Flatten
        # Forward pass on the classifier
        x = self.model.classifier(x)
        return conv_output, x


class ScoreCam():
    """
        Produces class activation map
    """
    def __init__(self, model, target_layer):
        self.model = model
        self.model.eval()
        # Define extractor
        self.extractor = CamExtractor(self.model, target_layer)

    def generate_cam(self, input_image, target_class=None):
        # Full forward pass
        # conv_output is the output of convolutions at specified layer
        # model_output is the final output of the model (1, 1000)
        conv_output, model_output = self.extractor.forward_pass(input_image)
        if target_class is None:
            target_class = np.argmax(model_output.data.numpy())
        # Get convolution outputs
        target = conv_output[0]
        # print(target)
        # print(target.shape)
        # Create empty numpy array for cam
        cam = np.ones(target.shape[1:], dtype=np.float32)
        # Multiply each weight with its conv output and then, sum
        for i in range(len(target)):
            # Unsqueeze to 4D
            saliency_map = torch.unsqueeze(torch.unsqueeze(target[i, :, :],0),0)
            #print(saliency_map)
            #print(saliency_map.shape)
            # Upsampling to input size
            saliency_map = F.interpolate(saliency_map, size=(224, 224), mode='bilinear', align_corners=False)
            if saliency_map.max() == saliency_map.min():
                continue
            # Scale between 0-1
            norm_saliency_map = (saliency_map - saliency_map.min()) / (saliency_map.max() - saliency_map.min())
            # Get the target score
            w = F.softmax(self.extractor.forward_pass(input_image*norm_saliency_map)[1],dim=1)[0][target_class]
            cam += w.data.cpu().numpy() * target[i, :, :].data.cpu().numpy()
        cam = np.maximum(cam, 0)
        cam = (cam - np.min(cam)) / (np.max(cam) - np.min(cam))  # Normalize between 0-1
        cam = np.uint8(cam * 255)  # Scale between 0-255 to visualize
        cam = np.uint8(Image.fromarray(cam).resize((input_image.shape[2],
                       input_image.shape[3])))/255
        return cam

In [None]:
transform = T.Compose([
    T.Resize((224,224)),
    #transforms.CenterCrop(224),
    T.ToTensor(),
])

In [None]:
target_example = 1
model1 = torchvision.models.resnet50(pretrained=True)
model1.fc = torch.nn.Linear(2048, 2)
model1.load_state_dict(torch.load('NEW_test_resnet_final.pth'))
model2 = torchvision.models.resnet50(pretrained=True)
model2.fc = torch.nn.Linear(2048, 2)
model2.load_state_dict(torch.load('New_resnet_test_surrogate_epoch0.pth'))



In [None]:
model = torchvision.models.resnet50(pretrained=True)
model.fc = torch.nn.Linear(2048, 2)
model.load_state_dict(torch.load('../../models/08-13/GRAAL.pth'))

In [None]:
class SurrogateLoss(torch.nn.Module):
    def __init__(self, beta, p):
        super(SurrogateLoss, self).__init__()
        self.beta = beta
        self.p = p

    def forward(self, outputs, labels):
        f = torch.nn.functional.softmax(outputs, dim=-1)[:,1]
        loss = -(labels*torch.log(f)) + (1-labels)*torch.log(self.beta**2 * self.p/(1-self.p) +f)
        weights = torch.where(labels == 1, 1/self.p, 1/(1-self.p))
        return (loss*weights).mean()
        

In [None]:
frame_path = '/home/msouda/Datasets/true_anonymized/fr2_20130815T211708/fr2_20130815T211708_s0969_f0.jpg'
img = Image.open(frame_path)
plt.imshow(img)
plt.show()
img = transform(img)
output = model(img.unsqueeze(0))
SurrogateLoss(1,0.11)(output, torch.tensor([0]))

In [None]:
print(torch.nn.functional.softmax(output, dim=-1))

In [None]:
np.log(0.1235 +1.0373e-08)*(1/0.89)

In [None]:
features1 =torchvision.models.resnet50(pretrained=True)
features1.fc = torch.nn.Linear(2048, 2)
features1.load_state_dict(torch.load('NEW_test_resnet_final.pth'))

features2 = torchvision.models.resnet50(pretrained=True)
features2.fc = torch.nn.Linear(2048, 2)
features2.load_state_dict(torch.load('New_resnet_test_surrogate_epoch0.pth'))


In [None]:
model1 = torch.nn.Sequential(OrderedDict([
    ('features', torch.nn.Sequential(OrderedDict([
        ('0', torch.nn.Sequential(features1.conv1, features1.bn1, features1.relu, features1.maxpool)),
        ('1', features1.layer1),
        ('2', features1.layer2),
        ('3', features1.layer3),
        ('4', features1.layer4),
        ('5', features1.avgpool)
    ]))),
    ('classifier',features1.fc)
])
)

model2 = torch.nn.Sequential(OrderedDict([
    ('features', torch.nn.Sequential(OrderedDict([
        ('0', torch.nn.Sequential(features2.conv1, features2.bn1, features2.relu, features2.maxpool)),
        ('1', features2.layer1),
        ('2', features2.layer2),
        ('3', features2.layer3),
        ('4', features2.layer4),
        ('5', features2.avgpool)
    ]))),
    ('classifier',features2.fc)
])
)

In [None]:
random_list = json.load(open('/home/msouda/Workspace/random_list.json'))

In [None]:
#vid = np.random.choice(random_list['test'])
vid = 'true_anonymized/fr2_20140621T202024'

In [None]:
frame_id = '235'
frame_path = os.path.join(vid, vid.split('/')[1]+'_s'+frame_id+'_f0.jpg')
frame_path

In [None]:
frame_path = 'true_anonymized/fr2_20130815T211708/fr2_20130815T211708_s0969_f0.jpg'

In [None]:
model1 = torch.nn.Sequential(OrderedDict([
    ('features', torch.nn.Sequential(OrderedDict([
        ('0', torch.nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool)),
        ('1', model.layer1),
        ('2', model.layer2),
        ('3', model.layer3),
        ('4', model.layer4),
        ('5', model.avgpool)
    ]))),
    ('classifier',model.fc)
])
)

In [None]:
annotations = pd.read_csv('/home/msouda/Datasets/true_anonymized/annotations.csv', header=None, names = ['img_path', 'label']).assign(
    video = lambda x: x.img_path.apply(lambda x: 'true_anonymized/'+x.split('/')[0]),
    img_path = lambda x: x.img_path.apply(lambda x:'/home/msouda/Datasets/true_anonymized/'+ x))
with open('training_GRAAL.json', 'r') as f:
    train_metadata = json.load(f)

test_list = train_metadata['test_video_list']

In [None]:
select_list = annotations.query('video in @test_list').sample(10)
select_list

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model1.to(device)
for img_path in select_list['img_path'].to_list():
    fig, ax = plt.subplots(1,6, figsize=(30,8))
    for i in range(5):
        img = Image.open(img_path)
        prep_img = transform(img).unsqueeze(0).to(device)
        cam = ScoreCam(model1, i)
        cam_img = cam.generate_cam(prep_img, 1)
        ax[i].imshow(prep_img.squeeze().permute(1,2,0).cpu().numpy())
        ax[i].imshow(cam_img, cmap='jet', alpha=0.5)
        ax[i].axis('off')
        ax[i].set_title(f'Layer {i}')
    ax[5].imshow(prep_img.squeeze().permute(1,2,0).cpu().numpy())
    ax[5].axis('off')
    ax[5].set_title('Original')
    plt.show()


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model1.to(device)
for img_path in select_list['img_path'].to_list():
    img = Image.open(img_path)
    prep_img = transform(img).unsqueeze(0).to(device)
    print(model(prep_img))
        

In [None]:
select_list = annotations.query('label == 0').sample(5).img_path.tolist()
for img_path in select_list:
    fig, ax = plt.subplots(1,6, figsize=(30,8))
    for i in range(5):
        img = Image.open(img_path)
        prep_img = transform(img).unsqueeze(0).to(device)
        cam = ScoreCam(model, i)
        cam_img = cam.generate_cam(prep_img, 1)
        ax[i].imshow(prep_img.squeeze().permute(1,2,0).cpu().numpy())
        ax[i].imshow(cam_img, cmap='jet', alpha=0.5)
        ax[i].axis('off')
        ax[i].set_title(f'Layer {i}')
    ax[5].imshow(prep_img.squeeze().permute(1,2,0).cpu().numpy())
    ax[5].axis('off')
    ax[5].set_title('Original')
    plt.show()

In [None]:
img = Image.open(select_list[-1])

In [None]:
# Score cam
with torch.no_grad():
    for i in range(5):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        prep_img = transform(img).unsqueeze(0).to(device)
        cams = []
        for model in [model1, model2]:
            model.to(device)
            score_cam = ScoreCam(model, target_layer=i)
            # Generate cam mask
            cam = score_cam.generate_cam(prep_img, 1)
            cams.append(cam)
        
        fig, ax = plt.subplots(1, 3, figsize=(15, 5))
        ax[0].imshow(prep_img.squeeze(0).permute(1, 2, 0).cpu())
        ax[0].set_title('Original')
        ax[0].axis('off')

        ax[1].imshow(prep_img.squeeze(0).permute(1, 2, 0).cpu())
        ax[1].imshow(cams[0], cmap='jet', alpha=0.5)
        ax[1].set_title('Model 1')
        ax[1].axis('off')

        ax[2].imshow(prep_img.squeeze(0).permute(1, 2, 0).cpu())
        ax[2].imshow(cams[1], cmap='jet', alpha=0.5)
        ax[2].set_title('Model 2')
        ax[2].axis('off')
        plt.show()