# Predict

In [1]:
import numpy as np
import random
import os
import math
from itertools import product

from sklearn import preprocessing
from sklearn.preprocessing import MinMaxScaler


from glob import glob
import pandas as pd
import cv2
from tqdm.auto import tqdm
from PIL import Image
from pathlib import Path
import matplotlib.pyplot as plt


import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
# from torch.utils.tensorboard import SummaryWriter

from torch.autograd import Variable

import torchvision.models as models
from torchvision import transforms

import albumentations as A
import albumentations.pytorch

from sklearn.metrics import accuracy_score

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using {device} device")

Using cuda device


In [3]:
CFG = {
    'IMG_SIZE':256,
    'EPOCHS':50,
    'PATIENCE':10,
    'class':14
}

In [4]:
Train_path = '/home/lab17/jupyter_home/Data/product_image/Training/'
Valid_path = '/home/lab17/jupyter_home/Data/product_image/Validation/'
Test_path = '/home/lab17/jupyter_home/git/test_img/'
save_graph_path = './result/'
save_model_path = '/home/lab17/jupyter_home/saved_models/'

# Load Data

### train

In [5]:
def get_train_data(data_dir):
#     img_path_list = []
    label_list = []
    label_name_list = []
    
    image_path = os.path.join(data_dir, 'dessert')
    
    for product_name in os.listdir(image_path):
        product_path = os.path.join(image_path, product_name)
        if os.path.isdir(product_path):
#             get image path
#             img_path_list.extend(glob(os.path.join(product_path, '*.jpg')))
#             img_path_list.extend(glob(os.path.join(product_path, '*.png')))
            label = list(product_name[:5])
            name = product_name[6:]
            
            # get label
            label_list.append(''.join(label))
            label_name_list.append(name)
                
#     return img_path_list, label_list
    return label_list, label_name_list

In [6]:
label_list, label_name_list = get_train_data(Train_path)

In [7]:
# encoder
le = preprocessing.LabelEncoder()
targets = le.fit_transform(label_list)
print('--targets\n' , targets)

label_encoder = {key:val for key, val in zip(label_list, targets)}
print(label_encoder)

--targets
 [12 10  2  8  0  3 11  9  4  7 13  5  6  1]
{'55701': 12, '45661': 10, '35211': 2, '45659': 8, '25222': 0, '35584': 3, '55034': 11, '45660': 9, '35585': 4, '45658': 7, '55702': 13, '45030': 5, '45657': 6, '25228': 1}


In [8]:
# decoder
label_decoder = {v: k for k, v in label_encoder.items()}
label_name_decoder = {key : value for key, value in zip(label_list, label_name_list)}

### validation

In [9]:
def get_valid_data(data_dir):
    img_valid_list = []
    label_valid_list = []
    
    image_path = os.path.join(data_dir, 'dessert')
    
    for product_name in os.listdir(image_path):
        product_path = os.path.join(image_path, product_name)
        if os.path.isdir(product_path):
            # get image path
            img_valid_list.extend(glob(os.path.join(product_path, '*.jpg')))
            img_valid_list.extend(glob(os.path.join(product_path, '*.png')))
            label = list(product_name[:5])
            
            # get label
            label_valid_list.append(''.join(label))
                
    return img_valid_list, label_valid_list

In [10]:
def valid_data_blanced(img, label):
    x = []
    y = []
    
    for i in range(CFG['class']):
        _img = img[(i * 15): ((i + 1) * 15)]
        _label = label[i]
        
        for img_product in _img:
            x.append(img_product)
            y.append(_label)
            
    return x, y

In [11]:
img_valid_list, label_valid_list = get_valid_data(Valid_path)
x_valid, y_valid = valid_data_blanced(img_valid_list, label_valid_list)
len(label_valid_list)

14

In [12]:
le2 = preprocessing.LabelEncoder()
targets_y = le2.fit_transform(y_valid)
targets_y_t = torch.as_tensor(targets_y)
one_hot_valid_y = F.one_hot(targets_y_t)
one_hot_valid_y.shape

torch.Size([210, 14])

### test

In [13]:
def get_test_data(data_dir):
    img_path_list = []
    label_list = []
    
    image_path = data_dir
    
#     for product in os.listdir(image_path):

    # get image path
    img_path_list.extend(glob(os.path.join(image_path, '*.jpg')))
    img_path_list.extend(glob(os.path.join(image_path, '*.png')))
    label_list = [ip[len('/home/lab17/jupyter_home/git/test_img/'):-6] for ip in img_path_list]

    # get label
#     label_list.append(''.join(label))
                
    return img_path_list, label_list

