<a href="https://colab.research.google.com/github/emyesme/CalcificationDetection/blob/Zarin/CD_CNN_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Import the required packages
Insert here all the packages you require, so in case they are not found an error will be shown before any other operation is performed.

In [None]:
# import the required packages
import os
import time
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models

## Set hyperparameters and options
Set here your hyperparameters (to be used later in the code), so that you can run and compare different experiments operating on these values. 
<br>_Note: a better alternative would be to use command-line arguments to set hyperparameters and other options (see argparse Python package)_

In [None]:
# hyperparameters
batch_size = 32
learning_rate = 0.02
epochs = 30
momentum = 0.1
lr_step_size = 1000   # if < epochs, we are using decaying learning rate
lr_gamma = 0.1
data_augmentation = True
dropout = 0.1
activation = nn.LeakyReLU()

# make visible only one GPU at the time
os.environ["CUDA_VISIBLE_DEVICES"] = "0"  # <-- should be the ID of the GPU you want to use

# options
# device = "cuda:0"           # put here "cuda:0" if you want to run on GPU
monitor_display = True      # whether to display monitored performance plots
display_first_n = 0         # how many samples/batches are displayed
num_workers = 2             # how many workers (=threads) for fetching data
pretrained = False          # whether to test a pretrained model (to be loaded) or train a new one
display_errors = True       # whether to display errors (only in pretrained mode)

## Define the model architecture
Define here your network.
<br>_Note: a better alternative would be to have a pool of network architectures defined in a python file (module) that one could import_

In [None]:
# define CNN

###
# Haq, I.U., Ali, H., Yu, W.H., Lei, C., Ali, H., Feature fusion and ensemble learningbased CNN model for mammographic image classification, Journal of King Saud University - Computer and
# Information Sciences (2022), doi: https://doi.org/10.1016/j.jksuci.2022.03.023
###

class CD_CNN(nn.Module):
    def __init__(self):
        super(CD_CNN, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=7, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=7, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        self.dropout1=nn.Dropout(p=0.2)
        self.bn3 = nn.BatchNorm2d(64)

        self.dwconv1 = nn.DepthwiseConv2D(in_channels=64, out_channels=128, kernel_size=5, padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=5, padding=1)
        self.bn5 = nn.BatchNorm2d(128)
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        self.dropout2=nn.Dropout(p=0.2)
        self.bn6 = nn.BatchNorm2d(128)

        self.dwconv2 = nn.DepthwiseConv2D(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.bn7 = nn.BatchNorm2d(256)
        self.conv4 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1)
        self.bn8 = nn.BatchNorm2d(256)
        self.conv5 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1)
        self.bn9 = nn.BatchNorm2d(256)
        self.pool3 = nn.MaxPool2d(kernel_size=2)
        self.dropout3=nn.Dropout(p=0.2)
        self.bn10 = nn.BatchNorm2d(256)

        self.fc1 = nn.Linear(256,512)
        self.fc2 = nn.Linear(512,512)
        self.fc3 = nn.Linear(512,2)
        self.act = nn.Sigmoid()

        nn.init.xavier_normal_(self.conv1.weight)
        nn.init.xavier_normal_(self.conv2.weight)
        nn.init.xavier_normal_(self.conv3.weight)
        nn.init.xavier_normal_(self.conv4.weight)
        nn.init.xavier_normal_(self.conv5.weight)
        nn.init.xavier_normal_(self.dwconv1.weight)
        nn.init.xavier_normal_(self.dwconv2.weight)


    def forward(self, x):
        
        # to complete
        
        return x

## Create the building blocks for training
Create an instance of the network, the loss function, the optimizer, and learning rate scheduler.

In [None]:
# net = CD_CNN()

# create loss function
criterion = nn.CrossEntropyLoss()

# create SGD optimizer
optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=momentum)

# create learning rate scheduler
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=lr_step_size, gamma=lr_gamma)

# experiment ID
experiment_ID = "%s_%s_%s_bs(%d)lr(%.4f_%d_%.1f)m(%.1f)e(%d)act(%s)xavier(yes)da(%s)do(%.1f)BN" % (type(net).__name__, type(criterion).__name__, type(optimizer).__name__,
                batch_size, learning_rate, lr_step_size, lr_gamma, momentum, epochs, type(activation).__name__, data_augmentation, dropout)

