In [352]:
import torch
import torchvision
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.autograd import Variable
from torch import optim
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.modules.dropout import Dropout

import time


from PIL import Image
from tqdm.auto import tqdm

import random
import numpy as np
import os
import cv2

from sklearn.model_selection import train_test_split

from matplotlib import pyplot as plt

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import timm

import pandas as pd
import csv

import wandb

In [353]:
CFG = {
    'IMG_SIZE':512,
    'EPOCHS':10,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':32,
    'SEED':3
}

In [None]:
wandb.init(
    project="Final Project", 
    entity="aitech4_cv3",
    name='classification_resnet50',
    config = {
        "lr" : CFG['LEARNING_RATE'],
        "epoch" : CFG['EPOCHS'],
        "batch_size" : CFG['BATCH_SIZE'],
    }
    )

config = wandb.config

In [354]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

In [355]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [356]:
data_file = pd.read_csv('/opt/ml/data_csv/data.csv')
label = data_file['info']
data =  data_file.drop(['info',"Unnamed: 0"], axis=1)
X_train, y_train, X_test, y_test = train_test_split(data, label, test_size=0.2, random_state=CFG['SEED'])


In [357]:
train_transform = A.Compose([A.RandomResizedCrop(CFG['IMG_SIZE'],CFG['IMG_SIZE'],(0.5,0.5),(1,1)),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])


val_transform = A.Compose([
                            A.RandomResizedCrop(CFG['IMG_SIZE'],CFG['IMG_SIZE'],(0.5,0.5),(1,1)),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])



test_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

In [358]:
X_train = X_train.reset_index()
X_test = X_test.reset_index()
y_train = y_train.reset_index()
y_test = y_test.reset_index()

In [359]:
class CustomDataset(Dataset):
    def __init__(self, img_paths, labels, transforms=None):
        self.img_paths = img_paths['filepath']
        self.labels = labels['info']
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_paths[index]

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        
        if self.transforms is not None:
            image = self.transforms(image=image)['image']
            
        
        if self.labels is not None:
            label = self.labels[index]
            return image, label
        else:
            return image
    
    def __len__(self):
        return len(self.img_paths)

