In this notebook page, the two models would be trained and be used to predict pictures.
In the Following parts, the two models, loss and accuracy curves, consusion matrix and interperting using captum would be shown.

In [None]:
%matplotlib inline

In [None]:
# import necessary packages
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch
import os
from torch.utils.data import random_split
import torchvision
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
from PIL import Image

Prepare work before training the model

1. show the number of dataset for trainin and testing. Such function is shown below. By doin this parts, users could see clearly how the dataset consists of and could use this when doing further analyse.

In [None]:
def show_number():
    picture_frames = pd.read_csv('/kaggle/input/deep-learning-for-msc-2022-23/train.csv')
    labels = picture_frames.iloc[:,1]
    labels = labels.to_list()
    x = set(labels)
    y = []
    for a in x:
        y.append(labels.count(a))
    x = list(x)
    plt.figure(figsize=(5, 4))
    plt.bar(x, y, facecolor='#1f77b4', edgecolor='k')
    # plt.xticks(rotation=90)
    plt.tick_params(labelsize=15)
    plt.xlabel('class', fontsize=10)
    plt.ylabel('number of data', fontsize=10)

2. create the dataset class to load the pictures in to RAM and change picture file into tensor. A parameter is added as it could create training
dataset and test dataset.

In [None]:
class CellPictureDataset(Dataset):

    def __init__(self, root_dir, csv_file=None, transform=None, test=False):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        if csv_file is not None:
            self.data_frame = pd.read_csv(csv_file)
        self.root_dir = os.path.abspath(root_dir)
        self.transform = transform
        self.test = test

    def __len__(self):
        return len(os.listdir(self.root_dir))

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        if self.test is False:
            img_name = os.path.join(self.root_dir,
                                    self.data_frame.iloc[idx, 0])
        else:
            img_name = os.path.join(self.root_dir, os.listdir(self.root_dir)[idx])
        image = Image.open(img_name)
        if self.transform:
            self.image = self.transform(image)
        if self.test is False:
            label = self.data_frame.iloc[idx, 1]
            return self.image, label
        else:
            return self.image

3. design my own convnet. I generate it as a 4-layer convnet, and set the last one layer as a hyperparameter for further modifying.

In [None]:
import collections
class MyConvNet(nn.Module):
    def __init__(self, l1=120):
        super().__init__()
        self.model = nn.Sequential(collections.OrderedDict([
            ('conv1', nn.Conv2d(3,32,5,stride=5)),
            ('relu1', nn.ReLU()),
            ('conv2', nn.Conv2d(32,16,5,stride=2)),
            ('relu2', nn.ReLU()),
            # Put in a linear layers ...
            ('flatten', nn.Flatten()),                                          
            ('fc1', nn.Linear(80*80,l1)),
            ('relu3', nn.ReLU()),
            ('fc3', nn.Linear(l1,4)),
        ]))

    def forward(self, x):
        x = self.model(x)
        return x


4. visualize a batch. Before using the dataset to train the model, it is necessary to deal with the pictures. For example, it is a good way to 
reshaple the pictures into a same shape, which could improve the robustness of the model.

In [None]:
from mpl_toolkits.axes_grid1 import ImageGrid

def show_a_batch():

    train_transform = transforms.Compose([transforms.RandomResizedCrop(224),
                                          transforms.RandomHorizontalFlip(),
                                          transforms.ToTensor(),
                                          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                         ])
    train_dataset = CellPictureDataset('/kaggle/input/deep-learning-for-msc-2022-23/train', csv_file='/kaggle/input/deep-learning-for-msc-2022-23/train.csv', transform=train_transform)
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=0)

    images, labels = next(iter(train_loader))

    images = images.numpy()

    n=6
    fig = plt.figure(figsize=(10, 10))
    grid = ImageGrid(fig, 111,  # 类似绘制子图 subplot(111)
                    nrows_ncols=(n, n),  # 创建 n 行 m 列的 axes 网格
                    axes_pad=0.02,  # 网格间距
                    share_all=True
                    )

    # 遍历每张图像
    for ax, im in zip(grid, images):
        ax.imshow(im.transpose((1,2,0)))
        ax.axis('off')

    plt.tight_layout()