## Create datasets
This includes training/validation split, where possible. In our example, MNIST does not have a validation set, so we use the test set as validation set (warning: see comments in the code).
<br>_Note: in general, you might need to implement your own Dataset in a separate Python file, and then import it in this file in order to create the dataset. The training/validation/test data split is also on your own, you may consider to embed it in your Dataset class_

In [None]:
# create datasets, transforms will be set after (or can be set here, if we had all we need)
# NOTE: torchvision MNIST has no validation set, we will use the test set as validation set
#       but in a real case scenario we MUST use a train / validation / test split to avoid
#       introducing biases in our results (final model performance SHOULD NOT be evaluated
#       on the validation set)
dataset_train = torchvision.datasets.MNIST("./mnist", train=True, download=False)
dataset_valid = torchvision.datasets.MNIST("./mnist", train=False, download=False)

## Check your data _before_ transforms are applied
This may sound naive, but the most recurring problem in Machine Learning and Deep Learning is that the model is fed with _wrong data_. This can be caused by incorrect data loading, processing, etc. Even if you are 100% sure your data are correct, you should _always_ check your data _before_ and _after_ transforms are applied. A good practice is to calculate and print statistics, or even displaying data where feasible.

In [None]:
# check your original data, before applying any transform
# NOTE: we also calculate data mean and standard deviation
print ("\nTrain data are %d, with shape %s" % (len(dataset_train), dataset_train.data.shape))
mu = dataset_train.data.float().mean()
std = dataset_train.data.float().std()
print ("...with mean %.1f and standard deviation %.1f" % (mu, std))
print ("...with labels %s, %s, %s, ..." % (dataset_train.targets[0], dataset_train.targets[1], dataset_train.targets[2]))
mu_valid = dataset_valid.data.float().mean()
std_valid = dataset_valid.data.float().std()
print ("\nValidation data are %d, with shape %s" % (len(dataset_valid), dataset_valid.data.shape))
print ("...with mean %.1f and standard deviation %.1f" % (mu_valid, std_valid))
print ("...with labels %s, %s, %s, ..." % (dataset_valid.targets[0], dataset_valid.targets[1], dataset_valid.targets[2]))
# visual check
for i in range(0, display_first_n):
    plt.imshow(dataset_train.data[i], cmap='gray')
    plt.title('Training Sample %d' % i)
    plt.show()

## Define data transforms
Data transforms are applied sample-wise at _batch generation_ time: they are _not_ applied until you use a Dataloader and fetch data from it. In general, they serve to transform your data into what the neural network expects. Data should be _at least_ converted to tensors whose shape corresponds to network input, and possibly normalized so as to be 0-centered roughly in the [-1,1] range. 

In this example, we also apply a transform to the targets (labels), so as to have one-hot tensor that can be compared with network outputs using the loss function.

Optionally, we may also apply data augmentation (on the training set, only).

In [None]:
# define Convert transform to convert MNIST images to torch float tensors
# the operations are (in sequence):
# - 'np.array' to convert the image to a numpy array
# - 'torch.from_numpy' to convert the numpy array to torch tensor
# - 'torch.unsqueeze' to add a singleton channel dimension so as to have 1x28x28 instead of 28x28
#    since torchvision.transforms want CxHxW tensors
# - '.float()' to convert to float tensors, since deep learning builds on float numbers
class Convert(object):
    def __call__(self, img):
        return torch.unsqueeze(torch.from_numpy(np.array(img)), 0).float()

# define data transform as a composition of Data Augmentation (training only), Convert, Normalize, and Reshape
# here, Normalize implements standardization using the previously computed mu and std
DataAugmentation = transforms.RandomApply(
        [transforms.RandomRotation(20, fill=(0,))], p=0.5)  # fill=(0,) is a workaround for the torchvision bug tracked at https://github.com/pytorch/vision/issues/1759#issuecomment-575307516
transform_train = transforms.Compose(
    [DataAugmentation,
     Convert(),
     transforms.Normalize(mean=[mu], std=[std])])
transform_test = transforms.Compose(
    [Convert(),
     transforms.Normalize(mean=[mu], std=[std])])

# set data and target transforms on both datasets
# NOTE: always use the SAME transforms (except for data augmentation) on both training and validation/test sets
#       to avoid introducing biases
if data_augmentation:
    dataset_train.transform = transform_train
else:
    dataset_train.transform = transform_test
dataset_valid.transform = transform_test

## Create data loaders
Dataloaders are in-built PyTorch objects that serve to sample batches from datasets. 

