# 1) Initial Setting

In [None]:
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0, path=path):
        self.path = path
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = np.inf
        self.early_stop = False
        
    def __call__(self, val_loss, model=None):
        if self.best_loss - val_loss > self.min_delta:
            torch.save(model.state_dict(), self.path)
            print(f'Model saved to: {self.path}')
            self.best_loss = val_loss
            self.counter = 0
        elif self.best_loss - val_loss < self.min_delta:
            self.counter += 1
            print(f"INFO: Early stopping counter {self.counter} of {self.patience}")
            if self.counter >= self.patience:
                print('INFO: Early stopping')
                self.early_stop = True

In [None]:
!pip install opencv-python
!pip install pytorch-ignite

In [None]:
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2
from tqdm.autonotebook import tqdm

from sklearn.model_selection import train_test_split

from glob import glob
import torch
import torch.nn as nn
from torchvision import transforms as T
from torch.utils.data import Dataset, DataLoader
import torchvision
import os
import shutil

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f'Currently using "{device}" device.')

In [None]:
seed = 42
batch_size = 4
image_size = 256
num_classes = 1
epochs = 30
path = r'seg_model.pth'

# 2) Dataset Preparation

#### Crack + Augmented Crack Image 합치기

In [None]:
path_crack = '/home/jovyan/aiclops_2023/crack_semantic_segmentation/crack_segmentation_dataset/images'
path_crack_aug = '/home/jovyan/aiclops_2023/crack_semantic_segmentation/crack_segmentation_dataset/augmented/images'

path_crack_total = '/home/jovyan/aiclops_2023/crack_semantic_segmentation/crack_segmentation_dataset/total_images/'

crack_imgs = os.listdir(path_crack)
crack_aug_imgs = os.listdir(path_crack_aug)

In [None]:
print(f'crack imgs: {crack_imgs}')
print(f'crack augmendted imgs : {crack_aug_imgs}')

In [None]:
for file in crack_imgs:
    file_path = os.path.join(path_crack, file)
    dir_path = os.path.join(path_crack_total, file)
    shutil.copy(file_path, dir_path)
# print(f"Copied file from {file_path} to {output_path}")


# 폴더2의 파일을 합친 폴더로 복사
for file in crack_aug_imgs:
    file_path = os.path.join(path_crack_aug, file)
    dir_path = os.path.join(path_crack_total, file)
    shutil.copy(file_path, dir_path)
    # print(f"Copied file from {file_path} to {output_path}")   

In [None]:
path_images = r'/home/jovyan/aiclops_2023/crack_semantic_segmentation/crack_segmentation_dataset/total_images/'
path_masks = r'/home/jovyan/aiclops_2023/crack_semantic_segmentation/crack_segmentation_dataset/total_masks/'

list_images = glob(path_images + '*.jpg')
list_masks = glob(path_masks + '*.jpg')

list_sorted_images = sorted([str(p) for p in list_images])
list_sorted_masks = sorted([str(p) for p in list_masks])

print(f'len(list_sorted_images): {len(list_sorted_images)}')

dataframe = pd.DataFrame({'images': list_sorted_images, 'masks': list_sorted_masks})
dataframe.sample(5)

#### Train, Test, Val Dataset 분리

In [None]:
## changed to define test size as % of dataset instead of set value
# test_size = 0.30* data size

train, test = train_test_split(dataframe, test_size=0.2, shuffle=True, random_state=seed)
train, valid = train_test_split(train, test_size=0.25, shuffle=True, random_state=seed)

print(f'Train size: {len(train)}, validation size: {len(valid)} and test size: {len(test)}')

In [None]:
train.loc[19521].squeeze(), train.loc[19521].squeeze().shape 

In [None]:
# Image Preprocessing
train_transforms = T.Compose([
    T.ToPILImage(),
    T.Resize((image_size, image_size)), # (256, 256)
    T.ToTensor()
])

In [None]:
class CrackDataset(Dataset):
    def __init__(self, dataset, transforms=train_transforms):
        self.dataset = dataset.reset_index(drop=True)
        self.transforms = transforms
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, ix):
        df_row = self.dataset.loc[ix].squeeze() # df_row: [(images) path_image, (masks) path_mask]
        path_image = df_row['images']
        path_mask = df_row['masks']
                
        # image
        image = cv2.imread(path_image)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image_tensor = self.transforms(image).float()
        
        # mask
        mask = cv2.imread(path_mask)
        mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
        mask = cv2.resize(mask, (image_size, image_size))
        _, mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY) # pixel 값이 0~255의 중간값인 127을 기준으로 흑, 백 처리
        mask_tensor = torch.as_tensor(mask[None], dtype=torch.float32)

        mask_tensor /= 255.
        
        return image_tensor, mask_tensor
    
    def collate_fn(self, batch):
        images, masks = tuple(zip(*batch))
        images = [img[None] for img in images]
        masks = [msk[None] for msk in masks]
        images, masks = [torch.cat(i).to(device) for i in [images, masks]]
        return images, masks

