In [1]:
from google.colab import drive
drive.mount("/gdrive", force_remount=True)

Mounted at /gdrive


In [2]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import (DataLoader, TensorDataset)
from sklearn.metrics import accuracy_score

class SpacingRNN(nn.Module):

    def __init__(self, config):
        super(SpacingRNN, self).__init__()

        # 전체 음절 개수
        self.eumjeol_vocab_size = config["eumjeol_vocab_size"]

        # 음절 임베딩 사이즈
        self.embedding_size = config["embedding_size"]

        # RNN 히든 사이즈
        self.hidden_size = config["hidden_size"]

        # 분류할 라벨의 개수
        self.number_of_labels = config["number_of_labels"]

        # 임베딩층: 랜덤 초기화 후 fine-tuning
        # 이곳을 채우세요.
        self.embedding = nn.Embedding(num_embeddings=self.eumjeol_vocab_size, embedding_dim = self.embedding_size, padding_idx=0)

        self.dropout = nn.Dropout(config["dropout"])

        # RNN layer
        # 이곳을 채우세요.
        self.bi_lstm = nn.LSTM(input_size = self.embedding_size, hidden_size = self.hidden_size, num_layers=3, batch_first=True,bidirectional=True)

        # fully_connected layer를 통하여 출력 크기를 number_of_labels에 맞춰줌
        # 이곳을 채우세요.
        self.linear = nn.Linear(in_features=self.hidden_size*2, out_features=self.number_of_labels)

    def forward(self, inputs):
        # (batch_size, max_length) -> (batch_size, max_length, embedding_size)
        eumjeol_inputs = self.embedding(inputs)

        # hidden_outputs, hidden_states = self.bi_gru(eumjeol_inputs)
        hidden_outputs, hidden_states = self.bi_lstm(eumjeol_inputs)

        # (batch_size, max_length, hidden_size*2)
        hidden_outputs = self.dropout(hidden_outputs)

        # (batch_size, max_length, hidden_size*2) -> (batch_size, max_length, number_of_labels)
        hypothesis = self.linear(hidden_outputs)

        return hypothesis

In [3]:
# 데이터를 읽어 리스트에 저장
def read_datas(file_path):
    with open(file_path, "r", encoding="utf8") as inFile:
        lines = inFile.readlines()
    datas = []
    for line in lines:
        # 입력 문장을 \t으로 분리
        pieces = line.strip().split("\t")
        # 입력 문자열을 음절 단위로 분리
        eumjeol_sequence, label_sequence = pieces[0].split(), pieces[1].split()
        datas.append((eumjeol_sequence, label_sequence))
    return datas

# 데이터를 읽고 각각의 딕셔너리 생성
def read_vocab_data(eumjeol_vocab_data_path):
    label2idx, idx2label = {"<PAD>":0, "B":1, "I":2}, {0:"<PAD>", 1:"B", 2:"I"}
    eumjeol2idx, idx2eumjeol = {}, {}

    with open(eumjeol_vocab_data_path, "r", encoding="utf8") as inFile:
        lines = inFile.readlines()

    for line in lines:
        eumjeol = line.strip()
        eumjeol2idx[eumjeol] = len(eumjeol2idx)
        idx2eumjeol[eumjeol2idx[eumjeol]] = eumjeol

    return eumjeol2idx, idx2eumjeol, label2idx, idx2label

def load_dataset(config):
    datas = read_datas(config["input_data"])
    eumjeol2idx, idx2eumjeol, label2idx, idx2label = read_vocab_data(config["eumjeol_vocab"])

    # 음절 데이터, 각 데이터의 실제 길이, 라벨 데이터를 담을 리스트
    eumjeol_features, eumjeol_feature_lengths, label_features = [], [], []

    for eumjeol_sequence, label_sequence in datas:
        eumjeol_feature = [eumjeol2idx[eumjeol] for eumjeol in eumjeol_sequence]
        label_feature = [label2idx[label] for label in label_sequence]

        # 음절 sequence의 실제 길이
        eumjeol_feature_length = len(eumjeol_feature)

        # 모든 입력 데이터를 고정된 길이로 맞춰주기 위한 padding 처리
        # 이곳을 채우세요.
        eumjeol_feature += [0] * (config["max_length"] - eumjeol_feature_length)
        label_feature += [0] * (config["max_length"] - eumjeol_feature_length)

        # 변환한 데이터를 각 리스트에 저장
        eumjeol_features.append(eumjeol_feature)
        eumjeol_feature_lengths.append(eumjeol_feature_length)
        label_features.append(label_feature)

    # 변환한 데이터를 Tensor 객체에 담아 반환
    eumjeol_features = torch.tensor(eumjeol_features, dtype=torch.long)
    eumjeol_feature_lengths = torch.tensor(eumjeol_feature_lengths, dtype=torch.long)
    label_features = torch.tensor(label_features, dtype=torch.long)

    return eumjeol_features, eumjeol_feature_lengths, label_features, eumjeol2idx, idx2eumjeol, label2idx, idx2label

