## Import

In [1]:
import random
import pandas as pd
import numpy as np
import os
import cv2

from sklearn import preprocessing
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from tqdm import tqdm

import albumentations as A # fast image agumentation library
from albumentations.pytorch.transforms import ToTensorV2 # 이미지 형 변환
#import torchvision.models as models

from sklearn.metrics import f1_score, accuracy_score
from sklearn.metrics import classification_report

import warnings
warnings.filterwarnings(action='ignore')

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# gpu 사용하기 위한 코드
# cuda가 설치되어 있으면 gpu

## Hyperparameter Setting

In [3]:
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':10,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':4,
    'SEED':41
}
# 이미지 사이즈, 이폭, 학습률, 배치사이즈, 시드 고정

## Fixed RandomSeed

In [4]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

## Data Load & Train/Validation Split

In [5]:
all_df = pd.read_csv('./train_bert.csv')

In [6]:
train_df, val_df, _, _ = train_test_split(all_df, all_df['cat3'], test_size=0.2, random_state=CFG['SEED'])
# train set, validation set 구별

## Label-Encoding

In [7]:
le = preprocessing.LabelEncoder()
le.fit(train_df['cat3'].values)
# 카테고리형 데이터를 수치형으로 변환하는 labelencoder

In [8]:
train_df['cat3'] = le.transform(train_df['cat3'].values)
val_df['cat3'] = le.transform(val_df['cat3'].values)
# cat3에 labelencoder를 적용하기

## CustomDataset

In [9]:
import torchvision
from PIL import Image


# Dataset 생성
class CustomDataset(Dataset):
    def __init__(self, img_path_list, text_vectors, label_list, transforms, infer=False):
        self.img_path_list = img_path_list
        self.text_vectors = text_vectors
        self.label_list = label_list
        self.transforms = transforms
        self.infer = infer

        shape_aug = torchvision.transforms.RandomResizedCrop(
            (224, 224), scale=(0.7, 1), ratio=(0.75, 1.3333333333333333))
        flop = torchvision.transforms.RandomHorizontalFlip()
        br = torchvision.transforms.ColorJitter(brightness=0.5)   #밝기
        contrast = torchvision.transforms.ColorJitter(contrast=0.5)     #대비

        self.randomtransform = torchvision.transforms.Compose([
            shape_aug, flop, br, contrast])

        
    def __getitem__(self, index):
        # NL
        token_vec = self.text_vectors[index]

        # Image 읽기
        img_path = self.img_path_list[index]
        image = Image.open(img_path)
        #image = cv2.imread(img_path)
        if not self.infer:
            image = self.randomtransform(image)
        image = np.array(image)
        
        if self.transforms is not None:
            image = self.transforms(image=image)['image'] # transforms(=image augmentation) 적용

        # Label
        if self.infer: # infer == True, test_data로부터 label "결과 추출" 시 사용
            return image, token_vec
        else: # infer == False
            label = self.label_list[index] # dataframe에서 label 가져와 "학습" 시 사용
            return image, token_vec, label
        
    def __len__(self):
        return len(self.img_path_list)

In [10]:
train_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

test_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

## Train

In [11]:
def score_function(real, pred):
    return f1_score(real, pred, average="weighted")

def validation(model, criterion, val_loader, device):
    model.eval() # nn.Module에서 train time과 eval time에서 수행하는 다른 작업을 수행할 수 있도록 switching 하는 함수
    
    model_preds = [] # 예측값
    true_labels = [] # 실제값
    
    val_loss = []
    
    with torch.no_grad():
        for img, text, label in tqdm(iter(val_loader)): # val_loader에서 img, text, label 가져옴
            img = img.float().to(device)
            text = text.to(device)
            label = label.type(torch.LongTensor) # label type을 LongTensor로 형변환, 추가하여 에러 해결
            label = label.to(device)
            
            model_pred = model(img, text)
            
            loss = criterion(model_pred, label) # 예측값, 실제값으로 손실함수 적용 -> loss 추출
            
            val_loss.append(loss.item()) # loss 출력, val_loss에 저장
            
            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += label.detach().cpu().numpy().tolist()
        
    test_weighted_f1 = score_function(true_labels, model_preds) # 실제 라벨값들과 예측한 라벨값들에 대해 f1 점수 계산
    return np.mean(val_loss), test_weighted_f1 # 각각 val_loss, val_score에 적용됨

