In [None]:
# %pip install -qqqU wandb transformers lightning albumentations torchmetrics torchinfo
# %pip install -qqq requests gradio

In [None]:
# %pip install opencv-python
# %pip install numpy
# %pip install matplotlib

### Importing the neccessary python libraries

In this project there were some various libraries which need to be imported to be able to allow us achieve our desired task
these libraries include 
* `os` -- 
* `zipfile`--
* `platform`--
* `warnings`--
* `glob`--
* `dataclass` -- 
* `wandb` -- 
* `opencv-python`-- (imported as cv)  this library helps us view and manipulate image data
* `requests`  -- this library helps us download our dataset from the internet
* `numpy` --
* 
* `matplotlib` -- this library helps us plot our image data 
* `torch` -- this is the library used for the training of the dataset
* `Albumentations` -- for augmentation of the image data
* `Transformers` --  To load the transformer model
* `Pytorch lightning module` -- To simplify and structure code implementations
* `torchmetrics` -- For evaluating the models performance
* `torchinfo` -- 

In [None]:
import os
import zipfile
import platform
import warnings
from glob import glob
from dataclasses import dataclass
import wandb

In [None]:
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb=8'

In [None]:

import cv2 as cv 
import requests
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import albumentations as A
from albumentations.pytorch import ToTensorV2

from transformers import SegformerForSemanticSegmentation

import lightning.pytorch as pl
from lightning.pytorch.loggers import WandbLogger
from lightning.pytorch.callbacks import LearningRateMonitor, ModelCheckpoint

from torchmetrics import MeanMetric
from torchmetrics.classification import MulticlassF1Score

from torchinfo import summary

In [None]:
torch.set_float32_matmul_precision('high')

os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":16:8"

%matplotlib inline

In [None]:
# wandb.login()

In [None]:
@dataclass(frozen=True)
class DatasetConfig:
    NUM_CLASSES: int = 4
    IMAGE_SIZE: tuple[int,int] = (288, 288)
    MEAN:tuple = (0.485, 0.456, 0.406)
    STD: tuple = (0.229, 0.224, 0.225)
    URL: str = r"https://www.dropbox.com/scl/fi/r0685arupp33sy31qhros/dataset_UWM_GI_Tract_train_valid.zip?rlkey=w4ga9ysfiuz8vqbbywk0rdnjw&dl=1"
    # DATASET_PATH: str = os.path.join(os.getcwd(),'dataset_UWM_GI_Tract_train_valid')
    DATASET_PATH: str = r"C:\Users\ANING\OneDrive\Documents\python\dataset_UWM_GI_Tract_train_valid"
    