In [4]:
def train(config):
    # RNN 모델 객체 생성
    model = SpacingRNN(config).cuda()

    # 데이터 읽기
    eumjeol_features, eumjeol_feature_lengths, label_features, eumjeol2idx, idx2eumjeol, label2idx, idx2label = load_dataset(config)

    # 학습 데이터를 batch 단위로 추출하기 위한 DataLoader 객체 생성
    train_features = TensorDataset(eumjeol_features, eumjeol_feature_lengths, label_features)
    train_dataloader = DataLoader(train_features, shuffle=True, batch_size=config["batch_size"])

    # 크로스엔트로피 비용 함수, padding은 계산하지 않음
    # 이곳을 채우세요.
    loss_func = nn.CrossEntropyLoss()


    # 모델 학습을 위한 optimizer
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(config["epoch"]):

        model.train()
        costs = []

        for step, batch in enumerate(train_dataloader):

            # 역전파 단계를 실행하기 전에 변화도를 0으로 변경
            optimizer.zero_grad()

            batch = tuple(t.cuda() for t in batch)

            # 음절 데이터, 각 데이터의 실제 길이, 라벨 데이터
            inputs, input_lengths, labels = batch[0], batch[1], batch[2]

            # 모델 출력 결과 얻어오기
            hypothesis = model(inputs)

            # hypothesis : (batch_size, max_length, number_of_labels) -> (batch_size*max_length, number_of_labels)
            # labels : (batch_size, max_length) -> (batch_size*max_length, )
            # 이곳을 채우세요.

            cost = loss_func(hypothesis.reshape(-1, len(label2idx)), labels.flatten())

            cost.backward()
            optimizer.step()

            # batch 단위 cost 값 저장
            costs.append(cost.data.item())

        torch.save(model.state_dict(), os.path.join(output_dir, "epoch_{0:d}.pt".format(epoch + 1)))

        # epoch 별로 평균 loss 값과 정확도 출력
        print("Average cost : {}".format(np.mean(costs)))

In [5]:
# 모델 출력 라벨 sequence와 정답 라벨 sequence를 기반으로
# 모델 출력 문장과 정답 문장 출력
def make_sentence(inputs, predicts, labels, idx2eumjeol, idx2label):

    predict_sentence, correct_sentence = "", ""

    for index in range(len(inputs)):
        eumjeol = idx2eumjeol[inputs[index]]
        correct_label = idx2label[labels[index]]
        predict_label = idx2label[predicts[index]]

        # 시작 음절인 경우 공백을 추가해줄 필요가 없음
        if (index == 0):
            predict_sentence += eumjeol
            correct_sentence += eumjeol
            continue

        # "B" 태그인 경우 어절의 시작 음절이므로 앞에 공백을 추가
        if (predict_label == "B"):
            predict_sentence += " "
        predict_sentence += eumjeol

        # "B" 태그인 경우 어절의 시작 음절이므로 앞에 공백을 추가
        if (correct_label == "B"):
            correct_sentence += " "
        correct_sentence += eumjeol

    return predict_sentence, correct_sentence

# 텐서를 리스트로 변환하는 함수
def tensor2list(input_tensor):
    return input_tensor.cpu().detach().numpy().tolist()

