### 1. 데이터 준비 및 전처리
- **압축 해제**: `mini_data.zip` 파일의 내용을 압축 해제하고 파일 구조를 확인합니다.
- **데이터 분리**: 이미지와 마스크를 연결하는 CSV 파일을 사용하여 이미지와 해당 마스크를 매칭하고, 이를 훈련 데이터와 검증 데이터로 9:1 비율로 분리합니다.

In [36]:
import os
import pandas as pd

data_path = '../DataSet/BeadSegmentation/'
# train.csv 파일 경로
csv_path = os.path.join(data_path, 'mini_data', 'train.csv')

# CSV 파일 로드
data_df = pd.read_csv(csv_path)
data_df.head()


Unnamed: 0,ImageId,MaskId
0,20230930_21h03m03s_B01_1.png,20230930_21h03m03s_B01_1_mask.png
1,20230930_21h03m03s_B01_2.png,20230930_21h03m03s_B01_2_mask.png
2,20230930_21h03m06s_B02_1.png,20230930_21h03m06s_B02_1_mask.png
3,20230930_21h03m06s_B02_2.png,20230930_21h03m06s_B02_2_mask.png
4,20230930_21h03m06s_B02_3.png,20230930_21h03m06s_B02_3_mask.png


### 데이터 분리 단계
- **데이터셋 셔플링**: 데이터를 무작위로 섞어서 모델이 특정 순서에 의존하지 않도록 합니다.
- **분리**: 전체 데이터셋을 90%는 훈련용으로, 나머지 10%는 검증용으로 분리합니다.
- **파일 저장**: 분리된 데이터를 각각의 폴더(훈련 및 검증)에 저장합니다.

In [37]:
from sklearn.model_selection import train_test_split

# 데이터셋을 훈련용과 검증용으로 분리 (90% 훈련, 10% 검증)
train_df, val_df = train_test_split(data_df, test_size=0.1, random_state=42)

# 훈련용과 검증용 데이터 프레임 확인
train_df.head(), val_df.head(), len(train_df), len(val_df)


(                         ImageId                             MaskId
 37  20230930_21h03m42s_B11_2.png  20230930_21h03m42s_B11_2_mask.png
 12  20230930_21h03m13s_B04_3.png  20230930_21h03m13s_B04_3_mask.png
 19  20230930_21h03m20s_B06_2.png  20230930_21h03m20s_B06_2_mask.png
 4   20230930_21h03m06s_B02_3.png  20230930_21h03m06s_B02_3_mask.png
 25  20230930_21h03m31s_B08_2.png  20230930_21h03m31s_B08_2_mask.png,
                          ImageId                             MaskId
 27  20230930_21h03m31s_B08_4.png  20230930_21h03m31s_B08_4_mask.png
 40  20230930_21h03m46s_B12_1.png  20230930_21h03m46s_B12_1_mask.png
 26  20230930_21h03m31s_B08_3.png  20230930_21h03m31s_B08_3_mask.png
 43  20230930_21h03m46s_B12_4.png  20230930_21h03m46s_B12_4_mask.png
 24  20230930_21h03m31s_B08_1.png  20230930_21h03m31s_B08_1_mask.png,
 43,
 5)

### 데이터셋 폴더 셋업

In [38]:
import os
import shutil

# 데이터 세트 유형 선택 ('mini_data' 또는 'original_data')
data_type = 'mini_data'  # 또는 'original_data'

# 데이터셋 경로 설정
dataset_path = os.path.join(data_path, data_type)

# 훈련 및 검증 데이터 폴더 생성
train_dir = os.path.join(data_path, 'train')
val_dir = os.path.join(data_path, 'val')

# 훈련 및 검증 폴더 내의 이미지와 마스크 폴더 생성
train_img_dir = os.path.join(train_dir, 'images')
train_mask_dir = os.path.join(train_dir, 'masks')
val_img_dir = os.path.join(val_dir, 'images')
val_mask_dir = os.path.join(val_dir, 'masks')

# 기존 폴더가 있을 경우 삭제
for dir_path in [train_dir, val_dir]:
    if os.path.exists(dir_path):
        shutil.rmtree(dir_path)
    os.makedirs(dir_path)

# 이미지와 마스크 폴더 생성
for sub_dir in [train_img_dir, train_mask_dir, val_img_dir, val_mask_dir]:
    os.makedirs(sub_dir)