Using ray tune to modyfy hyperparameters

In this parts, the functions are used to modify the hyperparameters of the model. The benefits of modify such hyperparameters are, for example, 
preventing overfitting during training and, improve accuracy of the model.

1. training function for model_1

In [None]:
# the function for trianing model_1, in this function, set number of last connection layer nurons, learnin rate, monmentum, batch size as hyperparameter
def train_cifar(config, checkpoint_dir=None, data_dir=None):
    net = MyConvNet(config["l1"])
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda:0"
        if torch.cuda.device_count() > 1:
            net = nn.DataParallel(net)
    net.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=config["lr"], momentum=config["momentum"])

    csv_dir = "/kaggle/input/deep-learning-for-msc-2022-23/train.csv"
    train_transform = transforms.Compose([transforms.RandomResizedCrop(224),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                     ])
    train_dir = "/kaggle/input/deep-learning-for-msc-2022-23/train"
    trainset = CellPictureDataset(train_dir, csv_file=csv_dir, transform=train_transform)

    test_abs = int(len(trainset) * 0.8)
    train_subset, val_subset = random_split(
        trainset, [test_abs, len(trainset) - test_abs])

    trainloader =DataLoader(
        train_subset,
        batch_size=int(config["batch_size"]),
        shuffle=True,
        num_workers=2,
        pin_memory=True)
    valloader = DataLoader(
        val_subset,
        batch_size=int(config["batch_size"]),
        shuffle=True,
        num_workers=2,
        pin_memory=True)

    for epoch in range(35):  # loop over the dataset multiple times
        running_loss = 0.0
        epoch_steps = 0
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            net.train()
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            epoch_steps += 1
#             print(outputs.device)
            # if i % 100 == 99:  # print every 2000 mini-batches
            #     print("[%d, %5d] loss: %.3f" % (epoch + 1, i + 1,
            #                                     running_loss / epoch_steps))
            #     running_loss = 0.0

        # Validation loss
        val_loss = 0.0
        val_steps = 0
        total = 0
        correct = 0
        for i, data in enumerate(valloader, 0):
            with torch.no_grad():
#                 net.eval()
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = net(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                loss = criterion(outputs, labels)
                val_loss += loss.cpu().numpy()
                val_steps += 1

        tune.report(loss=(val_loss / val_steps), accuracy=correct / total)
#     print("Finished Training")

2. training function for model_2. The used model is resnet18, which is the best choice from resnet50, vgg16, etc. Through testing, a complex model may be not suitable for this dataset. When used vgg16, the loss is even higher than model_1.

In [None]:
# # the function for trianing model_2, in this function, set learnin rate, monmentum as hyperparameter
def train_cifar_2(config, checkpoint_dir=None, data_dir=None):
    net = torchvision.models.resnet18()
    net.fc = nn.Linear(net.fc.in_features, 4)
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda:0"
        if torch.cuda.device_count() > 1:
            net = nn.DataParallel(net)
    net.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=config["lr"], momentum=config["momentum"])

    if checkpoint_dir:
        model_state, optimizer_state = torch.load(
            os.path.join(checkpoint_dir, "checkpoint"))
        net.load_state_dict(model_state)
        optimizer.load_state_dict(optimizer_state)

    csv_dir = "/kaggle/input/deep-learning-for-msc-2022-23/train.csv"
    train_transform = transforms.Compose([transforms.RandomResizedCrop(224),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                     ])
    train_dir = "/kaggle/input/deep-learning-for-msc-2022-23/train"
    trainset = CellPictureDataset(train_dir, csv_file=csv_dir, transform=train_transform)

    test_abs = int(len(trainset) * 0.8)
    train_subset, val_subset = random_split(
        trainset, [test_abs, len(trainset) - test_abs])

    trainloader =DataLoader(
        train_subset,
        batch_size=int(config["batch_size"]),
        shuffle=True,
        num_workers=2,
        pin_memory=True)
    valloader = DataLoader(
        val_subset,
        batch_size=int(config["batch_size"]),
        shuffle=True,
        num_workers=2,
        pin_memory=True)

    for epoch in range(35):  # loop over the dataset multiple times
        running_loss = 0.0
        epoch_steps = 0
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            net.train()
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            epoch_steps += 1
#             print(outputs.device)
            # if i % 100 == 99:  # print every 2000 mini-batches
            #     print("[%d, %5d] loss: %.3f" % (epoch + 1, i + 1,
            #                                     running_loss / epoch_steps))
            #     running_loss = 0.0

        # Validation loss
        val_loss = 0.0
        val_steps = 0
        total = 0
        correct = 0
        for i, data in enumerate(valloader, 0):
            with torch.no_grad():
