<a href="https://colab.research.google.com/github/Xdarii/memo/blob/main/deep_lab_transfert_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from glob import glob
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import scipy
from scipy import misc
import json
import matplotlib.pyplot as plt
from google.colab import drive
import numpy as np
import os


In [None]:
from google.colab import drive
drive.mount('/content/drive')
import os

Mounted at /content/drive


# Loading dataset

In [None]:
from pathlib import Path
from typing import Any, Callable, Optional

import numpy as np
from PIL import Image
from torchvision.datasets.vision import VisionDataset


class SegmentationDataset(VisionDataset):
    """A PyTorch dataset for image segmentation task.
    The dataset is compatible with torchvision transforms.
    The transforms passed would be applied to both the Images and Masks.
    """
    def __init__(self,
                 root: str,
                 image_folder: str,
                 mask_folder: str,
                 transforms: Optional[Callable] = None,
                 seed: int = None,
                 fraction: float = None,
                 subset: str = None,
                 image_color_mode: str = "rgb",
                 mask_color_mode: str = "grayscale") -> None:
        """
        Args:
            root (str): Root directory path.
            image_folder (str): Name of the folder that contains the images in the root directory.
            mask_folder (str): Name of the folder that contains the masks in the root directory.
            transforms (Optional[Callable], optional): A function/transform that takes in
            a sample and returns a transformed version.
            E.g, ``transforms.ToTensor`` for images. Defaults to None.
            seed (int, optional): Specify a seed for the train and test split for reproducible results. Defaults to None.
            fraction (float, optional): A float value from 0 to 1 which specifies the validation split fraction. Defaults to None.
            subset (str, optional): 'Train' or 'Test' to select the appropriate set. Defaults to None.
            image_color_mode (str, optional): 'rgb' or 'grayscale'. Defaults to 'rgb'.
            mask_color_mode (str, optional): 'rgb' or 'grayscale'. Defaults to 'grayscale'.
        Raises:
            OSError: If image folder doesn't exist in root.
            OSError: If mask folder doesn't exist in root.
            ValueError: If subset is not either 'Train' or 'Test'
            ValueError: If image_color_mode and mask_color_mode are either 'rgb' or 'grayscale'
        """
        super().__init__(root, transforms)
        image_folder_path = Path(self.root) / image_folder
        mask_folder_path = Path(self.root) / mask_folder
        if not image_folder_path.exists():
            raise OSError(f"{image_folder_path} does not exist.")
        if not mask_folder_path.exists():
            raise OSError(f"{mask_folder_path} does not exist.")

        if image_color_mode not in ["rgb", "grayscale"]:
            raise ValueError(
                f"{image_color_mode} is an invalid choice. Please enter from rgb grayscale."
            )
        if mask_color_mode not in ["rgb", "grayscale"]:
            raise ValueError(
                f"{mask_color_mode} is an invalid choice. Please enter from rgb grayscale."
            )

        self.image_color_mode = image_color_mode
        self.mask_color_mode = mask_color_mode

        if not fraction:
            self.image_names = sorted(image_folder_path.glob("*"))
            self.mask_names = sorted(mask_folder_path.glob("*"))
        else:
            if subset not in ["Train", "Test"]:
                raise (ValueError(
                    f"{subset} is not a valid input. Acceptable values are Train and Test."
                ))
            self.fraction = fraction
            self.image_list = np.array(sorted(image_folder_path.glob("*")))
            self.mask_list = np.array(sorted(mask_folder_path.glob("*")))
            if seed:
                np.random.seed(seed)
                indices = np.arange(len(self.image_list))
                np.random.shuffle(indices)
                self.image_list = self.image_list[indices]
                self.mask_list = self.mask_list[indices]
            if subset == "Train":
                self.image_names = self.image_list[:int(
                    np.ceil(len(self.image_list) * (1 - self.fraction)))]
                self.mask_names = self.mask_list[:int(
                    np.ceil(len(self.mask_list) * (1 - self.fraction)))]
            else:
                self.image_names = self.image_list[
                    int(np.ceil(len(self.image_list) * (1 - self.fraction))):]
                self.mask_names = self.mask_list[
                    int(np.ceil(len(self.mask_list) * (1 - self.fraction))):]

    def __len__(self) -> int:
        return len(self.image_names)

    def __getitem__(self, index: int) -> Any:
        image_path = self.image_names[index]
        mask_path = self.mask_names[index]
        with open(image_path, "rb") as image_file, open(mask_path,
                                                        "rb") as mask_file:
            image = Image.open(image_file)
            if self.image_color_mode == "rgb":
                image = image.convert("RGB").resize((224,224), Image.ANTIALIAS)
            elif self.image_color_mode == "grayscale":
                image = image.convert("L").resize((224,224), Image.ANTIALIAS)
            mask = Image.open(mask_file)
            if self.mask_color_mode == "rgb":
                mask = mask.convert("RGB").resize((224,224), Image.ANTIALIAS)
            elif self.mask_color_mode == "grayscale":
                mask = mask.convert("L").resize((224,224), Image.ANTIALIAS)
                # mask = np.array(mask)            
                # mask[mask > 0] = 1
            sample = {"image": image, "mask": mask}
            if self.transforms:
                sample["image"] = self.transforms(sample["image"])
                sample["mask"] = self.transforms(sample["mask"])
            return sample

