# Prep #1 : Download dataset
To download a dataset onto this Notebook, do either of the following.

## [Option1] Download once from Google Drive to your own PC and then upload to this Colab.
Here is the link to the google drive.

https://drive.google.com/file/d/1naLdgbRmBq_QhosmXJkdmG4Pi9SiIyoo/view?usp=share_link

## [Option2] Download directly from Google Drive to here
Run the command below

Ref: https://qiita.com/namakemono/items/c963e75e0af3f7eed732

In [None]:
!curl -sc /tmp/cookie "https://drive.google.com/uc?export=download&id=1naLdgbRmBq_QhosmXJkdmG4Pi9SiIyoo" > /dev/null
!CODE="$(awk '/_warning_/ {print $NF}' /tmp/cookie)"
!curl -Lb /tmp/cookie "https://drive.google.com/uc?export=download&confirm=${CODE}&id=1naLdgbRmBq_QhosmXJkdmG4Pi9SiIyoo" -o archive.zip
!unzip -q archive.zip

In [None]:
!unzip -q archive.zip

---

# Prep #2 : Doenload the pre-trained model
To download the model onto this Notebook, do either of the following.

## [Option1] Download once from Google Drive to your own PC and then upload to this Colab.
Here is the link to the google drive.

https://drive.google.com/file/d/14PtYuFZc-5sB2n9lLUDku8bgyEKSLZG5/view?usp=share_link

## [Option2] Download directly from Google Drive to here
Run the command below

Ref: https://qiita.com/namakemono/items/c963e75e0af3f7eed732

In [None]:
!curl -sc /tmp/cookie "https://drive.google.com/uc?export=download&id=14PtYuFZc-5sB2n9lLUDku8bgyEKSLZG5" > /dev/null
!CODE="$(awk '/_warning_/ {print $NF}' /tmp/cookie)"
!curl -Lb /tmp/cookie "https://drive.google.com/uc?export=download&confirm=${CODE}&id=14PtYuFZc-5sB2n9lLUDku8bgyEKSLZG5" -o drone_Trained.pth

---

# 1. Import libraries

In [None]:
import numpy as np 
import pandas as pd

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
import torchvision
import torch.nn.functional as F
from torch.autograd import Variable

from PIL import Image
import cv2
import albumentations as A

import statistics

import time
import os
from tqdm.notebook import tqdm

from torchsummary import summary
import segmentation_models_pytorch as smp

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
  print(torch.cuda.get_device_name())

# 2. Set nesesssary pathes
Specify the paths to the folder where training images are stored, model files, and the folder where output images will be stored.

In [None]:
IMAGE_PATH = "Custum_Dataset/Car/JPEGImages" #Path of the folder where drone images are stored (Either relative or absolute path is acceptable)
MASK_PATH = "Custum_Dataset/Car/SegmentationClass" #Path of the folder where the drone image mask image (correct label image) is stored (Either relative or absolute path is acceptable)
MODEL_PATH = "drone_Trained.pth" #Path to the trained model file (Either relative or absolute paths are acceptable)

# Create a folder to store the created model files and set the path to the variable
!mkdir -p Custum_Model
SAVE_PATH = 'Custum_Model'

# 3. Define a dataset

Create a list of image file names to be used in the study and check the number of images

In [None]:
def create_df():
    name = []
    for dirname, _, filenames in os.walk(IMAGE_PATH):
        for filename in filenames:
            name.append(filename.split('.')[0])
    
    return pd.DataFrame({'id': name}, index = np.arange(0, len(name)))

df = create_df()
print('Total num of images: ', len(df))

Then, split the whole dataset to 3 parts for training, validation and testing.

In [None]:
#10% of all data randomly isolated as test data
X_trainval, X_test = train_test_split(df['id'].values, test_size=0.1, random_state=19)
#15% of the rest data is randomly separated as validation data and the rest as training data
X_train, X_val = train_test_split(X_trainval, test_size=0.15, random_state=19)

print('Train Size   : ', len(X_train))
print('Val Size     : ', len(X_val))
print('Test Size    : ', len(X_test))

Define usual Dataset and Data transformation in PyTorch way.
In addition, here we need to implement the method called "convert_rgb_to_value" in terms of converting the original 3D mask images to 2D gray-scaled images, because the output shape of the model is 2D.

