# 모델학습

## Import package

In [1]:
# %pip install -U git+https://github.com/qubvel/segmentation_models.pytorch

In [3]:
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'  # 중복로드시 에러 무시

In [None]:
import sys
print(sys.version)
print("현재 가상 환경 경로:", sys.prefix)
print(sys.executable)

3.8.16 (default, Mar  2 2023, 03:21:46) 
[GCC 11.2.0]
현재 가상 환경 경로: /opt/anaconda3/envs/cuda11.8-pytorch2.0.0-py3.8
/opt/anaconda3/envs/cuda11.8-pytorch2.0.0-py3.8/bin/python


In [None]:
# %pip install albumentations
# %pip install segmentation_models_pytorch
# %pip install matplotlib
# %pip install pandas
# %pip install scikit-learn
# %pip install torch torchvision torchaudio
# %pip install numpy
# %pip install opencv-python
# # (서버 환경이나 GUI가 필요없다면 opencv-python-headless를 사용할 수도 있습니다.)

In [2]:
import torch
import albumentations as albu
from torch.utils.data import DataLoader
from torch.utils.data import Dataset as BaseDataset

import segmentation_models_pytorch as smp
import segmentation_models_pytorch.utils

import numpy as np
import cv2
import matplotlib.pyplot as plt
import shutil
import glob
import pandas as pd
import os
import random

os.environ['CUDA_VISIBLE_DEVICES'] = '0'

  from .autonotebook import tqdm as notebook_tqdm
  check_for_updates()


In [3]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

seed = 42
seed_everything(seed)

In [4]:
print(torch.cuda.is_available())

True


### Set and Initialize File Paths

In [5]:
def rebuild_dir(target_path):
    if os.path.exists(target_path):
        shutil.rmtree(target_path)
        os.makedirs(target_path)
    else:
        os.makedirs(target_path)

In [6]:
workspace_path = os.getcwd()
dataset_name = 'Full_Image'
dataset_path=os.path.join(workspace_path, dataset_name)

auto_labeling_path = os.path.join(dataset_path, 'cropped')

In [7]:
TG_images_name = []
TG_labels_name = []

for i in range(1, 15):
    if i == 8:
        pass
    else:
        TG_images_name.append(str(i)+'_TIFF_Cropped')
        TG_labels_name.append(str(i)+'_JSON_Cropped')

AKITA_images_name = []
AKITA_labels_name = []

for i in range(1, 7):
    AKITA_images_name.append('AKITA.'+str(i)+'_TIFF_Cropped')
    AKITA_labels_name.append('AKITA.'+str(i)+'_JSON_Cropped')

STZ_images_name = []
STZ_labels_name = []

STZ_images_name.append('B6 STZ.01_TIFF_Cropped')
STZ_labels_name.append('B6 STZ.01_JSON_Cropped')

STZ_images_name.append('B6 STZ.02_TIFF_Cropped')
STZ_labels_name.append('B6 STZ.02_JSON_Cropped')

for i in range(1, 7):
    if i == 5:
        pass
    else:
        STZ_images_name.append('STZ.50mpk.0'+str(i)+'_TIFF_Cropped')
        STZ_labels_name.append('STZ.50mpk.0'+str(i)+'_JSON_Cropped')

In [8]:
TG_imgs = []
TG_labels = []
for name in TG_images_name:
    TG_imgs.append(os.path.join(auto_labeling_path,name))
for name in TG_labels_name:
    TG_labels.append(os.path.join(auto_labeling_path,name))

In [9]:
AKITA_imgs = []
AKITA_labels = []
for name in AKITA_images_name:
    AKITA_imgs.append(os.path.join(auto_labeling_path,name))
for name in AKITA_labels_name:
    AKITA_labels.append(os.path.join(auto_labeling_path,name))

In [10]:
STZ_imgs = []
STZ_labels = []
for name in STZ_images_name:
    STZ_imgs.append(os.path.join(auto_labeling_path,name))