#                 net.eval()
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = net(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                loss = criterion(outputs, labels)
                val_loss += loss.cpu().numpy()
                val_steps += 1

#         with tune.checkpoint_dir(epoch) as checkpoint_dir:
#             path = os.path.join(checkpoint_dir, "checkpoint")
#             torch.save((net.state_dict(), optimizer.state_dict()), path)

        tune.report(loss=(val_loss / val_steps), accuracy=correct / total)

3. function for testing accuracy. It is necessary to split the training dataset as training parts and validation parts, which could test whether the model is overfitting or not. If plot the loss or accuracy curves, it can be seen that if as epoch iterating, the loss graudally decrease and accuracy increase, the model is heathy. If, for example, the training accuracy gradually increase, and even becomes nearly 100%, but the validation accuracy increase firstly and then decrease, it shows the model is overfitting.

In [None]:
def test_accuracy(net, device="cpu"):
    root_dir = "D://python_code//deep learning coursework//train"
    csv_dir = "D://python_code//deep learning coursework//train.csv"
    data_transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])])
    trainset = CellPictureDataset(root_dir=root_dir, csv_file=csv_dir, transform=data_transform)
    testloader = DataLoader(trainset, batch_size=4, shuffle=True, num_workers=2)
    

    correct = 0
    total = 0
    with torch.no_grad():
        for data in testloader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return correct / total

4. using tune from ray to select the best model. Tune from is a package for modifying hyperparameters, it randomly set the hyperparameter from 'config', then doing the above functions. In this part, the complicated process would be omitted, as it is hard to show all the process in a notebook. Interval form the best model I have tried would be shown as an example.

In [None]:
# the interval for model_1 is in this function
def main_1(num_samples=10, max_num_epochs=10, gpus_per_trial=2):
    # train_dir = os.path.abspath("train")
    # load_data(train_dir)
    configs = {
#         "l1": tune.sample_from(lambda _: 2 ** np.random.randint(8, 10)),
        "l1": 256,
        "lr": tune.loguniform(0.00883, 0.00885),
#         "lr": 0.00105,
#         "batch_size": tune.choice([16, 32]),
        "batch_size": 16,
        "momentum": tune.loguniform(0.65, 0.68)
#         "momentum": 0.7
    }
    scheduler = ASHAScheduler(
        metric="loss",
        mode="min",
        max_t=max_num_epochs,
        grace_period=1,
        reduction_factor=2)
    reporter = CLIReporter(
        # parameter_columns=["l1", "l2", "lr", "batch_size"],
        metric_columns=["loss", "accuracy", "training_iteration"])
    result = tune.run(
        train_cifar,
        resources_per_trial={"cpu": 2, "gpu": gpus_per_trial},
        config=configs,
        num_samples=num_samples,
        scheduler=scheduler)
#         progress_reporter=reporter)

    best_trial = result.get_best_trial("loss", "min", "last")
    print("Best trial config: {}".format(best_trial.config))
    print("Best trial final validation loss: {}".format(
        best_trial.last_result["loss"]))
    print("Best trial final validation accuracy: {}".format(
        best_trial.last_result["accuracy"]))

    best_trained_model = MyConvNet(best_trial.config["l1"])
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda:0"
        if gpus_per_trial > 1:
            best_trained_model = nn.DataParallel(best_trained_model)
    best_trained_model.to(device)

#     best_checkpoint_dir = best_trial.checkpoint.dir_or_data
#     model_state, optimizer_state = torch.load(os.path.join(
#         best_checkpoint_dir, "checkpoint"))
#     best_trained_model.load_state_dict(model_state)

    test_acc = test_accuracy(best_trained_model, device)
    print("Best trial test set accuracy: {}".format(test_acc))

