## Install Libraries

In [None]:
%pip install -q segmentation-models-pytorch

## Import Libraries

In [None]:
import os
import pandas as pd
import numpy as np
import cv2
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, random_split, DataLoader
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import os

import torch
import torch.nn as nn
import segmentation_models_pytorch as smp

In [None]:
!nvidia-smi -L

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

## Read data

In [None]:
images_path = "/kaggle/input/bkai-igh-neopolyp/train/train/"
image_path = []
TRAIN_DIR = '/kaggle/input/bkai-igh-neopolyp/train/train'
for root, dirs, files in os.walk(TRAIN_DIR):
    for file in files:
        path = os.path.join(root,file)
        image_path.append(path)
        
len(image_path)

In [None]:
mask_path = []
TRAIN_MASK_DIR = '/kaggle/input/bkai-igh-neopolyp/train_gt/train_gt'
for root, dirs, files in os.walk(TRAIN_MASK_DIR):
    for file in files:
        path = os.path.join(root,file)
        mask_path.append(path)
        
len(mask_path)

In [None]:
class TrainDataset(Dataset):
    def __init__(self, img_dir, label_dir, resize=None, transform=None):
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.resize = resize
        self.transform = transform
        self.images = os.listdir(self.img_dir)

    def __len__(self):
        return len(self.images)
    
    def read_mask(self, mask_path):
        image = cv2.imread(mask_path)
        image = cv2.resize(image, self.resize, interpolation=cv2.INTER_AREA)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

        lower_red1 = np.array([0, 100, 20])
        upper_red1 = np.array([10, 255, 255])
        lower_red2 = np.array([160,100,20])
        upper_red2 = np.array([179,255,255])
        
        lower_mask_red = cv2.inRange(image, lower_red1, upper_red1)
        upper_mask_red = cv2.inRange(image, lower_red2, upper_red2)
        
        red_mask = lower_mask_red + upper_mask_red
        red_mask[red_mask != 0] = 1

        green_mask = cv2.inRange(image, (36, 25, 25), (70, 255, 255))
        green_mask[green_mask != 0] = 2

        full_mask = cv2.bitwise_or(red_mask, green_mask)
        full_mask = np.expand_dims(full_mask, axis=-1) 
        full_mask = full_mask.astype(np.uint8)
        
        return full_mask

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.images[idx])
        label_path = os.path.join(self.label_dir, self.images[idx])
        image = cv2.imread(img_path)  #  BGR
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Convert to RGB
        label = self.read_mask(label_path)  
        image = cv2.resize(image, self.resize, interpolation=cv2.INTER_AREA)
        if self.transform:
            image = self.transform(image)
            
        return image, label
    
class TestDataset(Dataset):
    def __init__(self, img_dir="path/to/data", resize = None, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.resize = resize
        self.images = os.listdir(self.img_dir)

    def __len__(self):
        return len(self.images)
    

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.images[idx])
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        height, width, channels = image.shape
        image = cv2.resize(image, self.resize, interpolation=cv2.INTER_AREA)
        name =  os.path.basename(img_path)
        if self.transform:
            transformed = self.transform(image=image)
            image = transformed["image"]
        return image, height, width, name[:-5]

### Hyperparameters

In [None]:
img_resize = (256, 256)
batch_size = 32
learning_rate = 1e-3
alpha = 0.5  # Weight of cross entropy loss 

In [None]:
dataset = TrainDataset(img_dir= TRAIN_DIR,
                             label_dir= TRAIN_MASK_DIR,
                             resize= img_resize,
                             transform = None)

## Augmentation

In [None]:
class AugmentDataset(Dataset):
    def __init__(self, dataset, transform=None, length_multiplier=1):
        self.dataset = dataset
        self.transform = transform
        self.length_multiplier = length_multiplier
        
    def __getitem__(self, idx):
        actual_idx = idx % len(self.dataset)
        image, label = self.dataset[actual_idx]
        if self.transform:
            transformed = self.transform(image=image, mask=label)
            image = transformed['image'].float()
            label = transformed['mask'].float()
            label = label.permute(2, 0, 1)
        return image, label

    def __len__(self):
        return len(self.dataset) * self.length_multiplier

In [None]:
train_transformation = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomGamma (gamma_limit=(70, 130), always_apply=False, p=0.2),
    A.RGBShift(p=0.3, r_shift_limit=10, g_shift_limit=10, b_shift_limit=10),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