def test(config):
    # 데이터 읽기
    eumjeol_features, eumjeol_feature_lengths, label_features, eumjeol2idx, idx2eumjeol, label2idx, idx2label = load_dataset(config)

    # 평가 데이터를 batch 단위로 추출하기 위한 DataLoader 객체 생성
    test_features = TensorDataset(eumjeol_features, eumjeol_feature_lengths, label_features)
    test_dataloader = DataLoader(test_features, shuffle=False, batch_size=1)

    # RNN 모델 객체 생성
    model = SpacingRNN(config).cuda()
    # 사전학습한 모델 파일로부터 가중치 불러옴
    model.load_state_dict(torch.load(os.path.join(config["output_dir_path"], config["model_name"])))

    # 모델의 출력 결과와 실제 정답값을 담을 리스트
    total_hypothesis, total_labels = [], []

    for step, batch in enumerate(test_dataloader):

        model.eval()
        batch = tuple(t.cuda() for t in batch)

        # 음절 데이터, 각 데이터의 실제 길이, 라벨 데이터
        inputs, input_lengths, labels = batch[0], batch[1], batch[2]

        # 모델 평가
        hypothesis = model(inputs)

        # (batch_size, max_length, number_of_labels) -> (batch_size, max_length)
        hypothesis = torch.argmax(hypothesis, dim=-1)

        # batch_size가 1이기 때문
        input_length = tensor2list(input_lengths[0])
        input = tensor2list(inputs[0])[:input_length]
        label = tensor2list(labels[0])[:input_length]
        hypothesis = tensor2list(hypothesis[0])[:input_length]

        # 출력 결과와 정답을 리스트에 저장
        total_hypothesis += hypothesis
        total_labels += label

        if (step < 10):
            # 정답과 모델 출력 비교
            predict_sentence, correct_sentence = make_sentence(input, hypothesis, label, idx2eumjeol, idx2label)
            print("정답 : " + correct_sentence)
            print("출력 : " + predict_sentence)
            print()

    # 정확도 출력
    print("Accuracy : {}".format(accuracy_score(total_labels, total_hypothesis)))

In [6]:
if(__name__=="__main__"):
    root_dir = "/gdrive/MyDrive/colab/12주실습"
    output_dir = os.path.join(root_dir, "output")
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    config = {"mode": "train",
              "model_name":"epoch_{0:d}.pt".format(5),
              "input_data":os.path.join(root_dir, "train.txt"),
              "output_dir_path":output_dir,
              "eumjeol_vocab": os.path.join(root_dir, "eumjeol_vocab.txt"),
              "label_vocab": os.path.join(root_dir, "label_vocab.txt"),
              "eumjeol_vocab_size": 2458,
              "embedding_size": 100,
              "hidden_size": 100,
              "max_length": 920,
              "number_of_labels": 3,
              "epoch":10,
              "batch_size":64,
              "dropout":0.3
              }

    if(config["mode"] == "train"):
        train(config)
    else:
        test(config)

Average cost : 0.12184748251604129
Average cost : 0.02240825753328921
Average cost : 0.020814270535601847
Average cost : 0.013541539747833828
Average cost : 0.00865658468130646
Average cost : 0.007199218590991406
Average cost : 0.006224489782618571
Average cost : 0.005489708059997876
Average cost : 0.004838078766118122
Average cost : 0.004325315459407395


In [9]:
if(__name__=="__main__"):
    root_dir = "/gdrive/MyDrive/colab/12주실습"
    output_dir = os.path.join(root_dir, "output")
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    config = {"mode": "test",
              "model_name":"epoch_{0:d}.pt".format(5),
              "input_data":os.path.join(root_dir, "train.txt"),
              "output_dir_path":output_dir,
              "eumjeol_vocab": os.path.join(root_dir, "eumjeol_vocab.txt"),
              "label_vocab": os.path.join(root_dir, "label_vocab.txt"),
              "eumjeol_vocab_size": 2458,
              "embedding_size": 100,
              "hidden_size": 100,
              "max_length": 920,
              "number_of_labels": 3,
              "epoch":10,
              "batch_size":64,
              "dropout":0.3
              }

    if(config["mode"] == "train"):
        train(config)
    else:
        test(config)

정답 : 부인이 정성들여 키운 진이 독살되었다는 것은 연박사에게도 매우 가슴아픈 일일 것이다.
출력 : 부인이 정성들여키운진이 독살되었다는 것은 연박사에게도 매우가슴아픈 일일 것이다.

정답 : 이같은 그의 가난은 어려서만이 아니라 자란 뒤에도 늙을 때까지 줄기차게 계속되었다.
출력 : 이 같은 그의 가난은 어려서만이 아니라 자란 뒤에도 늙을 때까지 줄기차게 계속되었다.

정답 : 어느 물리학자의 머리 속.
출력 : 어느 물리학자의 머리속.

정답 : 말씨며 움직임이 세련되어 있었다.
출력 : 말씨며 움 직임이 세련되어 있었다.

정답 : "워싱턴과 뉴욕에서 차출한 인원까지 모두 12 명이네."
출력 : "워싱턴과 뉴욕에서 차출한 인원까지 모두 12명이네."

정답 : "그 사람을 아직도 사랑하나요?"
출력 : "그 사람을 아직도 사랑하나요?"

