In [1]:
import sys

sys.path.append('..')
from src.tools.print_sysinfo import print_env
print_env()

DATE : 2023-08-30
Pyton Version : 3.8.17
PyTorch Version : 1.12.1
OS : Linux 5.4.0-155-generic
CPU spec : x86_64
RAM spec : 503.73 GB
Device 0:
Name: NVIDIA A100-SXM4-40GB
Total Memory: 40536.1875 MB
Driver Version: 470.199.02
Device 1:
Name: NVIDIA A100-SXM4-40GB
Total Memory: 40536.1875 MB
Driver Version: 470.199.02
Device 2:
Name: NVIDIA A100-SXM4-40GB
Total Memory: 40536.1875 MB
Driver Version: 470.199.02
Device 3:
Name: NVIDIA DGX Display
Total Memory: 3911.875 MB
Driver Version: 470.199.02
Device 4:
Name: NVIDIA A100-SXM4-40GB
Total Memory: 40536.1875 MB
Driver Version: 470.199.02


In [2]:
import os
import cv2
from PIL import Image
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2

from src.tools.rle_encoder import rle_encode
from src.data.dataset import SourceDataset, TargetDataset

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

  warn(f"Failed to load image Python extension: {e}")


cpu


In [3]:
# GPU 사용 가능 여부 확인
gpu_available = torch.cuda.is_available()

# GPU 이름 가져오기
gpu_name = torch.cuda.get_device_name(0) if gpu_available else "No GPU available"

gpu_available, gpu_name

(False, 'No GPU available')

In [3]:
from torchvision.transforms import functional as F

class FisheyeTransform:
    def __init__(self, k=0.5, center=None):
        """
        Initialize the transform with distortion coefficient and center.
        """
        self.k = k
        self.center = center

    def __call__(self, image):
        """
        Apply the fisheye transform.
        """
        image = np.array(image)
        transformed_image = self.fisheye_transform(image, self.k, self.center)
        return F.to_pil_image(transformed_image)

    def fisheye_transform(self, image, k, center):
        """
        Apply fisheye transformation to an image.
        """
        rows, cols, _ = image.shape
        if center is None:
            center = (cols // 2, rows // 2)

        map_x = np.zeros((rows, cols), dtype=np.float32)
        map_y = np.zeros((rows, cols), dtype=np.float32)

        for i in range(rows):
            for j in range(cols):
                r = np.sqrt((i - center[1]) ** 2 + (j - center[0]) ** 2)
                theta = np.arctan(r)
                theta_d = theta + k * theta ** 3
                scale = theta_d / r if r != 0 else 1
                map_x[i, j] = center[0] + (j - center[0]) * scale
                map_y[i, j] = center[1] + (i - center[1]) * scale

        return cv2.remap(image, map_x, map_y, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)


In [10]:
from albumentations import (
    HorizontalFlip, IAAPerspective, ShiftScaleRotate, CLAHE, RandomRotate90,
    Transpose, ShiftScaleRotate, Blur, OpticalDistortion, GridDistortion, HueSaturationValue,
     GaussNoise, MotionBlur, MedianBlur, PiecewiseAffine,
    Sharpen, Emboss, RandomBrightnessContrast, Flip, OneOf, Compose
)
from albumentations.core.transforms_interface import DualTransform

# Here's the fisheye_transform function:
def fisheye_transform(image, k):
    height, width = image.shape[:2]
    fx, fy = width / 2, height / 2

    # Generate fisheye corrected image
    corrected_image = np.zeros_like(image)
    for i in range(height):
        for j in range(width):
            theta = np.arctan2(i - fy, j - fx)
            r = np.sqrt((i - fy) ** 2 + (j - fx) ** 2)
            r_corrected = r / (1 + k * r ** 2)
            i_corrected = int(fy + r_corrected * np.sin(theta))
            j_corrected = int(fx + r_corrected * np.cos(theta))
            
            # Ensure new coordinates are within image bounds
            if 0 <= i_corrected < height and 0 <= j_corrected < width:
                corrected_image[i, j] = image[i_corrected, j_corrected]
    return corrected_image

class FisheyeAug(A.DualTransform):
    def __init__(self, k=0.5, p=0.5):
        super(FisheyeAug, self).__init__(p=p)
        self.k = k

    def apply(self, img, **params):
        return fisheye_transform(img, self.k)
    
    def apply_to_mask(self, mask, **params):
        return fisheye_transform(mask, self.k)
    
def get_training_augmentation():
    train_transform = [
        A.Resize(224, 224),
        A.Normalize(always_apply=True),
        OneOf([
            GaussNoise(always_apply=True),
        ], p=0.2),
        OneOf([
            MotionBlur(p=0.2),
            MedianBlur(blur_limit=3, p=0.1, always_apply=True),
            Blur(blur_limit=3, p=0.1, always_apply=True),
        ], p=0.2),
        ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.2, rotate_limit=15, p=0.2),
        OneOf([
            OpticalDistortion(p=0.3),
            GridDistortion(p=0.1),
            PiecewiseAffine(p=0.3),
        ], p=0.2),
        OneOf([
            Sharpen(always_apply=True, p=1.0),
            Emboss(always_apply=True, p=1.0),
            RandomBrightnessContrast(always_apply=True, p=1.0),
        ], p=0.3),
        HueSaturationValue(always_apply=True, p=1.0),
        FisheyeAug(k=0.5, p=1.0),
        ToTensorV2()
    ]
    return Compose(train_transform)

