In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!cp /content/drive/MyDrive/test/Pascal-part.zip /content/
!unzip -n Pascal-part.zip 

In [None]:
!pip install -q evaluate
!pip install -q git+https://github.com/huggingface/transformers.git
!pip install -q albumentations pytorch-lightning
!pip install wandb
!wandb login

In [None]:
import os
import numpy as np
import cv2
import evaluate
import matplotlib.pyplot as plt
import pandas as pd
from transformers import MaskFormerForInstanceSegmentation
import torch
import pytorch_lightning as pl
from typing import Dict
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

In [None]:

preprocessor = MaskFormerImageProcessor(ignore_index=0, reduce_labels=False, do_resize=False, do_rescale=False, do_normalize=False)

In [None]:
%cd /content/Pascal-part/

In [None]:
import albumentations as A



train_transform = A.Compose(
    [       A.HorizontalFlip(p=0.4),
            A.VerticalFlip(p=0.4),
            A.Resize(width=Config.width, height=Config.height),
       
            A.Normalize(mean=ADE_MEAN, std=ADE_STD),
    
    ]
)

#optional transforms:
optional_transforms = A.Compose(
                  A.ShiftScaleRotate(scale_limit=0.5, rotate_limit=(-8, 8), #15
                                    shift_limit=0.1, p=1, border_mode=0, value=0.0),
                  A.IAAAdditiveGaussianNoise(p=0.2),
                  A.IAAPerspective(p=0.5),
                  A.OneOf(
                      [
                          A.CLAHE(p=1),
                          A.RandomBrightnessContrast(p=1),
                          A.RandomGamma(p=1),
                      ],
                      p=0.9,
                  ),
                  A.OneOf(
                      [
                          A.IAASharpen(p=1),
                          A.Blur(blur_limit=3, p=1),
                          A.MotionBlur(blur_limit=3, p=1),
                      ],
                      p=0.9,
                  ),
                  A.OneOf(
                      [
                          A.RandomBrightnessContrast(p=1),
                          A.HueSaturationValue(p=1),
                      ],
                      p=0.9,
                  ) 
                  )

test_transform = A.Compose([
    A.Resize(width=Config.width, height=Config.height),
    A.Normalize(mean=ADE_MEAN, std=ADE_STD),

])

train_dataset = ImageSegmentationDataset(id_path='train_id.txt', transform=train_transform)
test_dataset = ImageSegmentationDataset(id_path='val_id.txt', transform=test_transform)

In [None]:


def collate_fn(batch):
  inputs = list(zip(*batch))
  
  images = inputs[0]
  segmentation_maps = inputs[1]

  batch = preprocessor(
    images,
    segmentation_maps=segmentation_maps,
    return_tensors="pt",
  )
  batch["original_images"] = inputs[2]        
  batch['original_segmentation_maps'] = inputs[3]
  return batch

train_dataloader = DataLoader(train_dataset, batch_size=Config.train_batch_size, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=Config.train_batch_size, shuffle=False, collate_fn=collate_fn)

In [None]:
#PyTorch
ALPHA = 0.5
BETA = 0.5
GAMMA = 1

