In [1]:
import os

os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"  # Arrange GPU devices starting from 0
os.environ["CUDA_VISIBLE_DEVICES"]= "1"  # Set the GPU 1 to use

from torch import Tensor
import torch
import torch.nn as nn
from torch.nn import Transformer
import math
#여기까지 pytorch 기본 아래로는 따로 추가
from torch.utils.data import Dataset, DataLoader
from konlpy.tag import Okt
from tokenizers.implementations import SentencePieceBPETokenizer
#from pykospacing import Spacing
import import_ipynb
import SetData
import mtDataset
import model_class as mc
from SentencepieceTokenizer import sentencepiece_tokenizer

importing Jupyter notebook from SetData.ipynb
importing Jupyter notebook from mtDataset.ipynb
importing Jupyter notebook from model_class.ipynb
importing Jupyter notebook from SentencepieceTokenizer.ipynb


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def load_model(model, load_path, model_name, device):
    # state_dict를 불러옵니다.
    model.load_state_dict(torch.load(load_path + model_name + '_state_dict.pt'))  

    model = model.to(device)
    return model

In [3]:
#en 토크나이저
corpus = '/etc/jupyterhub/pythonex/Paper_2023/Data/source.txt'

sentencepiece_tokenizer = SentencePieceBPETokenizer(add_prefix_space = True)
sentencepiece_tokenizer.train(
    files = corpus,
    vocab_size = 15004,
    min_frequency = 2,
    special_tokens = ['<unk>', '<pad>', '<bos>', '<eos>']
)
vocab = sentencepiece_tokenizer.get_vocab()
print(len(vocab))
#print(sorted(vocab, key=lambda x: vocab[x]))

#sentencepiece_tokenizer.save_model('vocab', 'source_bpe')

#토크나이저를 통해 문장을 토큰으로 변환하는 함수
def morphs(sentence):
    return sentencepiece_tokenizer.encode(sentence).tokens

#kr 토크나이저
corpus = '/etc/jupyterhub/pythonex/Paper_2023/Data/target.txt'

sentencepiece_tokenizer_target = SentencePieceBPETokenizer(add_prefix_space = True)
sentencepiece_tokenizer_target.train(
    files = corpus,
    vocab_size = 15004,
    min_frequency = 2,
    special_tokens = ['<unk>', '<pad>', '<bos>', '<eos>']
)
vocab = sentencepiece_tokenizer_target.get_vocab()
print(len(vocab))
#print(sorted(vocab, key=lambda x: vocab[x]))

#sentencepiece_tokenizer_target.save_model('vocab', 'target_bpe')

#토크나이저를 통해 문장을 토큰으로 변환하는 함수
def morphs_target(sentence):
    return sentencepiece_tokenizer_target.encode(sentence).tokens


15004


15004





In [4]:
#DEVICE =  [torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), torch.device("cuda:1" if torch.cuda.is_available() else "cpu")]
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(DEVICE)

cuda


In [5]:
# 사전 내 특수 단어의 index를 각각 미리 정의합니다.
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3

# 데이터의 파일 정보
train_file_path = "/etc/jupyterhub/pythonex/Paper_2023/Data/train.txt"
valid_file_path = "/etc/jupyterhub/pythonex/Paper_2023/Data/valid.txt"

data_pairs = SetData.load_file(train_file_path)
train_dataset = mtDataset.MTDataset(data_pairs)
data_pairs = SetData.load_file(valid_file_path)
valid_dataset = mtDataset.MTDataset(data_pairs)

In [6]:
#데이터를 구하고 처리합니다.
from torchtext.vocab import build_vocab_from_iterator
from typing import Iterable, List

SRC_LANGUAGE = 'ko'
TGT_LANGUAGE = 'en'

token_transform = {}
vocab_transform = {}

#토크나이저 생성 후 정의한 토큰화 함수를 딕셔너리에 저장합니다.
token_transform[SRC_LANGUAGE] = morphs
token_transform[TGT_LANGUAGE] = morphs_target 

val_iter = SetData.make_iter(valid_file_path)

# 토큰 목록을 생성하기 위한 헬퍼(helper) 함수
def yield_tokens(data_iter: Iterable, language: str) -> List[str]:
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}
    
    for data_sample in data_iter:                    
        yield token_transform[language](data_sample[language_index[language]])

# 토큰들이 어휘집(vocab)에 인덱스 순서대로 잘 삽입되어 있는지 확인합니다
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:    
    # 학습용 데이터 반복자(iterator)
    train_iter = SetData.make_iter(train_file_path)
    # torchtext의 Vocab(어휘집) 객체 생성
    vocab_transform[ln] = build_vocab_from_iterator(yield_tokens(train_iter, ln),
                                                    min_freq=1,
                                                    specials=special_symbols,
                                                    special_first=True)

# UNK_IDX를 기본 인덱스로 설정합니다. 이 인덱스는 토큰을 찾지 못하는 경우에 반환됩니다.
# 만약 기본 인덱스를 설정하지 않으면 어휘집(Vocabulary)에서 토큰을 찾지 못하는 경우
# RuntimeError가 발생합니다.
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    vocab_transform[ln].set_default_index(UNK_IDX)

