## Predict Test

In [1]:
import numpy as np
import torch
import torch.nn.functional as F
import sys
import os
import csv

from itertools import product
from tqdm import tqdm
from torch.utils.data import DataLoader
from transformers import AutoModelForSequenceClassification, AutoTokenizer

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), os.pardir)))

from NLP.etc_plugin import check_exists_cuda
from NLP.model_plugin import set_seed, prepare_data_and_categories, \
    define_datasets, reshape_to_1d, show_test_evaluation, MODEL_NAME

In [3]:
def aspect_evaluate_tests(device, main_category, test_aspect_klue_sets):
    length = len(test_aspect_klue_sets)

    infers = [[] for i in range(length)]
    infer_labels = [[] for i in range(length)]
    
    for i in range(length):
        print("=============")
        print(i+1, ") Aspect Test ...")
        BEST_MODEL_NAME = f'./NLP/{main_category}/model_aspect_' + str(i) + "_best"
        
        best_model = AutoModelForSequenceClassification.from_pretrained(BEST_MODEL_NAME)
        best_model.to(device) # model을 GPU로 이동
        
        dataloader = DataLoader(test_aspect_klue_sets[i], batch_size=4, shuffle=False)
    
        best_model.eval()
        output_pred = []
        output_prob = []
        labels = []
    
        for data in tqdm(dataloader):
            with torch.no_grad():
                outputs = best_model(
                    input_ids=data['input_ids'].to(device), # data를 GPU로 이동
                    attention_mask=data['attention_mask'].to(device),
                    token_type_ids=data['token_type_ids'].to(device)
                )
            logits = outputs[0]
            prob = F.softmax(logits, dim=-1).detach().cpu().numpy()
            logits = logits.detach().cpu().numpy()
            result = np.argmax(logits, axis=-1)
            labels.append(data['label'].tolist())
    
            output_pred.append(result)
            output_prob.append(prob)
    
        pred_answer, output_prob = np.concatenate(output_pred).tolist(), np.concatenate(output_prob, axis=0).tolist()
        
        infers[i].extend(pred_answer)
        infer_labels[i].extend(labels)

    return infers, infer_labels

In [4]:
def sentiment_evaluate_tests(device, main_category, test_sentiment_klue_sets):
    print("=============")
    print("Sentiment Test ...")
    BEST_MODEL_NAME = f'./NLP/{main_category}/model_sentiment_best'
    
    best_model = AutoModelForSequenceClassification.from_pretrained(BEST_MODEL_NAME)
    best_model.to(device)
    
    dataloader = DataLoader(test_sentiment_klue_sets, batch_size=4, shuffle=False)
    
    best_model.eval()
    output_pred = []
    output_prob = []
    labels = []
    
    for data in tqdm(dataloader):
        with torch.no_grad():
            outputs = best_model(
                input_ids=data['input_ids'].to(device),
                attention_mask=data['attention_mask'].to(device),
                token_type_ids=data['token_type_ids'].to(device)
            )
        logits = outputs[0]
        prob = F.softmax(logits, dim=-1).detach().cpu().numpy()
        logits = logits.detach().cpu().numpy()
        result = np.argmax(logits, axis=-1)
        labels.append(data['label'].tolist())
    
        output_pred.append(result)
        output_prob.append(prob)
    
    pred_answer, output_prob = np.concatenate(output_pred).tolist(), np.concatenate(output_prob, axis=0).tolist()
    
    infers = [pred_answer]
    infer_labels = [labels]

    return infers, infer_labels

## Save Prediction Results

