In [None]:
from google.colab import drive
drive.mount("/gdrive", force_remount=True)

Mounted at /gdrive


In [None]:
"""
0. 선정한 분야 비율별로 section 분류
        "Game": 0.17,             #게임
        "Science": 0.09,          #과학
        "Animal": 0.12,           #동물
        "Broadcast": 0.31,        #방송
        "InternetBroadcast": 0.31,#인터넷 방송
1. 10000개의 니아 파일에서 파일당 2개의 문장을 추출. 여기서 문장을 max_length를 고려해서 18 음절만 슬라이싱
2. 그 중에서 1문장은 Q, 1문장은 A로 설정(여기까지 loadNIAData 함수를 만들어서 load_dataset() 함수에서 튜플로 리턴 받음)
3. 기존의 훈련 데이터셋을 넣기 전에 니아 파일에서 추출한 1만개의 (Q, A) 데이터 셋을 먼저 넣기
4. 기존의 훈련 데이터셋을 넣고 이후 매커니즘은 동일
"""

from torch.utils.data import (DataLoader, TensorDataset)
from torch import nn
from tqdm import tqdm # 설명
import numpy as np
import torch
import os

class TransformerChat(nn.Module):

    def __init__(self, config):
        super().__init__()
        #transfomer 모델을 위한 내부 필드 초기화

        # 전체 단어(음절) 개수
        self.vocab_size = config["vocab_size"]

        # 단어(음절) 벡터 크기
        self.embedding_size = config['embedding_size']

        # Transformer의 Attention Head 개수 (중요)
        self.num_heads = config['num_heads']

        # Transformer Encoder의 Layer 수 - 대충 12~24개 (중요)
        self.num_encoder_layers = config['num_encoder_layers']

        # Transformer Decoder의 Layer 수 - 대충 6개 : 디코더에는 상대적으로 적게 쌓음. (중요)
        self.num_decoder_layers = config['num_decoder_layers']

        # 입력 Sequence의 최대 길이
        self.max_length = config['max_length']

        # Transformer 내부 FNN의 hidden 크기
        self.hidden_size = config['hidden_size']

        # Token Embedding Matrix 선언 : one-hot vector가 아닌 임베딩 벡터를 만들어서 입력을 수행. 최대 vocab 크기만큼 있어야함. 그렇게 임베딩 레이어도 초기화
        self.embeddings = nn.Embedding(self.vocab_size, self.embedding_size)

        # Transformer Encoder-Decoder 설계(선언) : 앞에서 정의한 파라미터 적용
        self.transformer = nn.Transformer(d_model=self.embedding_size, nhead=self.num_heads, num_encoder_layers=self.num_encoder_layers,
                                          num_decoder_layers=self.num_decoder_layers, dim_feedforward=self.hidden_size)

        # 입력 길이 L에 대한 (L X L) mask 생성: 이전 토큰들의 정보만을 반영하기 위한 mask
        #       [[1, -inf, -inf, -inf],
        #        [1,    1, -inf, -inf],
        #               ......
        #        [1,    1,    1,    1]]
        # 이곳을 채우세요.
        # 순차적인 생성을 위해 mask를 생성해서 masking 수행. 이는 디코더가 랭귀지 모델이기 때문에!
        self.mask = self.transformer.generate_square_subsequent_mask(self.max_length).cuda()

        # 전체 단어 분포로 변환하기 위한 linear : 인코더가 아닌 디코더의 맨 위층에 있는 output layer의 linear를 의미
        # 이곳을 채우세요.
        self.projection_layer = nn.Linear(self.embedding_size, self.vocab_size)

    def forward(self, enc_inputs, dec_inputs): #파라미터는 인코더의 입력으로 들어갈 것과 디코더의 입력으로 들어갈 것

        # enc_inputs: [batch, seq_len], dec_inputs: [batch, seq_len]
        # enc_input_features: [batch, seq_len, emb_size] -> [seq_len, batch, emb_size]  ==> 구조를 변형 이는 transformer가 [seq_len, batch, emb_size] 모양으로 받음(즉, transformer에는 문장이 세로로 들어감.)
        # 이곳을 채우세요.
        enc_input_features = self.embeddings(enc_inputs).transpose(0, 1)
        #이를 통해 임베딩 레이어와 연결

        # dec_input_features: [batch, seq_len, emb_size] -> [seq_len, batch, emb_size]  ==> 구조를 위처럼 바꿈
        # 이곳을 채우세요.
        dec_input_features = self.embeddings(dec_inputs).transpose(0, 1)

        # dec_output_features: [seq_len, batch, emb_size]
        dec_output_features = self.transformer(src=enc_input_features, tgt=dec_input_features, src_mask = self.mask, tgt_mask = self.mask)
        #디코더의 output을 리턴 이것도 구조는 위와 같음

        # hypothesis : [seq_len, batch, vocab_size]
        hypothesis = self.projection_layer(dec_output_features)
        # projection layer는 맨위의 linear로 vocab_size만큼 내뱉음.

        return hypothesis