val_transformation = A.Compose([
    A.Normalize(mean=(0.485, 0.456, 0.406),std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])
# train_transformation = A.Compose([
#     A.HorizontalFlip(p=0.5),
#     A.VerticalFlip(p=0.5),
#     A.RandomGamma (gamma_limit=(70, 130), always_apply=False, p=0.2),
#     A.RGBShift(p=0.3, r_shift_limit=10, g_shift_limit=10, b_shift_limit=10),
#     A.OneOf([A.Blur(), A.GaussianBlur(), A.GlassBlur(), A.MotionBlur(), A.GaussNoise(), A.Sharpen(), A.MedianBlur(), A.MultiplicativeNoise()], p=0.35),
#     A.CoarseDropout(p=0.2, max_height=15, max_width=15, fill_value=255),
#     A.RandomSnow(snow_point_lower=0.1, snow_point_upper=0.15, brightness_coeff=1.5, p=0.09),
# #     A.RandomShadow(p=0.1),
#     A.ShiftScaleRotate(p=0.15, border_mode=cv2.BORDER_CONSTANT, shift_limit=0.15, scale_limit=0.15),
#     A.RandomCrop((img_resize, img_resize)),
#     A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
#     ToTensorV2(),
# ])
class UnNormalize(object):
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std
        
    def __call__(self, tensor):
        """
        Args:
            tensor (Tensor): Tensor image of size (C, H, W) to be normalized.
        Returns:
            Tensor: Normalized image.
        """
        for t, m, s in zip(tensor, self.mean, self.std):
            t.mul_(s).add_(m)
            # The normalize code -> t.sub_(m).div_(s)
        return tensor
    
unorm = UnNormalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))

In [None]:
# Define the split sizes
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

# Randomly split the dataset
generator = torch.Generator().manual_seed(7)
train_dataset, val_dataset = random_split(dataset, [train_size, val_size], generator=generator)

train_dataset = AugmentDataset(train_dataset, transform=train_transformation, length_multiplier=2)
val_dataset = AugmentDataset(val_dataset, transform=val_transformation)
# test_dataset = AugmentDataset(test_dataset, transform=val_transformation)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
# test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
test_dataset = TestDataset(img_dir="/kaggle/input/bkai-igh-neopolyp/test/test", resize=img_resize, transform=val_transformation)

In [None]:
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
len(train_dataset), len(val_dataset), len(test_dataset)

In [None]:
def show_images(images, labels=None, title=""):
    fig, axes = plt.subplots(1, len(images), figsize=(15, 5))
    if len(images) == 1:
        axes = [axes]
    for i, img in enumerate(images):
        img = unorm(img)
        img = img.permute(1, 2, 0).cpu().numpy()
        axes[i].imshow(img)
        if labels is not None:
            axes[i].imshow(labels[i].squeeze().cpu().numpy(), alpha=0.5, cmap='jet')
        axes[i].axis('off')
    fig.suptitle(title)
    plt.show()
    
# def visualize_samples(train_loader, val_loader, test_loader, unorm, num_samples=3):
#     # Visualize train samples
#     train_images, train_labels = next(iter(train_loader))
#     show_images(train_images[:num_samples], train_labels[:num_samples], title="Train Samples")

#     # Visualize validation samples
#     val_images, val_labels = next(iter(val_loader))
#     show_images(val_images[:num_samples], val_labels[:num_samples], title="Validation Samples")

#     # Visualize test samples
#     test_images, _, _, _ = next(iter(test_loader))
#     show_images(test_images[:num_samples], title="Test Samples")

# visualize_samples(train_loader, val_loader, test_loader, unorm)

## Define Model

In [None]:
import segmentation_models_pytorch as smp

model = smp.UnetPlusPlus(
    encoder_name="resnet152",        
    # encoder_weights="imagenet",     
    in_channels=3,                  
    classes=3     
)

In [None]:
# checkpoint = torch.load('/kaggle/input/kaggle-notebook/model.pth')
# model.load_state_dict(checkpoint['model'])
# model.to(device)

In [None]:
color_dict= {0: (0, 0, 0),
             1: (255, 0, 0),
             2: (0, 255, 0)}
def mask_to_rgb(mask, color_dict):
    output = np.zeros((mask.shape[0], mask.shape[1], 3))

    for k in color_dict.keys():
        output[mask==k] = color_dict[k]

    return np.uint8(output)    

In [None]:
# del train_dataset
# del val_dataset
# del test_dataset

## Train

Define Dice loss function

In [None]:
criterion_ce = nn.CrossEntropyLoss()
criterion_dice = smp.losses.DiceLoss(mode='multiclass')

In [None]:
# model = smp.UnetPlusPlus(
#     encoder_name="resnet34",        
#     # encoder_weights="imagenet",     
#     in_channels=3,                  
#     classes=3     
# )
checkpoint = torch.load('/kaggle/input/unetplusplus-resnet152/model.pth')
model.load_state_dict(checkpoint['model'])
model = model.to(device)

In [None]:
import matplotlib.pyplot as plt

