In [None]:
# importing libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision.transforms as transforms

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import copy

from skimage.util import montage
# import medmnist
# from medmnist import INFO, Evaluator


In [None]:
# loading data
data_file = 'breastmnist.npz'
npz_data = np.load(data_file)
print(npz_data.files)

In [None]:
train_images = npz_data['train_images']
train_labels = npz_data['train_labels']
print(train_images.shape)
print(train_labels.shape)
print(np.unique(train_labels))

In [None]:
ind = np.random.randint(len(train_images))
plt.figure(figsize=(2,2))
plt.imshow(train_images[ind], cmap='gray')
plt.title('label:'+str(train_labels[ind]))
plt.show()

In [None]:
plt.figure(figsize=(7,7))
plt.imshow(montage(train_images[:49]), cmap='gray')
plt.show()

In [None]:
class BreastDataset(data.Dataset):

    def __init__(self, npz_data, split, transform = None):
        self.npz_data = npz_data
        self.split = split
        self.transform = transform
        
        if self.split == 'train':
            self.imgs = npz_data['train_images']
            self.labels = npz_data['train_labels']
        elif self.split == 'val':
            self.imgs = npz_data['val_images']
            self.labels = npz_data['val_labels']
        elif self.split == 'test':
            self.imgs = npz_data['test_images']
            self.labels = npz_data['test_labels']
        else:
            raise ValueError

    def __len__(self):
        return self.imgs.shape[0]
    
    def __getitem__(self, index):
        
        img, target = self.imgs[index], self.labels[index].astype(int)

        if self.transform is not None:
            img = self.transform(img)
            
        return img, target


In [None]:
# preprocessing
data_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

In [None]:
# dataloader
NUM_EPOCHS = 10
BATCH_SIZE = 16
lr = 0.001
n_channels = 1
n_classes = 2

seed = 20
np.random.seed(seed)
torch.manual_seed(seed)

train_dataset = BreastDataset(npz_data, 'train', data_transform)
val_dataset = BreastDataset(npz_data, 'val', data_transform)
test_dataset = BreastDataset(npz_data, 'test', data_transform)

train_loader = data.DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = data.DataLoader(dataset=val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = data.DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=False)


In [None]:
# define model
class Net(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(Net, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels, 16, kernel_size=3),
            nn.ReLU())

        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 16, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer3 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3),
            nn.ReLU())

        self.layer4 = nn.Sequential(
            nn.Conv2d(32, 32, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))

        self.fc = nn.Sequential(
            nn.Linear(32 * 4 * 4, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes))

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

    
model = Net(in_channels=n_channels, num_classes=n_classes)

# define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)


In [None]:
# metric (accuracy)
def accuracy(scores, targets):
    scores = scores.detach().argmax(dim=1)
    acc = (scores==targets).float().sum().item()
    acc = acc / len(targets) # i'm not sure if needed or not
    return acc

# epoch train function
def train_epoch(model, optimizer, data_loader, criterion, num_classes):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch_ind, (batch_inputs, batch_labels) in enumerate(tqdm(data_loader)):
        
        # zero the parameter gradients
        optimizer.zero_grad()
        
        # forward
        batch_scores = model.forward(batch_inputs)
        batch_labels = batch_labels.squeeze().long()
        loss = criterion(batch_scores, batch_labels)   
        
        # backward + optimize
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.detach().item()
        epoch_acc += accuracy(batch_scores, batch_labels)
        
        prob = batch_scores.softmax(dim=-1)
        
    epoch_loss /= (batch_ind + 1)
    epoch_acc /= (batch_ind + 1)
    
    return epoch_loss, epoch_acc, optimizer


# epoch evaluate function
def evaluate_epoch(model, data_loader, criterion, num_classes):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    # since not training, no need to calculate the gradient
    with torch.no_grad():
    
        for batch_ind, (batch_inputs, batch_labels) in enumerate(data_loader):
            
            batch_scores = model.forward(batch_inputs)
            batch_labels = batch_labels.squeeze().long()
            loss = criterion(batch_scores, batch_labels)   

            epoch_loss += loss.detach().item()
            epoch_acc += accuracy(batch_scores, batch_labels)
            
            prob = batch_scores.softmax(dim=-1)

        epoch_loss /= (batch_ind + 1)
        epoch_acc /= (batch_ind + 1)
    
    return epoch_loss, epoch_acc

In [None]:
# training loop
train_loss, train_acc = [], []
val_loss, val_acc = [], []