In [14]:
test_img_path, test_label_list = get_test_data(Test_path)

In [15]:
# test data
test_x = test_img_path
# 레이블을 one-hot-vector로 변환
test_y = [label_encoder[key] for key in test_label_list]
test_targets = torch.as_tensor(test_y)
one_hot_test_y = F.one_hot(test_targets)

In [16]:
print([f'{i}, {y}' for i, y in zip(test_label_list, test_y)])

['55034, 11', '45661, 10', '25228, 1', '25222, 0', '45659, 8', '55701, 12', '45030, 5', '35211, 2', '45660, 9', '45659, 8', '35585, 4', '55702, 13', '25222, 0', '55702, 13', '35211, 2', '45030, 5', '45661, 10', '35584, 3', '25222, 0', '55701, 12', '35211, 2', '35211, 2', '55701, 12', '35584, 3', '45660, 9', '55702, 13', '25222, 0', '45657, 6', '45657, 6', '55701, 12', '45657, 6']


# Model

In [17]:
class AlbumentationsCustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, train_mode=True, transforms=None):
        self.transforms = transforms
        self.train_mode = train_mode
        self.img_path_list = img_path_list
        self.label_list = label_list

    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        # Get image data
        image = cv2.imread(img_path)
        
        # By default OpenCV uses BGR color space for color images,
        # so we need to convert the image to RGB color space.
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.train_mode:
#             image = image.astype(np.int16)
            augmented = self.transforms(image=image)
            image = augmented['image']
            label = self.label_list[index]
            return image, label
        else:
            augmented = self.transforms(image=image)
            image = augmented['image']
            label = self.label_list[index]
            return image, label
    
    def __len__(self):
        return len(self.img_path_list)
    
A_test_transform = albumentations.Compose([
                                    A.Resize(256, 256),
                                    A.Normalize(mean=(0.744859, 0.735139, 0.711357), std=(0.100712, 0.120692, 0.167998)),  
#                                     A.pytorch.transforms.ToTensor(),
                                    A.pytorch.transforms.ToTensorV2(transpose_mask=True),
                                ])

A_vali_dataset = AlbumentationsCustomDataset(x_valid, one_hot_valid_y, train_mode=True, transforms=A_test_transform)
A_vali_loader = DataLoader(A_vali_dataset, batch_size = 5, shuffle=False, num_workers=0, collate_fn=None)

A_test_dataset = AlbumentationsCustomDataset(test_x, one_hot_test_y, train_mode=False, transforms=A_test_transform)
A_test_loader = DataLoader(A_test_dataset, batch_size = 4, shuffle=False, num_workers=0, collate_fn=None)

In [18]:
class ResNet50(torch.nn.Module):
    def __init__(self):
        super(ResNet50, self).__init__()
        model = models.resnet50(pretrained=True)
        modules = list(model.children())[:-1]
        self.feature_extract = nn.Sequential(*modules)
        self.fc1 = nn.Linear(2048, 1000)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(1000,CFG['class'])
#         self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.feature_extract(x)
        # x = x.mean(dim=(-2, -1))
        # (batch, 2048, 4, 4)
        x = torch.squeeze(x)
        x = self.relu(self.fc1(x))
        out = self.fc2(x)
#         out = self.softmax(x)
        
        return out

In [19]:
class EfficientNetb4(torch.nn.Module):
    def __init__(self):
        super(EfficientNetb4, self).__init__()
        model = models.efficientnet_b4(pretrained=True)
        modules = list(model.children())[:-1]
        self.feature_extract = nn.Sequential(*modules)
        self.fc1 = nn.Linear(1792, 1000)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(1000, CFG['class'])
#         self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        x = self.feature_extract(x)
        # (batch, 1792, 1, 1)
        x = torch.squeeze(x)
        x = self.relu(self.fc1(x))
        out = self.fc2(x)
#         out = self.softmax(x)
        
        return out

In [20]:
class RegNet(torch.nn.Module):
    def __init__(self):
        super(RegNet, self).__init__()
        model = models.regnet_y_16gf(pretrained=True)
        modules = list(model.children())[:-1]
        self.feature_extract = nn.Sequential(*modules)
        self.fc1 = nn.Linear(3024, 1000)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(1000, CFG['class'])
#         self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        x = self.feature_extract(x)
        # (batch, 3024, 1, 1)
        x = torch.squeeze(x)
        x = self.relu(self.fc1(x))
        out = self.fc2(x)
#         out = self.softmax(x)
        
        return out

In [21]:
def score_function(real, pred):
    score = accuracy_score(real, pred)
    return score

# Inference

In [22]:
pred_ensemble = []
batch_size = 34