model.eval()
val_loss = 0
with torch.no_grad():
    for images, labels in val_loader:
        images = images.to(device)
        labels = labels.to(device)
        labels = labels.squeeze(dim=1).long()
        
        outputs = model(images)
        loss_dice = criterion_dice(outputs, labels)
        print(loss_dice.item())
        val_loss += loss_dice.item() * images.size(0)
        
        # Show images, outputs, and labels
        _, preds = torch.max(outputs, 1)
        images = images.cpu()
        labels = labels.cpu()
        preds = preds.cpu()
        for i in range(images.size(0)):
            plt.figure(figsize=(15, 5))
            plt.subplot(1, 3, 1)
            plt.imshow(images[i].permute(1, 2, 0))
            plt.title("Input Image")
            plt.axis('off')

            plt.subplot(1, 3, 2)
            plt.imshow(labels[i], cmap='gray')
            plt.title("Ground Truth")
            plt.axis('off')

            plt.subplot(1, 3, 3)
            plt.imshow(preds[i], cmap='gray')
            plt.title("Predicted Output")
            plt.axis('off')

            plt.show()
                
print(f"Loss: {val_loss/len(val_loader.dataset):.10f}")

In [None]:
model.eval()
val_loss = 0
with torch.no_grad():
    for images, labels in val_loader:
        images = images.to(device)
        labels = labels.to(device)
        labels = labels.squeeze(dim=1).long()
        
        outputs = model(images)
        # loss_ce = criterion_ce(outputs, labels)
        loss_dice = criterion_dice(outputs, labels)
        # loss = alpha * loss_ce + (1 - alpha) * loss_dice
        print(loss_dice.item())
        val_loss += loss_dice.item() * images.size(0)
print(f"Loss: {val_loss/len(val_loader.dataset):.10f}")

In [None]:
!mkdir prediction

## Get Testset Prediction

In [None]:
import cv2
import numpy as np

def Smoothed_img(image):
    resized_image = image
    smoothed_image = cv2.GaussianBlur(resized_image, (15, 15), 0)
    red_pixels = (smoothed_image[:, :, 2] > 100)
    green_pixels = (smoothed_image[:, :, 1] > 100)
    black_pixels = ~(red_pixels | green_pixels)
    smoothed_image[red_pixels] = [0, 0, 255]
    smoothed_image[green_pixels] = [0, 255, 0]
    smoothed_image[black_pixels] = [0, 0, 0]

    # pixel_values = np.array(smoothed_image)
    # unique_values = np.unique(pixel_values)
    # print(unique_values)
    return smoothed_image

In [None]:
model.eval()
with torch.no_grad():
    for image, h, w, name in test_loader:
        outputs = model(image.to(device))
        outputs = outputs.squeeze().cpu().numpy()
        outputs = np.argmax(outputs, axis=0)
        outputs = mask_to_rgb(outputs, color_dict)
        w = w.item()
        h = h.item()
        new_size = (w, h)
        resized_image = cv2.resize(outputs, new_size, interpolation=cv2.INTER_CUBIC)
        resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
        resized_image = Smoothed_img(resized_image)
        cv2.imwrite(f"prediction/{name[0]}.png", resized_image)
        

In [None]:
def rle_to_string(runs):
    return ' '.join(str(x) for x in runs)

def rle_encode_one_mask(mask):
    pixels = mask.flatten()
    pixels[pixels > 225] = 255
    pixels[pixels <= 225] = 0
    use_padding = False
    if pixels[0] or pixels[-1]:
        use_padding = True
        pixel_padded = np.zeros([len(pixels) + 2], dtype=pixels.dtype)
        pixel_padded[1:-1] = pixels
        pixels = pixel_padded
    rle = np.where(pixels[1:] != pixels[:-1])[0] + 2
    if use_padding:
        rle = rle - 1
    rle[1::2] = rle[1::2] - rle[:-1:2]
    
    return rle_to_string(rle)

def rle2mask(mask_rle, shape=(3,3)):
    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape).T

def mask2string(dir):
    strings = []
    ids = []
    ws, hs = [[] for i in range(2)]
    for image_id in os.listdir(dir):
        id = image_id.split('.')[0]
        path = os.path.join(dir, image_id)
        # print(path)
        img = cv2.imread(path)[:,:,::-1]
        h, w = img.shape[0], img.shape[1]
        for channel in range(2):
            ws.append(w)
            hs.append(h)
            ids.append(f'{id}_{channel}')
            string = rle_encode_one_mask(img[:,:,channel])
            strings.append(string)
    r = {
        'ids': ids,
        'strings': strings,
    }
    return r


MASK_DIR_PATH = '/kaggle/working/prediction'
dir = MASK_DIR_PATH
res = mask2string(dir)
df = pd.DataFrame(columns=['Id', 'Expected'])
df['Id'] = res['ids']
df['Expected'] = res['strings']

df.to_csv(r'output.csv', index=False)