class FocalTverskyLoss(torch.nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(FocalTverskyLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1, alpha=ALPHA, beta=BETA, gamma=GAMMA):
        
        #comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)       
        
        #flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)
        
        #True Positives, False Positives & False Negatives
        TP = (inputs * targets).sum()    
        FP = ((1-targets) * inputs).sum()
        FN = (targets * (1-inputs)).sum()
        
        Tversky = (TP + smooth) / (TP + alpha*FP + beta*FN + smooth)  
        FocalTversky = (1 - Tversky)**gamma
                       
        return FocalTversky



In [None]:
class SegModel(pl.LightningModule):
    def __init__(self, train_dataloader=None, validation_dataloader=None):
        super(SegModel, self).__init__()
        self.train_loader=train_dataloader
        self.val_loader=validation_dataloader                
        print(self.id2label)    
        
        self.metric=evaluate.load("mean_iou")

        self.args = Config
        self.lr = Config.lr
        self.id2label = self.args.id2label
        self.model = MaskFormerForInstanceSegmentation.from_pretrained("facebook/maskformer-swin-small-ade",
                                                                id2label=self.id2label,
                                                                ignore_mismatched_sizes=True, mask_feature_size=512)        
        self.train_batches = Config.train_batch_size*Config.max_epochs
        self.warmup_batches = int(self.train_batches*0.3)
    

   

    def forward(self, *args):
        return self.model(args)

    def train_dataloader(self):
        return self.train_loader

    def val_dataloader(self):
        return self.val_loader

    def transfer_batch_to_device(self, batch, device, dataloader_idx):
        #can't stack and move whole batch to device, because 
        #we need original images and original seg. maps (inconsistent size inside batch)
        return batch

    def create_optimizer(self):
        return torch.optim.AdamW(self.parameters(), lr=self.lr, weight_decay=0.1)
            
    def lr_warmup_config(self):
        def warmup(step):
            """
            This method will be called for ceil(warmup_batches/accum_grad_batches) times,
            warmup_steps has been adjusted accordingly
            """
            if self.args.warmup_steps <= 0:
                factor = 1
            else:
                factor = min(step / self.args.warmup_steps, 1)
            return factor

        opt1 = self.create_optimizer()
        return {
            'frequency': self.warmup_batches,
            'optimizer': opt1,
            'lr_scheduler': {
                'scheduler': torch.optim.lr_scheduler.LambdaLR(opt1, warmup),
                'interval': 'step',
                'frequency': 1,
                'name': 'lr/warmup'
            },
        }


    def lr_decay_config(self):
        opt2 = self.create_optimizer()
        return {
            'frequency': self.train_batches - self.warmup_batches,
            'optimizer': opt2,
            'lr_scheduler': {

                'scheduler': torch.optim.lr_scheduler.ReduceLROnPlateau(
                                    opt2, 'min', 
                                    factor=self.args.lrdecay_factor, 
                                    patience=self.args.lrdecay_patience,
                                    threshold=self.args.lrdecay_threshold, 
                                    threshold_mode='rel',  verbose=False,
                                    
                                   ),
                'interval': 'epoch',
                'frequency': 1,
                'monitor': self.args.lrdecay_monitor,
                'strict': False,
                'name': 'lr/reduce_on_plateau',
            }
        }


    def configure_optimizers(self):
        return (
            self.lr_warmup_config(),
            self.lr_decay_config()
        )

    def training_step(self, batch, batch_idx, optimizer_idx):
        outputs = self.model(
        pixel_values=batch["pixel_values"].to(self.device),
                mask_labels=[labels.to(self.device) for labels in batch["mask_labels"]],
                class_labels=[labels.to(self.device) for labels in batch["class_labels"]],)
        loss = outputs.loss        
        self.log("train_loss", loss, logger=True, on_epoch=True)          
        return loss

    def get_metrics(self, batch, outputs):
         original_images = batch["original_images"]
        
        target_sizes = [(image.shape[0], image.shape[1]) for image in original_images]
      
        predicted_segmentation_maps = preprocessor.post_process_semantic_segmentation(outputs,
                                                                                        target_sizes=target_sizes)
        ground_truth_segmentation_maps = batch["original_segmentation_maps"]
        metrics =self.metric.compute(references=ground_truth_segmentation_maps, 
                                     predictions=predicted_segmentation_maps,
                                     num_labels=self.args.num_labels, 
                                     ignore_index=self.arg.ignore_index)
        return metrics


    def validation_step(self, batch, batch_idx):

        self.log('step', batch_idx, prog_bar=True)

        outputs = self.model(
            pixel_values=batch["pixel_values"].to(self.device),
        )     
        metrics = self.get_metrics(outputs=outputs, batch=batch)
        iou = metrics['mean_iou']
        acc = metrics['mean_accuracy']
        self.log('val_iou', iou,logger=True,prog_bar=True, on_epoch=True, on_step=True)
        self.log('val_acc', acc,logger=True )
    
    def test

In [None]:
from pytorch_lightning.loggers import WandbLogger
from pytorch_lightning.callbacks import LearningRateMonitor
from pytorch_lightning.callbacks import ModelCheckpoint

wandb_logger = WandbLogger(project='mil_test', entity='sapsapfear')
lr_monitor = LearningRateMonitor(logging_interval='epoch')
checkpoint_callback = ModelCheckpoint(dirpath='/content/drive/MyDrive/test/', )
# Init ModelCheckpoint callback, monitoring 'val_loss'
# checkpoint_callback = ModelCheckpoint(monitor="val_iou")
model = SegModel(train_dataloader= train_dataloader,validation_dataloader= test_dataloader)
trainer = pl.Trainer(logger=wandb_logger, 
                    gpus=1
                     , 
                    #  strategy=DeepSpeedPlugin(deepspeed_config), 
                    #  strategy = "deepspeed_stage_2_offload",
                     precision=16,
                     default_root_dir='/content/drive/MyDrive/test/',
                     callbacks=[lr_monitor, checkpoint_callback],
                     log_every_n_steps=1,
                     min_epochs=1,
                     check_val_every_n_epoch=2,

                     )
trainer.fit(model, ckpt_path='/content/drive/MyDrive/test/epoch=51-step=28.ckpt')