In [None]:
# the interval for model_2 is in this function
def main_2(num_samples=10, max_num_epochs=10, gpus_per_trial=2):
    configs = {
#         "l1": tune.sample_from(lambda _: 2 ** np.random.randint(8, 10)),
        "l1": 256,
        "lr": tune.loguniform(0.00883, 0.00885),
#         "lr": 0.00105,
#         "batch_size": tune.choice([16, 32]),
        "batch_size": 16,
        "momentum": tune.loguniform(0.65, 0.68)
#         "momentum": 0.7
    }
    scheduler = ASHAScheduler(
        metric="loss",
        mode="min",
        max_t=max_num_epochs,
        grace_period=1,
        reduction_factor=2)
    reporter = CLIReporter(
        # parameter_columns=["l1", "l2", "lr", "batch_size"],
        metric_columns=["loss", "accuracy", "training_iteration"])
    result = tune.run(
        train_cifar,
        resources_per_trial={"cpu": 2, "gpu": gpus_per_trial},
        config=configs,
        num_samples=num_samples,
        scheduler=scheduler)
#         progress_reporter=reporter)

    best_trial = result.get_best_trial("loss", "min", "last")
    print("Best trial config: {}".format(best_trial.config))
    print("Best trial final validation loss: {}".format(
        best_trial.last_result["loss"]))
    print("Best trial final validation accuracy: {}".format(
        best_trial.last_result["accuracy"]))

    best_trained_model = MyConvNet(best_trial.config["l1"])
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda:0"
        if gpus_per_trial > 1:
            best_trained_model = nn.DataParallel(best_trained_model)
    best_trained_model.to(device)

#     best_checkpoint_dir = best_trial.checkpoint.dir_or_data
#     model_state, optimizer_state = torch.load(os.path.join(
#         best_checkpoint_dir, "checkpoint"))
#     best_trained_model.load_state_dict(model_state)

    test_acc = test_accuracy(best_trained_model, device)
    print("Best trial test set accuracy: {}".format(test_acc))

Training the model

After selecting suitable hyperparameters, it comes to training models. Whening training models, such functions like ploting the loss and accuracy curves could be added to test whether it is overfitting, as mentioned above.

1. code for ploting loss and accuracy curves. It will plot the training loss and validation loss for each epoch when training.

In [None]:
def training_and_visualize(model, learning_rate, monmentum):
    epoch = 10

    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda"
        if torch.cuda.device_count() > 1:
            model = nn.DataParallel(model)
    model.to(device)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr = learning_rate, momentum=monmentum)

    train_transform = transforms.Compose([transforms.RandomResizedCrop(224),
                                          transforms.RandomHorizontalFlip(),
                                          transforms.ToTensor(),
                                          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                         ])

    csv_dir = "/kaggle/input/deep-learning-for-msc-2022-23/train.csv"
    data_transform = train_transform
    train_dir = "/kaggle/input/deep-learning-for-msc-2022-23/train"
    trainset = CellPictureDataset(train_dir, csv_file=csv_dir, transform=data_transform)

    test_abs = int(len(trainset) * 0.8)
    train_subset, val_subset = random_split(
        trainset, [test_abs, len(trainset) - test_abs])

    trainloader =DataLoader(
        train_subset,
        batch_size=int(16),
        shuffle=True,
        num_workers=0)
    valloader = DataLoader(
        val_subset,
        batch_size=int(16),
        shuffle=True,
        num_workers=0)
    train_losses = []
    train_accs = []
    val_losses = []
    val_accs = []

    for epoch in range(epoch):
        # train the model
        train_loss = 0.0
        train_acc = 0.0
        # total_train = 0
        # step_train = 0
        for i, data in enumerate(trainloader, 0):
            # ...
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = loss_fn(outputs, labels)

            loss.backward()
            optimizer.step()
            train_loss += loss.cpu().detach().numpy()
            _, predicted = torch.max(outputs.data, 1)
            # total_train += labels.size(0)
            train_acc += (predicted == labels).sum().item() / len(labels)

        train_loss /= len(trainloader)
        train_acc /= len(trainloader)
        train_losses.append(train_loss)
        train_accs.append(train_acc)

        # validate the model
        val_loss = 0.0
        val_acc = 0.0
        # total = 0
        # correct = 0
        val_steps = 0
        with torch.no_grad():
            for i, data in enumerate(valloader, 0):
                with torch.no_grad():
                    inputs, labels = data
                    inputs, labels = inputs.to(device), labels.to(device)

                    outputs = model(inputs)
                    _, predicted = torch.max(outputs.data, 1)
                    # total += labels.size(0)
                    # correct += (predicted == labels).sum().item()

                    loss = loss_fn(outputs, labels)
                    val_loss += loss.cpu().numpy()
                    # val_loss += loss.item()
                    val_acc += (predicted == labels).sum().item() / len(labels)

        val_loss /= len(valloader)
        val_acc /= len(valloader)
        val_losses.append(val_loss)
        val_accs.append(val_acc)

    # create a figure with two subplots
    fig, axs = plt.subplots(1, 2, figsize=(12, 4))

    # plot the training and validation losses
    axs[0].plot(train_losses, label="Training Loss")
    axs[0].plot(val_losses, label="Validation Loss")
    axs[0].legend()
    axs[0].set_xlabel("Epoch")
    axs[0].set_ylabel("Loss")

    # plot the training and validation accuracies
    axs[1].plot(train_accs, label="Training Accuracy")
    axs[1].plot(val_accs, label="Validation Accuracy")
    axs[1].legend()
    axs[1].set_xlabel("Epoch")
    axs[1].set_ylabel("Accuracy")

