In [1]:
from models.resnet import *
from Stack_LIME import *
from coordconv import *

import tqdm

from IPython.display import clear_output

from pytorch_lightning import LightningModule, Trainer

import torch
from torch.utils.data import DataLoader, random_split
import torch.nn as nn
from torch.nn import functional as F

import torchvision
from torchvision import transforms
from torchvision.datasets import CIFAR10

import torchmetrics

torch.manual_seed(17)
tf = transforms.ToPILImage()

In [2]:
def create_folder(name):
    try:
        if not os.path.exists(name):
            os.makedirs(name)
    except OSError:
        print ('Error: Creating directory. ' + name)

In [3]:
class UnNormalize(object):
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        """
        Args:
            tensor (Tensor): Tensor image of size (C, H, W) to be normalized.
        Returns:
            Tensor: Normalized image.
        """
        for t, m, s in zip(tensor, self.mean, self.std):
            t.mul_(s).add_(m)
            # The normalize code -> t.sub_(m).div_(s)
        return tensor

In [None]:
model = resnet50(pretrained=True, progress=True, device="cuda")
#model
model.conv1 = nn.Sequential(CoordConv2d(3, 32, 1, with_r=True),nn.Conv2d(32,64,3))
model

In [36]:
class CNN(LightningModule):
    def __init__(self, batch_size = 200, num_workers = 0, train_dataset = None,
    test_dataset = None):
        super().__init__()
        self.model = resnet50(pretrained=False, progress=True, device="cuda")
        #self.model.conv1 = nn.Sequential(CoordConv2d(3, 32, 1, with_r=True),nn.Conv2d(32,64,3))
        self.num_workers = num_workers
        self.batch_size = batch_size
        self.train_dataset = train_dataset
        self.test_dataset = test_dataset
        self.metrics = torchmetrics.Accuracy()

    def forward(self, x):
        x = self.model(x)
        return x

    def training_step(self, batch, batch_nb):
        x, y = batch
        loss = F.cross_entropy(self(x), y)
        self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        acc = self.metrics(logits, y)
        metrics = {"val_acc": acc, "val_loss": loss}
        self.log_dict(metrics)
        return loss

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        acc = self.metrics(logits, y)
        metrics = {"test_acc": acc, "test_loss": loss}
        self.log_dict(metrics)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.1)

    def setup(self, stage = None):
        if stage == 'fit' or stage is None:
            dataset = self.train_dataset
            dataset_size = dataset.__len__()
            self.train_ds, self.val_ds = random_split(dataset, [int(dataset_size * 0.9) , int(dataset_size * 0.1)])


        if stage == 'test' or stage is None:
            self.test_ds = self.test_dataset

    def train_dataloader(self):
        return DataLoader(self.train_ds, batch_size=self.batch_size, num_workers = self.num_workers)

    def val_dataloader(self):
        return DataLoader(self.val_ds, batch_size=self.batch_size, num_workers = self.num_workers)

    def test_dataloader(self):
        return DataLoader(self.test_ds, batch_size=self.batch_size, num_workers = self.num_workers)

In [37]:
transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
    ])
unorm = UnNormalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))

In [38]:
train_data = CIFAR10('./data/cifar10', train = True, download = False, transform = transform)
test_data = CIFAR10('./data/cifar10', train = False, download = False, transform = transform)
labels = ['airplane', 'automobile', 'bird','cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']


In [39]:
model = CNN(batch_size = 512, num_workers = 0 ,
            train_dataset = train_data, test_dataset = test_data)

In [40]:
trainer = Trainer(accelerator = 'gpu', devices = 1 , max_epochs = 200)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs


In [41]:
model.setup(None)
trainer.test(model)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_acc': 0.18000000715255737, 'test_loss': 3.6212196350097656}
--------------------------------------------------------------------------------


[{'test_acc': 0.18000000715255737, 'test_loss': 3.6212196350097656}]

In [10]:
model.setup('fit')
trainer.validate(model)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
  f"The dataloader, {name}, does not have many workers which may be a bottleneck."


Validating: 0it [00:00, ?it/s]

--------------------------------------------------------------------------------
DATALOADER:0 VALIDATE RESULTS
{'val_acc': 0.9351999759674072, 'val_loss': 0.22464138269424438}
--------------------------------------------------------------------------------


[{'val_acc': 0.9351999759674072, 'val_loss': 0.22464138269424438}]

In [11]:
stack_xai = Stack_XAI(model, transform)

Model Loaded . . . 
Device is a cuda:0. . . 
transform setting finish . . .
Clear!


In [12]:
n = 0
for x, y in tqdm.tqdm(train_data):
    x = tf(unorm(x))
    m = stack_xai.explain(x, XAI = 'LIME', n_seg = 100, n_samples = 100)
    xai_img = stack_xai.get_mask_image(x, m)
    xai_img = Image.fromarray(xai_img)
    create_folder('./data/edit_img_b/'+ labels[y])
    xai_img.save('./data/edit_img_b/'+ labels[y] + '/' + str(n) + '.jpg')
    n += 1
    clear_output()

  return self.target_fn(args[0], **self.target_params)
IOPub message rate exceeded.##########################################        |
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_msg_rate_limit`.

Current values:
ServerApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [16]:
x, y = train_data[0]
model.eval()
with torch.no_grad():
    #batch = torch.stack(tuple(transform(i) for i in img), dim=0)
    #batch = batch.to(self.device)
    logits = model(transform(tf(unorm(x))).cuda().unsqueeze(0))
    output = F.softmax(logits, dim=1)
    output = output.detach().cpu().numpy()
score, label = torch.topk(torch.Tensor(output), 10)
label = label.squeeze_()
dict_prob = {}
for i in range(10):
    label_name = labels[label[i].item()]
    dict_prob[label_name] = score[0][i].item()
dict_prob

{'frog': 0.9865164756774902,
 'cat': 0.003206209046766162,
 'bird': 0.001991337863728404,
 'dog': 0.0015933840768411756,
 'deer': 0.0014924925053492188,
 'airplane': 0.0010641058906912804,
 'ship': 0.001064022770151496,
 'truck': 0.0010457603493705392,
 'horse': 0.0010206550359725952,
 'automobile': 0.0010056134779006243}

In [93]:
labels[y]

'cat'