In [None]:
import json
import glob
import random

#데이터 로딩은 조금 복잡

# 어휘사전(vocabulary) 생성 함수
def load_vocab(file_dir):
  #응답을 넣으면 인덱스로, 인덱스를 넣으면 응답을 리턴
  #해당 파일에 pad와 unk를 이미 넣어놔서 여기서는 굳이 처리할 필요가 없음.
    with open(file_dir,'r',encoding='utf8') as vocab_file:
        char2idx = {}
        idx2char = {}
        index = 0
        for char in vocab_file:
            char = char.strip()
            char2idx[char] = index
            idx2char[index] = char
            index+=1

    return char2idx, idx2char

# 문자 입력열을 인덱스로 변환하는 함수
def convert_data2feature(config, input_sequence, char2idx, decoder_input=False):

    # 고정 길이 벡터 생성 ; 최대 길이만큼하고 일단 먼저 pad로 채움
    input_features = np.zeros(config["max_length"], dtype=np.int)

    if decoder_input:
        # Decoder Input은 Target Sequence에서 Right Shift
        # Target Sequence :         ["안","녕","하","세","요", "</S>" ] : 디코더의 출력
        # Decoder Input Sequence :  ["<S>", "안","녕","하","세","요"]   : 디코더의 입력
        # 디코더에서의 input에서는 start symbol과 end symbol을 굉장히 많이 씀. 그래서 한 칸씩 뒤로 밀어야 함. 형식에 맞게 변형해야함!
        # 이곳을 채우세요.
        input_sequence = " ".join(["<S>"] + input_sequence.split()[:-1])#처음에 <S>를 넣고 그 뒤에 입력을 같이 넣어줌

    for idx,token in enumerate(input_sequence.split()):
        if token in char2idx.keys(): # 한 음절씩 token으로 받아서 index 받아오기.
            input_features[idx] = char2idx[token]
        else:                       # 사전 내에 없으면 UNK
            input_features[idx] = char2idx['<UNK>']

    return input_features

