In [1]:
import torch
import torch.utils.data as data

from torch import nn
from torch.utils.data import DataLoader
from torchvision import transforms
import copy

import ML_Models.ANN.model as model_ann
import ML_Models.data_loader as loader

import pandas as pd
from sklearn.preprocessing import MinMaxScaler, StandardScaler

from torch.utils.data import DataLoader
from torchvision import transforms

from Data_Sets.Gaussian import dgp_gaussian

from explainers.grad import Gradient
from explainers.ebp import EBP
from explainers.integrated_gradients import IntegratedGradients
from explainers.guided_backprop import GuidedBackprop
from explainers.smoothgrad import SmoothGrad
from explainers.gradcam import GradCAM
from explainers.guided_gradcam import GuidedGradCAM
from explainers.lrp import LRP
from explainers.input_x_gradient import InputTimesGradient

import torch
import numpy as np
from typing import Union

from torch import nn
import torch.nn.functional as F

In [2]:
class DataLoader_Tabular(data.Dataset):
    def __init__(self, path, filename, label, scale='minmax', gauss_params=None):
        
        """
        Load training dataset
        :param path: string with path to training set
        :param label: string, column name for label
        :param scale: string; either 'minmax' or 'standard'
        :param dict: standard params of gaussian dgp
        :return: tensor with training data
        """
        
        self.path = path
        # Load Gaussian data
        if self.path == 'gaussian':
            if gauss_params is None:
                gauss_params = {
                    'n_samples': 2500,
                    'dim': 25,
                    'n_clusters': 10,
                    'distance_to_center': 5,
                    'test_size': 0.25,
                    'upper_weight': 1,
                    'lower_weight': -1,
                    'seed': 564,
                    'sigma': None,
                    'sparsity': 0.25
                }
            
            data_dict, data_dict_train, data_dict_test = dgp_gaussian.generate_gaussians(gauss_params['n_samples'],
                                                        gauss_params['dim'],
                                                        gauss_params['n_clusters'],
                                                        gauss_params['distance_to_center'],
                                                        gauss_params['test_size'],
                                                        gauss_params['upper_weight'],
                                                        gauss_params['lower_weight'],
                                                        gauss_params['seed'],
                                                        gauss_params['sigma'],
                                                        gauss_params['sparsity']).dgp_vars()
            
            self.ground_truth_dict = data_dict
            self.target = label
            
            if filename == 'train':
                data_dict = data_dict_train
            else:
                data_dict = data_dict_test
                
            self.dataset = pd.DataFrame(data_dict['data'])
            data_y = pd.DataFrame(data_dict['target'])
            
            names = []
            for i in range(gauss_params['dim']):
                name = 'x' + str(i)
                names.append(name)
                
            self.dataset.columns = names
            self.dataset['y'] = data_y
            
            # add additional Gaussian related aspects
            self.probs = data_dict['probs']
            self.masks = data_dict['masks']
            self.weights = data_dict['weights']
            self.masked_weights = data_dict['masked_weights']
            self.cluster_idx = data_dict['cluster_idx']
            
        else:
            self.dataset = pd.read_csv(path + filename)
            self.target = label
        
        # Cleaning Routine

        # Save target and predictors
        self.X = self.dataset.drop(self.target, axis=1)
        
        # Save feature names
        self.feature_names = self.X.columns.to_list()
        self.target_name = label

        # Transform data
        if scale == 'minmax':
            self.scaler = MinMaxScaler()
        elif scale == 'standard':
            self.scaler = StandardScaler()
            
        self.scaler.fit_transform(self.X)
        
        self.data = self.scaler.transform(self.X)
        self.targets = self.dataset[self.target]
        
    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        # select correct row with idx
        if isinstance(idx, torch.Tensor):
            idx = idx.tolist()
        
        if self.path == 'gaussian':
            return (self.data[idx], self.targets.values[idx], self.weights[idx], self.masks[idx],
                    self.masked_weights[idx], self.probs[idx], self.cluster_idx[idx])
        else:
            return (self.data[idx], self.targets.values[idx])

    def get_number_of_features(self):
        return self.data.shape[1]
    
    def get_number_of_instances(self):
        return self.data.shape[0]
    