transform = A.Compose(
    [   
        A.Resize(224, 224),
        A.Normalize(),
        ToTensorV2()
    ]
)

augmentation = get_training_augmentation()


train_dataset = SourceDataset(csv_file='train_source.csv', transform=augmentation, is_training=True)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)

valid_dataset = SourceDataset(csv_file='val_source.csv', transform=transform, is_training=True)
valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False, num_workers=4)

In [11]:
def double_conv(in_channels, out_channels):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, 3, padding=1),
        nn.ReLU(inplace=True)
    )

class FPN_UNet_Dropout(nn.Module):
    def __init__(self):
        super(FPN_UNet_Dropout, self).__init__()

        # Encoder (Downsampling path)
        self.dconv_down1 = double_conv(3, 64)
        self.dropout1 = nn.Dropout(0.5)  # 추가된 Dropout 레이어
        self.dconv_down2 = double_conv(64, 128)
        self.dropout2 = nn.Dropout(0.5)  # 추가된 Dropout 레이어
        self.dconv_down3 = double_conv(128, 256)
        self.dropout3 = nn.Dropout(0.5)  # 추가된 Dropout 레이어
        self.dconv_down4 = double_conv(256, 512)
        self.dropout4 = nn.Dropout(0.5)  # 추가된 Dropout 레이어

        self.maxpool = nn.MaxPool2d(2)
        
        # Upward path and lateral connections for FPN
        self.upconv3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.lateral3 = nn.Conv2d(256, 256, kernel_size=1)
        
        self.upconv2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.lateral2 = nn.Conv2d(128, 128, kernel_size=1)
        
        self.upconv1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.lateral1 = nn.Conv2d(64, 64, kernel_size=1)
        
        # FPN heads for each pyramid level
        self.fpn_out3 = nn.Conv2d(256, 13, kernel_size=3, padding=1)
        self.fpn_out2 = nn.Conv2d(128, 13, kernel_size=3, padding=1)
        self.fpn_out1 = nn.Conv2d(64, 13, kernel_size=3, padding=1)
        
    def forward(self, x):
        # Encoder
        conv1 = self.dconv_down1(x)
        x = self.maxpool(self.dropout1(conv1))  # Dropout 적용
        
        conv2 = self.dconv_down2(x)
        x = self.maxpool(self.dropout2(conv2))  # Dropout 적용
        
        conv3 = self.dconv_down3(x)
        x = self.maxpool(self.dropout3(conv3))  # Dropout 적용
        
        x = self.dropout4(self.dconv_down4(x))  # Dropout 적용

        
        # Upward path with lateral connections
        x = self.upconv3(x)
        conv3 = self.lateral3(conv3)
        p3 = torch.add(x, conv3)  # Element-wise addition
        out3 = self.fpn_out3(p3)
        
        x = self.upconv2(p3)
        conv2 = self.lateral2(conv2)
        p2 = torch.add(x, conv2)
        out2 = self.fpn_out2(p2)
        
        x = self.upconv1(p2)
        conv1 = self.lateral1(conv1)
        p1 = torch.add(x, conv1)
        out1 = self.fpn_out1(p1)
        
        # Note: You can return combined results or individual FPN layer outputs based on the use case.
        return out1, out2, out3

