In [1]:
import os 
import cv2
import torch
import torchvision
import PIL
import wandb
import natsort
import glob

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
import segmentation_models_pytorch as smp
import torch.optim.lr_scheduler as lr_scheduler
import albumentations as A

# from adamp import AdamP
from albumentations.pytorch.transforms import ToTensorV2
from torchmetrics.functional.classification import binary_jaccard_index, jaccard_index
from torchvision import transforms as T
from torch.utils.data import DataLoader, Dataset
from tqdm.autonotebook import tqdm
# from tqdm import tqdm
from sklearn.model_selection import train_test_split
from ipywidgets import interact
from torchvision.transforms.functional import to_pil_image
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from sklearn.model_selection import KFold, StratifiedKFold, GroupKFold
from pytorch_lightning.loggers import WandbLogger

pl.seed_everything(42)

Global seed set to 42


42

In [2]:
class UnetPlusPlus(pl.LightningModule):
    def __init__(self, args=None, optimizer='adam', scheduler='reducelr'):
        super().__init__()
        self.model = smp.UnetPlusPlus(
            encoder_name='efficientnet-b3',  # choose encoder, e.g. mobilenet_v2 or efficientnet-b7
            encoder_weights="imagenet",  # use `imagenet` pre-trained weights for encoder initialization
            in_channels=3,  # model input channels (1 for gray-scale images, 3 for RGB, etc.)
            classes=1,  # model output channels (number of classes in your dataset)
        
        )
        self.args = args
        self.criterion = nn.BCEWithLogitsLoss()
        self.optimizer = optimizer
        self.scheduler = scheduler

    def forward(self, x):
        x = self.model(x)
        return x

In [3]:
def mask_to_rle(mask):
    flatten_mask = mask.flatten()
    if flatten_mask.max() == 0:
        return f'0 {len(flatten_mask)}'
    idx = np.where(flatten_mask!=0)[0]
    steps = idx[1:]-idx[:-1]
    new_coord = []
    step_idx = np.where(np.array(steps)!=1)[0]
    start = np.append(idx[0], idx[step_idx+1])
    end = np.append(idx[step_idx], idx[-1])
    length = end - start + 1
    for i in range(len(start)):
        new_coord.append(start[i])
        new_coord.append(length[i])
    new_coord_str = ' '.join(map(str, new_coord))
    return new_coord_str

In [4]:
class TestDataset(Dataset):
    def __init__(self, df, img_dir):
        self.df = df
        self.img_dir = img_dir
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        imname = row['img']
        image_path = os.path.join(self.img_dir,imname)
        
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = np.transpose(image, (2,0,1)).astype(np.float32)
        image = torch.Tensor(image) / 255.0
        
        return image,imname

In [9]:
# 프로젝트 경로
PROJECT_DIR = './'
os.chdir(PROJECT_DIR)

#데이터 경로
DATA_DIR = os.path.join(PROJECT_DIR, 'DATA') # 모든 데이터가 들어있는 폴더 경로
TEST_IMG_DIR = os.path.join(DATA_DIR, 'images') # 테스트 이미지가 들어있는 폴더 경로
TEST_CSV_FILE = os.path.join(DATA_DIR, 'testdf.csv') # 테스트 이미지 이름이 들어있는 CSV 경로
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [10]:
testdf = pd.read_csv(TEST_CSV_FILE)
test_dataset = TestDataset(testdf, TEST_IMG_DIR)
test_loader = DataLoader(dataset=test_dataset, batch_size=1,shuffle=False)

In [12]:
model = UnetPlusPlus()
model = model.load_from_checkpoint('./Models/jaccard_index_value=0.7554.ckpt')
model.to(DEVICE)
model.eval()

UnetPlusPlus(
  (model): UnetPlusPlus(
    (encoder): EfficientNetEncoder(
      (_conv_stem): Conv2dStaticSamePadding(
        3, 40, kernel_size=(3, 3), stride=(2, 2), bias=False
        (static_padding): ZeroPad2d((0, 1, 0, 1))
      )
      (_bn0): BatchNorm2d(40, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
      (_blocks): ModuleList(
        (0): MBConvBlock(
          (_depthwise_conv): Conv2dStaticSamePadding(
            40, 40, kernel_size=(3, 3), stride=[1, 1], groups=40, bias=False
            (static_padding): ZeroPad2d((1, 1, 1, 1))
          )
          (_bn1): BatchNorm2d(40, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
          (_se_reduce): Conv2dStaticSamePadding(
            40, 10, kernel_size=(1, 1), stride=(1, 1)
            (static_padding): Identity()
          )
          (_se_expand): Conv2dStaticSamePadding(
            10, 40, kernel_size=(1, 1), stride=(1, 1)
            (static_padd

In [13]:
file_list = [] # 이미지 이름 저장할 리스트
pred_list = [] # 마스크 저장할 리스트
class_list = [] # 클래스 이름 저장할 리스트 ('building')

model.eval()
with torch.no_grad():
    for batch_index, (image,imname) in tqdm(enumerate(test_loader)):
        image = image.to(DEVICE)
        logit_mask = model(image)
        pred_mask = torch.sigmoid(logit_mask) # logit 값을 probability score로 변경
        pred_mask = (pred_mask > 0.5) * 1.0 # 0.5 이상 확률 가진 픽셀값 1로 변환
        pred_rle = mask_to_rle(pred_mask.detach().cpu().squeeze(0)) # 마스크를 RLE 형태로 변경
        pred_list.append(pred_rle)
        file_list.append(imname[0])
        class_list.append("building")

0it [00:00, ?it/s]

In [None]:
# 예측 결과 데이터프레임 만들기
results = pd.DataFrame({'img_id':file_list,'class':class_list,'prediction':pred_list})

# sample_submission.csv와 같은 형태로 변형
sampledf = pd.read_csv(os.path.join(TEST_DIR, 'sample_submission.csv'))
sorter = list(sampledf['img_id'])
results = results.set_index('img_id')
results = results.loc[sorter].reset_index()
                       
# 결과 저장
results.to_csv('/content/drive/MyDrive/landmap/UnetPlusPlus_b3.csv', index=False)