"""
0. 선정한 분야 비율별로 section 분류
        "Game": 0.17,             #게임
        "Science": 0.09,          #과학
        "Animal": 0.12,           #동물
        "Broadcast": 0.31,        #방송
        "InternetBroadcast": 0.31,#인터넷 방송
1. 10000개의 니아 파일에서 파일당 2개의 문장을 추출. 여기서 문장을 max_length를 고려해서 18 음절만 슬라이싱
   만약 이 때 data 폴더 내에 Game 폴더 내의 2천개의 파일이 있다면 랜덤으로 0.17*10000인 1700개의 파일을 선택해서 가져옵니다.
2. 그 중에서 1문장은 Q, 1문장은 A로 설정(여기까지 loadNIAData 함수를 만들어서 load_dataset() 함수에서 튜플로 리턴 받음)
3. 기존의 훈련 데이터셋을 넣기 전에 니아 파일에서 추출한 1만개의 (Q, A) 데이터 셋을 먼저 넣기
4. 기존의 훈련 데이터셋을 넣고 이후 매커니즘은 동일
"""
# 니아 데이터 전처리하고 읽기 함수
def loadNIAData():
    print("니아 데이터 추출해서 전처리하는 함수 실행")
    domain_ratios = {
        "Game": 0.17,             #게임
        "Science": 0.09,          #과학
        "Animal": 0.12,           #동물
        "Broadcast": 0.31,        #방송
        "InternetBroadcast": 0.31,#인터넷 방송
    }

    # JSON 파일이 있는 폴더의 경로 설정
    folder_paths = {
        "Game": "/gdrive/MyDrive/Colab/Week13/data/Game/*.json",
        "Science": "/gdrive/MyDrive/Colab/Week13/data/Science/*.json",
        "Animal": "/gdrive/MyDrive/Colab/Week13/data/Animal/*.json",
        "Broadcast": "/gdrive/MyDrive/Colab/Week13/data/Broadcast/*.json",
        "InternetBroadcast": "/gdrive/MyDrive/Colab/Week13/data/InternetBroadcast/*.json",
    }

    # 데이터를 저장할 리스트
    Qdatas = []
    Adatas = []

    # 각 도메인에 대해 JSON 파일을 불러오고 "sentence" 추출
    for domain, ratio in domain_ratios.items():
        json_files = glob.glob(folder_paths.get(domain,""))  # 도메인에 해당하는 JSON 파일 목록 가져오기
        # 파일 수에 비례하도록 해당 도메인의 데이터 수 계산
        num_datas_to_select = int(ratio * 10000)
        # 데이터를 무작위로 선택 (num_datas_to_select를 데이터 리스트의 길이 미만으로 조정)
        selected_json_files = random.sample(json_files, num_datas_to_select)

        # 경로내 json 파일을 열고 작업 수행
        for json_file in selected_json_files:
            with open(json_file, "r", encoding="utf8") as file:
                data = json.load(file)

                #해당 json 파일에 대해서 첫 번째 sentence를 추출
                first_entity = data.get("named_entity", [{}])[0]
                first_sentence_obj = first_entity.get("content", [{}])[0]
                first_sentence = first_sentence_obj.get("sentence", "")
                first_sentence = first_sentence[:18]
                process_first_sentence = ''

                for char in first_sentence:
                    # 현재 문자가 한글이면
                    if char.isspace():
                        process_first_sentence += '<SP> '
                    else :
                        process_first_sentence+=(char + ' ')

                #해당 json 파일에 대해서 두 번째 sentence를 추출
                second_entity = data.get("named_entity", [{}])[1]
                second_sentence_obj = second_entity.get("content", [{}])[0]
                second_sentence = second_sentence_obj.get("sentence", "")
                second_sentence = second_sentence[:18]
                process_second_sentence = ''

                for char in second_sentence:
                    # 현재 문자가 한글이면
                    if char.isspace():
                        process_second_sentence += '<SP> '
                    else :
                        process_second_sentence+=(char + ' ')
                process_second_sentence+='</S>'
                Qdatas.append(process_first_sentence)
                Adatas.append(process_second_sentence)
    return Qdatas, Adatas