for name in STZ_labels_name:
    STZ_labels.append(os.path.join(auto_labeling_path,name))

In [11]:
tg_img_cnt = 0
tg_label_cnt = 0

akita_img_cnt = 0
akita_label_cnt = 0

stz_img_cnt = 0
stz_label_cnt = 0

for i in range(len(TG_imgs)):
    tg_img_cnt += len(os.listdir(TG_imgs[i]))
    tg_label_cnt += len(os.listdir(TG_labels[i]))

for i in range(len(AKITA_imgs)):
    akita_img_cnt += len(os.listdir(AKITA_imgs[i]))
    akita_label_cnt += len(os.listdir(AKITA_labels[i]))

for i in range(len(STZ_imgs)):
    stz_img_cnt += len(os.listdir(STZ_imgs[i]))
    stz_label_cnt += len(os.listdir(STZ_labels[i]))

print(tg_img_cnt, tg_label_cnt)
print(akita_img_cnt, akita_label_cnt)
print(stz_img_cnt, stz_label_cnt)

1604 1604
357 357
550 550


In [12]:
DATA_DIR = os.path.join(dataset_path,'Autolabeling_dataset')

x_dataset_dir = os.path.join(DATA_DIR, 'imgs')
y_dataset_dir = os.path.join(DATA_DIR, 'labels')
rebuild_dir(x_dataset_dir)
rebuild_dir(y_dataset_dir)

x_train_data_dir = os.path.join(DATA_DIR, 'train')
y_train_data_dir = os.path.join(DATA_DIR, 'train_labels')
rebuild_dir(x_train_data_dir)
rebuild_dir(y_train_data_dir)

x_test_data_dir = os.path.join(DATA_DIR, 'test')
y_test_data_dir = os.path.join(DATA_DIR, 'test_labels')
rebuild_dir(x_test_data_dir)
rebuild_dir(y_test_data_dir)

ckpt_path = os.path.join(DATA_DIR,'ckpt')
if not os.path.exists(ckpt_path):
    os.makedirs(ckpt_path,exist_ok=True)

In [13]:
# 파일 병합 및 이름 변경
def merge_and_rename_files(src_dir, dst_dir, prefix):
    for filename in os.listdir(src_dir):
        src_file_path = os.path.join(src_dir, filename)
        if os.path.isfile(src_file_path):
            # 새로운 파일명 생성
            new_filename = f"{prefix}_{filename}"
            dst_file_path = os.path.join(dst_dir, new_filename)
            # 파일 복사
            shutil.copy2(src_file_path, dst_file_path)

In [14]:
for i in range(len(TG_imgs)):
    merge_and_rename_files(TG_imgs[i], x_dataset_dir, TG_images_name[i])
    merge_and_rename_files(TG_labels[i], y_dataset_dir, TG_images_name[i])

for i in range(len(AKITA_imgs)):
    merge_and_rename_files(AKITA_imgs[i], x_dataset_dir, AKITA_images_name[i])
    merge_and_rename_files(AKITA_labels[i], y_dataset_dir, AKITA_images_name[i])

for i in range(len(STZ_imgs)):
    merge_and_rename_files(STZ_imgs[i], x_dataset_dir, STZ_images_name[i])
    merge_and_rename_files(STZ_labels[i], y_dataset_dir, STZ_images_name[i])

In [15]:
def permutation_train_test_split(data,label , valid_size=0.2, shuffle=True, random_state=seed):
    data_len=len(data)
    print(f'전체 데이터수 : {data_len}')
    valid_num=int(data_len*valid_size)
    train_num=data_len-valid_num

    if shuffle:
        shuffled=np.random.permutation(data_len)
        data=data[shuffled]
        label=label[shuffled]
        x_train=data[:train_num]
        y_train=label[:train_num]
        x_valid=data[train_num:]
        y_valid=label[train_num:]
    else:
        x_train=data[:train_num]
        y_train=label[:train_num]
        x_valid=data[train_num:]
        y_valid=label[train_num:]

    return x_train, y_train, x_valid, y_valid

