## 전처리(Preprocess), Vocab생성

In [1]:
#-*- coding:utf-8 -*-

from eunjeon import Mecab
from collections import Counter
import json

tagger = Mecab() #형태소분석기

def read_txt(path_to_file):
    txt_ls = []
    label_ls = []

    with open(path_to_file) as f:
        for i, line in enumerate(f.readlines()[1:]):
            label= line[:1]
            if label == '"':
                continue
                
            if len(line)>301: #gpu성능때문에 길이를 조정함, 성능에따라 조절하기
                txt = line[2:300]    
            elif len(line)>151:
                txt = line[2:150]
            else:
                txt = line[2:30]

            txt_ls.append(txt)
            label_ls.append(int(label))
    return txt_ls, label_ls

def remove_empty_review(X, Y):   # 비어있는 문장 제거
    empty_idx_ls = []

    for idx, review in enumerate(X):
        if len(review) == 0:
            empty_idx_ls.append(idx)

    empty_idx_ls = sorted(empty_idx_ls, reverse=True)

    for empty_idx in empty_idx_ls:
        del X[empty_idx], Y[empty_idx]

    return X, Y

def text2pos(text) : # 문장을 형태소 분석 토큰의 리스트로 변환
    rep_list = []
    for word, pos in tagger.pos(text) :
        rep = '{}/{}'.format(word, pos)
        rep_list.append(rep)

    return rep_list

def make_vocab(save_path, vocab_size = 60000) :   # Vocabulary 생성 
    print('Make Vocab ...')
    vocab = {'<PAD>':0, '<UNK>':1}
    x_train, _ = read_txt('/rate_train.txt') # Vocab은 학습데이터로만 구성!
    rep_counter = Counter()
    for text in x_train :
        for rep in text2pos(text) : 
            rep_counter[rep] += 1 # '단어/형태소' 태그 카운트

    of = open('/rep2cnt.txt', 'w')
    for rep, cnt in rep_counter.most_common() : # 가장 높은 빈도부터 순서대로
        if(len(vocab) < vocab_size) : # vocab_size가 넘지 않도록
            vocab[rep] = len(vocab)
        of.write('{}\t{}\n'.format(rep, cnt))

    print('Vocab Size : {}'.format(len(vocab)))
    print('Saving Vocab to {}...'.format(save_path))
    with open(save_path, 'w') as outfile:
        json.dump(vocab, outfile)
    # JSON >> dictionary를 저장할 수 있는 라이브러리

def convert_token_to_idx(vocab, tokens): # token 리스트를 index로 변환 (즉, 단어 표현을 index로 변환)
    idx = []
    for token in tokens :
        if(token not in vocab) : # vocab에 존재하지 않는 단어는 '<UNK>' index로!
            idx.append(vocab['<UNK>'])
        else :
            idx.append(vocab[token])
    return idx

def make_dataset(save_path, vocab_path) :
    # 데이터셋 생성
    with open(vocab_path) as json_file :
        vocab = json.load(json_file) # Vocab 불러오기

    all_data = {'train' : [], 'test' : []}
    for setname in ['train', 'test'] :
        print('{} processing...'.format(setname))
        text_list, answer_list = read_txt('/rate_{}.txt'.format(setname))
        text_list, answer_list = remove_empty_review(text_list, answer_list)

        for text, answer in zip(text_list, answer_list) :
            tokens = text2pos(text)
            idx = convert_token_to_idx(vocab, tokens)
            
            all_data[setname].append((text, tokens, idx, answer))

    print('#Train data : {}, #Test data : {}'.format(len(all_data['train']), len(all_data['test'])))
    with open(save_path, 'w') as outfile :
        json.dump(all_data, outfile)

if __name__ =='__main__' :
    vocab_path = '/vocab.json' # vocab 경로
    dataset_path = '/dataset.json' # dataset 경로
    make_vocab(vocab_path)
    make_dataset(dataset_path, vocab_path)


Make Vocab ...
Vocab Size : 60000
Saving Vocab to /vocab.json...
train processing...
test processing...
#Train data : 139879, #Test data : 6097