# 데이터 읽기 함수
def load_dataset(config):
    #니아 데이터셋 전처리
    Qdatas, Adatas = loadNIAData()
    print("니아 데이터 전처리 끝!\n니아 데이터 로딩 후에 train.txt 데이터 로딩 시작!")
    # 어휘사전 읽어오기
    char2idx, idx2char = load_vocab(config['vocab_file'])

    file_dir = config['train_file']#Q \t A로 구성
    data_file = open(file_dir,'r',encoding='utf8').readlines()

    # 데이터를 저장하기 위한 리스트 생성
    enc_inputs, dec_inputs, dec_outputs = [], [], []
    for i in range(len(Qdatas)):
      input_sequence = Qdatas[i]
      output_sequence = Adatas[i]

      #convert_data2feature로 하나씩 갖고 와서 모든 데이터를 feature로 전환해서 append할 것.  인코더의 input, 디코더의 input/output을 디자인
      enc_inputs.append(convert_data2feature(config, input_sequence, char2idx))
      dec_inputs.append(convert_data2feature(config, output_sequence, char2idx, True))
      dec_outputs.append(convert_data2feature(config, output_sequence, char2idx))

    for line in tqdm(data_file):#tqdm!

        line = line.strip().split('\t')#\t으로 한 줄을 분류

        input_sequence = line[0]
        output_sequence = line[1]

        #convert_data2feature로 하나씩 갖고 와서 모든 데이터를 feature로 전환해서 append할 것.  인코더의 input, 디코더의 input/output을 디자인
        enc_inputs.append(convert_data2feature(config, input_sequence, char2idx))
        dec_inputs.append(convert_data2feature(config, output_sequence, char2idx, True))
        dec_outputs.append(convert_data2feature(config, output_sequence, char2idx))

    # 전체 데이터를 저장하고 있는 리스트를 텐서 형태로 변환
    enc_inputs = torch.tensor(enc_inputs, dtype=torch.long)
    dec_inputs = torch.tensor(dec_inputs, dtype=torch.long)
    dec_outputs = torch.tensor(dec_outputs, dtype=torch.long)

    return enc_inputs, dec_inputs, dec_outputs, char2idx, idx2char


    # 어휘사전 읽어오기
    char2idx, idx2char = load_vocab(config['vocab_file'])

    file_dir = config['train_file']#Q \t A로 구성
    data_file = open(file_dir,'r',encoding='utf8').readlines()

    # 데이터를 저장하기 위한 리스트 생성
    enc_inputs, dec_inputs, dec_outputs = [], [], []

    for line in tqdm(data_file):#tqdm!

        line = line.strip().split('\t')#\t으로 한 줄을 분류

        input_sequence = line[0]
        output_sequence = line[1]

        #convert_data2feature로 하나씩 갖고 와서 모든 데이터를 feature로 전환해서 append할 것.  인코더의 input, 디코더의 input/output을 디자인
        enc_inputs.append(convert_data2feature(config, input_sequence, char2idx))
        dec_inputs.append(convert_data2feature(config, output_sequence, char2idx, True))
        dec_outputs.append(convert_data2feature(config, output_sequence, char2idx))

    # 전체 데이터를 저장하고 있는 리스트를 텐서 형태로 변환
    enc_inputs = torch.tensor(enc_inputs, dtype=torch.long)
    dec_inputs = torch.tensor(dec_inputs, dtype=torch.long)
    dec_outputs = torch.tensor(dec_outputs, dtype=torch.long)

    return enc_inputs, dec_inputs, dec_outputs, char2idx, idx2char

In [None]:
# 텐서를 리스트로 변환하는 함수
def tensor2list(input_tensor):
    return input_tensor.cpu().detach().numpy().tolist()

def do_test(config, model, word2idx, idx2word, input_sequence="오늘 약속있으세요?"):

    # 평가 모드 셋팅
    model.eval()

    # 입력된 문자열의 음절을 공백 단위 토큰으로 변환. 실제 공백은 <SP>로 변환된 상태 : "오늘 약속" -> "오 늘 <SP> 약 속"
    # 따라서 입력된 문자열도 입력 형태를 맞춰주기.
    input_sequence = " ".join([e if e != " " else "<SP>" for e in input_sequence])

    # 텐서 변환: [1, seq_len]
    enc_inputs = torch.tensor([convert_data2feature(config, input_sequence, word2idx)], dtype=torch.long).cuda()

    # 디코딩할 때! input_ids : [1, seq_len] -> 첫번째 디코더 입력 "<S>" 만들기 --> start symbol
    dec_inputs = torch.tensor([convert_data2feature(config, "", word2idx, True)], dtype=torch.long).cuda()

    # 시스템 응답 문자열 초기화
    response = ''

    # 최대 입력 길이 만큼 Decoding Loop
    for decoding_step in range(config['max_length']-1):

        # dec_outputs: [vocab_size] - 인코더와 디코더의 입력으로 디코더의 출력 리턴
        dec_outputs = model(enc_inputs, dec_inputs)[decoding_step, 0, :]

        # 가장 큰 출력을 갖는 인덱스 얻어오기
        dec_output_idx = np.argmax(tensor2list(dec_outputs))

        # 생성된 토큰은 dec_inputs에 추가 (첫번째 차원은 배치)
        dec_inputs[0][decoding_step+1] = dec_output_idx

        #계속 루핑을 돌면서 디코더 출력을 계속 리턴

        # end symbol인 </S> 심볼 생성 시, Decoding 종료
        if idx2word[dec_output_idx] == "</S>":
            break

        # 생성된 토큰을 인덱스를 통해서 추가
        response += idx2word[dec_output_idx]

    # <SP>를 공백으로 변환한 후 응답 문자열 출력
    print(response.replace("<SP>", " "))

