In [None]:
!pip install torchmetrics
!pip install timm

In [None]:
import os
import pandas as pd
import numpy as np
import cv2
from torchvision.io import read_image
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, random_split, DataLoader
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

from torchvision.transforms import ToTensor
from PIL import Image
import os

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision 
from torchvision import transforms
from torchinfo import summary
import timm
import torchmetrics
from tqdm import tqdm
import wandb

In [None]:
#Hyperparameter: 
batch_size = 8 # batch_size 

epochs = 70 # number of epochs 

train_size = 0.9 

learning_rate = 0.0001 # learning_rate


#1 Prepare data \
This part I reference many source code from the "code" part in the competition website 


In [None]:
### This part I reference source code from the "code" part in the competition website 
class CustomImageDataset(Dataset):
    def __init__(self, images_path, masks_path, resize=None):
        self.img_dir = images_path
        self.label_dir = masks_path
        self.resize = resize
        images_list = os.listdir(images_path)
        masks_list = os.listdir(masks_path)
        
        self.images_list = [images_path +"/"+ image_name for image_name in images_list]
        self.masks_list = [masks_path +"/" +  mask_name for mask_name in masks_list]

    def __len__(self):
        return len(self.images)
    
    def read_mask(self, mask_path):
        image = cv2.imread(mask_path)
        image = cv2.resize(image, self.resize)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        
        
        # Create red_mask for ground_truth image
        lower1 = np.array([0, 100, 20])
        upper1 = np.array([10, 255, 255])

        lower2 = np.array([160,100,20])
        upper2 = np.array([179,255,255])
        lower_mask = cv2.inRange(image, lower1, upper1)
        upper_mask = cv2.inRange(image, lower2, upper2)
        
        red_mask = lower_mask + upper_mask
        red_mask[red_mask != 0] = 1
        
        # Create green_mask for ground_truth image
        green_mask = cv2.inRange(image, (36, 25, 25), (70, 255, 255))
        green_mask[green_mask != 0] = 2
        
        
        full_mask = cv2.bitwise_or(red_mask, green_mask)
        full_mask = np.expand_dims(full_mask, axis=-1) 
        full_mask = full_mask.astype(np.uint8)
        
        return full_mask

    def __getitem__(self, idx):
        img_path = self.images_list[idx]
        label_path = self.masks_list[idx]
        image = cv2.imread(img_path)  # Đọc ảnh dưới dạng BGR
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Convert sang RGB
        label = self.read_mask(label_path)  
        image = cv2.resize(image, self.resize)
            
        return image, label

    def show_image(self, idx):
        img_path = self.images_list[idx]
        label_path = self.masks_list[idx]
        image = plt.imread(img_path)
        label = plt.imread(label_path)
        fig, axs = plt.subplots(1, 2, figsize=(10, 5))
        axs[0].imshow(image)
        axs[0].set_title('Image')
        axs[1].imshow(label)
        axs[1].set_title('Label')
        plt.show()

In [None]:
TRAIN_DIR = '/kaggle/input/bkai-igh-neopolyp/train/train'
TRAIN_MASK_DIR = '/kaggle/input/bkai-igh-neopolyp/train_gt/train_gt'


In [None]:
dataset = CustomImageDataset(images_path= TRAIN_DIR,
                             masks_path= TRAIN_MASK_DIR,
                             resize= (256,256))

In [None]:
# Split the datasets into image data list and label data image, to be convinient to split the training data and valid data later 
images_data = []
labels_data = []
for x,y in dataset:
    images_data.append(x)
    labels_data.append(y)

In [None]:
class CustomDataset(CustomImageDataset):
    def __init__(self, data, targets, transform=None):
        self.data = data
        self.targets = targets
        self.transform = transform

    def __getitem__(self, index):
        image = self.data[index]
        label = self.targets[index]
        if self.transform:
            transformed = self.transform(image=image, mask=label)
            image = transformed['image'].float()
            label = transformed['mask'].float()
            label = label.permute(2, 0, 1)
        return image, label
    
    def __len__(self):
        return len(self.data)