In [None]:
# create data loaders
# NOTE 1: shuffle helps training
# NOTE 2: in test mode, batch size can be as high as the GPU can handle (faster, but requires more GPU RAM)
dataloader_train = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True) 
dataloader_valid = torch.utils.data.DataLoader(dataset_valid, batch_size=512, num_workers=num_workers, pin_memory=True)  

## Check your data _after_ transforms are applied
To check what the network will see at train/test time, you have to use dataloaders which will apply the data transforms previously defined.

In [None]:
# define batch_show function to show MNIST data
# assume 'img' is a standardised tensor
# remember: tensor images are stored in channel-width-height (CWH) order
# remember: pyplot expects images to be in width-height-channel (WHC) order
def batch_show(img, batch_i):
    img = img*std + mu  # un-normalize
    img = img/255       # move data to [0,1] since pyplot expects float images to be in [0,1]
    npimg = img.numpy() # convert to numpy, since pyplot expects numpy images
    plt.imshow(np.transpose(npimg, (1, 2, 0)))  # CHW to WHC reshape
    plt.title('Training Batch %d' % batch_i)
    plt.show()

# visual check
for i, minibatch in enumerate(dataloader_train):
    if i >= display_first_n:
        break
    data, labels = minibatch
    # data have size batch_size x 784, with .view we reshape
    # data so as to have batch_size x 1 (channel) x 28 x 28
    batch_show(torchvision.utils.make_grid(data.view(-1, 1, 28, 28)), i)

## Define train function
It is preferable (but not mandatory) to embed training (1 epoch) code into a function, and call that function later during the training phase, at each epoch.

In [None]:
# define train function (1 epoch)
# returns average loss and accuracy
def train(dataset, dataloader):

    # switch to train mode
    net.train()

    # reset performance measures
    loss_sum = 0.0
    correct = 0

    # 1 epoch = 1 complete loop over the dataset
    for batch in dataloader:

        # get data from dataloader
        inputs, targets = batch

        # move data to device
        inputs, targets = inputs.to(device, non_blocking=True), targets.to(device, non_blocking=True)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward pass
        outputs = net(inputs)

        # calculate loss
        loss = criterion(outputs, targets)

        # loss gradient backpropagation
        loss.backward()

        # net parameters update
        optimizer.step()

        # accumulate loss
        loss_sum += loss.item()

        # accumulate correct outputs (for accuracy calculation)
        outputs_max = torch.argmax(outputs, dim=1)
        targets_max = targets #torch.argmax(targets, dim=1)
        correct += outputs_max.eq(targets_max).sum().float()

    # step learning rate scheduler
    scheduler.step()

    # return average loss and accuracy
    return loss_sum / len(dataloader), 100. * correct / len(dataset)

## Define test function
It is preferable (but not mandatory) to embed the test code into a function, and call that function whenever needed. For instance, during training for validation at each epoch, or after training for testing, or for deploying the model.

In [None]:
# define test function
# returns predictions
def test(dataset, dataloader):

    # switch to test mode
    net.eval()  

    # initialize predictions
    predictions = torch.zeros(len(dataset), dtype=torch.int64)
    sample_counter = 0

    # do not accumulate gradients (faster)
    with torch.no_grad():

        # test all batches
        for batch in dataloader:

            # get data from dataloader [ignore labels/targets as they are not used in test mode]
            inputs = batch[0]

            # move data to device
            inputs = inputs.to(device, non_blocking=True)

            # forward pass
            outputs = net(inputs)

            # store predictions
            outputs_max = torch.argmax(outputs, dim=1)
            for output in outputs_max:
                predictions[sample_counter] = output
                sample_counter += 1

    return predictions

## Train a new model or test a pretrained one
The code below also includes visual loss/accuracy monitoring during training, both on training and validation sets. 