# 훈련용 데이터 복사
for idx, row in train_df.iterrows():
    # 원본 이미지와 마스크 경로
    src_image_path = os.path.join(dataset_path, 'images', row['ImageId'])
    src_mask_path = os.path.join(dataset_path, 'masks', row['MaskId'])
    # 대상 이미지와 마스크 경로
    dst_image_path = os.path.join(train_img_dir, row['ImageId'])
    dst_mask_path = os.path.join(train_mask_dir, row['MaskId'])
    
    # 파일 복사
    shutil.copy(src_image_path, dst_image_path)
    shutil.copy(src_mask_path, dst_mask_path)

# 검증용 데이터 복사
for idx, row in val_df.iterrows():
    # 원본 이미지와 마스크 경로
    src_image_path = os.path.join(dataset_path, 'images', row['ImageId'])
    src_mask_path = os.path.join(dataset_path, 'masks', row['MaskId'])
    # 대상 이미지와 마스크 경로
    dst_image_path = os.path.join(val_img_dir, row['ImageId'])
    dst_mask_path = os.path.join(val_mask_dir, row['MaskId'])
    
    # 파일 복사
    shutil.copy(src_image_path, dst_image_path)
    shutil.copy(src_mask_path, dst_mask_path)

# 확인
os.listdir(train_img_dir), os.listdir(train_mask_dir), os.listdir(val_img_dir), os.listdir(val_mask_dir)


(['20230930_21h03m13s_B04_2.png',
  '20230930_21h03m13s_B04_4.png',
  '20230930_21h03m20s_B06_3.png',
  '20230930_21h03m50s_B13_2.png',
  '20230930_21h03m35s_B09_1.png',
  '20230930_21h03m35s_B09_3.png',
  '20230930_21h03m06s_B02_3.png',
  '20230930_21h03m39s_B10_3.png',
  '20230930_21h03m09s_B03_3.png',
  '20230930_21h03m50s_B13_3.png',
  '20230930_21h03m46s_B12_3.png',
  '20230930_21h03m06s_B02_4.png',
  '20230930_21h03m42s_B11_1.png',
  '20230930_21h03m20s_B06_4.png',
  '20230930_21h03m39s_B10_2.png',
  '20230930_21h03m06s_B02_1.png',
  '20230930_21h03m20s_B06_1.png',
  '20230930_21h03m50s_B13_1.png',
  '20230930_21h03m42s_B11_3.png',
  '20230930_21h03m09s_B03_2.png',
  '20230930_21h03m06s_B02_2.png',
  '20230930_21h03m42s_B11_4.png',
  '20230930_21h03m35s_B09_2.png',
  '20230930_21h03m03s_B01_2.png',
  '20230930_21h03m16s_B05_4.png',
  '20230930_21h03m13s_B04_1.png',
  '20230930_21h03m35s_B09_4.png',
  '20230930_21h03m42s_B11_2.png',
  '20230930_21h03m16s_B05_3.png',
  '20230930_21

###  백본을 활용한 UNet 모델 구현

In [39]:
import torch
from torch import nn
from torchvision.models import efficientnet_b4

class UNetWithEfficientNetB4(nn.Module):
    def __init__(self, num_classes):
        super(UNetWithEfficientNetB4, self).__init__()
        self.backbone = efficientnet_b4(pretrained=True).features
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(1792, 256, kernel_size=2, stride=2),
            nn.ReLU(),
            nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2),
            nn.ReLU(),
            nn.ConvTranspose2d(64, num_classes, kernel_size=2, stride=2)
        )

    def forward(self, x):
        x = self.backbone(x)
        x = self.decoder(x)
        return x

# Initialize model
num_classes = 1  # Since we are doing binary segmentation
model = UNetWithEfficientNetB4(num_classes=num_classes)

# Create a dummy input tensor (batch_size, num_channels, height, width)
dummy_input = torch.randn(1, 3, 256, 256)  # Example input size

# Forward pass through the model
output = model(dummy_input)

# Print the output shape
print("Output shape:", output.shape)




Output shape: torch.Size([1, 1, 128, 128])


### 데이터로더 구현

In [40]:
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image, ImageOps
import os
import numpy as np

class SegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None, target_transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.target_transform = target_transform
        self.images = sorted(os.listdir(image_dir))  # Make sure files are sorted

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = self.images[idx]
        img_path = os.path.join(self.image_dir, img_name)
        mask_path = os.path.join(self.mask_dir, img_name.replace('.png', '_mask.png'))
        image = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path).convert("L")  # Assuming mask is in grayscale

        # Apply histogram equalization
        if self.transform:
            image = self.transform(image)
        
        if self.target_transform:
            mask = self.target_transform(mask)

        return image, mask

# Histogram equalization and normalization for images
def enhance_image(image):
    image_np = np.array(image)
    image_eq = ImageOps.equalize(Image.fromarray(image_np))
    return image_eq

