# Neuroblastoma

## Imports python

In [1]:
import torch
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.optim as optim
import torchmetrics.classification as classification
import copy
import matplotlib.pyplot as plt
import numpy as np
import json
import os

from torch import nn
from torchvision import models

  from .autonotebook import tqdm as notebook_tqdm



## Vérification GPU

In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

Using cpu device


## Chargement des datasets et preprocessing

In [3]:
root_dir  = "../database/250/split_BA/"
train_dir = "train/"
valid_dir = "valid/"
test_dir  = "test/"

train_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.66747984797634830, 0.5799524696639141, 0.78054363559995920), (0.23162625605944703, 0.2340601507820534, 0.14160506754101998)),
    transforms.RandomCrop((224, 224))]
)
eval_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.66747984797634830, 0.5799524696639141, 0.78054363559995920), (0.23162625605944703, 0.2340601507820534, 0.14160506754101998)),
    transforms.CenterCrop((224, 224))]
)

# Chargement des données
train_dataset = datasets.ImageFolder(f"{root_dir}/{train_dir}", train_transform)
valid_dataset = datasets.ImageFolder(f"{root_dir}/{valid_dir}", eval_transform)
test_dataset  = datasets.ImageFolder(f"{root_dir}/{test_dir}",  eval_transform)

## Modèle

In [4]:
class ImprovedResNet(nn.Module):
    def __init__(self, resnet_version=18):
        super(ImprovedResNet, self).__init__()

        if resnet_version == 18:
            self.resnet = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
        elif resnet_version == 34:
            self.resnet = models.resnet34(weights=models.ResNet34_Weights.IMAGENET1K_V1)
        elif resnet_version == 50:
            self.resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
        elif resnet_version == 101:
            self.resnet = models.resnet101(weights=models.ResNet101_Weights.IMAGENET1K_V2)
        elif resnet_version == 152:
            self.resnet = models.resnet152(weights=models.ResNet152_Weights.IMAGENET1K_V2)
        else:
            raise ValueError(f"ResNet version {resnet_version} does not exist.")

        self.resnet.fc = nn.Sequential()
        self.fc = nn.Sequential(
            nn.Linear(in_features=512, out_features=256, bias=True),
            nn.LeakyReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(in_features=256, out_features=1, bias=True),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.resnet(x)
        x = self.fc(x)
        return x

    def fine_tune(self):
        for param in self.parameters():
            param.requires_grad = True

        for param in self.resnet.parameters():
            param.requires_grad = False

        for param in self.fc.parameters():
            param.requires_grad = True

    def get_trainable_parameters(self):
        return filter(lambda p: p.requires_grad, self.parameters())

## Entraînement

In [5]:
batch_size = 32
lr = 1e-6    # 4e-6 pour les Leak Experiences
epochs = 100

# model = LeakTestNet().to(device)
# parameters = model.parameters()
model = ImprovedResNet(resnet_version=18)
model.to(device)
model.fine_tune()

# optimizer = optim.Adam(model.get_trainable_parameters(), lr, betas=(0.9, 0.999))
optimizer = optim.SGD(model.get_trainable_parameters(), lr, momentum=0.9)
criterion = nn.BCELoss()
accuracy  = classification.BinaryAccuracy().to(device)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size, shuffle=False)
test_loader  = torch.utils.data.DataLoader(test_dataset,  batch_size, shuffle=False)

In [None]:
def train_model(model, loader, criterion, accuracy, epoch:int=0):
    loss, accu = np.zeros((len(loader))), np.zeros((len(loader)))
    accuracy.reset()
    model.train()
    for b, (inputs, labels) in enumerate(loader):
        (inputs, labels) = (inputs.to(device), labels.to(device))
        labels  = labels.unsqueeze(1).to(torch.float32)
        outputs = model(inputs)
        cr_loss = criterion(outputs, labels)

        optimizer.zero_grad()
        cr_loss.backward()
        optimizer.step()

        loss[b] = cr_loss.item()
        accu[b] = accuracy(outputs, labels).item()
    mean_loss = np.mean(loss)
    mean_accu = np.mean(accu)
    return mean_loss, loss, mean_accu, accu