class FPN_UNet_FC(nn.Module):
    def __init__(self):
        super(FPN_UNet_FC, self).__init__()
        self.fpn_unet = FPN_UNet_Dropout()
        self.upsample = nn.Upsample(size=(224, 224), mode='bilinear', align_corners=True)
        self.conv1x1 = nn.Conv2d(13+13+13, 13, kernel_size=1)  # Assuming we're concatenating

    def forward(self, x):
        out1, out2, out3 = self.fpn_unet(x)

        # Upsample each output to the desired size: 224x224
        out1_upsampled = self.upsample(out1)
        out2_upsampled = self.upsample(out2)
        out3_upsampled = self.upsample(out3)

        # Concatenate the outputs along the channel dimension
        merged_output = torch.cat([out1_upsampled, out2_upsampled, out3_upsampled], dim=1)

        # Map to desired number of channels using 1x1 convolution
        final_output = self.conv1x1(merged_output)

        return final_output


In [8]:
import math

def compute_iou(pred, target, num_classes):
    iou_list = []
    pred = pred.view(-1)
    target = target.view(-1)

    # For classes excluding the background
    for cls in range(num_classes - 1):  # We subtract 1 to exclude the background class
        pred_inds = pred == cls
        target_inds = target == cls
        intersection = (pred_inds[target_inds]).sum().float()
        union = (pred_inds + target_inds).sum().float()
        if union == 0:
            iou_list.append(float('nan'))  # If there is no ground truth, do not include in evaluation
        else:
            iou_list.append((intersection / union).item())
    return iou_list

def compute_mIoU(preds, labels, num_classes=13):
    iou_list = compute_iou(preds, labels, num_classes)
    valid_iou_list = [iou for iou in iou_list if not math.isnan(iou)]
    mIoU = sum(valid_iou_list) / len(valid_iou_list)
    return mIoU


In [12]:
# 1. 모델 불러오기
model = FPN_UNet_FC()
#model.load_state_dict(torch.load('path_to_pretrained_model.pth'))
model.to(device)

# 2. 데이터 준비 (여기서는 간략하게 표현합니다)
#train_loader, val_loader = prepare_target_domain_dataloaders()

# 3. 학습 설정
criterion = nn.CrossEntropyLoss() # 예시로 CrossEntropyLoss를 사용합니다
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001) # 작은 learning rate 사용

# 4. 학습
num_epochs = 100
train_losses = []
train_mIoUs = []
val_mIoUs = []

# Early stopping 관련 설정
patience = 10  # 10번의 epoch 동안 성능 향상이 없을 경우 학습 중단
no_improve_epochs = 0  # 성능 향상이 없는 epoch의 횟수
best_mIoU = 0.0  # 최고의 검증 mIoU 저장

for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    total_iou = 0.0
    num_batches = 0
    
    for images, masks in tqdm(train_loader):
        images = images.float().to(device)
        masks = masks.long().to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        _, predicted = outputs.max(1)
        total_iou += compute_mIoU(predicted, masks)
        num_batches += 1
    
    avg_loss = total_loss / num_batches
    avg_train_mIoU = total_iou / num_batches
    train_losses.append(avg_loss)
    train_mIoUs.append(avg_train_mIoU)
    print(f"Epoch {epoch + 1} - Training Loss: {avg_loss:.4f}, Training mIoU: {avg_train_mIoU:.4f}")

    
    # 5. 검증 (간략하게 표현)
    with torch.no_grad():
        model.eval()
        total_iou = 0
        num_images = 0
        for images, masks in tqdm(valid_loader):
            images = images.float().to(device)
            masks = masks.long().to(device)
        
            outputs = model(images)
            _, predicted = outputs.max(1)
            total_iou += compute_mIoU(predicted, masks)
            num_images += images.size(0)
        avg_mIoU = total_iou / num_images
        print(f"Epoch {epoch + 1}, mIoU: {avg_mIoU:.4f}")

    # Early stopping 검사
    if avg_mIoU > best_mIoU:
        best_mIoU = avg_mIoU
        # 최적의 모델 저장
        torch.save(model.state_dict(), 'best_model.pth')
        no_improve_epochs = 0
    else:
        no_improve_epochs += 1
        if no_improve_epochs >= patience:
            print("Early stopping triggered!")
            # 최적의 모델 불러오기
            model.load_state_dict(torch.load('best_model.pth'))
            break

  0%|          | 0/138 [00:00<?, ?it/s]