## Loading and Displaying

Change the `**data_path**`

In [None]:
from torchvision import transforms


data_path = 

image_datasets = {
    x: SegmentationDataset(data_path, 
                           "train/image", 
                           "train/label",
                            seed=100,
                            fraction=0.2,
                            subset=x,
                            transforms=transforms.Compose([transforms.ToTensor()]))  
    for x in ['Train', 'Test']
}
dataloaders = {
    x: DataLoader(image_datasets[x],
                    batch_size=8,
                    shuffle=True,
                    num_workers=2)
    for x in ['Train', 'Test']
}

In [None]:
import torchvision

samples = next(iter(dataloaders['Train']))
image, labels =samples['image'],samples['mask']
grid_labels = torchvision.utils.make_grid(labels, nrow=8)
grid_img = torchvision.utils.make_grid(image, nrow=8)


plt.figure(figsize=(45,45))
plt.imshow(grid_img.permute(1, 2, 0))

plt.figure(figsize=(45,45))
plt.imshow(grid_labels.permute(1, 2, 0)*255)

# Model deepLab

In [None]:
""" DeepLabv3 Model download and change the head for your prediction"""
from torchvision.models.segmentation.deeplabv3 import DeepLabHead
from torchvision import models


def createDeepLabv3(outputchannels=1):
    """DeepLabv3 class with custom head
    Args:
        outputchannels (int, optional): The number of output channels
        in your dataset masks. Defaults to 1.
    Returns:
        model: Returns the DeepLabv3 model with the ResNet101 backbone.
    """
    model = models.segmentation.deeplabv3_resnet50(pretrained=True,
                                                    progress=True)
    model.classifier = DeepLabHead(2048, outputchannels)
    # Set the model in training mode
    model.train()
    return model
model=createDeepLabv3(outputchannels=2)

Downloading: "https://download.pytorch.org/models/deeplabv3_resnet50_coco-cd0a2569.pth" to /root/.cache/torch/hub/checkpoints/deeplabv3_resnet50_coco-cd0a2569.pth


  0%|          | 0.00/161M [00:00<?, ?B/s]

# Metrics

In [None]:

def pixel_accuracy(output, mask):
    with torch.no_grad():
        output = torch.argmax(F.softmax(output, dim=1), dim=1)
        correct = torch.eq(output, mask).int()
        accuracy = float(correct.sum()) / float(correct.numel())
    return accuracy

def mIoU(pred_mask, mask, smooth=1e-10, n_classes=23):
    with torch.no_grad():
        pred_mask = F.softmax(pred_mask, dim=1)
        pred_mask = torch.argmax(pred_mask, dim=1)
        pred_mask = pred_mask.contiguous().view(-1)
        mask = mask.contiguous().view(-1)

        iou_per_class = []
        for clas in range(0, n_classes): #loop per pixel class
            true_class = pred_mask == clas
            true_label = mask == clas

            if true_label.long().sum().item() == 0: #no exist label in this loop
                iou_per_class.append(np.nan)
            else:
                intersect = torch.logical_and(true_class, true_label).sum().float().item()
                union = torch.logical_or(true_class, true_label).sum().float().item()

                iou = (intersect + smooth) / (union +smooth)
                iou_per_class.append(iou)
        return np.nanmean(iou_per_class)



# Apprentissage

Change the values