## Run!!

In [12]:
from efficientnet_pytorch import EfficientNet
class CustomModel(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(CustomModel, self).__init__()
        #self.resnet = resnet50(weights=ResNet50_Weights.DEFAULT).to(device)
        self.effnet = EfficientNet.from_pretrained('efficientnet-b0')      
        for params in self.effnet.parameters():
            params.requires_grad = True
        
        self.img_extract = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(1280, 128),
        )

        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(1024, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, num_classes)
        )

        self.nlp_extract = nn.Sequential(
            nn.Linear(4096, 1024), # 선형회귀. 4096개의 입력으로 2048개의 출력
        )

    def forward(self, img, text):
        img_feature = self.effnet.extract_features(img)
        img_feature = self.img_extract(img_feature)
        #text_feature = self.nlp_extract(text)

        #feature = img_feature + text_feature

        # feature = torch.cat([img_feature, text_feature], axis=1) # 2개 연결(4096 + 768)
        #output = self.classifier(feature) # classifier 적용
        #self.softmax(output)
        return img_feature

In [13]:
from transformers import BertModel

class CustomBertModel(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(CustomBertModel, self).__init__()
        self.model_bert = BertModel.from_pretrained("kykim/bert-kor-base")

        # Classifier
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Linear(768, num_classes),
        )

    def forward(self, img, text):
        x = self.model_bert(
            input_ids=text[:,0,0,:512],
            attention_mask=text[:,1,0,:512],
            token_type_ids=text[:,2,0,:512],
        ).last_hidden_state

        x = x.transpose(1,2)

        x = self.classifier(x)
        
        return x

In [14]:
bert_model = CustomBertModel(128)
bert_model.load_state_dict(torch.load('./bert_80percent.pt'))
bert_model.eval()

vision_model = CustomModel(128)
vision_model.load_state_dict(torch.load('./temp.pt'))
vision_model.eval()

Loaded pretrained weights for efficientnet-b0


