In [1]:
import os
import sys
sys.path.append("/home/jakob/Documents/PhD/Projects/ModalityRelevanceScores/code")
sys.path.append("/home/jakob/Documents/PhD/Projects/ModalityRelevanceScores/")
import time 
import torch
from captum.attr import LRP

from models.NetworkMapper import to_base_propagation, to_contribution_propagation
from datasets.MNIST import MNIST, MNIST_SUM, MNIST_DataNoise, MNIST_LabelNoise, MNIST_ImgShuffle;
from ContributionPropagation.Contribution import PerceptualScore;

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class Net(torch.nn.Module):
    def __init__(self, input_shape=[28,28], nclasses=10):
        super(Net, self).__init__()

        self.relu = torch.nn.ReLU()
        self.nclasses = nclasses
        self.branch1 = torch.nn.Sequential(
            torch.nn.Conv2d(1, 32, 3, 1),
            torch.nn.ReLU(),
            torch.nn.Conv2d(32, 64, 3, 1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(2),
            #torch.nn.Dropout(0.25),
            )   #torch.nn.Flatten(-3))

        self.branch2 = torch.nn.Sequential(
            torch.nn.Conv2d(1, 32, 3, 1),
            torch.nn.ReLU(),
            torch.nn.Conv2d(32, 64, 3, 1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(2),
            #torch.nn.Dropout(0.25),
            ) #torch.nn.Flatten(-3)) # 12 x 12 x 64

        self.dropout = torch.nn.Dropout(0.5)
        self.fc1 = torch.nn.Linear(2 * ((input_shape[0]-4) // 2) * ((input_shape[1]-4) // 2) * 64, 128)
        self.fc2 = torch.nn.Linear(128, nclasses)

    def forward(self, input):

        #assert type(input) is list
        if len(input) != 2 and len(input[0]) == 2: 
            input = input.permute([1,0,2,3,4])
        assert len(input) == 2
        
        x1_in, x2_in = input[0], input[1]

        x1 = self.branch1(x1_in)
        x2 = self.branch2(x2_in)

        x1 = torch.flatten(x1, start_dim=1)
        x2 = torch.flatten(x2, start_dim=1)

        x = torch.cat([x1,x2], -1)

        x = self.fc1(x)
        x = self.relu(x)
        #x = self.dropout(x)
        x = self.fc2(x)
        output = x
        return output


In [3]:
device = "cuda" if torch.cuda.is_available() else "cpu"

print("Device:", device)

Device: cuda


# Basic Models

In [4]:
for no_gpu in [True]:
  if no_gpu: device = "cpu"

  with torch.no_grad():
    for all_at_once in [True]:
      for inv1 in [False]:
        for inv2 in [False]:
          for split in [True]:
            for seed in [3]:
              # for factor in [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]:
              f_list = [150, 250, 500] #[1,2,5,10,15,25,50,100,150,250,500]
              for f_id, factor in enumerate(f_list): #[40, 50, 75, 100, 200, 250, 500, 1000]:

                print(f"Seed: {seed} facor {f_id+1} of {len(f_list)}")
                input_shape = [28,14] if split else [28,28]
              
                model = Net(input_shape=input_shape)

                exp_folder = "/home/jakob/Documents/PhD/Projects/ModalityRelevanceScores/0001_MNIST_Train_and_Evaluate/Experiments/Basic"
                exp_path = os.path.join(exp_folder, f"basic_img_experiments_seed{seed}_inv1{inv1}_inv2{inv2}_split{split}.pth")

                res_file = torch.load(exp_path, map_location="cpu")

                model_state_dict = res_file["final_model_state_dict"]

                model.load_state_dict(model_state_dict)

                model = to_contribution_propagation(model).to(device)

                data = MNIST(train=False, split_image=split, invert1=inv1, invert2=inv2);

                img1_tensor = torch.stack([data[i]["img1"] for i in range(len(data))],0)
                img2_tensor = torch.stack([data[i]["img2"] for i in range(len(data))],0)
                labels = torch.as_tensor([data[i]["label"] for i in range(len(data))]).unsqueeze(1)

                num_samples_list = [1, 5, 10, 25, 50, 100, 250] #, 500]

                times = {"prediction_single": [],
                        "RFP": [],
                        "shape": [],
                        "perc": dict([(i, []) for i in num_samples_list]),
                        "lrp": [],
                        "lrp_single": []
                        }

                model = to_base_propagation(model)

                if not all_at_once:
                  for s in range(len(data)):
                    print(f"Seed {seed}: {s+1} of {len(data)}                 ", end="\r")

                    # ImageClassifier takes a single input tensor of images Nx3x32x32,
                    # and returns an Nx10 tensor of class probabilities. It has one
                    # Conv2D and a ReLU layer.

                    lrp = LRP(model)
                    # Attribution size matches input size: 3x3x32x32

                    img1 = img1_tensor[s].unsqueeze(0)
                    img2 = img2_tensor[s].unsqueeze(0)

                    start_time = time.time()
                    attribution = lrp.attribute(torch.stack([img1, img2],0).to(device), target=5)
                    times["lrp_single"].append(time.time()-start_time)
                    del attribution

                    img1 = img1_tensor[s].unsqueeze(0)
                    img2 = img2_tensor[s].unsqueeze(0)

                    start_time = time.time()
                    attribution = lrp.attribute(torch.repeat_interleave(torch.stack([img1, img2],0), 10, dim=1).permute([1,0,2,3,4]).to(device), target=[0,1,2,3,4,5,6,7,8,9])
                    times["lrp"].append(time.time()-start_time)
                    del attribution

                  model = to_contribution_propagation(model).to(device)

                  for s in range(len(data)):
                    print(f"Seed {seed}: {s+1} of {len(data)}                 ", end="\r")
                    
                    # SHAPE Score

                    img1 = img1_tensor[s].unsqueeze(0)
                    img2 = img2_tensor[s].unsqueeze(0)  

                    zeros1 = torch.zeros_like(img1)
                    zeros2 = torch.zeros_like(img2)

                    # First Input:    Original combination
                    # Second part:    modality 2 random
                    #  Third part:    modality 1 random
                    x1 = [img1, img1, zeros1]
                    x2 = [img2, zeros2, img2]

                    x1 = torch.cat(x1, 0)
                    x2 = torch.cat(x2, 0)

                    start_time = time.time() 
                    _ =  model([x1.to(device),x2.to(device)])
                    times["shape"].append(time.time() - start_time) 


                    # Single Prediction
                    img1 = img1_tensor[s].unsqueeze(0)
                    img2 = img2_tensor[s].unsqueeze(0)  
                    start_time = time.time()          
                    _ =  model([img1.to(device), img2.to(device)])
                    times["prediction_single"].append(time.time() - start_time)
                    del img1
                    del img2

                    img1 = img1_tensor[s].unsqueeze(0)
                    img2 = img2_tensor[s].unsqueeze(0)  
                    x1 = torch.stack([
                                        torch.zeros_like(img1),
                                        img1,
                                        torch.zeros_like(img1)
                                    ], 0)
                    
                    x2 = torch.stack([
                                        torch.zeros_like(img2),
                                        torch.zeros_like(img2),
                                        img2
                                    ], 0)       
                        
                    # Relevance Forward Propagation
                    start_time = time.time() 
                    _ =  model([x1.to(device),x2.to(device)])
                    times["RFP"].append(time.time() - start_time)
                    del x1
                    del x2
                    del img1
                    del img2

                    # Perceptual Score
                    for num_samples in num_samples_list:

                      img1 = img1_tensor[s].unsqueeze(0)
                      img2 = img2_tensor[s].unsqueeze(0)  
                      shuffle_ids = torch.randperm(len(data))[:num_samples]
                      comp_samples1 = img1_tensor[shuffle_ids]
                      comp_samples2 = img2_tensor[shuffle_ids]

                      x1 = torch.cat([img1, img1.repeat(num_samples,1,1,1), comp_samples1], 0)
                      x2 = torch.cat([img2, comp_samples2, img2.repeat(num_samples,1,1,1)], 0)

                      start_time = time.time()               
                      _ =  model([x1.to(device),x2.to(device)])
                      times["perc"][num_samples].append(time.time() - start_time) 
                      del x1
                      del x2
                      del img1
                      del img2    

                  print("Save perceptual score")

                  if no_gpu:
                    save_path_times = exp_path.replace("basic_img", f"basic_img_TimeMeasure_single_cpu")
                  else:
                    save_path_times = exp_path.replace("basic_img", f"basic_img_TimeMeasure_single")

                  torch.save(times, save_path_times)

                else:
                    # Attribution size matches input size: 3x3x32x32
                    print(f"Seed: {seed} facor {f_id+1} of {len(f_list)}")

                    for k in range(int(10000/factor)):
                      img1 = img1_tensor[k*factor:(k+1)*factor].clone()
                      img2 = img2_tensor[k*factor:(k+1)*factor].clone()
                      img_labels = labels[k*factor:(k+1)*factor].clone()
                      lrp = LRP(model)

                      start_time = time.time()
                      _ = lrp.attribute(torch.stack([img1, img2],0).permute([1,0,2,3,4]).to(device), target=img_labels.squeeze().to(device))
                      times["lrp_single"].append(time.time()-start_time)
                      del img1
                      del img2
                      del lrp

                    for k in range(int(10000/factor)):
                      img1 = img1_tensor[k*factor:(k+1)*factor].clone()
                      img2 = img2_tensor[k*factor:(k+1)*factor].clone()
                      img_labels = labels[k*factor:(k+1)*factor].clone()
                      lrp = LRP(model)

                      start_time = time.time()
                      _ = lrp.attribute(torch.repeat_interleave(torch.stack([img1, img2],0), 10, dim=1).permute([1,0,2,3,4]).to(device), target=torch.repeat_interleave(img_labels.view(-1),10, dim=0).to(device))
                      times["lrp"].append(time.time()-start_time)
                      del img1
                      del img2
                      del lrp

                    model = to_contribution_propagation(model).to(device)
                    print(f"Seed: {seed} facor {f_id+1} of {len(f_list)}")

                    
                    # SHAPE Score
                    for k in range(int(10000/factor)):
                      img1 = img1_tensor[k*factor:(k+1)*factor].clone()
                      img2 = img2_tensor[k*factor:(k+1)*factor].clone()
                      img_labels = labels[k*factor:(k+1)*factor].clone()

                      zeros1 = torch.zeros_like(img1)
                      zeros2 = torch.zeros_like(img2)

                      # First Input:    Original combination
                      # Second part:    modality 2 random
                      #  Third part:    modality 1 random
                      x1 = [img1, img1, zeros1]
                      x2 = [img2, zeros2, img2]

                      x1 = torch.cat(x1, 0)
                      x2 = torch.cat(x2, 0)

                      start_time = time.time() 
                      _ =  model([x1.to(device),x2.to(device)])
                      times["shape"].append(time.time() - start_time)
                      del x1
                      del x2
                      del img1
                      del img2     

                    # Single Prediction
                    for k in range(int(10000/factor)):
                      img1 = img1_tensor[k*factor:(k+1)*factor].clone()
                      img2 = img2_tensor[k*factor:(k+1)*factor].clone()
                      img_labels = labels[k*factor:(k+1)*factor].clone()

                      start_time = time.time()          
                      _ =  model([img1.to(device), img2.to(device)])
                      times["prediction_single"].append(time.time() - start_time)
                      del img1
                      del img2     

                    for k in range(int(10000/factor)):
                      img1 = img1_tensor[k*factor:(k+1)*factor].clone()
                      img2 = img2_tensor[k*factor:(k+1)*factor].clone()
                      img_labels = labels[k*factor:(k+1)*factor].clone()

                      x1 = torch.stack([
                                          torch.zeros_like(img1),
                                          img1,
                                          torch.zeros_like(img1)
                                      ], 0)
                      
                      x2 = torch.stack([
                                          torch.zeros_like(img2),
                                          torch.zeros_like(img2),
                                          img2
                                      ], 0)       
                          
                      # Relevance Forward Propagation
                      start_time = time.time() 
                      _ =  model([x1.to(device),x2.to(device)])
                      times["RFP"].append(time.time() - start_time)
                      del x1
                      del x2
                      del img1
                      del img2         


                    # Perceptual Score
                    for num_samples in num_samples_list:

                      for k in range(int(10000/factor)):

                        img1 = []
                        img2 = []
                        comp_samples1 = []
                        comp_samples2 = []

                        for s in range(k*factor,(k+1)*factor):
                            shuffle_ids = torch.randperm(len(data))[:num_samples]
                            img1.append(img1_tensor[s:s+1].clone())
                            img2.append(img2_tensor[s:s+1].clone())
                            comp_samples1.append(img1_tensor[shuffle_ids].clone())
                            comp_samples2.append(img2_tensor[shuffle_ids].clone())

                        img1 = torch.cat(img1, 0)
                        img2 = torch.cat(img2, 0)
                        comp_samples1 = torch.cat(comp_samples1, 0)
                        comp_samples2 = torch.cat(comp_samples2, 0)

                        x1 = torch.cat([img1, img1.repeat(num_samples,1,1,1), comp_samples1], 0)
                        x2 = torch.cat([img2, comp_samples2, img2.repeat(num_samples,1,1,1)], 0)

                        start_time = time.time()               
                        _ =  model([x1.to(device),x2.to(device)])
                        times["perc"][num_samples].append(time.time() - start_time)
                        del x1
                        del x2
                        del img1
                        del img2                
                        del comp_samples1
                        del comp_samples2

                    print("Save perceptual score")

                    if no_gpu:
                      save_path_times = exp_path.replace("basic_img", f"basic_img_TimeMeasure_batch{factor}_cpu")
                    else:
                      save_path_times = exp_path.replace("basic_img", f"basic_img_TimeMeasure_batch{factor}")

                    torch.save(times, save_path_times)

                del times

Seed: 3 facor 1 of 3
Applied monkey patches for contribution propagation.
 Call Recursive:  ('branch1', Sequential(
  (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
  (1): ReLU()
  (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (3): ReLU()
  (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
))
 Call Recursive:  ('branch2', Sequential(
  (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
  (1): ReLU()
  (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (3): ReLU()
  (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
))


  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


Removed all monkey patches for contribution propagation.
 Call Recursive:  ('branch1', Sequential(
  (0): Conv2d(
    (layer): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
  )
  (1): ReLU()
  (2): Conv2d(
    (layer): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  )
  (3): ReLU()
  (4): MaxPool2d(
    (unfold): Unfold(kernel_size=(2, 2), dilation=1, padding=0, stride=[2, 2])
    (layer): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
))
 Call Recursive:  ('branch2', Sequential(
  (0): Conv2d(
    (layer): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
  )
  (1): ReLU()
  (2): Conv2d(
    (layer): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  )
  (3): ReLU()
  (4): MaxPool2d(
    (unfold): Unfold(kernel_size=(2, 2), dilation=1, padding=0, stride=[2, 2])
    (layer): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
))
Seed: 3 facor 1 of 3


  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


Applied monkey patches for contribution propagation.
 Call Recursive:  ('branch1', Sequential(
  (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
  (1): ReLU()
  (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (3): ReLU()
  (4): MaxPool2d(kernel_size=(2, 2), stride=[2, 2], padding=0, dilation=1, ceil_mode=False)
))
 Call Recursive:  ('branch2', Sequential(
  (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
  (1): ReLU()
  (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (3): ReLU()
  (4): MaxPool2d(kernel_size=(2, 2), stride=[2, 2], padding=0, dilation=1, ceil_mode=False)
))
Seed: 3 facor 1 of 3


: 