# Transforms
transform = transforms.Compose([
    transforms.Lambda(enhance_image),
    transforms.Resize((448, 448)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

target_transform = transforms.Compose([
    transforms.Resize((448, 448)),
    transforms.ToTensor()
])

# Create datasets
train_dataset = SegmentationDataset(train_img_dir, train_mask_dir, transform, target_transform)
val_dataset = SegmentationDataset(val_img_dir, val_mask_dir, transform, target_transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)


### 학습 및 검증

In [47]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class DiceBCELoss(nn.Module):
    def __init__(self):
        super(DiceBCELoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        # Sigmoid activation to bring inputs to a probability space (0, 1)
        inputs = torch.sigmoid(inputs)
        
        # Flatten the tensors to calculate Dice Loss
        inputs_flat = inputs.view(-1)
        if targets.shape[1] == 3:  # RGB 채널 체크
            targets = targets[:, 0, :, :].unsqueeze(1)  # 첫 번째 채널을 선택하고 채널 차원을 유지
        targets_flat = targets.view(-1)
        
        # Dice coefficient
        intersection = (inputs_flat * targets_flat).sum()
        dice_coefficient = (2. * intersection + smooth) / (inputs_flat.sum() + targets_flat.sum() + smooth)
        dice_loss = 1 - dice_coefficient
        
        # Binary Cross Entropy Loss
        bce_loss = F.binary_cross_entropy(inputs_flat, targets_flat, reduction='mean')
        
        # Combined loss
        combined_loss = bce_loss + dice_loss
        
        return combined_loss


In [48]:
# CUDA 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNetWithEfficientNetB4(num_classes=1).to(device)
criterion = DiceBCELoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)



In [49]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import os
from datetime import datetime

def dice_coefficient(pred, target):
    smooth = 1.
    pred = torch.sigmoid(pred)
    pred = (pred > 0.5).float()
    if target.shape[1] == 3:  # RGB 채널 체크
        target = target[:, 0, :, :].unsqueeze(1)  # 첫 번째 채널을 선택하고 채널 차원을 유지
    intersection = (pred * target).sum(dim=(1, 2, 3))
    dice = (2. * intersection + smooth) / (pred.sum(dim=(1, 2, 3)) + target.sum(dim=(1, 2, 3)) + smooth)
    return dice.mean()


def train_epoch(model, loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    dice_scores = []
    for images, masks in loader:
        images = images.to(device)
        masks = masks.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        dice_score = dice_coefficient(outputs, masks)
        dice_scores.append(dice_score.item())
    avg_loss = running_loss / len(loader)
    avg_dice = np.mean(dice_scores)
    return avg_loss, avg_dice

def validate_epoch(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    dice_scores = []
    with torch.no_grad():
        for images, masks in loader:
            images = images.to(device)
            masks = masks.to(device)
            outputs = model(images)
            loss = criterion(outputs, masks)
            dice_score = dice_coefficient(outputs, masks)
            dice_scores.append(dice_score.item())
            running_loss += loss.item()
    avg_loss = running_loss / len(loader)
    avg_dice = np.mean(dice_scores)
    return avg_loss, avg_dice

# 로깅 준비
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_name = f"UNet_EffiNet_{timestamp}"
summary_path = f"./summary/{model_name}.csv"
os.makedirs(os.path.dirname(summary_path), exist_ok=True)

best_dice = 0.0
results = []
num_epochs = 30

for epoch in range(num_epochs):
    train_loss, train_dice = train_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_dice = validate_epoch(model, val_loader, criterion, device)
    
    # 결과 저장
    results.append([epoch + 1, train_loss, train_dice, val_loss, val_dice])
    
    # 모델 저장
    if val_dice > best_dice:
        best_dice = val_dice
        checkpoint_filename = f"{model_name}_epoch_{epoch+1}_{timestamp}.pth"
        checkpoint_path = f"/home/club8080/Downloads/trained_model/{checkpoint_filename}"
        torch.save(model.state_dict(), checkpoint_path)
    
    # CSV 파일로 결과 저장
    results_df = pd.DataFrame(results, columns=['Epoch', 'Train Loss', 'Train Dice', 'Val Loss', 'Val Dice'])
    results_df.to_csv(summary_path, index=False)
    
    print(f"Epoch {epoch+1}: Train Loss = {train_loss}, Train Dice = {train_dice}, Val Loss = {val_loss}, Val Dice = {val_dice}")


RuntimeError: The size of tensor a (200704) must match the size of tensor b (802816) at non-singleton dimension 0