In [7]:
print(len(vocab_transform['ko']))
print(len(vocab_transform['en']))

14994
14961


In [8]:
print(vocab_transform['ko']['목적'])

2023


In [9]:
from torch.nn.utils.rnn import pad_sequence

# 순차적인 작업들을 하나로 묶는 헬퍼 함수
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

# BOS/EOS를 추가하고 입력 순서(sequence) 인덱스에 대한 텐서를 생성하는 함수
def tensor_transform(token_ids: List[int]):
    return torch.cat((torch.tensor([BOS_IDX]),
                      torch.tensor(token_ids),
                      torch.tensor([EOS_IDX])))

# 출발어(src)와 도착어(tgt) 원시 문자열들을 텐서 인덱스로 변환하는 변형(transform)
text_transform = {}
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    text_transform[ln] = sequential_transforms(token_transform[ln], # 토큰화(Tokenization)
                                               vocab_transform[ln], # 수치화(Numericalization)
                                               tensor_transform) # BOS/EOS를 추가하고 텐서를 생성

In [10]:
#greedy 탐색
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    src = src.to(DEVICE)
    src_mask = src_mask.to(DEVICE)

    memory = model.encode(src, src_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    for i in range(max_len-1):
        memory = memory.to(DEVICE)
        tgt_mask = (generate_square_subsequent_mask(ys.size(0))
                    .type(torch.bool)).to(DEVICE)
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        ys = torch.cat([ys,
                        torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
        if next_word == EOS_IDX:
            break
    return ys

# 입력 문장을 도착어로 번역하는 함수
def translate(model: torch.nn.Module, src_sentence: str):
    model.eval()
    src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)
    num_tokens = src.shape[0]
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    tgt_tokens = greedy_decode(
        model,  src, src_mask, max_len=num_tokens * 5, start_symbol=BOS_IDX).flatten()
    print(list(tgt_tokens.cpu().numpy()))
    text = " ".join(vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<bos>", "").replace("<eos>", "")    
    return checker(text)    

def checker(text):
    replace_text = text.replace("▁", "")
    return replace_text

In [11]:
#학습하는 동안, 모델이 예측하는 동안 정답(이후 출현하는 단어)을 보지 못하도록 하는 후속 단어 마스크(subsequent word mask)
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

#출발어와 도착어의 패딩(padding) 토큰들 또한 숨겨야 합니다. 아래에 두 가지 모두를 처리할 함수
def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

In [12]:
SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 512
NUM_ENCODER_LAYERS = 6
NUM_DECODER_LAYERS = 6

transformer = mc.Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
                                 NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)

#transformer, optimizer = load_model('models', '/mt-model-full-v8k', DEVICE)
transformer = load_model(transformer, 'models', '/mt-model-old-v15k_v15k', DEVICE)
transformer.eval()

Seq2SeqTransformer(
  (transformer): Transformer(
    (encoder): TransformerEncoder(
      (layers): ModuleList(
        (0): TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
          )
          (linear1): Linear(in_features=512, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (linear2): Linear(in_features=512, out_features=512, bias=True)
          (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.1, inplace=False)
          (dropout2): Dropout(p=0.1, inplace=False)
        )
        (1): TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
          )
          (linear1): Linear(in_feature

In [None]:
#transformer
#spacing = Spacing()
trans_text = translate(transformer, '제1조(목적) 이 법은 10ㆍ27법난과 관련하여 피해를 입은 자와 불교계의 명예를 회복시켜줌으로써 인권신장과 국민화합에 이바지함을 목적으로 한다.')
print(trans_text)

In [12]:
# 단어 순서 개념(notion)을 토큰 임베딩에 도입하기 위한 위치 인코딩(positional encoding)을 위한 헬퍼 모듈(Module)
class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float,
                 maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

# 입력 인덱스의 텐서를 해당하는 토큰 임베딩의 텐서로 변환하기 위한 헬퍼 모듈(Module)
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

# Seq2Seq 신경망
class Seq2SeqTransformer(nn.Module):
    def __init__(self,
                 num_encoder_layers: int,
                 num_decoder_layers: int,
                 emb_size: int,
                 nhead: int,
                 src_vocab_size: int,
                 tgt_vocab_size: int,
                 dim_feedforward: int = 512,
                 dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()
        self.transformer = Transformer(d_model=emb_size,
                                       nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout)
        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(
            emb_size, dropout=dropout)

    def forward(self,
                src: Tensor,
                trg: Tensor,
                src_mask: Tensor,
                tgt_mask: Tensor,
                src_padding_mask: Tensor,
                tgt_padding_mask: Tensor,
                memory_key_padding_mask: Tensor):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
                                src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        return self.generator(outs)

    def encode(self, src: Tensor, src_mask: Tensor):
        return self.transformer.encoder(self.positional_encoding(
                            self.src_tok_emb(src)), src_mask)

    def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
        return self.transformer.decoder(self.positional_encoding(
                          self.tgt_tok_emb(tgt)), memory,
                          tgt_mask)