# CNN2RNN 모델

### 모델 이전까지의 코드는 Baseline(SimpleCNN)과 동일

In [1]:
import numpy as np
import random
import os
import math

from glob import glob
import pandas as pd
import cv2
from tqdm.auto import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

from torch.autograd import Variable

import torchvision.models as models
from torchvision import transforms

In [2]:
# 파이토치 메모리 삭제 코드
torch.cuda.empty_cache()

In [3]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [4]:
CFG = {
    'IMG_SIZE':128,
    'EPOCHS':10,
    'LEARNING_RATE':2e-2,
    'BATCH_SIZE':16,
    'SEED':41
}

In [5]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

In [6]:
def get_train_data(data_dir):
    img_path_list = []
    label_list = []
    for case_name in os.listdir(data_dir):
        current_path = os.path.join(data_dir, case_name)
        if os.path.isdir(current_path):
            # get image path
            img_path_list.extend(glob(os.path.join(current_path, 'image', '*.jpg')))
            img_path_list.extend(glob(os.path.join(current_path, 'image', '*.png')))
            
            # get label
            label_df = pd.read_csv(current_path+'/label.csv')
            label_list.extend(label_df['leaf_weight'])
                
    return img_path_list, label_list

def get_test_data(data_dir):
    # get image path
    img_path_list = glob(os.path.join(data_dir, 'image', '*.jpg'))
    img_path_list.extend(glob(os.path.join(data_dir, 'image', '*.png')))
    img_path_list.sort(key=lambda x:int(x.split('/')[-1].split('.')[0]))
    return img_path_list

In [7]:
all_img_path, all_label = get_train_data('./data/open/train')
test_img_path = get_test_data('./data/open/test')

In [8]:
# Train : Validation = 0.8 : 0.2 Split
train_len = int(len(all_img_path)*0.8)

train_img_path = all_img_path[:train_len]
train_label = all_label[:train_len]

vali_img_path = all_img_path[train_len:]
vali_label = all_label[train_len:]

In [9]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, train_mode=True, transforms=None):
        self.transforms = transforms
        self.train_mode = train_mode
        self.img_path_list = img_path_list
        self.label_list = label_list

    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        # Get image data
        image = cv2.imread(img_path)
        if self.transforms is not None:
            image = self.transforms(image)

        if self.train_mode:
            label = self.label_list[index]
            return image, label
        else:
            return image
    
    def __len__(self):
        return len(self.img_path_list)

In [10]:
train_transform = transforms.Compose([
                    transforms.ToTensor(),
                    transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),
                    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
                    ])

test_transform = transforms.Compose([
                    transforms.ToTensor(),
                    transforms.Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])),
                    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
                    ])