In [None]:
mean=[0.485, 0.456, 0.406]
std=[0.229, 0.224, 0.225]

t_train = A.Compose([
    A.Resize(704, 1056, interpolation=cv2.INTER_NEAREST), 
    A.HorizontalFlip(), 
    A.VerticalFlip(), 
    A.GridDistortion(p=0.2), 
    A.RandomBrightnessContrast((0,0.5),(0,0.5)),
    A.GaussNoise()])

t_val = A.Compose([
    A.Resize(704, 1056, interpolation=cv2.INTER_NEAREST), 
    A.HorizontalFlip(),
    A.GridDistortion(p=0.2)])

# Definition of mapping between the color and class in annotation data
mapping = {
    (0, 0, 0): 0,
    (150, 143, 9): 1,
}

# Convert 3D annotation data to 2D data (convert from RGB values defined in mapping to value values)
def convert_rgb_to_value(target):
    h = target.shape[0]#画像の高さの取得
    w = target.shape[1]#画像の横幅の取得
    target = target.permute(2,0,1).contiguous()#テンソルの形を変換(H,W,C)->(C,H,W)
    mask = torch.empty(h, w, dtype=torch.long)#(H,W)の2次元型を用意
  
    for k in mapping:#マップで定義したクラスの数繰り返し検出する
        idx = (target==torch.tensor(k, dtype=torch.uint8).unsqueeze(1).unsqueeze(2))#targetのある画素がmappingに定義し現在対象となっている値Kと一致すればTrueを返すテンソルidxを生成(C,H,W) 
                                                                                #(RGB値しか持たないtorch.tensor(k, dtype=torch.uint8)に対してunsueezeを2回行うことで比較可能な3次元形式に変形している)
        validx = (idx.sum(0) == 3)#validx:RGB値すべてが一致した場合True,一致しない場合Falseの2次元テンソル（H,W）
        mask[validx] = torch.tensor(mapping[k], dtype=torch.long)#validxがTrueだった場所に現在のKのRGB値を持つvalue値をmaskに代入
  
    return mask

class DroneDataset(Dataset):
    
    def __init__(self, img_path, mask_path, X, mean, std, transform=None):
        self.img_path = img_path
        self.mask_path = mask_path
        self.X = X
        self.transform = transform
        self.mean = mean
        self.std = std


    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        img = cv2.imread(os.path.join(self.img_path, self.X[idx] + '.jpg'))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(os.path.join(self.mask_path, self.X[idx] + '.png'))
        
        if self.transform is not None:
            aug = self.transform(image=img, mask=mask)
            img = Image.fromarray(aug['image'])
            mask = aug['mask']
        
        if self.transform is None:
            img = Image.fromarray(img)
        
        t = T.Compose([T.ToTensor(), T.Normalize(self.mean, self.std)])
        img = t(img)
        mask = torch.from_numpy(mask).long()

        mask = convert_rgb_to_value(mask)        
 
        return img, mask

train_set = DroneDataset(IMAGE_PATH, MASK_PATH, X_train, mean, std, t_train)
val_set = DroneDataset(IMAGE_PATH, MASK_PATH, X_val, mean, std, t_val)

Then, DataLoader for both training and validation is created as usual.

In [None]:
batch_size= 4 

generator=torch.Generator()
generator.manual_seed(0)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, generator=generator)

val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True)

# 4. Define a model

Loading the pre-trained model.

Note: Although we only want to recognize two classes, "cars" and "other," the base pre-trained model has been trained to classify 24 classes, so the number of classes is set to 24 to match the base model.

In [None]:
model = smp.Unet('mobilenet_v2', encoder_weights='imagenet', classes=24, activation=None, encoder_depth=5, decoder_channels=[256, 128, 64, 32, 16])
model.load_state_dict(torch.load(MODEL_PATH))

# If you do not use this pre-trained model, define the model as follows. Note that there are two classes. Cars" and "Other.
#model = smp.Unet('mobilenet_v2', encoder_weights='imagenet', classes=2, activation=None, encoder_depth=5, decoder_channels=[256, 128, 64, 32, 16])

# 5. Define training and validation methods

Define methods to calculate metrics.