## Model (모델)

In [1]:
import torch.nn as nn
import torch
from torch.autograd import Variable
import torch.nn.functional as F
import random

class NewsClassification(nn.Module):
    def __init__(self, opts, vocab_size, padding_index=0) :
        super(NewsClassification, self).__init__()
        # 1) embedding
        embed_size = opts['embed_size']
        hidden_size = opts['hidden_size']
        lstm_layers = opts['n_layers']
        self.embed = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embed_size,
            padding_idx=padding_index
        )

        # 2) LSTM
        self.dropout = opts['dropout']
        n_category = opts['n_category']
        bidirectional = opts['bidirectional'] # 양방향이면 True, 단방향이면 False
        self.lstm = nn.LSTM(embed_size, hidden_size, lstm_layers, batch_first=True, bidirectional=bidirectional)
        if(bidirectional) : # 양방향의 경우, 왼쪽, 오른쪽 방향 2개이기 때문에 출력값의 크기가 2배
            input_size = 2 * hidden_size 
        else :
            input_size = hidden_size

        # 3) Linear
        self.linear = nn.Linear(input_size, n_category) #fully connected layer
        
    def attention_net(self, lstm_output, final_state): # lstm_output - (batch size, maxlen, hidden*2), final_state - (1, batch size, hidden*2)
        hidden = final_state.squeeze(0) # (batch size, hidden*2)
        attn_weights = torch.bmm(lstm_output, hidden.unsqueeze(2)).squeeze(2) # (batch size, maxlen)
        soft_attn_weights = F.softmax(attn_weights, 1)
        new_hidden_state = torch.bmm(lstm_output.transpose(1, 2), soft_attn_weights.unsqueeze(2)).squeeze(2) # (batch size, hidden*2)

        return new_hidden_state
    
    def forward(self, sent_tensor, sent_mask):
        sent_emb = self.embed(sent_tensor)
        output, _ = self.lstm(sent_emb)  # output - (batch size, maxlen, hidden*2)
        
        attn_output = self.attention_net(output, output.transpose(0, 1)[-1])
        
        #어텐션 적용했을때
        logits = self.linear(attn_output)
        
        #안했을때
        #logits = self.linear(output.transpose(0, 1)[-1]) 
        return logits

## 학습 (Train)

In [None]:
#-*- coding:utf-8 -*-
import random
import json
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F

opts = {
    # model
    'embed_size' : 200,     # embedding 차원 (단어 -> Vector로 변환할 때 Vector의 크기!)
    'hidden_size' : 250,    # hidden 차원 (중간 연산과정의 Vector 크기)
    'n_layers' : 2,         # LSTM layer의 개수
    'dropout' : 0.6,        # dropout rate
    'bidirectional' : True, # 양방향 LSTM
    'n_category' : 6,       # 6가지로 분류

    # train
    'vocab_path' : '/vocab.json',   # Vocab 경로
    'data_path' : '/dataset.json',  # 데이터셋 경로
    'use_gpu' : True,                   # GPU 사용여부. False면, CPU로 학습
    'epochs' : 50,               # 학습횟수       
    'batch_size' : 5,           # 배치크기
    'learning_rate' : 0.001      # 학습률
}