#---------
model_name = 'ResNet50'
model_lr = '0.0001'
model_optim = 'adam'
model_sch = 'CosineAnnealing' 
#---------

model_test = ResNet50().to(device)
model_test.load_state_dict(torch.load('/home/lab17/jupyter_home/saved_models/{}_{}_{}_{}_example.pth'.format(model_name, model_lr, model_optim, model_sch)))
model_test.eval()

criterion = torch.nn.CrossEntropyLoss()

# test_loss = []
# f_pred = []

# for img, label in tqdm(iter(A_vali_loader)):
#     img, label = img.float().to(device), label.float().to(device)
    
#     # Data -> Model -> Output
#     logit = model_test(img)
#     logit = torch.squeeze(logit)
    
#     # Calc loss
#     loss = criterion(logit, label)

#     test_loss.append(loss.item())
#     f_pred.extend(logit.argmax(1).detach().cpu().numpy().tolist())
    
# print('val loss :' ,np.mean(test_loss))
# print('val acc :', score_function(targets_y, f_pred))

test_loss = []
f_pred = []

for img, label in tqdm(iter(A_test_loader)):
    img, label = img.float().to(device), label.float().to(device)
    
    # Data -> Model -> Output
    logit = model_test(img)
    logit = torch.squeeze(logit)
    
    # Calc loss
    loss = criterion(logit, label)

    test_loss.append(loss.item())
    f_pred.extend(logit.argmax(1).detach().cpu().numpy().tolist())
    
print('test loss :' ,np.mean(test_loss))
print('test acc :', score_function(test_y, f_pred))

  0%|          | 0/8 [00:00<?, ?it/s]

test loss : 1.6851365715265274
test acc : 0.41935483870967744


- patience 너무커서 과적합,?

In [23]:
f_result = [label_name_decoder[label_decoder[result]] for result in f_pred]

In [24]:
print('test이미지 번호 | 정답 | 예측')
for img, res in zip(test_img_path, f_result):
    if label_name_decoder[img[-11:-6]]==res:
        print(f'o {img[-11:-4]}, {label_name_decoder[img[-11:-6]]}, {res}')
    else:
        print(f'x {img[-11:-4]}, {label_name_decoder[img[-11:-6]]}, {res}')

test이미지 번호 | 정답 | 예측
x 55034_1, 돌트로피칼666G, 돌황도666G
o 45661_1, 씨제이)쁘티첼(요거젤리블루베리), 씨제이)쁘티첼(요거젤리블루베리)
x 25228_1, 대만)파인애플케익184G, 대만)망고케익184g
x 25222_3, 대만)망고케익184g, 돌황도666G
x 45659_2, 씨제이)쁘티첼(요거젤리딸기), 돌황도666G
o 55701_3, 쁘띠첼요거젤리밀감, 쁘띠첼요거젤리밀감
o 45030_2, 돌황도666G, 돌황도666G
x 35211_2, 매일유업)데르뜨130G, 돌황도666G
x 45660_2, 씨제이)쁘티첼(요거젤리화이트코코), 씨제이)쁘티첼(요거젤리밀감)
x 45659_1, 씨제이)쁘티첼(요거젤리딸기), 대만)망고케익184g
o 35585_1, 매일데르뜨감귤90G, 매일데르뜨감귤90G
x 55702_2, 쁘띠첼요거젤리복숭아, 씨제이)쁘티첼(요거젤리복숭아)
x 25222_1, 대만)망고케익184g, 돌황도666G
x 55702_3, 쁘띠첼요거젤리복숭아, 씨제이)쁘티첼(요거젤리복숭아)
x 35211_1, 매일유업)데르뜨130G, 돌황도666G
o 45030_1, 돌황도666G, 돌황도666G
x 45661_2, 씨제이)쁘티첼(요거젤리블루베리), 돌황도666G
x 35584_2, 매일데르뜨파인애플90G, 돌황도666G
x 25222_4, 대만)망고케익184g, 돌황도666G
x 55701_2, 쁘띠첼요거젤리밀감, 씨제이)쁘티첼(요거젤리밀감)
x 35211_4, 매일유업)데르뜨130G, 씨제이)쁘티첼(요거젤리딸기)
o 35211_3, 매일유업)데르뜨130G, 매일유업)데르뜨130G
o 55701_1, 쁘띠첼요거젤리밀감, 쁘띠첼요거젤리밀감
o 35584_1, 매일데르뜨파인애플90G, 매일데르뜨파인애플90G
o 45660_1, 씨제이)쁘티첼(요거젤리화이트코코), 씨제이)쁘티첼(요거젤리화이트코코)
x 55702_1, 쁘띠첼요거젤리복숭아, 씨제이)쁘티첼(요거젤리복숭아)
o 25222_2, 대만)망고케익184g, 대만)망