In [None]:
class DatasetSegmentation(Dataset):
    def __init__(self,is_training,mean,std,img_size,images,masks):
        self.img_size = img_size
        self.images = images
        self.masks = masks
        self.mean = mean
        self.std = std
        self.is_training = is_training
        self.transforms = self.transformations(self.mean,self.std)
        pass

    def __len__(self):
        return len(self.images)

    def load_images(self,image,depth= 0):
        image = cv.imread(image,depth)
        if depth == cv.IMREAD_COLOR:
            cv.cvtColor(image,cv.COLOR_BGR2RGB)
        else:
            pass
        return cv.resize(image,self.img_size,interpolation=cv.INTER_NEAREST)

    def transformations(self,mean,std):
        transformers = []

        if self.is_training ==True:
            transformers.extend([
                A.HorizontalFlip(p=0.5), A.VerticalFlip(p=0.5),
                A.ShiftScaleRotate(scale_limit=0.12, rotate_limit=0.15, shift_limit=0.12, p=0.5),
                A.RandomBrightnessContrast(p=0.5),
                A.CoarseDropout(max_holes=8, max_height=self.img_size[1]//20, max_width=self.img_size[0]//20, min_holes=5, fill_value=0, mask_fill_value=0, p=0.5)
            ])
        
        transformers.extend([
                A.Normalize(mean=mean, std=std, always_apply=True),
                ToTensorV2(always_apply=True),  # (H, W, C) --> (C, H, W)
            ])

        return A.Compose(transformers)
    
    def __getitem__(self, index):
        image = self.load_images(self.images[index],depth=cv.IMREAD_COLOR)
        mask = self.load_images(self.masks[index],depth=cv.IMREAD_GRAYSCALE)
        transformed = self.transforms(image = image,mask = mask)
        image,mask = transformed['image'] , transformed['mask'].to(torch.long)
        return image,mask


    


        

In [None]:
torch.cuda.empty_cache()

In [None]:

class SegmentationDataset(pl.LightningDataModule):
    def __init__(self,num_classes=7,img_size=(288, 288),ds_mean=(0.485, 0.456, 0.406),ds_std=(0.229, 0.224, 0.225),batch_size=32,num_workers=12, pin_memory=False,shuffle_validation=False,) :

        super().__init__()
        self.num_classes = num_classes
        self.mean = ds_mean
        self.batch_size = batch_size
        self.std  = ds_std
        self.img_size = img_size
        self.shuffle_validation = shuffle_validation
        self.pin_memory = pin_memory
        self.num_workers = num_workers
      

    def setup(self,*args,**kwargs):
       train_images = sorted(glob(os.path.join(DatasetConfig.DATASET_PATH,'train','images',r'*.png')))
       train_masks = sorted(glob(os.path.join(DatasetConfig.DATASET_PATH,'train','masks',r'*.png')))
       valid_images = sorted(glob(os.path.join(DatasetConfig.DATASET_PATH,'valid','images',r'*.png')))
       valid_masks = sorted(glob(os.path.join(DatasetConfig.DATASET_PATH,'valid','masks',r'*.png')))
       
       self.train_ds = DatasetSegmentation(is_training=True,mean=self.mean,std=self.std,images=train_images,masks=train_masks,img_size=self.img_size)
       self.valid_ds = DatasetSegmentation(is_training=False,mean=self.mean,std=self.std,images=valid_images,masks=valid_masks,img_size=self.img_size)

       
    def train_dataloader(self):
        return DataLoader(self.train_ds,batch_size=self.batch_size,drop_last=True,num_workers=self.num_workers,pin_memory=self.pin_memory,shuffle=True,)
    
    def val_dataloader(self):
        return  DataLoader(self.valid_ds,batch_size=self.batch_size,drop_last=False,num_workers=self.num_workers,shuffle=self.shuffle_validation)

        


       

In [None]:
sd = SegmentationDataset(
    num_classes=DatasetConfig.NUM_CLASSES,
    img_size=DatasetConfig.IMAGE_SIZE,
    ds_mean=DatasetConfig.MEAN,
    ds_std=DatasetConfig.STD,
    batch_size=1,
    num_workers=0,
    shuffle_validation=True,
)

sd.setup()

train_batch = sd.train_dataloader()
valid_batch = sd.val_dataloader()

In [None]:
for images,masks in train_batch:
    print(masks.shape)

In [None]:
index_to_color = {
    0 : (0,0,0),
    1 : (0,0,255),
    2 : (0,255,0),
    3 : (255,0,0),
}

In [None]:
def mask_to_label(msks,color_map):
    mask = np.zeros(msks.shape[:2] + (3,))
    img_mask = np.squeeze(msks)

    for k in color_map.keys():
        mask[k == img_mask] = color_map[k]

    return np.float32(mask)/255

In [None]:
def overlay_mask(image,mask):
    alpha = 1.0
    beta = 0.7
    gamma = 0.0

    mask = cv.cvtColor(mask,cv.COLOR_RGB2BGR)
    image = cv.cvtColor(image,cv.COLOR_RGB2BGR)

    image = cv.addWeighted(image,alpha,mask,beta,gamma)
    image = cv.cvtColor(image,cv.COLOR_BGR2RGB)

    return np.clip(image,0.0,1.0)

In [None]:
def display_image_and_mask(*, images, masks, color_map=index_to_color):
    title = ["GT Image", "Color Mask", "Overlayed Mask"]

    for idx in range(images.shape[0]):
        image = images[idx]
        grayscale_gt_mask = masks[idx]

        fig = plt.figure(figsize=(15, 4))

        # Create RGB segmentation map from grayscale segmentation map.
        rgb_gt_mask = mask_to_label(grayscale_gt_mask, color_map=color_map)

        # Create the overlayed image.
        overlayed_image = overlay_mask(image, rgb_gt_mask)

        plt.subplot(1, 3, 1)
        plt.title(title[0])
        plt.imshow(image)
        plt.axis("off")

        plt.subplot(1, 3, 2)
        plt.title(title[1])
        plt.imshow(rgb_gt_mask)
        plt.axis("off")

        plt.imshow(rgb_gt_mask)
        plt.subplot(1, 3, 3)
        plt.title(title[2])
        plt.imshow(overlayed_image)
        plt.axis("off")

        plt.tight_layout()
        plt.show()

    return

In [None]:
def denormalize(tensors, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)):
    for c in range(3):
        tensors[:, c, :, :].mul_(std[c]).add_(mean[c])

    return torch.clamp(tensors, min=0.0, max=1.0)