In [11]:
train_dataset = CustomDataset(train_img_path, train_label, train_mode=True, transforms=train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

vali_dataset = CustomDataset(vali_img_path, vali_label, train_mode=True, transforms=test_transform)
vali_loader = DataLoader(vali_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

# 모델

In [12]:
# ----------------------- MODEL-------------------------------

# CNN을 사용한 이미지 Feature 추출 모델
class CNNFeatureExtracter(torch.nn.Module):
    def __init__(self):
        super(CNNFeatureExtracter, self).__init__()
        self.layer1 = torch.nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer2 = torch.nn.Sequential(
            nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer3 = torch.nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer4 = torch.nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=4, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        

    def forward(self, x):
        # Simple CNN Model (Batch, 3, 128, 128 -> Batch, 64, 7, 7)
        # (Batch, 3, 128, 128)
        x = self.layer1(x)
        # (Batch, 8, 64, 64)
        x = self.layer2(x)
        # (Batch, 16, 32, 32)
        x = self.layer3(x)
        # (Batch, 32, 16, 16)
        x = self.layer4(x)
        # (Batch, 64, 7, 7) -> Flatten (Batch, 64*7*7(=3136))
        x = torch.flatten(x, start_dim=1)
        # (Batch, 3136, 1) 
        out = x.unsqueeze(dim=2)
        return out

In [13]:
# 추출한 이미지의 Feature를 입력하여 RNN 형태의 LSTM 모델에 입력
class LSTM(nn.Module):

    def __init__(self, num_classes, input_size, hidden_size, num_layers, bidirectional):
        super(LSTM, self).__init__()
        
        self.num_classes = num_classes
        self.num_layers = num_layers
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bidirectional = bidirectional
        
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                            num_layers=num_layers, batch_first=True, bidirectional=bidirectional).to(device)
        
        self.fc = nn.Linear(hidden_size *2, num_classes)

    def forward(self, x):
        h_0 = Variable(torch.zeros(
            self.num_layers *2, x.size(0), self.hidden_size)).to(device)
        
        c_0 = Variable(torch.zeros(
            self.num_layers *2, x.size(0), self.hidden_size)).to(device)
        
        # Propagate input through LSTM
        # 길이 3136의 하나의 tensor가 입력
        # input_size = feature의 크기
        # 이미지 하나를 flatten으로 1차원으로 뽑고서 unsqueeze로 (3136, 1) 형태로 만들어서 입력
        ula, (h_out, _) = self.lstm(x, (h_0, c_0))
        
        h_out = h_out.view(-1, self.hidden_size*2)
        
        out = self.fc(h_out)
        
        return out

In [14]:
# 하나의 모델로 합침
class CNN2RNN(nn.Module):
    def __init__(self, num_classes, input_size, hidden_size, num_layers, bidirectional):
        super(CNN2RNN, self).__init__()
        self.cnn = CNNFeatureExtracter()
        self.rnn = LSTM(num_classes, input_size, hidden_size, num_layers, bidirectional)
        
    def forward(self, img):
        cnn_output = self.cnn(img)
        output = self.rnn(cnn_output)
        
        return output

In [15]:
def train(model, optimizer, train_loader, vali_loader, scheduler, device):
    model.to(device)
    

    # Loss Function
    criterion = nn.L1Loss().to(device)
    best_mae = 9999
    
    for epoch in range(1,CFG["EPOCHS"]+1):
        model.train()
        train_loss = []
        for img, label in tqdm(iter(train_loader)):
            img, label = img.float().to(device), label.float().to(device)
            
            optimizer.zero_grad()

            # Data -> Model -> Output
            logit = model(img)
            # Calc loss
            loss = criterion(logit.squeeze(0), label)

            # backpropagation
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())
            
        if scheduler is not None:
            scheduler.step()
            
        # Evaluation Validation set
        vali_mae = validation(model, vali_loader, criterion, device)
        
        print(f'Epoch [{epoch}] Train MAE : [{np.mean(train_loss):.5f}] Validation MAE : [{vali_mae:.5f}]\n')
        
        # Model Saved
        if best_mae > vali_mae:
            best_mae = vali_mae
            torch.save(model.state_dict(), './data/cnn2rnn_model.pth')
            print('Model Saved.')

In [16]:
def validation(model, vali_loader, criterion, device):
    model.eval() # Evaluation
    vali_loss = []
    with torch.no_grad():
        for img, label in tqdm(iter(vali_loader)):
            img, label = img.float().to(device), label.float().to(device)

            logit = model(img)
            loss = criterion(logit.squeeze(0), label)
            
            vali_loss.append(loss.item())

    vali_mae_loss = np.mean(vali_loss)
    return vali_mae_loss

In [17]:
model = CNN2RNN(
    num_classes = 1,
    num_layers = 2,
    hidden_size = 128,
    input_size = 1,
    bidirectional = True
).to(device)


optimizer = torch.optim.SGD(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = None

train(model, optimizer, train_loader, vali_loader, scheduler, device)

  0%|          | 0/80 [00:00<?, ?it/s]

  return F.l1_loss(input, target, reduction=self.reduction)
  return F.l1_loss(input, target, reduction=self.reduction)


  0%|          | 0/20 [00:00<?, ?it/s]

Epoch [1] Train MAE : [80.32989] Validation MAE : [77.61573]

Model Saved.


  return F.l1_loss(input, target, reduction=self.reduction)


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch [2] Train MAE : [78.44612] Validation MAE : [75.76459]

Model Saved.


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch [3] Train MAE : [76.18626] Validation MAE : [72.27522]

Model Saved.


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch [4] Train MAE : [72.65837] Validation MAE : [69.97212]

Model Saved.


  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch [5] Train MAE : [70.89802] Validation MAE : [70.02403]



  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch [6] Train MAE : [70.98118] Validation MAE : [70.14015]



  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch [7] Train MAE : [71.03716] Validation MAE : [69.99130]



  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch [8] Train MAE : [70.99818] Validation MAE : [70.18553]



  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch [9] Train MAE : [70.92582] Validation MAE : [70.08327]



  0%|          | 0/80 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch [10] Train MAE : [71.01691] Validation MAE : [70.05852]



In [18]:
def predict(model, test_loader, device):
    model.eval()
    model_pred = []
    with torch.no_grad():
        for img in tqdm(iter(test_loader)):
            img = img.float().to(device)

            pred_logit = model(img)
            pred_logit = pred_logit.squeeze(1).detach().cpu()

            model_pred.extend(pred_logit.tolist())
    return model_pred

In [22]:
test_dataset = CustomDataset(test_img_path, None, train_mode=False, transforms=test_transform)
test_loader = DataLoader(test_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

# Validation Score가 가장 뛰어난 모델을 불러옵니다.
checkpoint = torch.load('./data/cnn2rnn_model.pth')
model = CNN2RNN(num_classes = 1,
    num_layers = 2,
    hidden_size = 128,
    input_size = 1,
    bidirectional = True).to(device)
model.load_state_dict(checkpoint)

# Inference
preds = predict(model, test_loader, device)

  0%|          | 0/29 [00:00<?, ?it/s]

### 양방향 LSTM이기 때문에 결과값이 2배가 되서 나왔는데, 이를 추가적으로 처리하여 절반으로 줄여야 함.
### loss 값이 70정도로 충분하지 못하기 때문에 추가적인 처리를 진행하지 않고 해당 모델은 스킵하고 다음 모델에서 추가적인 처리 작업 추가 예정

In [27]:
len(preds)

920