In [None]:
### This part I also reference source code from the "code" part in the competition website 
train_transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomGamma (gamma_limit=(70, 130), eps=None, always_apply=False, p=0.2),
    A.RGBShift(p=0.3, r_shift_limit=10, g_shift_limit=10, b_shift_limit=10),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

val_transform = A.Compose([
    A.Normalize(mean=(0.485, 0.456, 0.406),std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])


train_size = int(train_size * len(images_data))
val_size = len(images_data) - train_size
train_dataset = CustomDataset(images_data[:train_size], labels_data[:train_size], transform=train_transform)
val_dataset = CustomDataset(images_data[train_size:], labels_data[train_size:], transform=val_transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

#2 Build the model: \
I build the UNet Model with the backbone of encoder is **"resnet101" **

I Use the module **timm** to import backbone **"resnet101"** and put in into UNet model

In [None]:
class decoder_block(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(decoder_block, self).__init__()
        
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding='same')
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding='same')
        
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.relu = nn.ReLU() 
        self.dropout = nn.Dropout(p=0.3)
    
    def forward(self, x, skip_layer):
        x = torch.cat([x, skip_layer], axis=1)
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.dropout(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        
        return x

In [None]:
class bottleneck_block(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(bottleneck_block, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding='same')
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding='same')
        
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.3)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.dropout(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        
        return x

In [None]:
# UNet model
class UNet(nn.Module):
    def __init__(self, n_class=3):
        super(UNet, self).__init__()
#         # Encoder blocks

        ## backbone:
        self.backbone = timm.create_model("resnet101", pretrained = True, features_only = True)
        
        
        # Bottleneck block
        self.bottleneck = bottleneck_block(2048, 1024)
        
        # Decoder blocks
        self.dec1 = decoder_block(1024+1024, 512)
        self.dec2 = decoder_block(512+512, 256)
        self.dec3 = decoder_block(256+256, 128)
        self.dec4 = decoder_block(128+64, 64)
        
        #upsampling
        self.transpose_conv1 = nn.ConvTranspose2d(1024,1024, kernel_size=2, stride=2)
        self.transpose_conv2 = nn.ConvTranspose2d(512,512, kernel_size=2, stride=2)
        self.transpose_conv3 = nn.ConvTranspose2d(256,256, kernel_size=2, stride=2)
        self.transpose_conv4 = nn.ConvTranspose2d(128,128, kernel_size=2, stride=2)
        self.transpose_conv5 = nn.ConvTranspose2d(n_class,n_class, kernel_size=2, stride=2)



        
        # 1x1 convolution
        self.out = nn.Conv2d(64, n_class, kernel_size=1, padding='same')
        
    def forward(self, image):
        n1,n2,n3,n4,n5 = self.backbone(image)
        
        n6 = self.bottleneck(n5)
        
        n7 = self.dec1(self.transpose_conv1(n6), n4)
        n8 = self.dec2(self.transpose_conv2(n7), n3)
        n9 = self.dec3(self.transpose_conv3(n8), n2)
        n10 = self.dec4(self.transpose_conv4(n9), n1)
        
        
        output = self.out(n10)
        
        
        return self.transpose_conv5(output)

In [None]:
image,label = train_dataset[2]

label_array = label.permute(1, 2, 0).numpy()
image_array = image.permute(1, 2, 0).numpy()

fig, axs = plt.subplots(1, 2, figsize=(10, 5))

axs[0].imshow(image_array)
axs[0].set_title('Image')
axs[0].axis('off')  
axs[1].imshow(label_array)
axs[1].set_title('Label')
axs[1].axis('off')  

plt.show()

In [None]:
color_dict= {0: (0, 0, 0),
             1: (255, 0, 0),
             2: (0, 255, 0)}
def mask_to_rgb(mask, color_dict):
    output = np.zeros((mask.shape[0], mask.shape[1], 3))

    for k in color_dict.keys():
        output[mask==k] = color_dict[k]

    return np.uint8(output)    

In [None]:
# wandb.init(
#     project = 'Unet_polyp-Segmentation'
# )

In [None]:
# Train function for each epoch
def train(train_dataloader, valid_dataloader,optimizer, epoch):
    print(f"Start epoch #{epoch+1}, learning rate for this epoch: {optimizer.param_groups[0]['lr']}")
    train_loss_epoch = 0
    test_loss_epoch = 0
    last_loss = 999999999
    model.train()
    for i, (data,targets) in enumerate(tqdm(train_dataloader), start = 1):
        n = data.shape[0]
        
        # Load data into GPU
        data, targets = data.to(device), targets.to(device)
        
        targets  = targets.squeeze(dim = 1).long()

        optimizer.zero_grad()
        outputs = model(data)

                

        # Backpropagation, compute gradients
        loss = loss_function(outputs, targets.long())
        loss.backward()

        # Apply gradients
        optimizer.step()
        
        # Save loss
        
        with torch.no_grad():
            mask = outputs.argmax(dim = 1).squeeze(1)
            train_loss_epoch += loss.item()
            
            # dice Score:
            dice_score = dice_fn(mask, targets.long())
            
            # iou_score:
            iou_score = iou_fn(mask, targets.long())
            
            train_loss_meter.update(loss.item(), n)
            iou_meter.update(iou_score,n)
            dice_meter.update(dice_score, n)
            
            
    print("Epoch: {}, train_loss: {}, IoU: {}, dice_score: {}".format(
            epoch, train_loss_meter.avg,iou_meter.avg, dice_meter.avg
        ))
           
                
    train_loss_epoch/= (i + 1)
    
    # Evaluate the validation set
    model.eval()
#     count = 0
    with torch.no_grad():
        for data, targets in valid_dataloader:
            data, targets = data.to(device), targets.to(device)
            test_output = model(data)
            targets  = targets.squeeze(dim = 1).long()

            test_loss = loss_function(test_output, targets.long())
            test_loss_epoch += test_loss.item()
            
#             if count == len(valid_dataloader):
#                 label = targets[0].cpu().numpy()
#                 label = mask_to_rgb(label,color_dict)
#                 outputs[0] = test_output[0].softmax(dim=0)
#                 output = outputs[0].cpu().numpy()
#                 output = np.argmax(output, axis=0)
#                 output = mask_to_rgb(output,color_dict)
#                 fig, axs = plt.subplots(1, 2, figsize=(10, 5))
#                 axs[0].imshow(label)
#                 axs[0].set_title('Label')
#                 axs[1].imshow(output)
#                 axs[1].set_title('Output')
#                 plt.show()
            
    test_loss_epoch/= (i+1)
    
    return train_loss_epoch , test_loss_epoch

In [None]:
class AverageMeter(object):
    def __init__(self):
        self.reset()
        
    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0
        
    def update(self, val, n=1):
        self.val = val
        self.sum += self.val *n 
        self.count += n
        self.avg = self.sum/self.count

In [None]:
def save_model(model, optimizer, path):
    checkpoint = {
        "model": model.state_dict(),
        "optimizer": optimizer.state_dict(),
    }
    torch.save(checkpoint, path)

def load_model(model, optimizer, path):
    checkpoint = torch.load(path)
    model.load_state_dict(checkpoint["model"])
    optimizer.load_state_dict(checkpoint['optimizer'])
    return model, optimizer

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else "cpu")
# Sepecify model :
model = UNet()
model.to(device)

#Define loss funciton 
loss_function = nn.CrossEntropyLoss()

# Define the optimizer (Adam optimizer)
optimizer = optim.Adam(params=model.parameters(), lr=learning_rate)
#optimizer.load_state_dict(checkpoint['optimizer'])


#metrics: 
dice_fn = torchmetrics.Dice(num_classes = 3, average = "macro").to(device)

iou_fn = torchmetrics.JaccardIndex(num_classes = 3, task = "multiclass", average = "macro").to(device)

#meter:
loss_meter = AverageMeter()
dice_meter = AverageMeter()
iou_meter = AverageMeter()
train_loss_meter = AverageMeter()

In [None]:
checkpoint_path = checkpoint_path = '/kaggle/working/unet_model.pth'
save_model(model, optimizer, checkpoint_path)

In [None]:
wandb.login(
#     set the wandb project where this run will be logged
#     project= "PolypSegment", 
    key = "5b871d83e15f995c416a95e926f3841ca478ed62",
)
wandb.init(
    project = "PolypSegment1"
)
#Training loop
train_loss_array = []
test_loss_array = []
last_loss = 9999999999999
for epoch in range(epochs):
    
    loss_meter.reset()
    dice_meter.reset()
    iou_meter.reset()
    
    train_loss_epoch = 0
    test_loss_epoch = 0
    (train_loss_epoch, test_loss_epoch) = train(train_loader, 
                                              valid_loader, 
                                              optimizer, epoch)
    
    if test_loss_epoch < last_loss:
        save_model(model, optimizer, checkpoint_path)
        last_loss = test_loss_epoch
        
    #learing_rate_scheduler.step()
    train_loss_array.append(train_loss_epoch)
    test_loss_array.append(test_loss_epoch)
    wandb.log({"Train loss": train_loss_epoch, "Valid loss": test_loss_epoch})
    print("Epoch {}: loss: {:.4f}".format(epoch + 1, 
                                        train_loss_array[-1]))

    

In [None]:
model, optimizer = load_model(model, optimizer, "/kaggle/working/unet_model.pth")
device = torch.device('cuda' if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
!mkdir prediction

In [None]:
trainsize = 256
model.eval()
for i in os.listdir("/kaggle/input/bkai-igh-neopolyp/test/test"):
    img_path = os.path.join("/kaggle/input/bkai-igh-neopolyp/test/test", i)
    ori_img = cv2.imread(img_path)
    ori_img = cv2.cvtColor(ori_img, cv2.COLOR_BGR2RGB)
    ori_w = ori_img.shape[0]
    ori_h = ori_img.shape[1]
    img = cv2.resize(ori_img, (trainsize, trainsize))
    transformed = val_transform(image=img)
    input_img = transformed["image"]
    input_img = input_img.unsqueeze(0).to(device)
    with torch.no_grad():
        output_mask = model.forward(input_img).squeeze(0).cpu().numpy().transpose(1,2,0)
    mask = cv2.resize(output_mask, (ori_h, ori_w))
    mask = np.argmax(mask, axis=2)
    mask_rgb = mask_to_rgb(mask, color_dict)
    mask_rgb = cv2.cvtColor(mask_rgb, cv2.COLOR_RGB2BGR)
    cv2.imwrite("prediction/{}".format(i), mask_rgb) 

In [None]:
import numpy as np
import pandas as pd
import cv2
import os

def rle_to_string(runs):
    return ' '.join(str(x) for x in runs)

def rle_encode_one_mask(mask):
    pixels = mask.flatten()
    pixels[pixels > 225] = 255
    pixels[pixels <= 225] = 0
    use_padding = False
    if pixels[0] or pixels[-1]:
        use_padding = True
        pixel_padded = np.zeros([len(pixels) + 2], dtype=pixels.dtype)
        pixel_padded[1:-1] = pixels
        pixels = pixel_padded
    rle = np.where(pixels[1:] != pixels[:-1])[0] + 2
    if use_padding:
        rle = rle - 1
    rle[1::2] = rle[1::2] - rle[:-1:2]
    
    return rle_to_string(rle)

def rle2mask(mask_rle, shape=(3,3)):
    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape).T

def mask2string(dir):
    strings = []
    ids = []
    ws, hs = [[] for i in range(2)]
    for image_id in os.listdir(dir):
        id = image_id.split('.')[0]
        path = os.path.join(dir, image_id)
        print(path)
        img = cv2.imread(path)[:,:,::-1]
        h, w = img.shape[0], img.shape[1]
        for channel in range(2):
            ws.append(w)
            hs.append(h)
            ids.append(f'{id}_{channel}')
            string = rle_encode_one_mask(img[:,:,channel])
            strings.append(string)
    r = {
        'ids': ids,
        'strings': strings,
    }
    return r


MASK_DIR_PATH = '/kaggle/working/prediction'
dir = MASK_DIR_PATH
res = mask2string(dir)
df = pd.DataFrame(columns=['Id', 'Expected'])
df['Id'] = res['ids']
df['Expected'] = res['strings']

df.to_csv(r'output.csv', index=False)
print('Done')