# Dataset preperation

## Preprocessing

``` python dataset_preprocessing.py ```

1. Preprocess the dataset (squaring, resizing, and monochroming)

2. Split dataset into 90% and 10% from 375 data
    *10% data will be use as data **testing**

## Augmentation

``` python dataset_augmentation.py ```

1. Augmentation the 90% dataset (5000 data)

2. Split the augmentation result by 80% (**training**) and 20% (**validation**)

# Enabling GPU

In [None]:
import torch

if not torch.cuda.is_available():
  raise Exception("GPU not availalbe. CPU training will be too slow.")

print("device name", torch.cuda.get_device_name(0))

# Variable Setup

In [None]:
BATCH_SIZE = 1
EPOCH_NUM = 50

# DataLoader


In [None]:
import glob
import numpy as np
import torchvision.transforms as T

from torchvision.transforms.functional import adjust_gamma
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from preprocessing import *
from helper import *
from PIL import Image


class SimDataset(Dataset):
  def __init__(self, image_path, mask_path):
    self.input_images = glob.glob(str(image_path) + str('/*'))
    self.target_masks =  glob.glob(str(mask_path) + str('/*'))

  def __len__(self):
    return len(self.target_masks)

  def __getitem__(self, idx):
    # Images
    image = self.input_images[idx]
    image_as_image = Image.open(image) 
    image_as_np = np.asarray(image_as_image)

    image_as_np = resize_image(image_as_np, 224, 224)
    image_as_np = clahe_equalized(image_as_np)
    image_as_np = adjust_gamma(image_as_np, 1.2)
    image_as_np = normalization(image_as_np, max=1, min=0)
    image_as_np = np.expand_dims(image_as_np, axis=0)  # add additional dimension
    image_as_tensor = torch.from_numpy(image_as_np).float()  # Convert numpy array to tensorq

    # Mask
    mask = self.target_masks[idx]
    mask_as_mask = Image.open(mask)
    mask_as_np = np.asarray(mask_as_mask)

    mask_as_np = resize_image(mask_as_np, 224, 224)
    # mask_as_np = monochroming_image(mask_as_np)
    mask_as_np = mask_as_np/255
    mask_as_np = np.expand_dims(mask_as_np, axis=0)
    mask_as_tensor = torch.from_numpy(mask_as_np).float()  # Convert numpy array to tensor

    return (image_as_tensor, mask_as_tensor)

train_set = SimDataset('final-dataset/train/images', 'final-dataset/train/masking')
val_set = SimDataset('final-dataset/val/images', 'final-dataset/val/masking')
test_set = SimDataset('final-dataset/test/images', 'final-dataset/test/masking')

image_datasets = {
  'train': train_set, 'val': val_set, 'test': test_set
}

dataloaders = {
  'train': DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=0),
  'val': DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=0),
  'test': DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
}

n_train = len(image_datasets['train'])
n_val = len(image_datasets['val'])
n_test = len(image_datasets['test'])

print("n_train=",n_train)
print("n_val=",n_val)
print("n_val=",n_test)

# Display image and label.
# Ambil satu batch data dari DataLoader
for data_type in ['train', 'val', 'test']:
    # Ambil satu batch data dari DataLoader untuk jenis data tertentu
    train_features, train_labels = next(iter(dataloaders[data_type]))
    print(train_features.shape, train_labels.shape)
    
    # Ambil satu gambar dan label dari batch tersebut
    img = train_features[0].squeeze()
    label = train_labels[0].squeeze()

    # Tampilkan gambar dan label
    f, axarr = plt.subplots(1, 2)
    axarr[0].imshow(img, cmap="gray")
    axarr[0].set_title("Image")
    axarr[1].imshow(label, cmap="gray")
    axarr[1].set_title("Label")
    plt.show()

    np.set_printoptions(threshold=np.inf)  # Mengatur opsi agar seluruh array dicetak
    for element in np.nditer(label.numpy()):
      # if element > 0:
        print(f"Nilai {element}")

# Create U-NET Model Function

In [None]:
import torch.nn as nn


def double_conv(in_channels, out_channels):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True)
    )