In [None]:
dataset_train = CrackDataset(train)

# image
sample_image_tensor = dataset_train[3584][0]
plt.subplot(121)
plt.imshow(dataset_train[3584][0].cpu().detach().numpy().transpose(1,2,0)) # C x H x W ➡️ H x W x C

# mask
sample_mask_tensor = dataset_train[3584][1]
plt.subplot(122)
plt.imshow(dataset_train[3584][1].cpu().detach().numpy().transpose(1,2,0), cmap='gray') # C x H x W ➡️ H x W x C

In [None]:
def calculate_iou(pred_mask, true_mask):
    # print(f'🔥 true_mask: {true_mask}')
     

    pred_mask = torch.from_numpy(pred_mask)
    pred_mask.squeeze_(dim=2) 
    pred_mask = torch.clamp(pred_mask, min=0)
    # print(f'🔥 pred_mask: {pred_mask}')
    pred_mask = pred_mask.numpy()
    pred_mask = np.round(255 * pred_mask)
    # print(f'🔥 pred_mask: {pred_mask}')

    pred_mask = pred_mask.astype(np.uint8)
    _, pred_mask = cv2.threshold(pred_mask, 127, 255, cv2.THRESH_BINARY)
    # print(f'🔥 pred_mask: {pred_mask}')
    pred_mask = torch.from_numpy(pred_mask)
    true_mask = torch.from_numpy(true_mask)

    # print(f'⭐ pred_mask: {pred_mask}')
    # print(f'⭐ true_mask: {true_mask}')
    pred_mask = (pred_mask >= 255).bool()
    true_mask = (true_mask >= 255).bool()
    print(f'🔥 pred_mask: {torch.sum(pred_mask)}')

    intersection = torch.logical_and(pred_mask, true_mask)
    union = torch.logical_or(pred_mask, true_mask)
    print(torch.sum(intersection))
    print(torch.sum(union))
    iou = torch.sum(intersection) / torch.sum(union)
    # # print(f'✅ pred_mask: {pred_mask}, \n true_mask: {true_mask}')

    # intersection = torch.logical_and(pred_mask, true_mask)
    # union = torch.logical_or(pred_mask, true_mask)
    # iou = torch.sum(intersection) / torch.sum(union)
    
    return iou


In [None]:
def calculate_dice(pred_mask, true_mask):    
    
    pred_mask = torch.from_numpy(pred_mask)
    pred_mask.squeeze_(dim=2) 
    pred_mask = pred_mask.numpy()
    pred_mask = np.round( 255 * pred_mask)
    pred_mask = pred_mask.astype(np.uint8)
    _, pred_mask = cv2.threshold(pred_mask, 127, 255, cv2.THRESH_BINARY)
    pred_mask = torch.from_numpy(pred_mask)
    true_mask = torch.from_numpy(true_mask)
    
    pred_mask = (pred_mask >= 255).float()
    true_mask = (true_mask >= 255).float()

    
    
    intersection = torch.sum(pred_mask * true_mask)
    union = torch.sum(pred_mask) + torch.sum(true_mask)

    dice = 2 * intersection / (union + 1e-8)
    return dice

In [None]:
@torch.no_grad()
def validate_test_image(model, dataset):
    idx = np.random.randint(len(dataset))
    dataset = dataset.reset_index(drop=True)
    df_row = dataset.loc[idx].squeeze()
    
    # image
    image = cv2.imread(df_row['images'])
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image_tensor = train_transforms(image).unsqueeze(0).to(device)
    #print(f'image_tensor.dtype: {image_tensor.dtype}')
    #print(f'image.shape: {image_tensor.shape}')
    #print('--------------')

# [Original Version]
# -------------------------------------------------------------

    # Mask 
    mask = cv2.imread(df_row['masks'])
    mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
    mask = cv2.resize(mask, (image_size, image_size))
    _, mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
    

    model.eval()
    predicted_mask = model(image_tensor)
    predicted_mask = predicted_mask['out'][0].cpu().detach().numpy().transpose(1,2,0)
# -------------------------------------------------------------

# [New Version]
# -------------------------------------------------------------

    # mask
    # mask = cv2.imread(df_row['masks'])  
    # mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
    # mask = cv2.resize(mask, (image_size, image_size))
    # _, mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
    # mask = torch.from_numpy(mask)
    #print(f'mask.dtype: {mask.dtype}')
    #print(f'mask.shape: {mask.shape}')
    #print('--------------')

    
    # ⭐ Inference
    # model.eval()
    # predicted_mask = model(image_tensor)
    # predicted_mask = predicted_mask['out'][0].cpu().detach().numpy().transpose(1,2,0)

    # predicted_mask = torch.from_numpy(predicted_mask) 
    # predicted_mask.squeeze_(dim=2) 
   
    # predicted_mask = predicted_mask.numpy()
    # predicted_mask = np.round( 255 * predicted_mask)
    # predicted_mask = predicted_mask.astype(np.uint8)

    # _, predicted_mask = cv2.threshold(predicted_mask, 127, 255, cv2.THRESH_BINARY)
    # predicted_mask = torch.from_numpy(predicted_mask)