CustomModel(
  (effnet): EfficientNet(
    (_conv_stem): Conv2dStaticSamePadding(
      3, 32, kernel_size=(3, 3), stride=(2, 2), bias=False
      (static_padding): ZeroPad2d((0, 1, 0, 1))
    )
    (_bn0): BatchNorm2d(32, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
    (_blocks): ModuleList(
      (0): MBConvBlock(
        (_depthwise_conv): Conv2dStaticSamePadding(
          32, 32, kernel_size=(3, 3), stride=[1, 1], groups=32, bias=False
          (static_padding): ZeroPad2d((1, 1, 1, 1))
        )
        (_bn1): BatchNorm2d(32, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
        (_se_reduce): Conv2dStaticSamePadding(
          32, 8, kernel_size=(1, 1), stride=(1, 1)
          (static_padding): Identity()
        )
        (_se_expand): Conv2dStaticSamePadding(
          8, 32, kernel_size=(1, 1), stride=(1, 1)
          (static_padding): Identity()
        )
        (_project_conv): Conv2dStaticSamePadding(

## Inference

In [15]:
test_df = pd.read_csv('./test.csv')

character_remove = ["<br>", "<br />", "*", "※", "<strong>", "<strong/>", "-",]

from transformers import BertTokenizerFast
tokenizer_bert = BertTokenizerFast.from_pretrained("kykim/bert-kor-base")

MAX_LEN = 512
test_vectors = []
character_remove = ["<br>", "<br />", "*", "※", "<strong>", "<strong/>", "-",]

for i, ov in enumerate(test_df['overview']):
    for char_remove in character_remove:
        ov = ov.replace(char_remove, "")

    ov = ov.split('.')
    ov = '. [SEP]'.join(ov)
    ov = ov.split('\n')
    ov = '. [SEP]'.join(ov)
    ov = "[CLS] " + ov

    inputs = tokenizer_bert(ov, return_tensors='pt')
    sz = len(inputs['input_ids'][0])
    if sz < 512:
        ov512 = ov + "[PAD]"*(MAX_LEN-sz)
        inputs = tokenizer_bert(ov512, return_tensors='pt')

    vec = np.array([
        np.array(inputs['input_ids'][:,:512]),
        np.array(inputs['attention_mask'][:,:512]),
        np.array(inputs['token_type_ids'][:,:512]),
    ])

    test_vectors.append(vec)

test_vectors = np.array(test_vectors)
print(test_vectors.shape)

val_vectors = []
for i, ov in enumerate(val_df['overview']):
    for char_remove in character_remove:
        ov = ov.replace(char_remove, "")
        
    ov = ov.split('.')
    ov = '. [SEP]'.join(ov)
    ov = ov.split('\n')
    ov = '. [SEP]'.join(ov)
    ov = "[CLS] " + ov

    inputs = tokenizer_bert(ov, return_tensors='pt')
    sz = len(inputs['input_ids'][0])
    if sz < 512:
        ov512 = ov + "[PAD]"*(MAX_LEN-sz)
        inputs = tokenizer_bert(ov512, return_tensors='pt')

    vec = np.array([
        np.array(inputs['input_ids'][:,:512]),
        np.array(inputs['attention_mask'][:,:512]),
        np.array(inputs['token_type_ids'][:,:512]),
    ])
    val_vectors.append(vec)

val_vectors = np.array(val_vectors)
print(val_vectors.shape)

Token indices sequence length is longer than the specified maximum sequence length for this model (553 > 512). Running this sequence through the model will result in indexing errors


(7280, 3, 1, 512)
(3398, 3, 1, 512)


In [16]:
test_dataset = CustomDataset(test_df['img_path'].values, test_vectors, None, test_transform, True)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

val_dataset = CustomDataset(val_df['img_path'].values, val_vectors, val_df['cat3'].values, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0) # 6

In [17]:
def inference_val(model, test_loader, deivce):
    model.to(device)
    model.eval()
    
    model_preds = np.array([])
    
    with torch.no_grad():
        for img, text, label in tqdm(iter(test_loader)):
            img = img.float().to(device)
            text = text.to(device)
            
            model_pred = model(img, text)
            
            if len(model_preds) == 0:
                model_preds=model_pred.detach().cpu().numpy()
            else:
                model_preds = np.append(model_preds, model_pred.detach().cpu().numpy(), axis=0)
    # img, text에 따른 예측값들을 model_preds 배열에 넣어 리턴
    return model_preds

In [18]:
b_preds = inference_val(bert_model, val_loader, device)
v_preds = inference_val(vision_model, val_loader, device)

  3%|▎         | 27/850 [00:10<05:27,  2.52it/s]


KeyboardInterrupt: 

100%|██████████| 850/850 [01:06<00:00, 12.82it/s]


In [None]:
weight_b = 0.79
weight_v = 0.21

preds_index = []
for b,v in zip(b_preds, v_preds):
    preds = weight_b * b + weight_v * v

    index = np.argmax(preds)
    preds_index.append(index)

test_weighted_f1 = score_function(val_dataset.label_list, preds_index) # 실제 라벨값들과 예측한 라벨값들에 대해 f1 점수 계산
print(test_weighted_f1)

0.8168537254921074


# TEST SET

In [19]:
def inference_test(model, test_loader, deivce):
    model.to(device)
    model.eval()
    
    model_preds = np.array([])
    
    with torch.no_grad():
        for img, text in tqdm(iter(test_loader)):
            img = img.float().to(device)
            text = text.to(device)
            
            model_pred = model(img, text)
            
            if len(model_preds) == 0:
                model_preds=model_pred.detach().cpu().numpy()
            else:
                model_preds = np.append(model_preds, model_pred.detach().cpu().numpy(), axis=0)
    # img, text에 따른 예측값들을 model_preds 배열에 넣어 리턴
    return model_preds

In [20]:
test_b_preds = inference_test(bert_model, test_loader, device)
test_v_preds = inference_test(vision_model, test_loader, device)

100%|██████████| 1820/1820 [08:34<00:00,  3.54it/s]
100%|██████████| 1820/1820 [01:58<00:00, 15.31it/s]


In [21]:
weight_b = 0.79
weight_v = 0.21

preds_index = []
for b,v in zip(test_b_preds, test_v_preds):
    preds = weight_b * b + weight_v * v

    index = np.argmax(preds)
    preds_index.append(index)

NameError: name 'weight_b' is not defined

## Submission

In [None]:
submit = pd.read_csv('./sample_submission.csv')

In [None]:
submit['cat3'] = le.inverse_transform(preds_index)

In [None]:
submit.to_csv('./submit_ensemble.csv', index=False)
# 제출 파일로 저장