2. Function for drawing confusion matrix. Confusion matrix is useful in machine learning and deep learning. It shows evaluaion standards beyond loss and accuracy, which is a good tool to analyse the model. In this part, a batch of dataset would be selected as an example to show the confusion matrix.

In [None]:
from sklearn.metrics import confusion_matrix
import itertools

def cnf_matrix_plotter(model, cmap=plt.cm.Blues):
    """
    import trained model, labels and colored map
    """
    train_transform = transforms.Compose([transforms.RandomResizedCrop(224),
                                          transforms.RandomHorizontalFlip(),
                                          transforms.ToTensor(),
                                          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                         ])
    train_dataset = CellPictureDataset('/kaggle/input/deep-learning-for-msc-2022-23/train', csv_file='/kaggle/input/deep-learning-for-msc-2022-23/train.csv', transform=train_transform)
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=0)

    images, labels = next(iter(train_loader))
    outputs = model(images)
    _, predicted = torch.max(outputs.data, 1)

    cm = confusion_matrix(labels.numpy(), predicted.numpy())
    
    plt.figure(figsize=(4, 4))
    
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    # plt.colorbar() 
    tick_marks = np.arange(len(classes))
    
    plt.title('confusion matrix', fontsize=30)
    plt.xlabel('predicted', fontsize=25, c='r')
    plt.ylabel('real class', fontsize=25, c='r')
    plt.tick_params(labelsize=12) # set font size

    # write numbers
    threshold = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > threshold else "black",
                 fontsize=12)

    plt.tight_layout()

    # plt.savefig('混淆矩阵.pdf', dpi=300) # save figures

Interpertation for models

After getting the models, we need to know how the models work, and analyse why the models generate such result. A necessary part is model interpretaion. In pytorch, the captum could be used for analysing how the model works. As an example, the occlusion and intergrated gradient are shown below.

In [None]:
from captum.attr import IntegratedGradients
from captum.attr import GradientShap
from captum.attr import Occlusion
from captum.attr import NoiseTunnel
from captum.attr import visualization as viz

import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap

1. Occlusion. Occlusion uses a block to cover a part of the picture to test whether this part contributes an important role for the predicted result. The size of block can be changed, when using a small block could not get a good result, it can be changed to a larger block. 