# -------------------------------------------------------------


    #print(predicted_mask)
    #print(f'predicted_mask.dtype: {predicted_mask.dtype}')    
    #print(f'predicted_mask.shape: {predicted_mask.shape}')   
        
  
    plt.figure(figsize=(8, 4))
    plt.subplot(131)
    plt.title('Original image')
    plt.imshow(image)
    
    plt.subplot(132)
    plt.title('Original mask')
    plt.imshow(mask, cmap='gray')
    
    plt.subplot(133)
    plt.title('Predicted mask')
    plt.imshow(predicted_mask, cmap='gray')
    
    plt.tight_layout()
    plt.show()
    plt.pause(0.001)
    
    print(f'image shape: {image.shape}, label mask shape: {mask.shape}, predicted mask shape: {predicted_mask.shape}')
    
  
    IoU = calculate_iou(predicted_mask, mask)
    dice = calculate_dice(predicted_mask, mask)

    print(f'✅ IoU: {IoU}')
    print(f'🎲 dice: {dice}')

    return IoU, dice

In [None]:
train_dataset = CrackDataset(train)
valid_dataset = CrackDataset(valid)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=train_dataset.collate_fn, drop_last=True) # batch_size = 4
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, collate_fn=valid_dataset.collate_fn, drop_last=True) # batch_size = 4

In [None]:
from torchvision.models.segmentation.deeplabv3 import DeepLabHead

def get_model(output_channels=1, unfreeze=True):
    model = torchvision.models.segmentation.deeplabv3_resnet101(pretrained=True, progress=False)
    
    for param in model.parameters():
        param.requires_grad = unfreeze # True
    
    model.classifier = DeepLabHead(2048, output_channels) # output_channels = 1
    
    return model.to(device)

model = get_model()  # set output_channels = 3 if we work with colored masks

In [None]:
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)  # if unfreeze=True -> 1e-4, 1e-5, so not to ruin good init w
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3, min_lr=1e-6, factor=0.1)
early = EarlyStopping()

In [None]:
def train_one_batch(batch, model, criterion, optimizer):
    images, masks = batch
    optimizer.zero_grad()
    predicted_masks = model(images)['out']
    # print(f'✅ predicted_masks.shape =', predicted_masks.shape)
    # print(f'✅ predicted_masks=', predicted_masks)
    # print(f'🔥 mask = {masks}' )
    
    loss = criterion(predicted_masks, masks)
    loss.backward()
    optimizer.step()
    
    return loss.item()

@torch.no_grad()

def validate_one_batch(batch, model, criterion):
    images, masks = batch
    prediction_masks = model(images)['out']
    loss = criterion(prediction_masks, masks) # 4 x 1 x 256 x 256 (batch x C x H x W)
    return loss.item()

In [None]:
train_losses, valid_losses = [], []
ious, dices = [], []

In [None]:
for epoch in range(epochs):

    print(f'Epoch {epoch + 1}/{epochs}')
    epoch_train_losses, epoch_valid_losses = [], []
    
    # 🏋️ Training
    model.train()
    for _, batch in enumerate(tqdm(train_dataloader, leave=False)):
        batch_train_loss = train_one_batch(batch, model, criterion, optimizer)
        epoch_train_losses.append(batch_train_loss)

    epoch_train_loss = np.array(epoch_train_losses).mean()
    train_losses.append(epoch_train_loss)

    print(f'Train loss: {epoch_train_loss:.4f}.')
    
    # ⭐ Inference
    model.eval()
    for i, batch in enumerate(tqdm(valid_dataloader, leave=False)):
        batch_valid_loss = validate_one_batch(batch, model, criterion)
        epoch_valid_losses.append(batch_valid_loss)
        
    epoch_valid_loss = np.array(epoch_valid_losses).mean()
    valid_losses.append(epoch_valid_loss)
    print(f'Valid loss: {epoch_valid_loss:.4f}.')
    print('-'*50)

    # Score & Early Stopping
    IoU, dice = validate_test_image(model, test)
    ious.append(IoU), dices.append(dice)
    
    print(f'✅ IoU: {IoU}')
    print(f'🎲 dice: {dice}')

    print(f'1️⃣ IoU List: {ious}')
    print(f'2️⃣ dice List: {dices}')
    scheduler.step(epoch_valid_loss)
    early(epoch_valid_loss, model=model)
    if early.early_stop:
        print(f'Validation loss did not improve for {early.patience} epochs. Training stopped.')
        model.load_state_dict(torch.load(path))
        break

<hr></hr>