In [6]:
def save_prediction_results(main_category, category_with_original_aspects \
                            , test_ASP_datas, test_ASP_labels, asp_infers \
                            , test_SEN_data, test_SEN_labels, sen_infers):
    """
    BEST 모델에 test 데이터셋을 넣어 예측한 값, 정답 및 리뷰 원문을 csv로 저장하는 함수
    """
    ### 속성(Aspect) ###
    asp_file_path = f'./NLP/{main_category}/'

    asp_data_list = [
        ['sentence', 'prediction', 'correct'], # Column Title
    ]

    for i in range(0, len(test_ASP_datas[0])):
        prediction_aspects = []
        correct_aspects = []
        for j in range(0, len(category_with_original_aspects)):
            if asp_infers[j][i]:
                prediction_aspects.append(category_with_original_aspects[j])
            if test_ASP_labels[j][i]:
                correct_aspects.append(category_with_original_aspects[j])
        asp_data_list.append([test_ASP_datas[0][i], prediction_aspects, correct_aspects])

    # Unnesting Prediction
    transformed_asp_prediction_data_list = [
        ['id', 'sentence', 'prediction'],
    ]

    for index, row in enumerate(asp_data_list[1:]):
        sentence, predictions, corrects = row
        for prediction in predictions:
            transformed_asp_prediction_data_list.append([
                index+1,
                sentence,
                prediction
            ])

    with open(asp_file_path+'test_asp_prediction_results.csv', mode="w", newline="", encoding="cp949") as file:
        writer = csv.writer(file)
    
        for row in transformed_asp_prediction_data_list:
            writer.writerow(row)

    # Unnesting Correct
    transformed_asp_correct_data_list = [
        ['id', 'sentence', 'correct'],
    ]

    for index, row in enumerate(asp_data_list[1:]):
        sentence, predictions, corrects = row
        for correct in corrects:
            transformed_asp_correct_data_list.append([
                index+1,
                sentence,
                correct
            ])

    with open(asp_file_path+'test_asp_correct_results.csv', mode="w", newline="", encoding="cp949") as file:
        writer = csv.writer(file)
    
        for row in transformed_asp_correct_data_list:
            writer.writerow(row)

    ### 감성(Sentiment) ###
    sen_file_path = f'./NLP/{main_category}/test_sen_results.csv'

    sen_data_list = [
        ['sentence', 'prediction', 'correct'], # Column Title
    ]

    for i in range(0, len(test_SEN_data)):
        sen_data_list.append([test_SEN_data[i], sen_infers[0][i], test_SEN_labels[i]])

    with open(sen_file_path, mode="w", newline="", encoding="cp949") as file:
        writer = csv.writer(file)
    
        for row in sen_data_list:
            writer.writerow(row)

## Main

In [None]:
if __name__ == '__main__':
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    check_exists_cuda(device)
    
    set_seed()
    main_categories = [
        '스킨케어',
        '헤어_바디케어',
        '메이크업_뷰티소품',
        '남성화장품'
    ]
    
    for main_category in main_categories:
        print(f"================={main_category}====================")
        trains, validations, tests, category_with_original_aspects = prepare_data_and_categories(main_category)
        
        tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
        
        test_ASP_datas, test_ASP_labels, test_SEN_data, test_SEN_labels = define_datasets(tests, main_category, category_with_original_aspects)
    
        # 1차원 변환, aspect는 리스트 반환하고 sentiment는 단일 반환
        test_aspect_klue_sets = reshape_to_1d('aspect', test_ASP_datas, test_ASP_labels, tokenizer)
        test_sentiment_klue_sets = reshape_to_1d('sentiment', test_SEN_data, test_SEN_labels, tokenizer)

        # Aspect model test
        asp_infers, asp_infer_labels = aspect_evaluate_tests(device, main_category, test_aspect_klue_sets)
        
        # Aspect test scores 출력
        show_test_evaluation('aspect', asp_infers, asp_infer_labels, category_with_original_aspects)
        
        # Sentiment model test
        sen_infers, sen_infer_labels = sentiment_evaluate_tests(device, main_category, test_sentiment_klue_sets)
           
        # Sentiment test scores 출력
        show_test_evaluation('sentiment', sen_infers, sen_infer_labels)
        
        print("================================================")

        # 테스트 결과 따로 저장
        save_prediction_results(main_category, category_with_original_aspects \
                                , test_ASP_datas, test_ASP_labels, asp_infers \
                                , test_SEN_data, test_SEN_labels, sen_infers)