def test(config):

    # 어휘사전 읽어오기
    word2idx, idx2word = load_vocab(config['vocab_file'])

    # Transformer Seq2Seq 모델 객체 생성
    model = TransformerChat(config).cuda()

    # 학습한 모델 파일로부터 가중치 불러옴
    model.load_state_dict(torch.load(os.path.join(config["output_dir"], config["trained_model_name"])))

    while(True):
        input_sequence = input("문장을 입력하세요. (종료는 exit을 입력하세요.) : ")
        if input_sequence == 'exit':
            break
        do_test(config, model, word2idx, idx2word, input_sequence)

In [None]:
def train(config):

    # Transformer Seq2Seq 모델 객체 생성
    model = TransformerChat(config).cuda()

    # 데이터 읽기 : load_dataset()
    enc_inputs, dec_inputs, dec_outputs, word2idx, idx2word = load_dataset(config)

    # TensorDataset/DataLoader를 통해 배치(batch) 단위로 데이터를 나누고 셔플(shuffle)
    train_features = TensorDataset(enc_inputs, dec_inputs, dec_outputs)
    train_dataloader = DataLoader(train_features, shuffle=True, batch_size=config["batch_size"])

    # 크로스엔트로피 손실 함수 : vocab만큼 나가니까 멀티 클래스 분류!
    loss_func = nn.CrossEntropyLoss()

    # 옵티마이저 함수 지정
    optimizer = torch.optim.Adam(model.parameters(), lr=config["learn_rate"])

    for epoch in range(config["epoch"] + 1):

        for (step, batch) in enumerate(train_dataloader):

            # 학습 모드 셋팅
            model.train()

            # batch = (enc_inputs[step], dec_inputs[step], dec_outputs)*batch_size
            # .cuda()를 통해 메모리에 업로드
            batch = tuple(t.cuda() for t in batch)

            # 역전파 변화도 초기화
            optimizer.zero_grad()

            enc_inputs, dec_inputs, dec_outputs = batch

            # hypothesis: [seq_len, batch, vocab_size] -> [seq_len*batch, vocab_size]
            hypothesis = model(enc_inputs, dec_inputs).view(-1, config['vocab_size']) # 모델에서 hypothesis를 리턴.
            #그런데 크로스 엔트로피에게는 2차원으로 펴서 비교하게 해야함 따라서 view() 메서드 이용. 이 때 파라미터를 보면 두 번째 차원은 vocab_size만큼 하도록 지정하고 첫 번째 차원은 알아서 수행하도록(seq_len*batch) 설정

            # labels: [batch, seq_len] -> [seq_len, batch] -> [seq_len(max_length)*batch]
            labels = dec_outputs.transpose(0, 1) # 디코더의 output을 transpose해서 차원을 바꾸기
            labels = labels.reshape(config["max_length"]*dec_inputs.size(0))# 바뀐 차원을 예측과 형식이 맞도록 정답을 reshape, dec_inputs.size(0)는 batch_size와 동일
            #즉 정답 인덱스를 1차원으로 만들기

            # 비용 계산 및 역전파 수행: cross_entopy 내부에서 labels를 원핫벡터로 변환 (골드레이블은 항상 1차원으로 입력)
            loss = loss_func(hypothesis, labels)
            #hypothesis는 1차원이였는데 cross_entropy 내부에서 labels를 1차원인 원핫벡터로 알아서 만들어줌.
            loss.backward()
            optimizer.step()

            # 200 배치마다 중간 결과 출력
            if (step+1)% 200 == 0:
                print("Current Step : {0:d} / {1:d}\tCurrent Loss : {2:f}".format(step+1, int(len(enc_inputs) / config['batch_size']), loss.item()))
                # 생성 문장을 확인하기 위한 함수 호출
                # do_test(config, model, word2idx, idx2word)

        # 에폭마다 가중치 저장
        torch.save(model.state_dict(), os.path.join(config["output_dir"], "epoch_{0:d}.pt".format(epoch)))