class UNet(nn.Module):
    def __init__(self, n_class):
        super().__init__()
                
        self.dconv_down1 = double_conv(1, 64)
        self.dconv_down2 = double_conv(64, 128)
        self.dconv_down3 = double_conv(128, 256)
        self.dconv_down4 = double_conv(256, 512)        

        self.maxpool = nn.MaxPool2d(2)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)        
        
        self.dconv_up3 = double_conv(256 + 512, 256)
        self.dconv_up2 = double_conv(128 + 256, 128)
        self.dconv_up1 = double_conv(128 + 64, 64)
        
        self.conv_last = nn.Conv2d(64, n_class, 1)
        
    def forward(self, x):
        conv1 = self.dconv_down1(x)
        x = self.maxpool(conv1)

        conv2 = self.dconv_down2(x)
        x = self.maxpool(conv2)
        
        conv3 = self.dconv_down3(x)
        x = self.maxpool(conv3)
        
        x = self.dconv_down4(x)
        
        x = self.upsample(x)      
        x = torch.cat([x, conv3], dim=1)
        
        x = self.dconv_up3(x)
        x = self.upsample(x)        
        x = torch.cat([x, conv2], dim=1)       

        x = self.dconv_up2(x)
        x = self.upsample(x)        
        x = torch.cat([x, conv1], dim=1)   
        
        x = self.dconv_up1(x)
        
        out = self.conv_last(x)
        
        return out

# Train U-NET

In [None]:
import torch.nn.functional as F
import torch.optim as optim
import time
import copy
from collections import defaultdict
from torch.optim import lr_scheduler
from tqdm.notebook import trange, tqdm
# from sklearn.metrics import accuracy_score, precision_score, f1_score
import gc


# def dice_loss(pred, target, smooth = 1.):
#     pred = pred.contiguous()
#     target = target.contiguous()    

#     intersection = (pred * target).sum(dim=2).sum(dim=2)
    
#     loss = (1 - ((2. * intersection + smooth) / (pred.sum(dim=2).sum(dim=2) + target.sum(dim=2).sum(dim=2) + smooth)))
    
#     return loss.mean()

# def calc_loss(pred, target, metrics, bce_weight=0.5):
#     bce = F.binary_cross_entropy_with_logits(pred, target)

#     pred = torch.sigmoid(pred)
#     dice = dice_loss(pred, target)

#     loss = bce * bce_weight + dice * (1 - bce_weight)

#     metrics['bce'] += bce.data.cpu().numpy() * target.size(0)
#     metrics['dice'] += dice.data.cpu().numpy() * target.size(0)
#     metrics['loss'] += loss.data.cpu().numpy() * target.size(0)

#     return loss

# def print_metrics(metrics, epoch_samples, phase):
#     outputs = []
#     for k, v in metrics.items():
#         if k != 'y_true' and k != 'y_pred':
#             outputs.append("{}: {:.4f}".format(k, v / epoch_samples))
    
#     # Cetak semua metrik kecuali 'y_true' dan 'y_pred'
#     print("{}: {}".format(phase, ", ".join(outputs)))

def train_model(model, optimizer, scheduler, loss_fn, num_epochs=EPOCH_NUM):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = 1e10
    train_loss = []
    val_loss = []

    for epoch in trange(num_epochs):
        print('Epoch {}/{}'.format(epoch + 1, num_epochs))
        print('-' * 10)

        since = time.time()

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                # for param_group in optimizer.param_groups:
                #     print("LR", param_group['lr'])
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            # metrics = defaultdict(float)
            # metrics['y_true'] = []
            # metrics['y_pred'] = []
            met = 0.0
            epoch_samples = 0
            iter_loss = 0.0
            iterations = 0

            print ('{} Process'.format(phase.capitalize()))

            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = loss_fn(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        iter_loss += loss.item()
                        loss.backward()
                        optimizer.step()
                        scheduler.step()
                    
                    # Collect true and predicted labels for evaluation
                    if phase == 'val':
                        # pred_labels = torch.sigmoid(outputs) > 0.5
                        # pred_labels = pred_labels.cpu().numpy().astype(int).flatten()
                        # true_labels = labels.cpu().numpy().astype(int).flatten()
                        # metrics['y_true'].extend(true_labels)
                        # metrics['y_pred'].extend(pred_labels)
                        iter_loss += loss.item()

                # statistics
                met += loss.data.cpu().numpy() * labels.size(0)
                epoch_samples += inputs.size(0)
                iterations += 1

            # print_metrics(metrics, epoch_samples, phase)
            # epoch_loss = metrics['loss'] / epoch_samples
            epoch_loss = met / epoch_samples
            if phase == 'train':
                train_loss.append(iter_loss/iterations)
            else:
                val_loss.append(iter_loss/iterations)

            # Print loss for each phase
            print('{} Loss: {:.10f}\n'
               .format(phase.capitalize(), iter_loss/iterations))
            
            # deep copy the model
            if phase == 'val' and epoch_loss < best_loss:
                best_loss = epoch_loss
                print('saving best model. val loss: {:10f}'.format(best_loss))
                best_model_wts = copy.deepcopy(model.state_dict())
                torch.save(model.state_dict(),'Unet_Model.pth')

        # time_elapsed = time.time() - since
        # print('{:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))

        stop = time.time()
        print('Time: {}s\n'.format(stop-since))
        
    print('Best val loss: {:10f}'.format(best_loss))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, train_loss, val_loss

# Optimize Cuda Core
torch.cuda.empty_cache()
gc.collect()
torch.cuda.memory_summary(device=None, abbreviated=False)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

num_class = 1
model = UNet(num_class).to(device)
loss_fn = nn.BCEWithLogitsLoss()

# if os.path.exists('Unet_Model.pth'):
#     model.load_state_dict(torch.load('Unet_Model.pth'))

optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4)

exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=30, gamma=0.1)

model, train_loss, val_loss = train_model(model, optimizer_ft, exp_lr_scheduler, loss_fn, num_epochs=EPOCH_NUM)

# Loss
f = plt.figure(figsize=(10, 10))
plt.plot(train_loss, label='Training Loss')
plt.plot(val_loss, label='Testing Loss')
plt.legend()
plt.show()

# Predict U-NET

In [None]:
import math
import torch
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import trange, tqdm
from helper import *


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

num_class = 1
model = UNet(num_class).to(device)

loss_fn = nn.BCEWithLogitsLoss()
 
all_preds = []
all_labels = []
total_loss = 0.0\

model.load_state_dict(torch.load('Unet_Model.pth'))

model.eval()   # Set model to the evaluation mode

def reverse_transform(inp): 
    inp = inp.transpose((1, 2, 0))
    new_inp = np.zeros((inp.shape[0],inp.shape[1],3))
    for ch in range(new_inp.shape[2]):
        for i in range(new_inp.shape[0]):
            for j in range(new_inp.shape[1]):
                if inp[i,j]>0:
                    new_inp[i,j,ch]=1.0
                else:
                    new_inp[i,j,ch]=0.0

    new_inp = (new_inp * 255).astype(np.uint8)

    return new_inp

def reverse_transform_input(inp):
    inp = inp.transpose((1, 2, 0))
    new_inp = np.zeros((inp.shape[0],inp.shape[1],3))
    for ch in range(new_inp.shape[2]):
        for i in range(new_inp.shape[0]):
            for j in range(new_inp.shape[1]):
                new_inp[i,j,ch]=inp[i,j]
    new_inp = (new_inp * 255).astype(np.uint8)

    return new_inp

for index, (inputs, labels) in enumerate(tqdm(dataloaders['test'])):
    inputs = inputs.to(device)
    labels = labels.to(device)

    # Predict
    pred = model(inputs)
    # print(pred.shape)
    # print("Nilai Minimal Prediksi:", pred.min())
    # print("Nilai Maximal Prediksi:", pred.max())
    
    # Menentukan jumlah kolom untuk tampilan bersampingan
    num_columns = 3

    # Hitung loss
    loss = loss_fn(pred, labels)

    # Akumulasi total loss
    total_loss += loss.item()

    # Konversi prediksi dan label ke CPU dan numpy
    pred = pred.data.cpu().numpy()
    inputs_cpu = inputs.data.cpu().numpy()
    labels_cpu = labels.data.cpu().numpy()

    num_samples = pred.shape[0]

    # Mengumpulkan hasil prediksi dan label
    all_preds.append(pred)
    all_labels.append(labels_cpu)
    
    # Menggunakan subplot untuk menampilkan gambar
    plt.figure(figsize=(15, 5 * num_samples))
    
    for i in range(num_samples):
        # Menampilkan gambar input
        plt.subplot(num_samples, num_columns, i * num_columns + 1)
        plt.imshow(reverse_transform_input(inputs_cpu[i]))
        plt.title('Input')
        
        # Menampilkan gambar prediksi
        plt.subplot(num_samples, num_columns, i * num_columns + 2)
        plt.imshow(reverse_transform(pred[i]))
        plt.title('Prediksi')
        
        # Menampilkan histogram prediksi
        plt.subplot(num_samples, num_columns, i * num_columns + 3)
        plt.hist(pred[i].flatten(), bins=20)
        plt.title('Histogram Prediksi')
    
    plt.show()

    print("Shape: ", pred.shape)
    print("Nilai Minimal Prediksi:", pred.min())
    print("Nilai Maximal Prediksi:", pred.max())

    # Menampilkan loss pada setiap input
    print(f"Loss: {loss.item()}\n")

# Menghitung rata-rata loss
average_loss = total_loss / len(dataloaders['test'])

# Menampilkan hasil rata-rata loss
print(f"Average Loss: {average_loss}")