## 필요한 패키지 및 기본 제공 함수 (Requirements)

In [None]:
!git clone https://github.com/HaeunYu/text_classification_2022.git

In [None]:
!pip install PyKomoran
!pip install nltk

In [None]:
import nltk
nltk.download('punkt')

In [None]:
import json
import torch
from tqdm import trange
import numpy as np
import random
from nltk import sent_tokenize
from PyKomoran import *

In [None]:
def load_data(path) :
  with open(path) as f :
    data = json.load(f)

  return data


def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
# end of set_seed

## 데이터 로드 및 전처리 (Preprocess)



1.   PyKomoran을 이용하여 train data와 test data를 형태소 분석 (품사 종류와 상관없이 모든 형태소를 이용함)
2.   형태소 분석한 train data를 이용하여 word2idx 구축
3.   train data의 label들을 이용하여 label2idx 구축
4.   word2idx와 label2idx를 이용하여 train data와 test data의 word와 label들을 index로 변환



In [None]:
train_path = "./text_classification_2022/newsdata_train.json"
test_path = "./text_classification_2022/newsdata_test.json"
label_list = ['IT', '경제', '문화', '스포츠', '정치']

train_data = load_data(train_path)
test_data = load_data(test_path)

komoran = Komoran("EXP")

In [None]:
# 실습 2-1
# word2idx, labels2idx, idx2labels 를 반환하는 함수를 완성해주세요.

def create_w2i_l2i_i2l(data, label_list, komoran) :

    # 이곳에 코드를 작성해주세요 #







    # ------------------------- #

    return word2idx, labels2idx, idx2labels

In [None]:
word2idx, labels2idx, idx2labels = create_w2i_l2i_i2l(train_data, label_list, komoran)

In [None]:
def convert_examples_to_features(data, word2idx, labels2idx, komoran, max_length=512):
    input_ids = list()
    labels = list()

    for doc in data:
        doc_ids = []
        sentences = sent_tokenize(doc["content"])
        for sentence in sentences:
            doc_ids.extend(
                [
                    word2idx[w if w in word2idx else '<UNK>']
                    for w in komoran.get_plain_text(sentence).split(' ')
                ]
            )

        if len(doc_ids) < max_length:
            doc_ids += [word2idx['<PAD>']] * (max_length - len(doc_ids))
            
        elif len(doc_ids) > max_length:
            doc_ids = doc_ids[:max_length]

        input_ids.append(doc_ids)
        labels.append(labels2idx[doc["topic"]])

    return input_ids, labels


def make_dataset(input_ids, labels):
    return torch.utils.data.TensorDataset(torch.tensor(input_ids, dtype=torch.long),
                                          torch.tensor(labels, dtype=torch.long))

In [None]:
train_inputs, train_labels = convert_examples_to_features(train_data, word2idx, labels2idx, komoran, max_length=512)
train_dataset = make_dataset(train_inputs, train_labels)

test_inputs, test_labels = convert_examples_to_features(test_data, word2idx, labels2idx, komoran, max_length=512)
test_dataset = make_dataset(test_inputs, test_labels)

## 모델 학습 (Train)

In [None]:
# torch.permute 사용 예시


In [None]:
# 실습 2-2
# torch.nn.Embedding 함수와 torch.nn.Conv1d 함수를 사용하여 모델을 구현해주세요

class CNN(torch.nn.Module):
    def __init__(self, vocab_size, output_dim):
        super(CNN, self).__init__()

        # 이곳에 코드를 작성해주세요 #
        self.word_embed = 
        self.conv_layer1 = 
        self.conv_layer2 = 
        self.conv_layer3 = 
        # ------------------------- #
        
        self.dropout = torch.nn.Dropout(0.1)
        self.fc = torch.nn.Linear(3 * 30, output_dim, bias=True)

# 실습 2-3
# 2-2에서 구현한 layer를 사용하여 모델의 forward 함수를 구현해주세요

    def forward(self, inputs):

        # 이곳에 코드를 작성해주세요 #
        embedded = 

        conv1 = 
        conv1 = torch.nn.functional.relu(conv1.max(1)[0]) # max pooling

        conv2 = 
        conv2 = torch.nn.functional.relu(conv2.max(1)[0]) # max pooling

        conv3 =
        conv3 = torch.nn.functional.relu(conv3.max(1)[0]) # max pooling

        # ------------------------- #

        x = torch.cat([conv1, conv2, conv3], dim=1)

        output = self.fc(self.dropout(x))

        return output