In [None]:
if(__name__=="__main__"):

    root_dir = "/gdrive/MyDrive/Colab/Week13/"
    config = {"mode": "test",
              "vocab_file": os.path.join(root_dir, "vocab.txt"),
              "train_file": os.path.join(root_dir, "train.txt"),
              "trained_model_name":"epoch_{}.pt".format(10),
              "output_dir":output_dir,
              "epoch": 10,
              "learn_rate":0.00005,
              "num_encoder_layers": 6,
              "num_decoder_layers": 6,
              "num_heads": 4,
              "max_length": 20,
              "batch_size": 128,
              "embedding_size": 256,
              "hidden_size": 512,
              "vocab_size": 4427
            }

    if(config["mode"] == "train"):
        train(config)
    else:
        test(config)

"""
0. 선정한 분야 비율별로 section 분류
        "Game": 0.17,             #게임
        "Science": 0.09,          #과학
        "Animal": 0.12,           #동물
        "Broadcast": 0.31,        #방송
        "InternetBroadcast": 0.31,#인터넷 방송
1. 10000개의 니아 파일에서 파일당 2개의 문장을 추출. 여기서 문장을 max_length를 고려해서 18 음절만 슬라이싱
2. 그 중에서 1문장은 Q, 1문장은 A로 설정(여기까지 loadNIAData 함수를 만들어서 load_dataset() 함수에서 튜플로 리턴 받음)
3. 기존의 훈련 데이터셋을 넣기 전에 니아 파일에서 추출한 1만개의 (Q, A) 데이터 셋을 먼저 넣기
4. 기존의 훈련 데이터셋을 넣고 이후 매커니즘은 동일
"""

문장을 입력하세요. (종료는 exit을 입력하세요.) : 안녕?


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  input_features = np.zeros(config["max_length"], dtype=np.int)


안녕하세요
문장을 입력하세요. (종료는 exit을 입력하세요.) : 밥 먹었어??
아니 아직 안먹었어
문장을 입력하세요. (종료는 exit을 입력하세요.) : 배가 너무 고파
ㅋㅋㅋㅋㅋㅋㅋ
문장을 입력하세요. (종료는 exit을 입력하세요.) : exit


'\n0. 선정한 분야 비율별로 section 분류\n        "Game": 0.17,             #게임\n        "Science": 0.09,          #과학\n        "Animal": 0.12,           #동물\n        "Broadcast": 0.31,        #방송\n        "InternetBroadcast": 0.31,#인터넷 방송\n1. 10000개의 니아 파일에서 파일당 2개의 문장을 추출. 여기서 문장을 max_length를 고려해서 18 음절만 슬라이싱\n2. 그 중에서 1문장은 Q, 1문장은 A로 설정(여기까지 loadNIAData 함수를 만들어서 load_dataset() 함수에서 튜플로 리턴 받음)\n3. 기존의 훈련 데이터셋을 넣기 전에 니아 파일에서 추출한 1만개의 (Q, A) 데이터 셋을 먼저 넣기\n4. 기존의 훈련 데이터셋을 넣고 이후 매커니즘은 동일\n'