# Inference2

In [25]:
# sorted(os.listdir('/home/lab17/jupyter_home/saved_models/'))
file_list = sorted(glob('/home/lab17/jupyter_home/saved_models/*.pth'))
print(len(file_list))
sorted(glob('/home/lab17/jupyter_home/saved_models/*.pth'))

33


['/home/lab17/jupyter_home/saved_models/EfficientNetb4_0.0001_Lamb_CosineAnnealing_example.pth',
 '/home/lab17/jupyter_home/saved_models/EfficientNetb4_0.0001_adam_CosineAnnealing_example.pth',
 '/home/lab17/jupyter_home/saved_models/EfficientNetb4_0.0001_nadam_CosineAnnealing_example.pth',
 '/home/lab17/jupyter_home/saved_models/EfficientNetb4_0.0001_rmsprop_CosineAnnealing_example.pth',
 '/home/lab17/jupyter_home/saved_models/EfficientNetb4_0.001_Lamb_CosineAnnealing_example.pth',
 '/home/lab17/jupyter_home/saved_models/EfficientNetb4_0.001_adam_CosineAnnealing_example.pth',
 '/home/lab17/jupyter_home/saved_models/EfficientNetb4_0.001_nadam_CosineAnnealing_example.pth',
 '/home/lab17/jupyter_home/saved_models/EfficientNetb4_0.001_rmsprop_CosineAnnealing_example.pth',
 '/home/lab17/jupyter_home/saved_models/EfficientNetb4_1e-05_rmsprop_CosineAnnealing_example.pth',
 '/home/lab17/jupyter_home/saved_models/RegNet_0.0001_Lamb_CosineAnnealing_example.pth',
 '/home/lab17/jupyter_home/saved

In [26]:
file_list=['/home/lab17/jupyter_home/saved_models/RegNet_0.001_rmsprop_CosineAnnealing_example.pth']

In [27]:
pred_ensemble = []
batch_size = 34

for m_path in file_list:
    if 'ResNet50' in m_path:
        model_test = ResNet50().to(device)
    elif 'EfficientNetb4' in m_path:
        model_test = EfficientNetb4().to(device)
    elif 'RegNet' in m_path:
        model_test = RegNet().to(device)    

    model_test.load_state_dict(torch.load(m_path))
    model_test.eval()

    criterion = torch.nn.CrossEntropyLoss()

    test_loss = []
    f_pred = []

#     for img, label in tqdm(iter(A_vali_loader)):
#         img, label = img.float().to(device), label.float().to(device)

#         # Data -> Model -> Output
#         logit = model_test(img)
#         logit = torch.squeeze(logit)

#         # Calc loss
#         loss = criterion(logit, label)

#         test_loss.append(loss.item())
#         f_pred.extend(logit.argmax(1).detach().cpu().numpy().tolist())

#     print('val loss :' ,np.mean(test_loss))
#     print('val acc :', score_function(targets_y, f_pred))

#     test_loss = []
#     f_pred = []

    for img, label in tqdm(iter(A_test_loader)):
        img, label = img.float().to(device), label.float().to(device)

        # Data -> Model -> Output
        logit = model_test(img)
        logit = torch.squeeze(logit)

        # Calc loss
        loss = criterion(logit, label)

        test_loss.append(loss.item())
        f_pred.extend(logit.argmax(1).detach().cpu().numpy().tolist())
        
    print(m_path[len('/home/lab17/jupyter_home/saved_models/'):])

    print('test loss :' ,np.mean(test_loss))
    print('test acc :', score_function(test_y, f_pred))

  0%|          | 0/8 [00:00<?, ?it/s]

RegNet_0.001_rmsprop_CosineAnnealing_example.pth
test loss : 4.247981650754809
test acc : 0.3870967741935484


In [28]:
# f_pred = []
# pred_prob = []

# image_data = Image.open('home/lab17/jupyter_home/Data/product_test/img.jpg')

# image_transform = transforms.Compose([
#     transforms.Resize(size=256),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.744859, 0.735139, 0.711357],
#                          std=[0.100712, 0.120692, 0.167998])
# ])

# x = image_transform(image_data)
# pred = model_test(x)
# pred_prob.extend(pred.detach().cpu().numpy())
# f_pred.extend(pred.argmax(1).detach().cpu().numpy().tolist())

# label_decoder = {val:key for key, val in zip(range(CFG['class']), sorted(label_list))}

# f_result = [label_decoder[result] for result in f_pred]

# print(f_result)