def return_loaders(data_name, is_tabular, batch_size=32, transform=None, scaler='minmax', gauss_params=None):
    
    if is_tabular:
        transform = None
    else:
        if transform is not None:
            transform = transform
        else:
            # Standard Transforms
            if data_name == 'mnist':
                transform = transforms.Compose([transforms.ToTensor(),
                                                transforms.Normalize((0.1307,), (0.3081,))])
            elif data_name == 'cifar10':
                transform = transforms.Compose([transforms.ToTensor(),
                                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
            # Not supported data sets
            else:
                raise ValueError
            
    # Dictionary
    dict = {'mnist': ('MNIST', transform, is_tabular, None),
            'cifar10': ('CIFAR10', transform, is_tabular, None),
            'adult': ('Adult', transform, is_tabular, 'income'),
            'compas': ('COMPAS', transform, is_tabular, 'risk'),
            'german': ('German_Credit_Data', transform, is_tabular, 'credit-risk'),
            'gaussian': ('Gaussian', transform, is_tabular, 'y')
            }
    
    if dict[data_name][2]:
        
        if dict[data_name][0] == 'Gaussian':
            prefix = 'gaussian'
            file_train = 'train'
            file_test = 'test'
        else:
            prefix = './Data_Sets/' + dict[data_name][0] + '/'
            file_train = data_name + '-train.csv'
            file_test = data_name + '-test.csv'

        dataset_train = DataLoader_Tabular(path=prefix, filename=file_train, label=dict[data_name][3],
                                           scale=scaler, gauss_params=gauss_params)
    
        dataset_test = DataLoader_Tabular(path=prefix, filename=file_test, label=dict[data_name][3],
                                          scale=scaler, gauss_params=gauss_params)
    else:
        if data_name == 'mnist':
            dataset_train = MNIST(root='./Data_Sets/' + dict[data_name][0] + '/', train=True, download=True,
                                  transform=dict[data_name][1])
            dataset_test = MNIST(root='./Data_Sets/' + dict[data_name][0] + '/', train=False, download=True,
                                 transform=dict[data_name][1])
    
        elif data_name == 'cifar10':
            dataset_train = CIFAR10(root='./Data_Sets/' + dict[data_name][0] + '/', train=True, download=True,
                                    transform=dict[data_name][1])
            dataset_test = CIFAR10(root='./Data_Sets/' + dict[data_name][0] + '/', train=False, download=True,
                                   transform=dict[data_name][1])

    trainloader = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
    testloader = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)
    
    return trainloader, testloader

In [3]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.GAP = nn.AvgPool2d(kernel_size=5)
        
        self.fc1 = nn.Linear(32, 84)
        self.fc2 = nn.Linear(84, 2)
        self.relu1 = nn.ReLU()
        self.relu2 = nn.ReLU()
        self.relu3 = nn.ReLU()
        
        #self.softmax = nn.Softmax()

    def forward(self, x):
        x = self.relu1(self.conv1(x))
        x = self.relu2(self.conv2(x))
        x = self.GAP(x)
        
        print(x.shape)
        #x = torch.flatten(x, 1)  # flatten all dimensions except batch
        x = x.view(-1, 32)
        print(x.shape)
        x = self.relu3(self.fc1(x))
        x = self.fc2(x)
        #x = F.softmax(x)
        return x
    
    def L_relu3(self, x):
        x = self.relu1(self.conv1(x))
        x = self.relu2(self.conv2(x))
        x = x.view(-1, 32)
        #x = self.relu3(self.fc1(x))
        return x

In [4]:
def training(model, train_loader, test_loader, learning_rate, epochs, dataset,
             adv_train_params=None):
    
    loaders = {'train': train_loader,
               'test': test_loader}
    
    # model collector
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0
    
    # Use GPU if available
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    # declaring optimizer and loss
    if dataset == ('mnist' or 'cifar10' or 'gaussian'):
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    else:
        optimizer = torch.optim.RMSprop(model.parameters(), lr=learning_rate)
        criterion = nn.BCELoss()

    # training
    for e in range(epochs):
        print('Epoch {}/{}'.format(e, epochs - 1))
        print('-' * 10)
        
        # Each epoch has a training and validation phase
        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluation mode
    
            running_loss = 0.0
            running_corrects = 0.0

            target_true = 0.0
            predicted_true = 0.0
            correct_true = 0.0

            if dataset == 'gaussian':
                for i, (inputs, labels, weights, masks, masked_weights, probs, cluster_idx) in enumerate(loaders[phase]):
    
                    inputs = inputs.to(device).view(-1, 1, 5, 5)
                    labels = labels.to(device).type(torch.long)
                    #labels_one_hot = torch.nn.functional.one_hot(labels)
                    labels_one_hot = torch.stack([1-labels, labels], dim=1)

                    optimizer.zero_grad()
        
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs.float()).float()
            
                        loss = criterion(outputs, labels_one_hot.float())
            
                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(torch.argmax(outputs, dim=1) == labels.float())

                    # compute f1 score
                    predicted_classes = torch.argmax(outputs, dim=1) == 0
                    target_classes = labels.float()
                    target_true += torch.sum(target_classes == 0).float()
                    predicted_true += torch.sum(predicted_classes).float()
                    correct_true += torch.sum((predicted_classes*1 == target_classes) == (predicted_classes*1 == 0)).float()
                    
            else:
                
                for i, (inputs, labels) in enumerate(loaders[phase]):
        
                    inputs = inputs.to(device)
                    labels = labels.to(device).type(torch.long)
    
                    optimizer.zero_grad()
        
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs.float())
                        
                        loss = criterion(outputs, labels)
            
                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()
            
                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(torch.round(outputs) == labels.float())

            recall = correct_true / target_true
            precision = correct_true / predicted_true
            f1_score = 2 * precision * recall / (precision + recall)
            
            epoch_loss = running_loss / len(loaders[phase].dataset)
            epoch_acc = running_corrects / len(loaders[phase].dataset)
    
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
            print('{} F1: {:.4f} Prec: {:.4f} Recal: {:.4f}'.format(phase, f1_score, precision, recall))

            # deep copy the model
            if phase == 'test' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

            print()

    if adv_train_params['type'] == 'none':
        # save vanilla model
        torch.save(model.state_dict(best_model_wts),
                   'ML_Models/Saved_Models/CNN/{}_lr_{}_acc_{:.2f}.pt'.format(dataset, learning_rate, epoch_acc))

