## Prepare datasets

In [None]:
# from google.colab import drive

# drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [17]:
DATA_PATH = "../dataset/bootcamp"

## Import some Library

In [18]:
import torch
import torch.nn as nn
from PIL import Image
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2
import os
import matplotlib.pyplot as plt

## Datasets Loader

In [21]:
files = os.listdir(DATA_PATH)
train_path = os.path.join(DATA_PATH, 'train')
img_train_path = os.path.join(train_path, 'images')
print(os.listdir(img_train_path))

['0001TP_006690.png', '0001TP_006720.png', '0001TP_006750.png', '0001TP_006780.png', '0001TP_006810.png', '0001TP_006840.png', '0001TP_006870.png', '0001TP_006900.png', '0001TP_006930.png', '0001TP_006960.png', '0001TP_006990.png', '0001TP_007020.png', '0001TP_007050.png', '0001TP_007080.png', '0001TP_007110.png', '0001TP_007140.png', '0001TP_007170.png', '0001TP_007200.png', '0001TP_007230.png', '0001TP_007260.png', '0001TP_007290.png', '0001TP_007320.png', '0001TP_007350.png', '0001TP_007380.png', '0001TP_007410.png', '0001TP_007440.png', '0001TP_007470.png', '0001TP_007500.png', '0001TP_007530.png', '0001TP_007560.png', '0001TP_007590.png', '0001TP_007620.png', '0001TP_007650.png', '0001TP_007680.png', '0001TP_007710.png', '0001TP_007740.png', '0001TP_007770.png', '0001TP_007800.png', '0001TP_007830.png', '0001TP_007860.png', '0001TP_007890.png', '0001TP_007920.png', '0001TP_007950.png', '0001TP_007980.png', '0001TP_008010.png', '0001TP_008040.png', '0001TP_008070.png', '0001TP_0081

In [3]:
class CityScapes(torch.utils.data.DataLoader):
  def __init__(self, files_list):
    super().__init__()
    self.files_list = files_list
  #magic method
  def __len__(self): #ada berapa banyak sih datapoint kita yang akan kita train
    return len(self.files_list)

  def __getitem__(self, idx):
    file_name = os.path.join(PATH, self.files_list[idx])
    file_img = os.path.join(file_name, f'images/{self.files_list[idx]}.png')

    folder_masks = os.path.join(file_name, 'annotations')
    file_masks = [os.path.join(folder_masks, mask) for mask in os.listdir(folder_masks)]

    image = Image.open(file_img).convert('RGB')
    image = np.array(image)

    masks = np.zeros(image.shape[:2], dtype=np.uint8)
    for m in file_masks:
      mask = Image.open(m)
      mask = np.asarray(mask)
      masks += mask

    output = data_transforms(image=image, mask=masks)
    image = output['image']
    masks = output['mask']

    return image, masks.unsqueeze(0)/255

## Model Architecture

- DoubleConv

In [4]:
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.conv(x)

- Unet

In [5]:
class UNET(nn.Module):
    def __init__(
            self, in_channels=3, out_channels=1, features=[64, 128, 256, 512],
    ):
        super(UNET, self).__init__()
        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        # doblemconv, dobleconv..
        # Down part of UNET
        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature

        #upsam, doubleconv, up, ..
        # Up part of UNET
        for feature in reversed(features):
            self.ups.append(#0
                nn.ConvTranspose2d(
                    feature*2, feature, kernel_size=2, stride=2,
                )
            )
            self.ups.append(DoubleConv(feature*2, feature))#1

        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def forward(self, x):
        skip_connections = []

        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)

        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1] # reversed(skip_connections)

        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2]

            concat_skip = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx+1](concat_skip)

        return self.final_conv(x)

# FCN8s

In [1]:
class FCN8s(nn.Module):
  def __init__(self, in_channels, out_channels, features= [64, 128, 256, 512, 1024]):
    super().__init__()
    self.layers = nn.ModuleList()
    self.pool = nn.MaxPool2d(2, 2)

    for feature in features:
      self.layers.append(DoubleConv(in_channels, feature))
      in_channels = feature

    self.ups1 = nn.ConvTranspose2d(features[-1], features[-2], kernel_size=2, stride=2)
    self.ups2 = nn.ConvTranspose2d(features[-1], features[-3], kernel_size=2, stride=2)

    self.predictions = nn.ConvTranspose2d(features[-2], out_channels, kernel_size=8, stride=8)

  def forward(self, x):
    skip_connections=[]

    for idx,layer in enumerate(self.layers):
      x = layer(x)
      x = self.pool(x)
      if idx in [2,3]:
        skip_connections.append(x)


    ups1 = self.ups1(x)
    concat1 = torch.concat([ups1, skip_connections[-1]], dim=1)

    ups2 = self.ups2(concat1)
    concat2 = torch.concat([ups2, skip_connections[-2]], dim=1)

    return self.predictions(concat2)