In [None]:
# pretrained model not available --> TRAIN a new one and save it
if not pretrained:
    
    # reset performance monitors
    losses = []
    train_accuracies = []
    valid_accuracies = []
    ticks = []
    
    # move net to device
    net.to(device)
    
    # start training
    for epoch in range(1, epochs+1):

        # measure time elapsed
        t0 = time.time()
        
        # train
        avg_loss, accuracy_train = train(dataset_train, dataloader_train)

        # test on validation
        predictions = test(dataset_valid, dataloader_valid)
        accuracy_valid = 100. * predictions.eq(dataset_valid.targets).sum().float() / len(dataset_valid)
                    
        # update performance history
        losses.append(avg_loss)
        train_accuracies.append(accuracy_train.cpu())
        valid_accuracies.append(accuracy_valid.cpu())
        ticks.append(epoch)

        # print or display performance
        if not monitor_display:
            print ("\nEpoch %d\n"
                "...TIME: %.1f seconds\n"
                "...loss: %g (best %g at epoch %d)\n"
                "...training accuracy: %.2f%% (best %.2f%% at epoch %d)\n"
                "...validation accuracy: %.2f%% (best %.2f%% at epoch %d)" % (
                epoch,
                time.time()-t0,
                avg_loss, min(losses), ticks[np.argmin(losses)],
                accuracy_train, max(train_accuracies), ticks[np.argmax(train_accuracies)],
                accuracy_valid, max(valid_accuracies), ticks[np.argmax(valid_accuracies)]))
        else:
            fig, ax1 = plt.subplots(figsize=(12, 8), num=1)
            ax1.set_xticks(np.arange(0, epochs+1, step=epochs/10.0))
            ax1.set_xlabel('Epochs')
            ax1.set_ylabel(type(criterion).__name__, color='blue')
            ax1.set_ylim(0.0001, 1)
            ax1.tick_params(axis='y', labelcolor='blue')
            ax1.set_yscale('log')
            ax1.plot(ticks, losses, 'b-', linewidth=1.0, aa=True, 
                label='Training (best at ep. %d)' % ticks[np.argmin(losses)])
            ax1.legend(loc="lower left")
            ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis
            ax2.set_ylabel('Accuracy %', color='red')
            ax2.set_ylim(90, 100)
            ax2.set_yticks(np.arange(90, 100, step=1))
            ax2.tick_params(axis='y', labelcolor='red')
            ax2.plot(ticks, train_accuracies, 'r-', linewidth=1.0, aa=True, 
                label='Training (%.2f%%, best %.2f%% at ep. %d)' % (accuracy_train, max(train_accuracies), ticks[np.argmax(train_accuracies)]))
            ax2.plot(ticks, valid_accuracies, 'r--', linewidth=1.0, aa=True, 
                label='Validation (%.2f%%, best %.2f%% at ep. %d)' % (accuracy_valid, max(valid_accuracies), ticks[np.argmax(valid_accuracies)]))
            ax2.legend(loc="lower right")
            plt.xlim(0, epochs+1)
            # this works if running from notebooks
            if run_from_notebook:
                fig.show()
                fig.canvas.draw()
            # this works if running from console
            else:
                plt.draw()
                #plt.pause(0.001)
                plt.show()
           # plt.savefig(experiment_ID + ".png", dpi=300)
            fig.clear()

        # save model if validation performance has improved
        if (epoch-1) == np.argmax(valid_accuracies):
            torch.save({
                'net': net,
                'accuracy': max(valid_accuracies),
                'epoch': epoch
            }, experiment_ID + ".tar")

# pretrained model available -> load it and test
else:

    # load pretrained model
    checkpoint = torch.load(experiment_ID + ".tar", map_location=lambda storage, loc: storage)
    net = checkpoint['net']
    print ("Loaded pretrained model\n...trained for %d epochs\n...reached accuracy %.2f" % (checkpoint['epoch'], checkpoint['accuracy']))

    # move net to device
    net.to(device)

    # test
    predictions = test(dataset_valid, dataloader_valid)
    accuracy = 100. * predictions.eq(dataset_valid.targets).sum().float() / len(dataset_valid)
    print ("Accuracy on test set is %.2f" % accuracy)

    # display errors
    if display_errors:

        # predictions / target comparisons = 1 for match, 0 for mismatch
        # we subtract 1, so we have 0 for match, -1 for mismatch
        # nonzero elements are thus all mismatches
        errors = torch.nonzero(~predictions.eq(dataset_valid.targets))

        # get errors samples and convert them to torch tensors
        error_samples = torch.zeros(len(errors), 1, 28, 28)
        conversion = Convert()
        for i, e in enumerate(errors):
            error_samples[i] = conversion(dataset_valid.data[e.item()])

        # make a grid of images and show
        img = torchvision.utils.make_grid(error_samples, nrow=20)
        img = img/255       # move data to [0,1] since pyplot expects float images to be in [0,1]
        npimg = img.numpy() # convert to numpy, since pyplot expects numpy images
        plt.imshow(np.transpose(npimg, (1, 2, 0)))  # CHW to WHC reshape
        plt.title('Errors')
        plt.show()