In [5]:
adv_training_params = {'type': 'none',      
                       'parameter': None}  

dataset_train = DataLoader_Tabular(path='gaussian',
                                          filename='train', label='y')

dataset_test = DataLoader_Tabular(path='gaussian',
                                         filename='test', label='y')
input_size = dataset_train.get_number_of_features()

In [6]:
torch.tensor(dataset_test.data).shape

torch.Size([6250, 25])

In [7]:
torch.tensor(dataset_test.data).view(-1, 5, 5).shape

torch.Size([6250, 5, 5])

In [8]:
torch.tensor(dataset_test.targets)

tensor([1, 1, 1,  ..., 0, 1, 0], dtype=torch.int32)

In [9]:
# Define the model
model = CNN()

trainloader = DataLoader(dataset_train, batch_size=64, shuffle=True)
testloader = DataLoader(dataset_test, batch_size=64, shuffle=True)

# Train the model
#training(model, trainloader, testloader, 0.002, epochs=85, dataset='gaussian', adv_train_params=adv_training_params)

In [10]:
'''
Load CNN Model
'''
model_path = 'ML_Models/Saved_Models/CNN/gaussian_lr_0.002_acc_0.89.pt'
ann = CNN()
ann.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))

<All keys matched successfully>

In [11]:
ann.eval()

CNN(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (GAP): AvgPool2d(kernel_size=5, stride=5, padding=0)
  (fc1): Linear(in_features=32, out_features=84, bias=True)
  (fc2): Linear(in_features=84, out_features=2, bias=True)
  (relu1): ReLU()
  (relu2): ReLU()
  (relu3): ReLU()
)

In [12]:
'''
Gaussian Data DGP
'''
gauss_train_input = testloader.dataset.ground_truth_dict
data_iter = iter(testloader)
inputs, labels, weights, masks, masked_weights, probs, cluster_idx = data_iter.next()
gaussian_all = torch.FloatTensor(testloader.dataset.data)

In [13]:
labels = labels.type(torch.int64)
inputs = inputs.view(-1,1,5,5).float()

In [14]:
ann(inputs)

torch.Size([64, 32, 1, 1])
torch.Size([64, 32])