NameError: name 'nn' is not defined

## Engine function

- Dice coefficient metrics

In [7]:
def calculate_dice_coefficient(ground_truth, predicted):
    intersection = np.logical_and(ground_truth, predicted)
    dice_coefficient = (2 * np.sum(intersection)) / (np.sum(ground_truth) + np.sum(predicted))
    return dice_coefficient

def calculate_dice_coefficients(ground_truths, predictions):
    num_samples = len(ground_truths)
    dice_coefficients = np.zeros(num_samples)
    for i in range(num_samples):
        dice_coefficients[i] = calculate_dice_coefficient(ground_truths[i], predictions[i])
    return dice_coefficients

- training phase

In [8]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    #please do training step in this function
    model.train()
    loss_one_step = 0
    loop = tqdm(dataloader)
    for data, targets in loop:
        data = data.to('cuda')
        targets = targets.float().to(device="cuda")
        # forward
        with torch.cuda.amp.autocast():
            predictions = model(data)
            loss = loss_fn(predictions, targets)
    
        optim.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optim)
        scaler.update()
        loss_one_step += loss.item()
    
        # update tqdm loop
        loop.set_postfix(loss=loss.item())
    
    return loss_one_step / len(dataloader)

def evaluate(model, dataloader, criterion, device):
    #please do evaluation step that calculate evaluation loss and evaluation metrics dice_score_coefficient
    model.eval()
    loss_one_step = 0
    loop = tqdm(dataloader)

    for data, targets in loop:
        data = data.to('cuda')
        targets = targets.float().to(device="cuda")
        with torch.no_grad():
            with torch.cuda.amp.autocast():
                predictions = model(data)
                loss = loss_fn(predictions, targets)
        loss_one_step += loss.item()

        loop.set_postfix(loss=loss.item())

    return loss_one_step / len(dataloader)

In [None]:
def train():
  #please init everthing in here and do the training process
  pass

In [9]:
save_dir = os.path.join("../models", "UNet")
os.makedirs(save_dir, exist_ok=True)  # Ensure directory exist
def train(train_dataloaders, val_dataloaders, model, loss_fn, optim, num_epochs, log_freq=10, save_best_model=False, best_model_name='best_model.pth', last_model_name='last_model.pth'):
    """
    Train the model for a given number of epochs.
    :param train_dataloaders: A dictionary of dataloaders for training and validation.
    :param val_dataloaders: A dictionary of dataloaders for validation.
    :param model: The model to train.
    :param loss_fn: The loss function to use.
    :param optim: The optimizer to use.
    :param num_epochs: The number of epochs to train for.
    :param log_freq: The frequency with which to log training metrics.
    :return: The trained model.
    """
    best_model = None
    best_val_loss = float('inf')

    # best_model_name = os.path.join('drive/MyDrive/UNet/ckpt_save', best_model_name)
    # last_model_name = os.path.join('drive/MyDrive/UNet/ckpt_save', last_model_name)
    
    best_model_name = os.path.join('../models/UNet', best_model_name)
    last_model_name = os.path.join('../models/UNet', last_model_name)

    for epoch in range(num_epochs):
        train_loss = train_engine(train_dataloaders, model, loss_fn, optim)
        val_loss = val_engine(val_dataloaders, model, loss_fn)

        is_best = False
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            if save_best_model:
                best_model = model
                torch.save(best_model.state_dict(), best_model_name)
                torch.save(model.state_dict(), last_model_name)
                is_best = True

        if epoch % log_freq == 0:
            print('Epoch {}/{}'.format(epoch, num_epochs - 1))
            print('-' * 10)
            print('Train Loss: {:.4f}'.format(train_loss))
            print('Val Loss: {:.4f}'.format(val_loss))
            if is_best:
                print(f'✅ Best model saved! (Val Loss: {format(val_loss)})')
            print()

    return model

In [10]:
model = train(train_dataloader, val_dataloader, model, loss_fn, optim, 100, log_freq=1, save_best_model=True)

NameError: name 'train_dataloader' is not defined