class BatchGen: # Batch를 만들어 주는 Class
    def __init__(self, raw_data, batch_size, evaluation=False):
        self.batch_size = batch_size
        self.eval = evaluation
        data = raw_data
        # shuffle
        if not evaluation: # 학습데이터는 순서를 랜덤으로 Shuffle, 평가데이터는 X
            indices = list(range(len(data)))
            random.shuffle(indices)
            data = [data[i] for i in indices]
        data = [data[i:i + batch_size] for i in range(0, len(data), batch_size)]
        # batch size만큼 data를 나눔 [ [batch1], [batch2], [batch3], ...]
        self.data = data

    def __len__(self):
        return len(self.data) # batch 개수 >> 'len(batch_data)'로 사용!

    def __iter__(self):
        for batch in self.data: 
            #(0, 1, 2, 3), (text, tokens, idx, answer)
            text_list = [b[0] for b in batch]
            token_list = [b[1] for b in batch]
            batch_size = len(batch)
            max_len = max([len(b[2]) for b in batch]) # batch의 문장 중에 가장 긴 문장의 길이
            answer_tensor = torch.LongTensor([b[3] for b in batch]) # size : (batch_size)
            sentence_tensor = torch.LongTensor(batch_size, max_len).fill_(0) # size : (batch_size, max_len)
                                                                            # 0(PAD index)으로 초기화
            for b_idx in range(batch_size) :
                sent = batch[b_idx][2]
                sentence_tensor[b_idx][:len(sent)] = torch.LongTensor(sent)
                # sentence의 앞에서부터 채워넣음!
                # 예를 들어 batch_size가 4이고, max_len이 5이면,
                # 1,  2,  3,  4,  5,  >> 가장 긴 문장의 index
                # 6,  0,  0,  0,  0,  >> 0은 padding index
                # 7,  8,  9,  0,  0
                # 10, 11, 12, 13, 0
                # 14, 15, 0,  0,  0
                # 와 같은 tensor(행렬)이 생성

            sentence_mask = sentence_tensor.eq(0) # 0인 부분만 1

            yield [text_list, token_list, sentence_tensor, sentence_mask, answer_tensor]

def load_data(vocab_path, data_path) :
    with open(vocab_path) as json_file :
        vocab = json.load(json_file)
    with open(data_path) as json_file :
        all_data = json.load(json_file)

    return len(vocab), all_data['train'], all_data['test']


if __name__ == '__main__' :
    print('[Program Start...]')
    print(opts)
    vocab_size, train_data, test_data = load_data(opts['vocab_path'], opts['data_path'])
    

    model = NewsClassification(opts, vocab_size) # 모델
    if(opts['use_gpu']) : # gpu 사용한다면,
        model = model.cuda()

    optimizer = torch.optim.Adam(model.parameters(), opts['learning_rate'])
    criterion = nn.CrossEntropyLoss()

    best_acc = 0.
    for epoch in range(opts['epochs']) :
        model.train()
        train_loss = 0
        train_batch = BatchGen(train_data, opts['batch_size']) # 학습 batch

        
        
        for batch in train_batch :
            sent_tensor = Variable(batch[2]) # loss 계산을 위한 Variable
            sent_mask = Variable(batch[3])
            ans_tensor = Variable(batch[4])
            if(opts['use_gpu']) : # gpu 사용한다면,
                sent_tensor = sent_tensor.cuda()
                sent_mask = sent_mask.cuda()
                ans_tensor = ans_tensor.cuda()

            logits = model.forward(sent_tensor, sent_mask)

            loss = criterion(logits, ans_tensor) # loss 계산
            train_loss += loss.item()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
        print('Epoch : {}, Average Loss : {}'.format(epoch+1, train_loss/len(train_batch)))

        test_batch = BatchGen(test_data, opts['batch_size'], evaluation=True) # 평가 batch
        model.eval()
        predict_list = []
        n_correct = 0. # 정답 개수 Count
        for batch in test_batch :
            sent_tensor = Variable(batch[2])
            sent_mask = Variable(batch[3])
            ans_tensor = Variable(batch[4])
            if(opts['use_gpu']) :
                sent_tensor = sent_tensor.cuda()
                sent_mask = sent_mask.cuda()
                ans_tensor = ans_tensor.cuda()

            logits = model.forward(sent_tensor, sent_mask)
            predict = F.softmax(logits, dim=1).argmax(dim=1) # 가장 큰 index 선택

            n_correct += (predict == ans_tensor.long()).sum().item() 

        acc = n_correct / len(test_data) * 100.

        if(acc > best_acc) :
            best_acc = acc
            print('Test Accuracy : {} [new best acc]'.format(acc))
        else :
            print('Test Accuracy : {}'.format(acc))
        
        # 학습이 끝난 신경망 모델 저장
        #params = model.state_dict()  
        #torch.save(params, "model_" + str(epoch) + ".prm", pickle_protocol = 4)