# Train, Validation 파일 나누기 (8:2)
X_path=np.array(glob.glob(x_dataset_dir+"/*.png"))
Y_path=np.array(glob.glob(y_dataset_dir+"/*.png"))
x_train, y_train, x_valid, y_valid=permutation_train_test_split(X_path,Y_path,valid_size=0.1,shuffle=True,random_state=seed)

print('훈련 데이터 수 = img : ',len(x_train),', label : ',len(y_train))
print('검증 데이터 수 = img : ',len(x_valid),', label : ',len(y_valid))

전체 데이터수 : 2511
훈련 데이터 수 = img :  2260 , label :  2260
검증 데이터 수 = img :  251 , label :  251


In [16]:
for i in x_train:
    shutil.copy2(i,x_train_data_dir)
for i in y_train:
    shutil.copy2(i,y_train_data_dir)
for i in x_valid:
    shutil.copy2(i,x_test_data_dir)
for i in y_valid:
    shutil.copy2(i,y_test_data_dir)

### Define dataset

In [17]:
class Dataset(BaseDataset):

    CLASSES = ['x' for x in range(255)]
    CLASSES.append('cell')

    def __init__(
            self,
            images_dir,
            masks_dir,
            classes=None,
            augmentation=None,
            preprocessing=None,
    ):
        self.ids = sorted(os.listdir(images_dir))
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
        self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids]

        # convert str names to class values on masks
        self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes]

        self.augmentation = augmentation
        self.preprocessing = preprocessing

    def __getitem__(self, i):

        # read data
        image = cv2.imread(self.images_fps[i])
        mask = cv2.imread(self.masks_fps[i], 0)

        # extract certain classes from mask (e.g. cars)
        masks = [(mask == v) for v in self.class_values]
        mask = np.stack(masks, axis=-1).astype('float')

        # apply augmentations
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        # apply preprocessing
        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        return image, mask

    def __len__(self):
        return len(self.ids)

### Augment dataset

In [None]:
def get_training_augmentation():
    train_transform = [
        albu.HorizontalFlip(p=0.5),
        albu.VerticalFlip(p=0.5),

        albu.ShiftScaleRotate(shift_limit=0.05, scale_limit=(0.3, 0.9), rotate_limit=90, p=0.2, border_mode=cv2.BORDER_REPLICATE),

        albu.RandomBrightnessContrast(brightness_limit=(-0.2, 0.2), contrast_limit=(-0.2, 0.2), p=0.5),

        albu.OneOf(
            [
                albu.CLAHE(p=1),
                # albu.RandomBrightness(p=1),
                # 오류 발생 원인: RandomBrightness → 수정 필요!
                albu.RandomBrightnessContrast(p=1),  # ✅ 수정된 부분
                
                albu.RandomGamma(p=1),
            ],
            p=0.6,
        ),

        albu.OneOf(
            [
                albu.Blur(blur_limit=3, p=1),
                albu.MotionBlur(blur_limit=3, p=1),
                albu.GaussianBlur(blur_limit=3, p=1),
            ],
            p=0.6,
        ),

        albu.OneOf(
            [
                # albu.RandomBrightness(p=1),
                albu.RandomBrightnessContrast(p=1),  # ✅ 수정된 부분
                
                albu.HueSaturationValue(p=1),
            ],
            p=0.6,
        ),
        albu.OneOf([
            albu.GaussNoise(var_limit=(10.0, 50.0), p=1),
            albu.ISONoise(p=1),
        ], p=0.3),

        # --- 항상 마지막에 크기 고정 ---
        albu.Resize(512, 512, always_apply=True),
    ]
    return albu.Compose(train_transform)