In [None]:
def pixel_accuracy(output, mask):
    with torch.no_grad():
        output = torch.argmax(F.softmax(output, dim=1), dim=1)
        correct = torch.eq(output, mask).int()
        accuracy = float(correct.sum()) / float(correct.numel())
    return accuracy

def mIoU(pred_mask, mask, smooth=1e-10, n_classes=23):
    with torch.no_grad():
        pred_mask = F.softmax(pred_mask, dim=1)
        pred_mask = torch.argmax(pred_mask, dim=1)
        pred_mask = pred_mask.contiguous().view(-1)
        mask = mask.contiguous().view(-1)

        iou_per_class = []
        for clas in range(0, n_classes): #loop per pixel class
            true_class = pred_mask == clas
            true_label = mask == clas

            if true_label.long().sum().item() == 0: #no exist label in this loop
                iou_per_class.append(np.nan)
            else:
                intersect = torch.logical_and(true_class, true_label).sum().float().item()
                union = torch.logical_or(true_class, true_label).sum().float().item()

                iou = (intersect + smooth) / (union +smooth)
                iou_per_class.append(iou)
        return np.nanmean(iou_per_class)

Define the method to get current learning rate from a optimizer.

In [None]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

Define the fit function to train and validate a model.

In [None]:
def fit(epochs, model, train_loader, val_loader, criterion, optimizer, scheduler):
    torch.cuda.empty_cache()
    train_losses = []
    test_losses = []
    val_iou = []; val_acc = []
    train_iou = []; train_acc = []
    lrs = []
    min_loss = np.inf
    decrease = 1 ; not_improve=0

    model.to(device)
    fit_time = time.time()
    for e in range(epochs):
        since = time.time()
        running_loss = 0
        iou_score = 0
        accuracy = 0
        #training loop
        model.train()
        
        for i, data in enumerate(tqdm(train_loader)):
            #training phase
            image_tiles, mask_tiles = data
            
            image = image_tiles.to(device); mask = mask_tiles.to(device);

            #forward
            output = model(image)
            loss = criterion(output, mask)

            #evaluation metrics
            iou_score += mIoU(output, mask)
            accuracy += pixel_accuracy(output, mask)

            #backward
            loss.backward()
            optimizer.step() #update weight img = Image.fromarray(img)
            lrs.append(get_lr(optimizer))
            scheduler.step() 
            
            running_loss += loss.item()
            
        else:
            model.eval()
            test_loss = 0
            test_accuracy = 0
            val_iou_score = 0
            #validation loop
            with torch.no_grad():
                for i, data in enumerate(tqdm(val_loader)):
                    #reshape to 9 patches from single image, delete batch size
                    image_tiles, mask_tiles = data
                    
                    image = image_tiles.to(device); mask = mask_tiles.to(device);
                    output = model(image)
                    #evaluation metrics
                    val_iou_score +=  mIoU(output, mask)
                    test_accuracy += pixel_accuracy(output, mask)
                    #loss
                    loss = criterion(output, mask)                                  
                    test_loss += loss.item()
            
            #calculatio mean for each batch
            train_losses.append(running_loss/len(train_loader))
            test_losses.append(test_loss/len(val_loader))


            if min_loss > (test_loss/len(val_loader)):
                print('Loss Decreasing.. {:.3f} >> {:.3f} '.format(min_loss, (test_loss/len(val_loader))))
                min_loss = (test_loss/len(val_loader))
                decrease += 1
                if decrease % 5 == 0:
                    print('saving model...')
                    torch.save(model.state_dict(), SAVE_PATH + '/Unet-Mobilenet_v2_mIoU-{:.3f}.pth'.format(val_iou_score/len(val_loader)))
                    

            if (test_loss/len(val_loader)) > min_loss:
                not_improve += 1
                min_loss = (test_loss/len(val_loader))
                print(f'Loss Not Decrease for {not_improve} time')
                #if not_improve == 7:
                    #print('Loss not decrease for 7 times, Stop Training')
                    #break
            
            #iou
            val_iou.append(val_iou_score/len(val_loader))
            train_iou.append(iou_score/len(train_loader))
            train_acc.append(accuracy/len(train_loader))
            val_acc.append(test_accuracy/ len(val_loader))
            print("Epoch:{}/{}..".format(e+1, epochs),
                  "Train Loss: {:.3f}..".format(running_loss/len(train_loader)),
                  "Val Loss: {:.3f}..".format(test_loss/len(val_loader)),
                  "Train mIoU:{:.3f}..".format(iou_score/len(train_loader)),
                  "Val mIoU: {:.3f}..".format(val_iou_score/len(val_loader)),
                  "Train Acc:{:.3f}..".format(accuracy/len(train_loader)),
                  "Val Acc:{:.3f}..".format(test_accuracy/len(val_loader)),
                  "Time: {:.2f}m".format((time.time()-since)/60))
        
    history = {'train_loss' : train_losses, 'val_loss': test_losses,
               'train_miou' :train_iou, 'val_miou':val_iou,
               'train_acc' :train_acc, 'val_acc':val_acc,
               'lrs': lrs}
    print('Total time: {:.2f} m' .format((time.time()- fit_time)/60))
    return history