In [None]:
def train(model, train_dataset, args):

    set_seed(42)

    train_batch_size = args["train_batch_size"]
    num_train_epochs = args["num_train_epochs"]
    device = args["device"]
    learning_rate = args["learning_rate"]

    # optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Loss function
    criterion = torch.nn.CrossEntropyLoss()

    train_dataLoader = torch.utils.data.DataLoader(train_dataset, shuffle=True, batch_size=train_batch_size)

    train_iterator = trange(num_train_epochs, desc="Epoch")

    print("\n***** Running training *****")
    print("  Num examples = {}".format(len(train_dataset)))
    print("  Num Epochs = {}".format(num_train_epochs))
    print("  Train Batch size = {}".format(train_batch_size))
    print("  Device = ", device)

    model.to(device)
    model.train(True)
    model.zero_grad()
    for epoch in train_iterator:
        loss = 0
        for batch in train_dataLoader:
            input_vector = batch[0].to(device)
            label = batch[1].to(device)
            predict = model(input_vector)

            loss = criterion(predict, label)
            loss += loss.item()

            loss.backward()
            optimizer.step()
            model.zero_grad()

        if (epoch + 1) % 10 == 0:
            print("\n********** Train Result **********")
            print("  Epoch / Total Epoch : {} / {}".format(epoch + 1, num_train_epochs))
            print("  Loss : {:.4f}".format(loss))

    model.train(False)
# end of train

In [None]:
# 실습 2-4
# model 선언을 위한 input_dim과 output_dim 을 설정해주세요

vocab_size = # 이곳에 코드를 작성해주세요 #
output_dim = # 이곳에 코드를 작성해주세요 #


args = dict()
args["train_batch_size"] = 64
args["device"] = torch.device("cuda" if torch.cuda.is_available() else "cpu")
args["learning_rate"] = 0.0005
args["num_train_epochs"] = 500

model = CNN(vocab_size, output_dim)

train(model, train_dataset, args)

## 모델 학습 후 평가 (Evaluation)

In [None]:
def evaluate(model, test_dataset, args, news_num=900):
    test_dataLoader = torch.utils.data.DataLoader(test_dataset, shuffle=False, batch_size=1)

    device = args["device"]

    print("***** Running evaluation *****")
    print("  Num examples = {}".format(len(test_dataset)))
    print("  Test Batch size = 1")

    model.eval()
    pred = None
    label = None
    for batch in test_dataLoader:
        input_vector = batch[0].to(device)

        with torch.no_grad():
            predict = model(input_vector)

        if pred is None:
            pred = predict.detach().cpu().numpy()
            label = batch[1].numpy()
        else:
            pred = np.append(pred, predict.detach().cpu().numpy(), axis=0)
            label = np.append(label, batch[1].numpy(), axis=0)

    pred = np.argmax(pred, axis=1)

    news_num -= 800
    sample_pred = pred[news_num]
    sample_label = label[news_num]
    sample_result = {"pred": sample_pred, "label": sample_label}

    accuracy = (pred == label).sum() / 200

    return accuracy, sample_result

In [None]:
accuracy, sample_result = evaluate(model, test_dataset, args, news_num=900)

print("\n********** Total Test Result **********")
print("  Accuracy {}".format(accuracy))
print("  Sample pred : {}".format(sample_result["pred"]))
print("  Sample label : {}".format(sample_result["label"]))


with open("CNN_result.txt", "w") as fw :
  fw.write("********** Total Test Result **********")
  fw.write("\n  Accuracy {}".format(accuracy))
  fw.write("\n  Sample pred : {}".format(sample_result["pred"]))
  fw.write("\n  Sample label : {}".format(sample_result["label"]))

## 전체 코드

In [None]:
# main

train_path = "./text_classification_2022/newsdata_train.json"
test_path = "./text_classification_2022/newsdata_test.json"
label_list = ['IT', '경제', '문화', '스포츠', '정치']

train_data = load_data(train_path)
test_data = load_data(test_path)

komoran = Komoran("EXP")

args = dict()
args["train_batch_size"] = 64
args["device"] = torch.device("cuda" if torch.cuda.is_available() else "cpu")
args["learning_rate"] = 0.0005
args["num_train_epochs"] = 500

word2idx, labels2idx, idx2labels = create_w2i_l2i_i2l(train_data, label_list, komoran)

train_inputs, train_labels = convert_examples_to_features(train_data, word2idx, labels2idx, komoran, max_length=512)
train_dataset = make_dataset(train_inputs, train_labels)

test_inputs, test_labels = convert_examples_to_features(test_data, word2idx, labels2idx, komoran, max_length=512)
test_dataset = make_dataset(test_inputs, test_labels)

input_dim = len(word2idx)
output_dim = len(label_list)

model = CNN(input_dim, output_dim)

train(model, train_dataset, args)

accuracy, sample_result = evaluate(model, test_dataset, args, news_num=900)

print("\n********** Total Test Result **********")
print("  Accuracy {}".format(accuracy))
print("  Sample pred : {}".format(sample_result["pred"]))
print("  Sample label : {}".format(sample_result["label"]))