In [1]:
import os
import cv2
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
def rle_decode(mask_rle, shape):
    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape)

# RLE 인코딩 함수
#이진 이미지 마스크를 런 길이 인코딩(연속된 픽셀의 길이를 나타냄)으로 표현
def rle_encode(mask):
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

In [None]:
#이미지 데이터 명암대비 조정
def contrast_adjustment(image, alpha, beta):
    # 명암 대비 조정 적용
    adjusted_image = cv2.convertScaleAbs(image, alpha=alpha, beta=beta)
    return adjusted_image

In [None]:
#이미지 데이터 노이즈 제거(가우시안 필터링)
def image_noise_removal(image, kernel_size=3, sigma=0):
    # 가우시안 필터링을 사용하여 이미지 노이즈 제거
    filtered_image = cv2.GaussianBlur(image, (kernel_size, kernel_size), sigma)

    return filtered_image

In [4]:
class SatelliteDataset(Dataset):
    #데이터셋 초기화
    def __init__(self, csv_file, transform=None, infer=False):
        self.data = pd.read_csv(csv_file)
        for i in range(len(self.data.iloc[:,1] )):
          self.data.iloc[:,1][i]=self.data.iloc[:,1][i].replace("./", "/")
        self.data.iloc[:,1] = "./drive/MyDrive/sw_ai" + self.data.iloc[:,1]
        self.transform = transform
        self.infer = infer

    def __len__(self):
        return len(self.data)


    #주어진 인덱스에 해당하는 데이터 샘플 반환
    def __getitem__(self, idx):
        #이미지 파일 로드, RGB형식으로 변환
        img_path = self.data.iloc[idx, 1]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        #명암대비 조정 적용 (alpha는 명암조정비율, beta는 명암조정 상수)
        image = contrast_adjustment(image, alpha=1.5, beta=10)
         # 이미지 노이즈 제거
        image = image_noise_removal(image, kernel_size=3, sigma=0)

        if self.infer:
            if self.transform:
                image = self.transform(image=image)['image']
            return image

        mask_rle = self.data.iloc[idx, 2] #csv파일에서 런-길이 인코딩값 가져옴(문자열)
        mask = rle_decode(mask_rle, (image.shape[0], image.shape[1])) #런-길이 인코딩된 마스크를 이진 이미지 마스크로 디코딩

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        return image, mask

In [5]:
transform = A.Compose(
    [
        A.Resize(224, 224),
        A.Normalize(),
        ToTensorV2()
    ]
)

dataset = SatelliteDataset(csv_file='./drive/MyDrive/sw_ai/train.csv', transform=transform)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True, num_workers=4)



In [6]:
import torch.nn as nn
import torchvision.models

# HRNet
1. 다양한 해상도 특성 캡쳐가능
2. 고해상도 특성 유지
3. ResnetUNet보다는 무거움
4. 고해상도 이미지 분석 작업이라면 더 유리

In [None]:
import torchvision.models as models