def eval_model(model, loader, criterion, accuracy, epoch:int=0):
    loss, accu = np.zeros((len(loader))), np.zeros((len(loader)))
    accuracy.reset()
    model.eval()
    with torch.no_grad():    
        for b, (inputs, labels) in enumerate(loader):
            (inputs, labels) = (inputs.to(device), labels.to(device))
            labels  = labels.unsqueeze(1).to(torch.float32)
            outputs = model(inputs)

            cr_loss = criterion(outputs, labels)
            loss[b] = cr_loss.item()
            accu[b] = accuracy(outputs, labels).item()
    mean_loss = np.mean(loss)
    mean_accu = np.mean(accu)
    return mean_loss, loss, mean_accu, accu

def print_H_at_epoch(H, e:int=0):
    print(f"Epoch {e:3d}: Train=({H['train']['m_loss'][e]:.3f}, {H['train']['m_accu'][e]:.2%})", end=" | ")
    print(              f"Valid=({H['valid']['m_loss'][e]:.3f}, {H['valid']['m_accu'][e]:.2%})")
    
# Historic
H = {
    "train": {"loss":   np.zeros((epochs+1, len(train_loader))),
              "accu":   np.zeros((epochs+1, len(train_loader))),
              "m_loss": np.zeros((epochs+1)),
              "m_accu": np.zeros((epochs+1))},
    "valid": {"loss":   np.zeros((epochs+1, len(valid_loader))),
              "accu":   np.zeros((epochs+1, len(valid_loader))),
              "m_loss": np.zeros((epochs+1)),
              "m_accu": np.zeros((epochs+1))}
}

(H["train"]["m_loss"][0], H["train"]["loss"][0, :],
 H["train"]["m_accu"][0], H["train"]["accu"][0, :]) = eval_model(model, train_loader, criterion, accuracy, epoch=0)
(H["valid"]["m_loss"][0], H["valid"]["loss"][0, :],
 H["valid"]["m_accu"][0], H["valid"]["accu"][0, :]) = eval_model(model, valid_loader, criterion, accuracy, epoch=0)

print_H_at_epoch(H, 0)
best_valid_loss = float("inf")
best_model = None
best_epoch = None

for e in range(1, epochs+1):
    (H["train"]["m_loss"][e], H["train"]["loss"][e, :],
     H["train"]["m_accu"][e], H["train"]["accu"][e, :]) = train_model(model, train_loader, criterion, accuracy, e)
    
    (H["valid"]["m_loss"][e], H["valid"]["loss"][e, :],
     H["valid"]["m_accu"][e], H["valid"]["accu"][e, :]) = eval_model(model, valid_loader, criterion, accuracy, e)

    print_H_at_epoch(H, e)
    if best_valid_loss > H["valid"]["m_loss"][e]:
        best_valid_loss = H["valid"]["m_loss"][e]
        best_model = copy.deepcopy(model)
        best_epoch = e

(mean_loss, loss, mean_accu, accu) = eval_model(best_model, test_loader, criterion, accuracy)
print(f"Best model [{best_epoch}]: ({mean_loss:.3f}, {mean_accu:.2%})")

Epoch   0: Train=(0.696, 49.89%) | Valid=(0.705, 45.13%)
Epoch   1: Train=(0.715, 46.75%) | Valid=(0.703, 46.85%)
Epoch   2: Train=(0.710, 48.89%) | Valid=(0.703, 47.86%)
Epoch   3: Train=(0.714, 47.86%) | Valid=(0.701, 48.87%)
Epoch   4: Train=(0.715, 47.70%) | Valid=(0.699, 49.80%)


## Visualisation des courbes d'apprentissage