In [None]:
def doing_occulsion(model, img_pil):
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    transform_A = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),          
        transforms.ToTensor()         
    ])
    transform_B = transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
    model.eval().to(device)
    rc_img = transform_A(img_pil)
    rc_img_norm = np.transpose(rc_img.squeeze().cpu().detach().numpy(), (1,2,0))
    input_tensor = transform_B(rc_img).unsqueeze(0).to(device)
    pred_logits = model(input_tensor)
    pred_softmax = F.softmax(pred_logits, dim=1)
    pred_conf, pred_id = torch.topk(pred_softmax, 1)
    pred_conf = pred_conf.detach().cpu().numpy().squeeze().item()
    pred_id = pred_id.detach().cpu().numpy().squeeze().item()
    
    occlusion = Occlusion(model)
    attributions_occ = occlusion.attribute(input_tensor,
                                       strides = (3, 8, 8), # 遮挡滑动移动步长
                                       target=pred_id, # 目标类别
                                       sliding_window_shapes=(3, 15, 15), # 遮挡滑块尺寸
                                       baselines=0) # 被遮挡滑块覆盖的像素值
    attributions_occ_norm = np.transpose(attributions_occ.detach().cpu().squeeze().numpy(), (1,2,0))
    viz.visualize_image_attr_multiple(attributions_occ_norm, # 224 224 3
                                  rc_img_norm,           # 224 224 3
                                  ["original_image", "heat_map"],
                                  ["all", "positive"],
                                  show_colorbar=True,
                                  outlier_perc=2)

2. Intergrated gradients
Intergrated gradients shows the process of a pixel from 0 to its original value in intergrated gradient. This can conclude the importance of each pixel for the prediction.

In [None]:
def doing_ig(model, img_pil):
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    transform_A = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),          
        transforms.ToTensor()         
    ])
    transform_B = transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
    model.eval().to(device)
    rc_img = transform_A(img_pil)
    rc_img_norm = np.transpose(rc_img.squeeze().cpu().detach().numpy(), (1,2,0))
    input_tensor = transform_B(rc_img).unsqueeze(0).to(device)
    pred_logits = model(input_tensor)
    pred_softmax = F.softmax(pred_logits, dim=1)
    pred_conf, pred_id = torch.topk(pred_softmax, 1)
    pred_conf = pred_conf.detach().cpu().numpy().squeeze().item()
    pred_id = pred_id.detach().cpu().numpy().squeeze().item()
    
    integrated_gradients = IntegratedGradients(model)
    attributions_ig = integrated_gradients.attribute(input_tensor, target=pred_id, n_steps=200)
    attributions_ig_norm = np.transpose(attributions_ig.detach().cpu().squeeze().numpy(), (1,2,0))
    plt.imshow(attributions_ig_norm[:, :, 0] * 100)

    default_cmap = LinearSegmentedColormap.from_list('custom blue', 
                                                 [(0, '#ffffff'),
                                                  (0.25, '#000000'),
                                                  (1, '#000000')], N=256)

# 可视化 IG 值
    viz.visualize_image_attr(attributions_ig_norm, # 224,224,3
                            rc_img_norm,          # 224,224,3
                            method='heat_map',
                            cmap=default_cmap,
                            show_colorbar=True,
                            sign='positive',
                            outlier_perc=1)
    
    # add noise to make the figure smooth
    noise_tunnel = NoiseTunnel(integrated_gradients) 

    # 获得输入图像每个像素的 IG 值
    attributions_ig_nt = noise_tunnel.attribute(input_tensor, nt_samples=2, nt_type='smoothgrad_sq', target=pred_id)

    # 转为 224 x 224 x 3的数据维度
    attributions_ig_nt_norm = np.transpose(attributions_ig_nt.squeeze().cpu().detach().numpy(), (1,2,0))

    # 设置配色方案
    default_cmap = LinearSegmentedColormap.from_list('custom blue', 
                                                    [(0, '#ffffff'),
                                                    (0.25, '#000000'),
                                                    (1, '#000000')], N=256)

    viz.visualize_image_attr_multiple(attributions_ig_nt_norm, # 224 224 3
                                    rc_img_norm, # 224 224 3
                                    ["original_image", "heat_map"],
                                    ["all", "positive"],
                                    cmap=default_cmap,
                                    show_colorbar=True)

Other transition functions