for epoch in range(NUM_EPOCHS):
    print('epoch', epoch)

    train_epoch_loss, train_epoch_acc, optimizer = train_epoch(model, optimizer, train_loader, criterion, n_classes)
    print('train loss/acc: {:.2f}, {:.2f}'.format(train_epoch_loss, train_epoch_acc))
    train_loss.append(train_epoch_loss)
    train_acc.append(train_epoch_acc)
    
    val_epoch_loss, val_epoch_acc = evaluate_epoch(model, val_loader, criterion, n_classes)
    print('val loss/acc: {:.2f}, {:.2f}'.format(val_epoch_loss, val_epoch_acc))
    val_loss.append(val_epoch_loss)
    val_acc.append(val_epoch_acc)

In [None]:
# visulization of learning curves
plt.plot(train_loss)
plt.plot(val_loss)
plt.legend(['train','val'])
plt.ylabel('loss')
plt.xlabel('epoch')
plt.show()

plt.plot(train_acc)
plt.plot(val_acc)
plt.legend(['train','val'])
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.show()

# evaluation on test set
epoch_loss, epoch_acc = evaluate_epoch(model, test_loader, criterion, n_classes)
print('test loss/acc: {:.2f}, {:.2f}'.format(epoch_loss, epoch_acc))


## Early stopping

In [None]:
np.random.seed(seed)
torch.manual_seed(seed)

model = Net(in_channels=n_channels, num_classes=n_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)


In [None]:
class EarlyStop:
    def __init__(self, patience=1):
        self.patience = patience
        self.counter = 0
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.best_model = None

    def __call__(self, val_loss, model):
        if val_loss >= self.val_loss_min:
            self.counter += 1
            print(f'ES counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.val_loss_min = val_loss
            self.best_model = copy.deepcopy(model)
            self.counter = 0


In [None]:
train_loss, train_acc = [], []
val_loss, val_acc = [], []

NUM_EPOCHS = 100
ES_PATIENCE = 5
early_stopping = EarlyStop(patience=ES_PATIENCE)

for epoch in range(NUM_EPOCHS):
    print('epoch', epoch)

    train_epoch_loss, train_epoch_acc, optimizer = train_epoch(model, optimizer, train_loader, criterion, n_classes)
    print('train loss/acc: {:.2f}, {:.2f}'.format(train_epoch_loss, train_epoch_acc))
    train_loss.append(train_epoch_loss)
    train_acc.append(train_epoch_acc)
    
    val_epoch_loss, val_epoch_acc = evaluate_epoch(model, val_loader, criterion, n_classes)
    print('val loss/acc: {:.2f}, {:.2f}'.format(val_epoch_loss, val_epoch_acc))
    val_loss.append(val_epoch_loss)
    val_acc.append(val_epoch_acc)
    
    early_stopping(val_epoch_loss, model)
    if early_stopping.early_stop:
        print("early stopping at epoch {}, model saved at epoch {}".format(epoch, epoch-early_stopping.patience))
        break
    
    if epoch==0:
        init_model = copy.deepcopy(model)

best_model = early_stopping.best_model

In [None]:
plt.plot(train_loss)
plt.plot(val_loss)
plt.legend(['train','val'])
plt.ylabel('loss')
plt.xlabel('epoch')
plt.show()

plt.plot(train_acc)
plt.plot(val_acc)
plt.legend(['train','val'])
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.show()


In [None]:
epoch_loss, epoch_acc = evaluate_epoch(init_model, test_loader, criterion, n_classes)
print('1st epoch test loss/acc: {:.2f}, {:.2f}'.format(epoch_loss, epoch_acc))

epoch_loss, epoch_acc = evaluate_epoch(model, test_loader, criterion, n_classes)
print('last epoch test loss/acc: {:.2f}, {:.2f}'.format(epoch_loss, epoch_acc))

epoch_loss, epoch_acc = evaluate_epoch(best_model, test_loader, criterion, n_classes)
print('best model test loss/acc: {:.2f}, {:.2f}'.format(epoch_loss, epoch_acc))


## Output probabilities

In [None]:
test_single = data.DataLoader(dataset=test_dataset, batch_size=1,shuffle=False)

probs = []
labels = []
model.eval()
with torch.no_grad():
    for image, label in test_single:
        score= model.forward(image)
        prob= score.softmax(dim=-1)
        
        labels.append(label.detach().item())
        probs.append(prob.detach().numpy())

probs = np.concatenate(probs,axis=0)
labels = np.array(labels)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
cm = confusion_matrix(labels, np.argmax(probs, axis=1), normalize='true')
ConfusionMatrixDisplay(cm).plot()