# 6. Run the training

Define the hyper-parameters.

In [None]:
max_lr = 1e-3
epoch = 1
weight_decay = 1e-4

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=max_lr, weight_decay=weight_decay)
sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epoch,
                                            steps_per_epoch=len(train_loader))

Let's go run!

In [None]:
history = fit(epoch, model, train_loader, val_loader, criterion, optimizer, sched)

Save the trained model if needed.

In [None]:
torch.save(model.state_dict(), SAVE_PATH + '/drone_Custum_Trained.pth')

---
## Note: This cell is specially processed for this lecture.
Originally, we would like to run several hundred epochs of model training, but this is difficult due to time constraints, so we will replace the model with one that has already been trained for several dozen epochs and proceed with the process after this.

In [None]:
!curl -sc /tmp/cookie "https://drive.google.com/uc?export=download&id=1JXPHg4brau1T93z79VNr4VqLeCEx2CcW" > /dev/null
!CODE="$(awk '/_warning_/ {print $NF}' /tmp/cookie)"
!curl -Lb /tmp/cookie "https://drive.google.com/uc?export=download&confirm=${CODE}&id=1JXPHg4brau1T93z79VNr4VqLeCEx2CcW" -o drone_Custum_Trained.pth
model.load_state_dict(torch.load('drone_Custum_Trained.pth'))

---

# 7. Measure the accuracy of trained model with test image set

In [None]:
reverse_mapping = {1: (150, 143, 9),
                   0: (0, 0, 0),
}      

def visualize(temp):
    r = temp.copy()
    g = temp.copy()
    b = temp.copy()
    for l in range(0,len(reverse_mapping)):
        r[temp==l]=reverse_mapping[l][0]
        g[temp==l]=reverse_mapping[l][1]
        b[temp==l]=reverse_mapping[l][2]

    rgb = np.zeros((temp.shape[1], temp.shape[2],3))

    rgb[:,:,0] = (r)
    rgb[:,:,1] = (g)
    rgb[:,:,2] = (b)
    return rgb

In [None]:
class DroneTestDataset(Dataset):
    
    def __init__(self, img_path, mask_path, X, transform=None):
        self.img_path = img_path
        self.mask_path = mask_path
        self.X = X
        self.transform = transform
      
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        img = cv2.imread(os.path.join(self.img_path, self.X[idx] + '.jpg'))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(os.path.join(self.mask_path, self.X[idx] + '.png'))
        filename = str(self.X[idx])
        
        if self.transform is not None:
            aug = self.transform(image=img, mask=mask)
            img = Image.fromarray(aug['image'])
            mask = aug['mask']
        
        if self.transform is None:
            img = Image.fromarray(img)
        
        mask = torch.from_numpy(mask).long()

        mask = convert_rgb_to_value(mask)
        
        return img, mask, filename

In [None]:
t_test = A.Resize(704, 1056, interpolation=cv2.INTER_NEAREST)
test_set = DroneTestDataset(IMAGE_PATH, MASK_PATH, X_test, transform=t_test)