In [360]:
train_dataset = CustomDataset(X_train, X_test, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

val_dataset = CustomDataset(y_train, y_test, val_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [361]:


class BaseModel(nn.Module):
    def __init__(self, num_classes=1):
        super(BaseModel, self).__init__()
        self.backbone = timm.create_model(model_name='resnet34', pretrained=True)
        self.fc = nn.Linear(1000,num_classes)
        #self.classifier1 = nn.Linear(256, num_classes)
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.fc(x)
        x = nn.Sigmoid()(x)
        return x

In [362]:
def calculate_accuracy(y_pred, y):
    top_pred = torch.round(y_pred)
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    return acc

In [363]:
def validation(model, criterion, test_loader, device):
    model.eval()
    
    model_preds = []
    true_labels = []
    
    val_loss = []
    val_acc = []
    
    with torch.no_grad():
        for img, label in iter(test_loader):
            img, label = img.float().to(device), label.float().to(device).reshape(-1,1)
            
            model_pred = model(img)

            loss = criterion(model_pred, label)
            
            val_loss.append(loss.item())

            val_acc.append(calculate_accuracy(model_pred, label).item())
            
            true_labels += label.detach().cpu().numpy().tolist()

    return np.mean(val_loss), np.mean(val_acc)

In [364]:
def time_of_epoch(start, end):
    time_per_epoch = end - start
    time_per_epoch_min = int(time_per_epoch / 60)
    time_per_epoch_sec = int(time_per_epoch -(time_per_epoch_min*60))
    return time_per_epoch_min, time_per_epoch_sec

In [367]:
def train(model, optimizer, train_loader, test_loader, scheduler, device):
    model.to(device)

    criterion = nn.BCELoss().to(device)

    best_score = 0
    best_model = None

    for epoch in range(1, CFG["EPOCHS"]+1):
        model.train()
        start_time = time.monotonic()
        train_loss = []
        epoch_acc = 0
        for step,(img, label) in enumerate(train_loader): #tqdm(iter(train_loader)

            img, label = img.float().to(device), label.float().to(device)
            optimizer.zero_grad()
            label = label.view(-1,1)

            model_pred = model(img)

            acc = calculate_accuracy(model_pred, label)

            loss = criterion(model_pred, label)
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

            if (step + 1) % 5 == 0:
                print(f'Epoch [{epoch}], Step [{step+1}], Train Loss : [{round(loss.item(),4):.5f}] Train acc : [{acc:.5f}]')

        end_time = time.monotonic()
        epoch_min, epoch_sec = time_of_epoch(start_time, end_time)
        train_loss_m = np.mean(train_loss)
        epoch_acc += acc.item()
        val_loss, val_acc = validation(model, criterion, test_loader, device)

        print(f'Epoch [{epoch}], Train Loss : [{train_loss_m:.5f}] Val Loss : [{val_loss:.5f}] Val acc : [{val_acc:.5f}],Time : {epoch_min}m {epoch_sec}s')
        
        if scheduler is not None:
            scheduler.step()
            
        if best_score < val_acc:
            best_model = model
            best_score = val_acc
            print(f"save_best_pth EPOCH {epoch}")
            torch.save(model.state_dict(),"../../model_save_dir/best.pth")
        
    return best_model


In [368]:
model = BaseModel()
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = None
infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

Epoch [1], Step [5], Train Loss : [0.56700] Train acc : [0.71875]
Epoch [1], Step [10], Train Loss : [0.38140] Train acc : [0.81250]
Epoch [1], Step [15], Train Loss : [0.42520] Train acc : [0.84375]
Epoch [1], Step [20], Train Loss : [0.46230] Train acc : [0.71875]
Epoch [1], Step [25], Train Loss : [0.34590] Train acc : [0.84375]
Epoch [1], Train Loss : [0.50589] Val Loss : [1.10370] Val acc : [0.72356],Time : 0m 33s
save_best_pth EPOCH 1
Epoch [2], Step [5], Train Loss : [0.15340] Train acc : [0.93750]
Epoch [2], Step [10], Train Loss : [0.36660] Train acc : [0.84375]
Epoch [2], Step [15], Train Loss : [0.33620] Train acc : [0.87500]
Epoch [2], Step [20], Train Loss : [0.30120] Train acc : [0.81250]
Epoch [2], Step [25], Train Loss : [0.25290] Train acc : [0.90625]
Epoch [2], Train Loss : [0.33035] Val Loss : [0.53539] Val acc : [0.78846],Time : 0m 32s
save_best_pth EPOCH 2


In [369]:
test_img_paths = "테스트 이미지 경로"

In [370]:
class CustomTestDataset(Dataset):
    def __init__(self, img_paths, labels, transforms=None):
        self.img_paths = img_paths
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_paths

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        
        if self.transforms is not None:
            image = self.transforms(image=image)['image']
        return image
    
    def __len__(self):
        return len(self.img_paths)

In [380]:
test_dataset = CustomTestDataset(test_img_paths, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [392]:
def inference(model, img_path, device):
    model = BaseModel()
    model.load_state_dict(torch.load("/opt/ml/model_save_dir/best.pt", map_location=device))
    model = model.to(device)
    
    model_preds = []
    img_paths = []
    img_paths.append(img_path)
    
    model.eval()
    with torch.no_grad():
        for img in iter(test_loader):
            img = img.float().to(device)
            model_pred = model(img)
            model_preds.append(torch.round(model_pred).detach().cpu().numpy())
    
    print('Done.')
    return model_preds[0][0]

In [393]:
preds = inference(infer_model, test_img_paths, device)
print(preds)

Done.
[1.]
