In [1]:
import autopep8
import isort
from yapf.yapflib.yapf_api import FormatCode


# @register_cell_magic
def fmt(line, cell):
   """
   My formatter cell magic comannd.
   Please install autopep8, isort and yapf before using this magic command.
   !pip install autopep8 isort yapf
   """
   ret = isort.code(cell)
   ret = autopep8.fix_code(ret)
   print(FormatCode(ret, style_config='pep8')[0])


 ## Import


In [2]:
import copy
import glob
import os
import random

import albumentations as A
import cv2
import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision as tv
from albumentations.pytorch import ToTensorV2
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score
from sklearn.model_selection import train_test_split
from torch.optim import lr_scheduler

from pytorchtools import EarlyStopping

In [3]:
random_seed = 3141

np.random.seed(random_seed)
torch.manual_seed(random_seed)
if torch.cuda.device_count() > 1:
    torch.cuda.manual_seed_all(random_seed) # for multiple GPU
else:
    torch.cuda.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

 # 1.Confirm dataset

In [4]:
img_paths = []  # Image path
class_ids = []  # Class ID according to the image path
class_names = []  # Class name

target = "mix" #Target includes ham, chicken, mix

img_dirs = sorted(glob.glob('C:/Users/eugene/Desktop/research_eugene/annotation/rename/' + target + '/train/**'))
val_dirs = sorted(glob.glob('C:/Users/eugene/Desktop/research_eugene/annotation/rename/' + target + '/val/**'))
test_dirs = sorted(glob.glob('C:/Users/eugene/Desktop/research_eugene/annotation/rename/' + target + '/test/**'))


for class_id, img_dir in enumerate(img_dirs):

    class_names.append(img_dir.split(os.sep)[-1])

    tmp_img_paths = sorted(glob.glob(f'{img_dir}/*'))
    print(class_names[-1], len(tmp_img_paths))
    for img_path in tmp_img_paths:
        img_paths.append(img_path)
        class_ids.append(class_id)

print('Total', len(img_paths))



contact 3598
no_contact 5241
Total 8839


 # 2.Transform

In [6]:
from torchvision.transforms import ToTensor, CenterCrop, Compose

train_transform = Compose([
    ToTensor(), 
    CenterCrop([360, 360]),

])


val_transform = Compose([
    ToTensor(), 
    CenterCrop([360, 360]),
    
])


test_transform = Compose([
    ToTensor(), 
    CenterCrop([360, 360]),
    
])


In [7]:
from torchvision.datasets import ImageFolder



img_dirs = 'C:/Users/eugene/Desktop/research_eugene/annotation/rename/' + target + '/train/'
val_dirs = 'C:/Users/eugene/Desktop/research_eugene/annotation/rename/' + target + '/val/'
test_dirs = 'C:/Users/eugene/Desktop/research_eugene/annotation/rename/' + target + '/test/'


train_dataset = ImageFolder(
    root=img_dirs,
    transform=train_transform
)

val_dataset = ImageFolder(
    root=val_dirs,
    transform=val_transform
)


test_dataset = ImageFolder(
    root=test_dirs,
    transform=val_transform
)

 # 3.CNN model

In [9]:
def def_model():
    """CNNモデルを定義し返す関数

    Returns
    -------
    model : torchvision.models
        CNNモデル
    """

    # for efficientnet_b0
    # model = tv.models.efficientnet_b0(pretrained=True)
    # model.classifier._modules['1'] = torch.nn.Linear(1280, 2) 


    # for efficientnet_b4
    model = tv.models.efficientnet_b4(pretrained=False) 
    model.classifier._modules['1'] = torch.nn.Linear(1792, 2) 

    

    if torch.cuda.is_available():
        model = model.cuda()
    return model


 # 4.Train model & Test model

In [None]:
early_stopping = EarlyStopping(patience=10, verbose=True, delta=0.005)

In [12]:
import torchvision
from torchvision import utils as vutils
 
 
def save_image_tensor(input_tensor: torch.Tensor, filename):
    """
    save tensor as image
    :param input_tensor: the tensor to be saved
    :param filename: name to be saved
    """
    assert (len(input_tensor.shape) == 4 and input_tensor.shape[0] == 1)
    # copy
    input_tensor = input_tensor.clone().detach()
    # to cpu
    input_tensor = input_tensor.to(torch.device('cpu'))
    vutils.save_image(input_tensor, filename)

def train_model(model, loader, criterion, optimizer, pla_lr_scheduler):
    model.train()
    train_loss = 0.0
    for data in loader:
        inputs, labels = data

        if torch.cuda.is_available():
            inputs = inputs.cuda()
            labels = labels.cuda()

        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        outputs = torch.nn.functional.softmax(outputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    valid_loss = (train_loss / len(loader))      
    pla_lr_scheduler.step(valid_loss)

    print(f"Training loss: {train_loss / len(loader):.6f}")
   
    return model


def test_model(model, loader, criterion):
    model.eval()
    test_loss = 0.0
    all_labels = []
    all_predicted = []
    error_path = 'C:/Users/eugene/Desktop/research_eugene/error_img/'

    with torch.no_grad():
        for data in loader:
            inputs, labels = data

            all_labels.append(labels.numpy())

            if torch.cuda.is_available():
                inputs = inputs.cuda()
                labels = labels.cuda()

            # model prediction
            outputs = model(inputs)
            outputs = torch.nn.functional.softmax(outputs)
            test_loss += criterion(outputs, labels)
            _, predicted = torch.max(outputs.data, 1) 
            if torch.cuda.is_available():
                predicted = predicted.cpu()
            all_predicted.append(predicted.numpy())

    # Evaluation
    labels = all_labels[0]
    predicted = all_predicted[0]
    for idx in range(1, len(all_labels)):
        labels = np.hstack((labels, all_labels[idx]))
        predicted = np.hstack((predicted, all_predicted[idx]))

    accuracy = accuracy_score(labels, predicted)
    f1 = f1_score(labels, predicted, average='macro')
    cm = confusion_matrix(labels, predicted)

    # early_stopping needs the validation loss to check if it has decresed, 
    # and if it has, it will make a checkpoint of the current model
    early_stopping(test_loss/len(loader), model)
    
    
    print(
        f"Accuracy: {accuracy:.3f}, F1: {f1:.3f}, Loss: {test_loss/len(loader):.6f}"
    )
    print("Confusion matrix")
    print(cm)

 # 5.Learning


In [None]:
num_epochs = 200
batch_size = 16
lr = 1e-3



train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)

val_loader = torch.utils.data.DataLoader(val_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(test_dataset,
                                           batch_size=batch_size,
                                           shuffle=False)


model = def_model().cuda()
criterion = torch.nn.CrossEntropyLoss().cuda() 
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

pla_lr_scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.5, patience=3, verbose=True)


for epoch in range(num_epochs):


    print(f"Epoch: {epoch+1}")
    model = train_model(model, train_loader, criterion, optimizer, pla_lr_scheduler = pla_lr_scheduler)
    test_model(model, val_loader, criterion)
    torch.save(model.state_dict(), 'C:/Users/eugene/Desktop/research_eugene/model/model' + str(epoch+1) +'.pth')
    
  

    
    if early_stopping.early_stop:
        print("Early stopping")
        break

    
print("Training finished")

test_model(model, test_loader, criterion)