In [None]:
def predict_image_mask_miou(model, image, mask, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
    model.eval()
    t = T.Compose([T.ToTensor(), T.Normalize(mean, std)])
    image = t(image)
    model.to(device); image=image.to(device)
    mask = mask.to(device)
    with torch.no_grad():
        
        image = image.unsqueeze(0)
        mask = mask.unsqueeze(0)
        
        output = model(image)
        score = mIoU(output, mask)
        masked = torch.argmax(output, dim=1)
        masked = masked.cpu().squeeze(0)
    return masked, score

def predict_image_mask_pixel(model, image, mask, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
    model.eval()
    t = T.Compose([T.ToTensor(), T.Normalize(mean, std)])
    image = t(image)
    model.to(device); image=image.to(device)
    mask = mask.to(device)
    with torch.no_grad():
        
        image = image.unsqueeze(0)
        mask = mask.unsqueeze(0)
        
        output = model(image)
        acc = pixel_accuracy(output, mask)
        masked = torch.argmax(output, dim=1)
        masked = masked.cpu().squeeze(0)
    return masked, acc

In [None]:
def miou_score(model, test_set):
    score_iou = []
    for i in tqdm(range(len(test_set))):
        img, mask ,filename= test_set[i]
        pred_mask, score = predict_image_mask_miou(model, img, mask)
        pred_mask = pred_mask.cpu().numpy().copy()
        pred_mask = pred_mask.reshape(1,pred_mask.shape[0],pred_mask.shape[1])
        cv2.imwrite(os.path.join(SAVE_PATH,filename + ".png"), visualize(pred_mask),[cv2.IMWRITE_PNG_COMPRESSION,9])#ここで画像を保存
        score_iou.append(score)
    mean_iou = statistics.mean(score_iou)
    return mean_iou

def pixel_acc(model, test_set):
    accuracy = []
    for i in tqdm(range(len(test_set))):
        img, mask,filename = test_set[i]
        pred_mask, acc = predict_image_mask_pixel(model, img, mask)
        accuracy.append(acc)
    mean_acc = statistics.mean(accuracy)
    return mean_acc

In [None]:
mob_miou = miou_score(model, test_set)
print("miou: " + str(mob_miou))
mob_acc = pixel_acc(model, test_set)
print("acc: " + str(mob_acc))

# 8. Infer a single image randomly

In [None]:
def mIoU(pred_mask, mask, smooth=1e-10, n_classes=23):
    with torch.no_grad():
        pred_mask = F.softmax(pred_mask, dim=1)
        pred_mask = torch.argmax(pred_mask, dim=1)
        pred_mask = pred_mask.contiguous().view(-1)
        mask = mask.contiguous().view(-1)

        iou_per_class = []
        for clas in range(0, n_classes): #loop per pixel class
            true_class = pred_mask == clas
            true_label = mask == clas

            if true_label.long().sum().item() == 0: #no exist label in this loop
                iou_per_class.append(np.nan)
            else:
                intersect = torch.logical_and(true_class, true_label).sum().float().item()
                union = torch.logical_or(true_class, true_label).sum().float().item()

                iou = (intersect + smooth) / (union +smooth)
                iou_per_class.append(iou)
        return np.nanmean(iou_per_class)

In [None]:
def predict_image_mask_miou(model, image, mask, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
    model.eval()
    t = T.Compose([T.ToTensor(), T.Normalize(mean, std)])
    image = t(image)
    model.to(device); image=image.to(device)
    mask = mask.to(device)
    with torch.no_grad():
        
        image = image.unsqueeze(0)
        mask = mask.unsqueeze(0)
        
        output = model(image)
        score = mIoU(output, mask)
        masked = torch.argmax(output, dim=1)
        masked = masked.cpu().squeeze(0)
    return masked, score

In [None]:
import random

t_test = A.Resize(704, 1056, interpolation=cv2.INTER_NEAREST)
test_set = DroneTestDataset(IMAGE_PATH, MASK_PATH, X_test, transform=t_test)

index = random.randint(0, len(test_set))
img, mask, filename = test_set[index]
pred_mask, score = predict_image_mask_miou(model, img, mask)
pred_mask = pred_mask.cpu().numpy().copy()
pred_mask = pred_mask.reshape(1,pred_mask.shape[0],pred_mask.shape[1])
print(filename)


import matplotlib.pyplot as plt
plt.figure(figsize=(20,16))
plt.subplot(1,2,1)
plt.imshow(img)
plt.subplot(1,2,2)
plt.imshow(visualize(pred_mask))