In [None]:
for images, masks in valid_batch:

    images = denormalize(images, mean=DatasetConfig.MEAN, std=DatasetConfig.STD).permute(0, 2, 3, 1).numpy()
    masks  = masks.numpy()
    
    display_image_and_mask(images=images, masks=masks)

    break

In [None]:
def get_model(model_name,num_classes):
    model = SegformerForSemanticSegmentation.from_pretrained(
        model_name,
        num_labels=num_classes,
        ignore_mismatched_sizes=True,
    )
    return model

In [None]:

class SegmentationModel(pl.LightningModule):
    def __init__(self, model_name,num_classes):
        super().__init__()
        self.model = get_model(model_name,num_classes)


    def forward(self,data):
        output = self.model(pixel_values =data,return_dict=True)
        unsampled_logists = F.interpolate(output['logists'],size=data.shape[-2:],mode='bilinear',align_corners=False)
        return unsampled_logists

    def training_step(self, batch, *args,**kwargs):
        data, target = batch
        output = self(data)
        loss = torch.nn.functional.nll_loss(output, target.view(-1))
        return loss

    def validation_step(self, batch, *args, **kwargs):
        data, target = batch
        output = self(data)
        loss = torch.nn.functional.nll_loss(output, target.view(-1))
        return loss

    def on_train_epoch_end(self):
        self.log("epoch", self.current_epoch)

    def configure_optimizers(self):
        return torch.optim.SGD(self.model.parameters(), lr=0.1)

In [None]:
class TrainingConfig:
    BATCH_SIZE:      int = 48 # 8
    NUM_EPOCHS:      int = 100
    INIT_LR:       float = 3e-4
    NUM_WORKERS:     int = 0 if platform.system() == "Windows" else os.cpu_count()

    OPTIMIZER_NAME:  str = "AdamW"
    WEIGHT_DECAY:  float = 1e-4
    USE_SCHEDULER:  bool = True # Use learning rate scheduler?
    SCHEDULER:       str = "MultiStepLR" # Name of the scheduler to use.
    MODEL_NAME:str = "nvidia/segformer-b4-finetuned-ade-512-512" 

In [None]:
model = SegmentationModel(
    model_name=TrainingConfig.MODEL_NAME,
    num_classes=DatasetConfig.NUM_CLASSES,
)


data_module = SegmentationDataset(
    num_classes=DatasetConfig.NUM_CLASSES,
    img_size=DatasetConfig.IMAGE_SIZE,
    ds_mean=DatasetConfig.MEAN,
    ds_std=DatasetConfig.STD,
    batch_size=TrainingConfig.BATCH_SIZE,
    num_workers=TrainingConfig.NUM_WORKERS,
    pin_memory=torch.cuda.is_available(),
)




In [None]:
model_checkpoint = ModelCheckpoint(
    monitor="valid/f1",
    mode="max",
    filename="checkpoint{epoch:03d}-vloss_{valid/loss:.4f}_vf1_{valid/f1:.4f}",
    auto_insert_metric_name=False,
)

In [None]:
trainer = pl.Trainer(
    accelerator="auto", 
    devices="auto",  
    strategy="auto",  
    max_epochs=TrainingConfig.NUM_EPOCHS,  
    enable_model_summary=True,  
    precision="16-mixed",  
    callbacks=[model_checkpoint]
 
)

# Start training
trainer.fit(model, data_module)

In [3]:
import os
dataset_path = os.path.join(os.getcwd(),'dataset_UWM_GI_Tract_train_valid.zip')
dataset_path[0]

'c'

In [16]:
p = ['l']
p.extend(['p','o'])

In [17]:
p

['l', 'p', 'o']