class HRNetUNet(nn.Module):
    def __init__(self, n_class):
        super(HRNetUNet, self).__init__()

        # HRNet 모델 로드
        self.base_model = models.segmentation.hrnet.hrnetv2_hrnet18(pretrained=True)
        self.base_layers = list(self.base_model.children())

        # Encoder 부분
        self.layer0 = nn.Sequential(*self.base_layers[:3])  # size=(N, 64, x.H/2, x.W/2)
        self.layer1 = self.base_layers[3]  # size=(N, 64, x.H/4, x.W/4)
        self.layer2 = self.base_layers[4]  # size=(N, 18, x.H/4, x.W/4)
        self.layer3 = self.base_layers[5]  # size=(N, 36, x.H/8, x.W/8)
        self.layer4 = self.base_layers[6]  # size=(N, 72, x.H/16, x.W/16)

        # Decoder 부분
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        self.conv_up3 = convrelu(36 + 72, 72, 3, 1)  # 36채널과 72채널을 concat하고 72채널의 필터를 적용
        self.conv_up2 = convrelu(18 + 72, 36, 3, 1)  # 18채널과 72채널을 concat하고 36채널의 필터를 적용
        self.conv_up1 = convrelu(64 + 36, 36, 3, 1)  # 64채널과 36채널을 concat하고 36채널의 필터를 적용
        self.conv_up0 = convrelu(64 + 36, 18, 3, 1)  # 64채널과 36채널을 concat하고 18채널의 필터를 적용

        self.conv_original_size0 = convrelu(3, 64, 3, 1)  # 입력 이미지의 3채널에 64채널의 필터를 적용
        self.conv_original_size1 = convrelu(64, 64, 3, 1)  # 64채널에 64채널의 필터를 적용
        self.conv_original_size2 = convrelu(64 + 18, 64, 3, 1)  # 64채널과 18채널을 concat하고 64채널의 필터를 적용

        self.conv_last = nn.Conv2d(64, n_class, 1)  # 64채널을 n_class 채널로 변환하는 1x1 컨볼루션 필터

    def forward(self, input):
        x_original = self.conv_original_size0(input)  # 입력 이미지에 3채널 필터 적용
        x_original = self.conv_original_size1(x_original)  # 이전 계층의 출력에 64채널 필터 적용

        layer0 = self.layer0(input)  # 입력 이미지에 대한 첫 번째 레이어 적용
        layer1 = self.layer1(layer0)  # layer0의 출력에 대한 두 번째 레이어 적용
        layer2 = self.layer2(layer1)  # layer1의 출력에 대한 세 번째 레이어 적용
        layer3 = self.layer3(layer2)  # layer2의 출력에 대한 네 번째 레이어 적용
        layer4 = self.layer4(layer3)  # layer3의 출력에 대한 다섯 번째 레이어 적용

        x = self.upsample(layer4)  # layer4의 출력을 2배로 업샘플링
        x = torch.cat([x, layer3], dim=1)  # layer4의 출력과 layer3의 출력을 채널 방향으로 concat
        x = self.conv_up3(x)  # concat된 텐서에 convrelu 연산 적용

        x = self.upsample(x)  # 이전 계층의 출력을 2배로 업샘플링
        x = torch.cat([x, layer2], dim=1)  # 이전 계층의 출력과 layer2의 출력을 채널 방향으로 concat
        x = self.conv_up2(x)  # concat된 텐서에 convrelu 연산 적용

        x = self.upsample(x)  # 이전 계층의 출력을 2배로 업샘플링
        x = torch.cat([x, layer1], dim=1)  # 이전 계층의 출력과 layer1의 출력을 채널 방향으로 concat
        x = self.conv_up1(x)  # concat된 텐서에 convrelu 연산 적용

        x = self.upsample(x)  # 이전 계층의 출력을 2배로 업샘플링
        x = torch.cat([x, layer0], dim=1)  # 이전 계층의 출력과 layer0의 출력을 채널 방향으로 concat
        x = self.conv_up0(x)  # concat된 텐서에 convrelu 연산 적용

        x = self.upsample(x)  # 이전 계층의 출력을 2배로 업샘플링
        x = torch.cat([x, x_original], dim=1)  # 이전 계층의 출력과 원본 이미지의 출력을 채널 방향으로 concat
        x = self.conv_original_size2(x)  # concat된 텐서에 convrelu 연산 적용

        out = self.conv_last(x)  # conv_last를 통해 최종 출력을 얻음

        return out

## 기존 ResNetUNet에 스킵연결 적용

# ResNetUNet과의 차이점
디코더 부분에 스킵연결 레이어를 추가하여 인코더의 특징 맵을 디코더로 직접 전달함,
 디코더는 인코더의 저수준 특징과 고수준 특징을 결합하여 더 정확한 분할 결과를 얻을 수 있음.


In [None]:
class ResNetUNetSkip(nn.Module):
    def __init__(self, n_class):
        super(ResNetUNetSkip, self).__init__()

        # ResNet 모델 로드
        self.base_model = models.resnet34(pretrained=True)
        self.base_layers = list(self.base_model.children())

        # 인코더 레이어
        self.layer0 = nn.Sequential(*self.base_layers[:3])
        self.layer1 = nn.Sequential(*self.base_layers[3:5])
        self.layer2 = self.base_layers[5]
        self.layer3 = self.base_layers[6]
        self.layer4 = self.base_layers[7]

        # 디코더 레이어
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        self.dec4 = convrelu(512, 256, 3, 1)
        self.dec3 = convrelu(256 + 256, 256, 3, 1)
        self.dec2 = convrelu(256 + 128, 128, 3, 1)
        self.dec1 = convrelu(128 + 64, 64, 3, 1)
        self.dec0 = convrelu(64, 64, 3, 1)

        self.final_conv = nn.Conv2d(64, n_class, 1)

    def forward(self, input):
        # 인코더
        x0 = self.layer0(input)
        x1 = self.layer1(x0)
        x2 = self.layer2(x1)
        x3 = self.layer3(x2)
        x4 = self.layer4(x3)

        # 디코더
        x = self.upsample(x4)
        x = torch.cat([x, x3], dim=1)
        x = self.dec4(x)

        x = self.upsample(x)
        x = torch.cat([x, x2], dim=1)
        x = self.dec3(x)

        x = self.upsample(x)
        x = torch.cat([x, x1], dim=1)
        x = self.dec2(x)

        x = self.upsample(x)
        x = torch.cat([x, x0], dim=1)
        x = self.dec1(x)

        x = self.dec0(x)

        out = self.final_conv(x)

        return out

def convrelu(in_channels, out_channels, kernel_size, padding):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel_size, padding=padding),
        nn.ReLU(inplace=True)
    )

In [7]:
def convrelu(in_channels, out_channels, kernel, padding):
  return nn.Sequential(
    nn.Conv2d(in_channels, out_channels, kernel, padding=padding),
    nn.ReLU(inplace=True), #활성화 함수도 변경해볼 것
  )