In [None]:
# get and save model_1 to a lcoal .pth file
def get_best_model1(lr, monmentum, l1):
    # total_train_step = 0
    # total_test_step = 0
    model = MyConvNet(l1=l1)
    if torch.cuda.is_available():
        device = "cuda"
        if torch.cuda.device_count() > 1:
            net = nn.DataParallel(net)
    model.to(device)

    csv_dir = "D://python_code//deep learning coursework//train.csv"
    # data_transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])])
    train_transform = transforms.Compose([transforms.RandomResizedCrop(224),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                     ])
    train_dir = "D://python_code//deep learning coursework//train"
    trainset = CellPictureDataset(train_dir, csv_file=csv_dir, transform=train_transform)
    train_loader = DataLoader(trainset, batch_size=32, shuffle=True, num_workers=0)
    
    epoch = 35
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(lr=lr, momentum=monmentum)
    for i in range(epoch):
        # print("-------no.{} train begin".format(i+1))
        for data in train_loader:
            image, label = data
            image, label = image.to(device), label.to(device)
            outputs = model(image)
            loss = loss_fn(outputs, label)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
    torch.save(model, 'myconvnet.pth')
    return model

In [None]:
# get and save model_2 to a local .pth file
def get_best_model2(lr, monmentum):

    model = torchvision.models.resnet18()   
    model.fc = nn.Linear(model.fc.in_features, 4)
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda"
        if torch.cuda.device_count() > 1:
            net = nn.DataParallel(net)
    model.to(device)
    
    csv_dir = "D://python_code//deep learning coursework//train.csv"
    # data_transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])])
    train_transform = transforms.Compose([transforms.RandomResizedCrop(224),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                                     ])
    train_dir = "D://python_code//deep learning coursework//train"
    trainset = CellPictureDataset(train_dir, csv_file=csv_dir, transform=train_transform)
    train_loader = DataLoader(trainset, batch_size=32, shuffle=True, num_workers=0)

    epoch = 35
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=monmentum)
    for i in range(epoch):
        print("-------no.{} train begin".format(i+1))
        for data in train_loader:
            image, label = data
            image, label = image.to(device), label.to(device)
            outputs = model(image)
            loss = loss_fn(outputs, label)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
    torch.save(model, 'resnet18.pth')
    return model

In [None]:
# output the target .cvs result file
def get_result(model):
    test_transform = transforms.Compose([transforms.Resize(256),
                                         transforms.CenterCrop(224),
                                         transforms.ToTensor(),
                                         transforms.Normalize(
                                             mean=[0.485, 0.456, 0.406], 
                                             std=[0.229, 0.224, 0.225])
                                        ])
    testset = CellPictureDataset("test",transform=test_transform, test=True)
    testloader = DataLoader(testset, batch_size=32, shuffle=False, num_workers=0)
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda"
    model.eval().to(device)

    labels = []
    for data in testloader:
            image = data.to(device)
            outputs = model(image)
            _, predicted = torch.max(outputs.data, 1)
            label_list = list(predicted.cpu().numpy())
            for label in label_list:
                 labels.append(label)
    data_frame = {'Filename': os.listdir('test'),
                  'Label': labels
                  }
    df = pd.DataFrame(data_frame)
    df.to_csv("example.csv", index=False)

In [None]:
if __name__ == "__main__":
    main_1(num_samples=10, max_num_epochs=35, gpus_per_trial=2)
    main_2(num_samples=10, max_num_epochs=35, gpus_per_trial=2)
    net_1 = get_best_model1(lr= 0.008830556690188944, monmentum= 0.6994160916279658)
    net_2 = get_best_model2(lr= 0.0015, monmentum= 0.9784381166958563)
    show_number()
    show_a_batch()
    training_and_visualize(MyConvNet(l1=256), lr=0.008830556690188944, momentum=0.6994160916279658)
    cnf_matrix_plotter(net_1)
    cnf_matrix_plotter(net_2)

    # randomly select a picture from train dataset
    img_path = "/kaggle/input/deep-learning-for-msc-2022-23/train/7.png"
    img_pil = Image.open(img_path)
    doing_occulsion(net_1, img_pil=img_pil)
    doing_occulsion(net_2, img_pil=img_pil)
    doing_ig(net_1, img_pil=img_pil)
    doing_ig(net_2, img_pil=img_pil)