In [None]:
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import timm
import pandas as pd
import random
import os
from tqdm import tqdm
import torchvision.transforms as T
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, Subset
from torch.utils.data import ConcatDataset
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau, OneCycleLR, CosineAnnealingWarmRestarts, StepLR
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import StratifiedKFold
import cv2

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_base_dir = '/data/ephemeral/home/data/train'
test_base_dir = '/data/ephemeral/home/data/test'


traindata_info_file = "/data/ephemeral/home/data/train.csv"

testdata_info_file = "/data/ephemeral/home/data/test.csv"

train_data = pd.read_csv(traindata_info_file)

test_data = pd.read_csv(testdata_info_file)

x = train_data['image_path']
y = train_data['target']

train_data.head(3)

In [None]:
class Train():
    def __init__(self, model, device, train_loader, val_loader, epochs, optimizer, criterion, scheduler, 
                 early_stop = False, patience_limit = None, best_val_loss = float('inf'), best_model = None):
        self.model = model
        self.device = device
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.epochs = epochs
        self.optimizer = optimizer
        self.criterion = criterion
        self.scheduler = scheduler
        self.early_stop = early_stop
        self.patience_limit = patience_limit
        self.best_val_loss = best_val_loss
        self.best_model = best_model

    def train(self):
        patience_check = 0
        self.model.to(self.device)
        for epoch in range(self.epochs):
            running_loss = 0.0

            # 모델 학습
            torch.cuda.empty_cache()
            self.model.train()
            for idx, (images, labels) in enumerate(tqdm(self.train_loader, desc=f"Epoch {epoch+1}/{self.epochs}")):
                images, labels = images.to(self.device), labels.to(self.device)
                
                outputs = self.model(images)
                
                loss = self.criterion(outputs, labels)
                
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                
                running_loss += loss.item()
            
            train_loss = running_loss / len(self.train_loader)

            # 모델 평가
            torch.cuda.empty_cache()
            self.model.eval()
            correct = 0
            total = 0
            running_val_loss = 0.0
            with torch.no_grad():
                for inputs, labels in self.val_loader:
                    inputs, labels = inputs.to(self.device), labels.to(self.device)
                    
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, labels)
                    running_val_loss += loss.item()
                    _, pred = torch.max(outputs, 1)
                    
                    total += labels.size(0)
                    correct += (pred == labels).sum().item()

                accuracy = 100 * correct / total
                val_loss = running_val_loss / len(self.val_loader)
                
            print(f'Epoch {epoch + 1}/{self.epochs}, Test_Loss: {train_loss:.4f}, Val_Loss: {val_loss:.4f}, Accuracy: {accuracy:.2f}%')

            # 조기 종료
            if self.early_stop and self.patience_limit is not None:
                if val_loss > self.best_val_loss:
                    patience_check += 1
                    if patience_check >= self.patience_limit:
                        break
                else: 
                    self.best_val_loss = val_loss
                    patience_check = 0
                    torch.save(self.model.state_dict(), '/data/ephemeral/home/Dongook/model_dw/res50d_e120.pt')

            self.scheduler.step(val_loss)

In [None]:
transform = A.Compose([
        A.Resize(224,224),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ])