정답 : "나한테 걸린 건 행복한 거예요. 나를 훔치려는 사내들이 많아요. 무슨 말인지 알아요?"
출력 : "나한테 걸린 건행복한 거예요. 나를 훔치려는 사내들이 많아요. 무슨 말인지 알아요?"

정답 : 김광민은 집에 돌아와 있었으나 그 얘기를 바로 주남 마을과 녹동 마을에서 일어난 일이었기 때문에 자세히 들을 수 있었다.
출력 : 김광민은 집에 돌아와 있었으나 그 얘기를 바로 주남마을 과 녹동마을에서 일어난 일이었기 때문에 자세히 들을 수 있었다.

정답 : 미리 이야기해 두는 게 좋을 것 같아."
출력 : 미리이야기해두는 게 좋을 것 같아."

정답 : "부인의 입장에서 남편의 자살을 얘기하는 말투로는 너무 냉정하다는 생각 안 드세요?"
출력 : "부인의 입장에서 남편의 자살을 얘기하는 말투로는 너무냉정하다는 생각안드세요?"

Accuracy : 0.9201309448473932


#코드 개선
먼저 epoch = 10, num_layer = 3으로 변경 0.907 -> 0.92

bi-lstm 이용하여 test 했을 때 대부분의 띄어쓰기는 옳게 되었으나 모델은 "은, 는, 이, 가" 등의 단어가 주격 조사등의 조사로 사용될 때와 그렇지 않을 때의 구분을 옳게 하지 못하는 경우가 다수 존재

따라서 test 문장들의 띄어쓰기를 제거하고 이를 konlpy의 Kkma 형태소 분석기를 이용하여 아래와 같이 문장 성분을 출력 및 저장한다.

이후 test 문장들에 "은 는 이 가" 가 발견될 경우 해당 문장에서 조사로 쓰였는지 아닌지를 판단하여 lstm에서 출력된 문장들을 한번 더 수정해주는 방식을 아이디어로 생각 하였다.

In [None]:
from konlpy.tag import Kkma
from konlpy.utils import pprint

testD = read_datas("/gdrive/MyDrive/colab/12주실습/test.txt")
kkma = Kkma()

for i, content in enumerate(testD):
  
  pprint(kkma.pos(''.join(content[0])))
  # print(content[0])
  print("")


# pprint(kkma.pos())

[('그러', 'VV'),
 ('고', 'ECE'),
 ('보', 'VXV'),
 ('니', 'ECD'),
 ('경리', 'NNG'),
 ('는', 'JX'),
 ('윤', 'NNG'),
 ('보', 'XSN'),
 ('혜의', 'NNG'),
 ('근황', 'NNG'),
 ('에', 'JKM'),
 ('대하', 'VV'),
 ('어', 'ECS'),
 ('알', 'VV'),
 ('는', 'ETD'),
 ('것', 'NNB'),
 ('이', 'JKS'),
 ('없', 'VA'),
 ('어', 'ECD'),
 ('보이', 'VV'),
 ('었', 'EPT'),
 ('다', 'EFN'),
 ('.', 'SF')]

[('이제', 'NNG'),
 ('7', 'NR'),
 ('년', 'NNM'),
 ('대', 'NNG'),
 ('환란', 'NNG'),
 ('이', 'JKS'),
 ('눈앞', 'NNG'),
 ('에', 'JKM'),
 ('닥쳐오', 'VV'),
 ('았', 'EPT'),
 ('습니다', 'EFN'),
 ('.', 'SF')]

[('이', 'NNG'),
 ('관찰자', 'NNG'),
 ('에게', 'JKM'),
 ('장치', 'NNG'),
 ('(', 'SS'),
 ('예', 'NNG'),
 ('를', 'JKO'),
 ('들', 'VV'),
 ('면', 'ECE'),
 ('90', 'NR'),
 ('의', 'JKG'),
 ('경사각', 'NNG'),
 ('을', 'JKO'),
 ('가지', 'VV'),
 ('ㄴ', 'ETD'),
 ('두', 'MDN'),
 ('거울', 'NNG'),
 (')', 'SS'),
 ('를', 'JKO'),
 ('제공', 'NNG'),
 ('하', 'XSV'),
 ('여', 'ECS'),
 (',', 'SP'),
 ('같', 'VA'),
 ('은', 'ETD'),
 ('시각', 'NNG'),
 ('에', 'JKM'),
 ('A', 'OL'),
 ('와', 'JC'),
 ('B', 'OL'),
 ('두', 'MDN'),
 ('곳