```
    NUM_INPUT_CHANNELS = 3
    NUM_OUTPUT_CHANNELS = 2

    NUM_EPOCHS = 100

    LEARNING_RATE = 1e-3
    MOMENTUM = 0.9
    BATCH_SIZE = 8

    save_dir=os.path.join(cdirectory,'model_weight_vege')

    data_path =
    



In [None]:
NUM_INPUT_CHANNELS = 3
NUM_OUTPUT_CHANNELS = 2

NUM_EPOCHS = 100

LEARNING_RATE = 1e-3
MOMENTUM = 0.9
BATCH_SIZE = 8


save_dir=os.path.join(cdirectory,'model_weight_vege')

data_path = 

In [None]:
import csv
import copy
import time
from tqdm import tqdm
import torch
import numpy as np
import os 
from torchvision import transforms
import torch.nn.functional as F
def train_model(model, criterion, dataloaders, optimizer, metrics, bpath, num_epochs=3):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = 1e10
    # Use gpu if available
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)
    count=0
    # Initialize the log file for training and testing loss and metrics
    fieldnames = ['epoch', 'Train_loss', 'Test_loss'] + \
        [f'Train_{m}' for m in metrics.keys()] + \
        [f'Test_{m}' for m in metrics.keys()]
    with open(os.path.join(bpath, 'log.csv'), 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
 
    for epoch in range(1, num_epochs+1):
        print('Epoch {}/{}'.format(epoch, num_epochs))
        print('-' * 10)
        # Each epoch has a training and validation phase
        # Initialize batch summary
        batchsummary = {a: [0] for a in fieldnames}
 
        for phase in ['Train', 'Test']:
          
            if phase == 'Train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode
 
            # Iterate over data.
            for sample in tqdm(iter(dataloaders[phase])):
                inputs = sample['image'].to(device)
                masks = sample['mask'].to(device)
                # zero the parameter gradients
                optimizer.zero_grad()
 
                # track history if only in train
                with torch.set_grad_enabled(phase == 'Train'):

                    outputs = model(inputs)
                    predicted_tensor =outputs['out']

                    loss = criterion(predicted_tensor, masks.to(torch.long).squeeze(1))
                    # print('-----' * 5)
                    for name, metric in metrics.items():
                        if name == ' mIoU':
                    #         # Use a classification threshold of 0.1
                            batchsummary[f'{phase}_{name}'].append(
                                metric(predicted_tensor, masks,n_classes=2))
                        else:
                            batchsummary[f'{phase}_{name}'].append(
                                metric(predicted_tensor, masks))
 
                    # backward + optimize only if in training phase
                    if phase == 'Train':
                        loss.backward()
                        optimizer.step()

            batchsummary['epoch'] = epoch
            epoch_loss = loss
            batchsummary[f'{phase}_loss'] = epoch_loss.item()
            print('{} Loss: {:.4f}'.format(
                phase, loss))
        for field in fieldnames[3:]:
            batchsummary[field] = np.mean(batchsummary[field])
        print(batchsummary)
        with open(os.path.join(bpath, 'log.csv'), 'a', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writerow(batchsummary)
            # deep copy the model
            if phase == 'Test' and loss < best_loss:
                best_loss = loss
                best_model_wts = copy.deepcopy(model.state_dict())
                torch.save(model.state_dict(), os.path.join(save_dir, "model_best_vege.pth"))

 
    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Lowest Loss: {:4f}'.format(best_loss))
 
    # load best model weights
    model.load_state_dict(best_model_wts)

    return model

if __name__=='__main__':





    if not os.path.isdir(save_dir):
      os.makedirs(save_dir)


    train_dataset = {
        x: SegmentationDataset(data_path, 
                              "train/image", 
                              "train/label",
                                seed=100,
                                fraction=0.2,
                                subset=x,
                                transforms=transforms.Compose([transforms.ToTensor()]))  
        for x in ['Train', 'Test']
    }
    train_dataloader = {
        x: DataLoader(train_dataset[x],
                        batch_size=8,
                        shuffle=True,
                        num_workers=2)
        for x in ['Train', 'Test']
    }
    # Specify the loss function
    # criterion = torch.nn.MSELoss(reduction='mean')
    # Specify the optimizer with a lower learning rate
    # optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

    # Specify the evaluation metrics
    metrics = {'pixel_accuracy': pixel_accuracy, 'mIoU': mIoU}

 
    # =======================================
    # ========== if you already have a model ======================
    # model.load_state_dict(torch.load(Path(save_dir) / "model_best_vege.pth",map_location=torch.device('cpu')))
    # model.train()

    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)    

    model=train_model(model, criterion, train_dataloader, optimizer, metrics, save_dir, NUM_EPOCHS)
    torch.save(model.state_dict(), os.path.join(save_dir, "model_best.pth"))


# Inference

In [None]:
from pathlib import Path
model = SegNet(input_channels=3,
                        output_channels=2)

model.load_state_dict(torch.load(Path(save_dir) / "model_best_vege.pth",map_location=torch.device('cpu')))
model.eval()

In [None]:
from torchvision.utils import make_grid as makeg
gr=makeg(masked,nrow=8)
grid_img=makeg(d['image'])
grid_gt=makeg(d['mask'])

plt.figure(figsize=(45,45))
plt.imshow(grid_img.permute(1, 2, 0))
plt.figure(figsize=(45,45))
plt.imshow(gr.permute(1, 2, 0)*255)
plt.figure(figsize=(45,45))
plt.imshow(grid_gt.permute(1, 2, 0)*255)