In [None]:
def plot_metric(H, max_epoch:int, metric:str="accu"):
    fig = plt.figure(figsize=(6, 3))

    cmap = plt.get_cmap("tab10")
    colors = [cmap(1), cmap(0)]
    xline = np.arange(max_epoch+1)
    handles = []
    labels = ["valid", "train"]

    for c, dataset in zip(colors, labels):
        std  = np.sqrt(np.var(H[dataset][metric], axis=1))[:max_epoch+1]
        mean = H[dataset][f"m_{metric}"][:max_epoch+1]
        plt.fill_between(xline, mean-std, mean+std, linewidth=0, color=c, alpha=0.4)
        line, = plt.plot(xline, mean, color=c)
        handles += [line]

    plt.xlabel("epoch")
    if metric == "accu":
        plt.ylim([min(min(H["train"]["m_accu"]), min(H["valid"]["m_accu"])), 1])
    elif metric == "loss":
        plt.ylim([0, max(max(H["train"][f"m_loss"]), max(H["valid"][f"m_loss"]))])
    plt.legend(handles[::-1], labels[::-1])
    plt.grid()

In [None]:
os.makedirs("backup", exist_ok=True)
backup_folder = "backup/new"
os.makedirs(backup_folder, exist_ok=True)    
    
plot_metric(H, max_epoch=epochs, metric="accu")
plt.savefig(f"{backup_folder}/accu.pdf", bbox_inches="tight")
plot_metric(H, max_epoch=epochs, metric="loss")
plt.savefig(f"{backup_folder}/loss.pdf", bbox_inches="tight")

## Sauvegarde

In [None]:
torch.save(model.state_dict(), f"{backup_folder}/model.pth")

backup = {}
backup["loader"] = {"train": len(train_loader), "valid": len(valid_loader), "test": len(test_loader)}
backup["opti"]   = {"optimizer": "SGD", "lr": lr, "batch_size": batch_size, "epochs": epochs}
backup["train_loss"] = H["train"]["m_loss"].tolist()
backup["train_accu"] = H["train"]["m_accu"].tolist()
backup["valid_loss"] = H["valid"]["m_loss"].tolist()
backup["valid_accu"] = H["valid"]["m_accu"].tolist()
backup["train_loss_batch"] = H["train"]["loss"].tolist()
backup["train_accu_batch"] = H["train"]["accu"].tolist()
backup["valid_loss_batch"] = H["valid"]["loss"].tolist()
backup["valid_accu_batch"] = H["valid"]["accu"].tolist()

with open(f"{backup_folder}/backup.json", "w") as fd:
    json.dump(backup, fd)

## Chargement d'une sauvegarde

In [None]:
backup_folder = "backup/new"

with open(f"{backup_folder}/backup.json") as fd:
    backup = json.load(fd)
    
H = {"train": {}, "valid": {}}
H["train"]["m_loss"] = np.array(backup["train_loss"])
H["train"]["m_accu"] = np.array(backup["train_accu"])
H["valid"]["m_loss"] = np.array(backup["valid_loss"])
H["valid"]["m_accu"] = np.array(backup["valid_accu"])
H["train"]["loss"] = np.array(backup["train_loss_batch"])
H["train"]["accu"] = np.array(backup["train_accu_batch"])
H["valid"]["loss"] = np.array(backup["valid_loss_batch"])
H["valid"]["accu"] = np.array(backup["valid_accu_batch"])

epochs = backup["opti"]["epochs"]

# model = LeakTestNet().to(device)
model, _ = init_resnet18_binary()
model_weights = torch.load(f"{backup_folder}/model.pth")
model.load_state_dict(model_weights)

In [None]:
plot_metric(H, max_epoch=epochs, metric="accu")
plot_metric(H, max_epoch=epochs, metric="loss")

In [None]:
(mean_loss, loss, mean_accu, accu) = eval_model(model, test_loader, criterion, accuracy)
print(f"Best model: ({mean_loss:.3f}, {mean_accu:.2%})")