tensor([[-6.8965,  6.8511],
        [ 2.4575, -2.5295],
        [-6.3613,  6.3105],
        [ 4.6080, -4.6685],
        [-4.9750,  4.9100],
        [ 4.7030, -4.7091],
        [-6.6664,  6.6219],
        [-0.9446,  0.8390],
        [-4.8809,  4.8108],
        [-2.3244,  2.2378],
        [ 2.9469, -3.0168],
        [-5.0472,  4.9859],
        [ 3.4925, -3.5435],
        [ 2.5230, -2.6092],
        [ 3.2277, -3.2917],
        [ 6.5061, -6.4718],
        [ 1.4567, -1.5317],
        [ 0.5141, -0.6176],
        [ 0.6486, -0.7642],
        [-0.2123,  0.1325],
        [-0.1646,  0.0587],
        [-3.7220,  3.6326],
        [-4.5136,  4.4452],
        [-3.4121,  3.3181],
        [ 3.2772, -3.3788],
        [ 0.4101, -0.4867],
        [ 2.2603, -2.3605],
        [-0.7450,  0.6561],
        [-3.1577,  3.0524],
        [ 1.2560, -1.3403],
        [-4.4683,  4.3957],
        [-4.4753,  4.4075],
        [-4.7074,  4.6326],
        [-0.0290, -0.0594],
        [ 5.3316, -5.4115],
        [ 5.7982, -5

In [15]:
grad = Gradient(ann)
grad_exp = grad.get_explanation(inputs, labels).numpy()
grad_exp.shape

torch.Size([64, 32, 1, 1])
torch.Size([64, 32])


  "required_grads has been set automatically." % index


(64, 1, 5, 5)

In [16]:
'''
Testing IG on Gaussian Data
'''
ig = IntegratedGradients(ann)
exp_ig = ig.get_explanation(inputs, labels).detach().numpy()
exp_ig.shape

torch.Size([3200, 32, 1, 1])
torch.Size([3200, 32])


(64, 1, 5, 5)

In [17]:
'''
Testing EBP on Gaussian Data
'''
ebp = EBP(ann)
exp_ebp = ebp.get_explanation(inputs, labels).numpy()
exp_ebp.shape

torch.Size([64, 32, 1, 1])
torch.Size([64, 32])
tensor([[[[0.3777, 0.0086, 0.0095, 0.0292, 0.0061],
          [0.0562, 0.0182, 0.0260, 0.0294, 0.0125],
          [0.0363, 0.0459, 0.0242, 0.1213, 0.0041],
          [0.0135, 0.0177, 0.0068, 0.0417, 0.0064],
          [0.0345, 0.0158, 0.0241, 0.0058, 0.0282]]],


        [[[0.0189, 0.0171, 0.0055, 0.0045, 0.0072],
          [0.0402, 0.0361, 0.2884, 0.0048, 0.0095],
          [0.0311, 0.0890, 0.0514, 0.0187, 0.1203],
          [0.0184, 0.0290, 0.0531, 0.0537, 0.0100],
          [0.0442, 0.0050, 0.0121, 0.0281, 0.0038]]],


        [[[0.2701, 0.0149, 0.0106, 0.0386, 0.0081],
          [0.0213, 0.0343, 0.0105, 0.0320, 0.0107],
          [0.1231, 0.1186, 0.0251, 0.0820, 0.0177],
          [0.0143, 0.0180, 0.0093, 0.0555, 0.0153],
          [0.0013, 0.0131, 0.0182, 0.0085, 0.0288]]],


        ...,


        [[[0.0039, 0.0098, 0.0273, 0.2761, 0.0021],
          [0.0262, 0.0139, 0.0182, 0.0228, 0.0262],
          [0.0061, 0.1300, 0.0778, 0.0516

(64, 1, 5, 5)

In [18]:
'''
Testing LRP on Gaussian Data
'''
lrp = LRP(ann)
exp_lrp = lrp.get_explanation(inputs, labels)
exp_lrp.shape

torch.Size([64, 32, 1, 1])
torch.Size([64, 32])
torch.Size([64, 32, 1, 1])
torch.Size([64, 32])


torch.Size([64, 1, 5, 5])

In [24]:
inputs.requires_grad=True

In [25]:
'''
Testing GuidedGradCAM on Gaussian Data
'''
ggc = GuidedGradCAM(ann, ann.L_relu3)
ggc_exp = ggc.get_explanation(inputs, labels)
ggc_exp.shape

TypeError: 'method' object is not iterable

In [None]:
'''
Testing GradCAM on Gaussian Data
'''
gradcam = GradCAM(ann, ann.conv)
exp_gradcam = gradcam.get_explanation(inputs, labels)
exp_gradcam.shape