def get_validation_augmentation():
    """Add paddings to make image shape divisible by 32"""
    test_transform = [
        # albu.PadIfNeeded(512, 512)
        
        albu.Resize(512, 512, always_apply=True),
    ]
    return albu.Compose(test_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')


def get_preprocessing(preprocessing_fn):

    _transform = [
        albu.Lambda(image=preprocessing_fn),
        albu.Lambda(image=to_tensor, mask=to_tensor),
    ]
    return albu.Compose(_transform)

## Train model

### Define encoder, activation function, model, hyper parameter

In [19]:
ENCODER = 'efficientnet-b2'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = ['cell']
ACTIVATION = 'sigmoid' # could be None for logits or 'softmax2d' for multiclass segmentation
DEVICE = 'cuda'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
# # DirectML 디바이스 생성
# dml = torch_directml.device()
# DEVICE = dml
MODEL = 'Unet++'
EPOCH = 30
# create segmentation model with pretrained encoder
if MODEL == 'Unet':
    model = smp.Unet(
        encoder_name=ENCODER,
        encoder_weights=ENCODER_WEIGHTS,
        classes=len(CLASSES),
        activation=ACTIVATION,
    )
elif MODEL == 'Unet++':
    model = smp.UnetPlusPlus(
        encoder_name=ENCODER,
        encoder_weights=ENCODER_WEIGHTS,
        classes=len(CLASSES),
        activation=ACTIVATION,
    )
elif MODEL == 'FPN':
    model = smp.FPN(
        encoder_name=ENCODER,
        encoder_weights=ENCODER_WEIGHTS,
        classes=len(CLASSES),
        activation=ACTIVATION,
    )
else:
    print('No model!')
preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

In [20]:
loss = smp.utils.losses.DiceLoss()
metrics = [
    smp.utils.metrics.IoU(threshold=0.5),
]

optimizer = torch.optim.AdamW([
    dict(params=model.parameters(), lr=0.0001),
])

### Set dataloader (원본+증대 데이터)

In [21]:
# create epoch runners
# it is a simple loop of iterating over dataloader`s samples
train_epoch = smp.utils.train.TrainEpoch(
    model,
    loss=loss,
    metrics=metrics,
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

valid_epoch = smp.utils.train.ValidEpoch(
    model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

### Train

In [None]:
import os
import torch
from sklearn.model_selection import KFold
from torch.utils.data import ConcatDataset, DataLoader, Subset

EPOCH = 20
BATCH_SIZE = 8
k_folds = 4

train_logs_list, valid_logs_list = [], []
max_score = 0

original_dataset = Dataset(
    x_train_data_dir,
    y_train_data_dir,
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    classes=CLASSES,
)

augmented_dataset = Dataset(
    x_train_data_dir,
    y_train_data_dir,
    augmentation=get_training_augmentation(),  # 여기에 증대 적용
    preprocessing=get_preprocessing(preprocessing_fn),
    classes=CLASSES,
)

kfold = KFold(n_splits=k_folds, shuffle=True)

for fold, (train_ids, valid_ids) in enumerate(kfold.split(original_dataset)):
    print(f'FOLD {fold}')
    print('--------------------------------')

    train_original_subset = Subset(original_dataset, train_ids)
    train_augmented_subset = Subset(augmented_dataset, train_ids)
    valid_subset = Subset(original_dataset, valid_ids)

     # 원본 데이터셋과 증대된 데이터셋을 합친 훈련 데이터셋
    train_combined_subset = ConcatDataset([train_original_subset, train_augmented_subset])

    # 데이터 로더 생성
    train_loader = DataLoader(train_combined_subset, batch_size=BATCH_SIZE, shuffle=True)
    valid_loader = DataLoader(valid_subset, batch_size=BATCH_SIZE, shuffle=False)
    
    for epoch in range(EPOCH):
        print(f'\nEpoch: {epoch}')

        train_logs = train_epoch.run(train_loader)
        valid_logs = valid_epoch.run(valid_loader) 
        train_logs_list.append(train_logs)
        valid_logs_list.append(valid_logs)

        if max_score < valid_logs['iou_score']:
            max_score = valid_logs['iou_score']
            torch.save(model, os.path.join(ckpt_path,'{}_model.pth'.format(dataset_name)))
            print('Model saved!')

    print(f'Finished fold {fold}')
    print('--------------------------------')

print('Training and evaluation completed for all folds.')

FOLD 0
--------------------------------

Epoch: 0
train:  28%|██████████████████████████████████████▏                                                                                                | 120/424 [01:09<02:53,  1.75it/s, dice_loss - 0.8949, iou_score - 0.2245]

## Test

### Load best model and dataset

In [None]:
# load best saved checkpoint
best_model = torch.load(os.path.join(ckpt_path,'{}_model.pth'.format(dataset_name)))

In [None]:
sample_path = os.path.join(ckpt_path,dataset_name)
sample_prediction = os.path.join(sample_path,'prediction')
sample_compare = os.path.join(sample_path,'compare(original-label-img)')
rebuild_dir(sample_path)
rebuild_dir(sample_prediction)
rebuild_dir(sample_compare)

### 테스트

In [None]:
test_dataset_vis = Dataset(
    x_test_data_dir,
    y_test_data_dir,
    classes=CLASSES,
)

In [None]:
# create test dataset
test_dataset = Dataset(
    x_test_data_dir,
    y_test_data_dir,
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    classes=CLASSES,
)

test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

In [None]:
# # evaluate model on test set
test_epoch = smp.utils.train.ValidEpoch(
    model=best_model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
)

logs = test_epoch.run(test_dataloader)
print("Evaluation on Test Data: ")
print(f"Mean IoU Score: {logs['iou_score']:.4f}")
print(f"Mean Dice Loss: {logs['dice_loss']:.4f}")

In [None]:
def draw_contours_on_image(image, mask, color):
    """Mask의 테두리를 주어진 색상으로 이미지에 그립니다."""
    contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    image_with_contours = image.copy()
    cv2.drawContours(image_with_contours, contours, -1, color, 2)
    return image_with_contours

def count_pixels_in_mask(mask):
    """마스크 안의 픽셀 수를 반환합니다."""
    return np.sum(mask)

In [None]:
for n in range(0, len(test_dataset)):
    image_vis = test_dataset_vis[n][0].astype('uint8')
    image_vis = cv2.cvtColor(image_vis, cv2.COLOR_BGR2RGB)
    image, gt_mask = test_dataset[n]

    gt_mask = gt_mask.squeeze()

    x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)
    pr_mask = best_model.predict(x_tensor)
    pr_mask = (pr_mask.squeeze().cpu().numpy().round())

    red_contoured = draw_contours_on_image(image_vis, (gt_mask * 255).astype(np.uint8), (255, 0, 0))
    blue_contoured = draw_contours_on_image(image_vis, (pr_mask * 255).astype(np.uint8), (0, 0, 255))
    combined_contoured = draw_contours_on_image(red_contoured, (pr_mask * 255).astype(np.uint8), (0, 0, 255))

    red_pixels_count = count_pixels_in_mask(gt_mask)
    blue_pixels_count = count_pixels_in_mask(pr_mask)
    
    intersection = np.logical_and(gt_mask, pr_mask)
    union = np.logical_or(gt_mask, pr_mask)
    iou_score = np.sum(intersection) / np.sum(union)
    
    fig, axes = plt.subplots(1, 6, figsize=(21, 7), facecolor='white')

    axes[0].set_title('original image', fontsize=10, pad=10)
    axes[0].axis('off')
    axes[0].imshow(image_vis)

    axes[1].set_title('Ground Truth', fontsize=10, pad=10)
    axes[1].axis('off')
    axes[1].imshow(cv2.cvtColor(gt_mask, cv2.COLOR_GRAY2RGB))

    axes[2].set_title('prediction', fontsize=10, pad=10)
    axes[2].axis('off')
    axes[2].imshow(cv2.cvtColor(pr_mask, cv2.COLOR_GRAY2RGB))

    axes[3].set_title('label with red contours', fontsize=10, pad=10)
    axes[3].axis('off')
    axes[3].imshow(red_contoured)

    axes[4].set_title('prediction with blue contours', fontsize=10, pad=10)
    axes[4].axis('off')
    axes[4].imshow(blue_contoured)

    axes[5].set_title('combined contours on original', fontsize=10, pad=10)
    axes[5].axis('off')
    axes[5].imshow(combined_contoured)

    # 각 subplot에 텍스트 추가
    axes[3].text(10, 10, f'Red pixels: {red_pixels_count}', color='white', fontsize=12, bbox=dict(facecolor='red', alpha=0.5))
    axes[4].text(10, 10, f'Blue pixels: {blue_pixels_count}', color='white', fontsize=12, bbox=dict(facecolor='blue', alpha=0.5))
    axes[5].text(10, 10, f'IoU score: {iou_score:.4f}', color='white', fontsize=12, bbox=dict(facecolor='green', alpha=0.5))

    plt.savefig(os.path.join(sample_compare, 'sample_compare_%05d.png' % n))
    cv2.imwrite(os.path.join(sample_prediction, '%05d.png' % n), cv2.cvtColor(pr_mask, cv2.COLOR_GRAY2RGB))

    plt.show()

## Show and save result

In [None]:
train_logs_df = pd.DataFrame(train_logs_list)
valid_logs_df = pd.DataFrame(valid_logs_list)
train_logs_df.T

In [None]:
plt.figure(figsize=(20,8))
plt.plot(train_logs_df.index.tolist(), train_logs_df.iou_score.tolist(), lw=3, label = 'Train')
plt.plot(valid_logs_df.index.tolist(), valid_logs_df.iou_score.tolist(), lw=3, label = 'Valid')
plt.xlabel('Epochs', fontsize=20)
plt.ylabel('IoU Score', fontsize=20)
plt.title('IoU Score Plot', fontsize=20)

# y축의 틱 간격을 0.1로 조절
plt.yticks(np.arange(0, 1.1, 0.1))

plt.legend(loc='best', fontsize=16)
plt.grid()
plt.savefig(os.path.join(sample_path,'iou_score_plot.png'))
plt.show()

In [None]:
plt.figure(figsize=(20,8))
plt.plot(train_logs_df.index.tolist(), train_logs_df.dice_loss.tolist(), lw=3, label = 'Train')
plt.plot(valid_logs_df.index.tolist(), valid_logs_df.dice_loss.tolist(), lw=3, label = 'Valid')
plt.xlabel('Epochs', fontsize=20)
plt.ylabel('Dice Loss', fontsize=20)
plt.title('Dice Loss Plot', fontsize=20)

# y축의 틱 간격을 0.1로 조절합니다.
plt.yticks(np.arange(0, max(train_logs_df.dice_loss.max(), valid_logs_df.dice_loss.max()) + 0.1, 0.1))

plt.legend(loc='best', fontsize=16)
plt.grid()
plt.savefig(os.path.join(sample_path,'dice_loss_plot.png'))
plt.show()

In [None]:
print('{} done'.format(dataset_name))

In [None]:
train_logs_df

In [None]:
train_logs_df.tail()

In [None]:
valid_logs_df.tail()

In [None]:
train_logs_df.to_csv(os.path.join(sample_path,'train_{}_{}epoch.csv'.format(MODEL,EPOCH)))
valid_logs_df.to_csv(os.path.join(sample_path,'valid_{}_{}epoch.csv'.format(MODEL,EPOCH)))