In [None]:
def fill_white(img):
    img_np = np.array(img)

    # 각 채널에 대해 Canny 엣지 검출 적용
    edges_r = cv2.Canny(img_np[:, :, 0], 50, 150)
    edges_g = cv2.Canny(img_np[:, :, 1], 50, 150)
    edges_b = cv2.Canny(img_np[:, :, 2], 50, 150)

    # 세 채널의 엣지를 결합하여 하나의 이미지로 생성
    edges_combined = np.maximum(np.maximum(edges_r, edges_g), edges_b)

    # 모폴로지 연산을 위한 커널 생성
    kernel = np.ones((3, 3), np.uint8)  
    # 엣지 이미지를 닫기 위한 모폴로지 연산 적용
    closed_edges = cv2.morphologyEx(edges_combined, cv2.MORPH_CLOSE, kernel)

    filled_image_rgb = cv2.cvtColor(closed_edges, cv2.COLOR_GRAY2RGB)

    inverted_edges = cv2.bitwise_not(closed_edges)

    # 흰색 배경 이미지 생성
    filled_image_rgb = np.full_like(img_np, 255)  
    # 배경이 흰색인 상태에서 검은색 선을 포함한 inverted_edges를 복사
    filled_image_rgb[inverted_edges == 0] = [0, 0, 0] 

    transform = A.Compose([
        A.LongestMaxSize(256),
        A.PadIfNeeded(256, 256, border_mode=cv2.BORDER_CONSTANT, value=(255, 255, 255)),
        A.RandomCrop(224, 224,p=1.0),
        A.Affine(scale=(0.5, 1.5), p=0.5),
        A.CoarseDropout(max_holes=4, max_height=30, max_width=30, fill_value=255, p=0.5),
        A.HorizontalFlip(p=0.5),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ])

    transformed = transform(image=filled_image_rgb)
    
    return transformed['image']


In [None]:
class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, is_aug = False):
        self.image_paths = image_paths
        self.labels = labels
        self.is_aug = is_aug

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(os.path.join(train_base_dir, image_path)).convert('RGB')
        if not self.is_aug:
            image = np.array(image)
            image = transform(image=image)['image']
        else:
            image = fill_white(image)
        
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return image, label

In [None]:
x_train = []
y_train = []
for i in range(len(train_data)):
    x_train.append(os.path.join(train_base_dir, train_data['image_path'].iloc[i]))
    y_train.append(train_data['target'].iloc[i])

In [None]:
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size = 0.2, stratify= y_train, random_state = 42)

print(len(train_data))
print(len(x_train))
print(len(x_val))

In [None]:
train_dataset = CustomDataset(x_train, y_train, is_aug = False)
aug_dataset = CustomDataset(x_train, y_train, is_aug = True)
val_dataset = CustomDataset(x_val, y_val, is_aug = False)

print(len(train_dataset))
print(len(aug_dataset))
print(len(val_dataset))

dataset = ConcatDataset([train_dataset, aug_dataset])
print(len(dataset))

In [None]:
train_loader = DataLoader(dataset, 128, num_workers=4, shuffle=True)
val_loader = DataLoader(val_dataset, 128, shuffle=False)

In [None]:
model = timm.create_model('resnet50d', pretrained = True, num_classes = 500)


epochs = 40
criterion = nn.CrossEntropyLoss(label_smoothing = 0.1)
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9)

scheduler = StepLR(10, 0.1)
trainer = Train(model, device = device, train_loader = train_loader, val_loader = val_loader, epochs = epochs,
                    optimizer = optimizer, criterion = criterion, scheduler = scheduler, early_stop= True, patience_limit=5)
trainer.train()

In [None]:
b_model = timm.create_model('resnet50d', pretrained = True, num_classes = 500)

b_model.load_state_dict(torch.load('/data/ephemeral/home/Dongook/model_dw/res50d_e120.pt'))

In [None]:
class TestDataset(Dataset):
    def __init__(self, image_paths):
        self.image_paths = image_paths

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(os.path.join(test_base_dir, image_path)).convert('RGB')
        image = np.array(image)
        image = transform(image=image)['image']
        return image
test_dataset = TestDataset(list(test_data['image_path']))

In [None]:
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
print(len(test_loader))

In [None]:
b_model.to(device)
b_model.eval()
    
predictions = []
with torch.no_grad():  
    for images in tqdm(test_loader):
        images = images.to(device)
            
        logits = b_model(images)
        logits = F.softmax(logits, dim=1)
        preds = logits.argmax(dim=1)
            
        predictions.extend(preds.cpu().detach().numpy())

In [None]:
test_data1 = pd.read_csv(testdata_info_file)

test_data1['target'] = predictions
test_data1 = test_data1.reset_index().rename(columns={"index": "ID"})
test_data1

In [None]:
test_data1.to_csv("/data/ephemeral/home/Dongook/output/output_res50d.csv", index=False)
test_data1