class ResNetUNet(nn.Module):
  def __init__(self, n_class):
    super().__init__()

    self.base_model = torchvision.models.resnet34(pretrained=True)
    self.base_layers = list(self.base_model.children())

    self.layer0 = nn.Sequential(*self.base_layers[:3]) # size=(N, 64, x.H/2, x.W/2)
    self.layer0_1x1 = convrelu(64, 64, 1, 0)
    self.layer1 = nn.Sequential(*self.base_layers[3:5]) # size=(N, 64, x.H/4, x.W/4)
    self.layer1_1x1 = convrelu(64, 64, 1, 0)
    self.layer2 = self.base_layers[5]  # size=(N, 128, x.H/8, x.W/8)
    self.layer2_1x1 = convrelu(128, 128, 1, 0)
    self.layer3 = self.base_layers[6]  # size=(N, 256, x.H/16, x.W/16)
    self.layer3_1x1 = convrelu(256, 256, 1, 0)
    self.layer4 = self.base_layers[7]  # size=(N, 512, x.H/32, x.W/32)
    self.layer4_1x1 = convrelu(512, 512, 1, 0)

    self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

    self.conv_up3 = convrelu(256 + 512, 512, 3, 1)
    self.conv_up2 = convrelu(128 + 512, 256, 3, 1)
    self.conv_up1 = convrelu(64 + 256, 256, 3, 1)
    self.conv_up0 = convrelu(64 + 256, 128, 3, 1)

    self.conv_original_size0 = convrelu(3, 64, 3, 1)
    self.conv_original_size1 = convrelu(64, 64, 3, 1)
    self.conv_original_size2 = convrelu(64 + 128, 64, 3, 1)

    self.conv_last = nn.Conv2d(64, n_class, 1)

  def forward(self, input):
    x_original = self.conv_original_size0(input)
    x_original = self.conv_original_size1(x_original)

    layer0 = self.layer0(input)
    layer1 = self.layer1(layer0)
    layer2 = self.layer2(layer1)
    layer3 = self.layer3(layer2)
    layer4 = self.layer4(layer3)

    layer4 = self.layer4_1x1(layer4)
    x = self.upsample(layer4)
    layer3 = self.layer3_1x1(layer3)
    x = torch.cat([x, layer3], dim=1)
    x = self.conv_up3(x)

    x = self.upsample(x)
    layer2 = self.layer2_1x1(layer2)
    x = torch.cat([x, layer2], dim=1)
    x = self.conv_up2(x)

    x = self.upsample(x)
    layer1 = self.layer1_1x1(layer1)
    x = torch.cat([x, layer1], dim=1)
    x = self.conv_up1(x)

    x = self.upsample(x)
    layer0 = self.layer0_1x1(layer0)
    x = torch.cat([x, layer0], dim=1)
    x = self.conv_up0(x)

    x = self.upsample(x)
    x = torch.cat([x, x_original], dim=1)
    x = self.conv_original_size2(x)

    out = self.conv_last(x)

    return out

In [None]:
model = ResNetUNet(1).to(device)

# loss function과 optimizer 정의
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# training loop
for epoch in range(20):  # 10 에폭 동안 학습합니다.
    model.train()
    epoch_loss = 0
    for images, masks in tqdm(dataloader):
        images = images.float().to(device)
        masks = masks.float().to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks.unsqueeze(1))
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f'Epoch {epoch+1}, Loss: {epoch_loss/len(dataloader)}')

Downloading: "https://download.pytorch.org/models/resnet34-b627a593.pth" to /root/.cache/torch/hub/checkpoints/resnet34-b627a593.pth
100%|██████████| 83.3M/83.3M [00:00<00:00, 91.8MB/s]
 38%|███▊      | 169/447 [2:31:43<4:25:04, 57.21s/it]

In [None]:
test_dataset = SatelliteDataset(csv_file='./drive/MyDrive/sw_ai/test.csv', transform=transform, infer=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)

In [None]:
with torch.no_grad():
    model.eval()
    result = []
    for images in tqdm(test_dataloader):
        images = images.float().to(device)

        outputs = model(images)
        masks = torch.sigmoid(outputs).cpu().numpy()
        masks = np.squeeze(masks, axis=1)
        masks = (masks > 0.35).astype(np.uint8) # Threshold = 0.35

        for i in range(len(images)):
            mask_rle = rle_encode(masks[i])
            if mask_rle == '': # 예측된 건물 픽셀이 아예 없는 경우 -1
                result.append(-1)
            else:
                result.append(mask_rle)

In [None]:
submit = pd.read_csv('./drive/MyDrive/sw_ai/sample_submission.csv')
submit['mask_rle'] = result

In [None]:
submit.to_csv('./submit13.csv', index=False)

In [None]:
# 해상도 계산 함수
def measure_resolution(image_path):
  image = Imagee.open(image_path)
  width, height = image.size
  print(width)
  print(height)
  return width * height