
## 202211866 최대승
## 13주차 기계학습 과제 수행에 앞선 학습데이터 준비 과정입니다.

* NIA AI-Hub에서 다운로드 받은 뷰티 카테고리의 json데이터를 활용하기로 함
* 코랩의 data 폴더에 json파일을 130개를 저장함
* 각 json파일에서 댓글 문장 (content)만 전부 추출함
* 댓글 문장을 추출할 때마다 전처리를 거침
* 전처리 과정
  1. 모든 공백을 &lt;SP&gt;로 변환
  2. 음절 단위마다 띄어쓰기 추가
  3. 띄어쓰기 추가로 인해 변형된 < S P >를 다시 &lt;SP&gt; 로 복원
* 댓글 문장 두 개를 질문과 응답으로 짝을 지어서 1만개의 짝을 만들어야 해서, 댓글 문장이 2만 개 필요했음.
* 댓글 문장을 총 2만 개 추출할 때까지 추출함.
* 2만 개의 전처리된 문장을 확보하면, 두 개씩 짝 지음
* 짝 지어진 문장 중 첫번째 문장 뒤에 \t 추가
* 짝 지어진 문장 중 두번째 문장 뒤에 &lt;/S&gt; 추가
* 짝 지어진 문장들을 txt파일에 저장하고, 훈련데이터로 사용
* vocab데이터는 실습과정에서 썼던 데이터를 그대로 사용


## 제가 사용한 130개의 json데이터, 이를 사용하여 준비한 학습데이터(1만개의 질문-응답 쌍)는 과제 제출할 때 같이 제출하겠습니다.

In [1]:
from google.colab import drive
drive.mount("/gdrive")

Mounted at /gdrive


In [3]:
from tqdm import tqdm

In [2]:
import os
import json

# 전처리 과정
def ask_spaces(texts):
    processed_texts = []
    for text in texts:
        # 공백을 <SP>로 변환
        text = text.replace(" ", "<SP>")
        # 음절 단위로 분리
        syllables = " ".join(text)  # 한글자씩 분리
        syllables = syllables.replace("< S P >", "<SP>")  # "< S P >"를 "<SP>"로 복원
        processed_texts.append(syllables)
    return processed_texts


# 설정
folder_path = "/gdrive/My Drive/ML/13w/data/"  # JSON 파일들이 들어 있는 폴더 경로
output_file = "/gdrive/My Drive/ML/13w/train.txt"  # 결과 파일 경로
target_sentence_count = 20000  # 목표 문장 개수 (질문 1만개, 대답1만개)

# 문장을 저장할 리스트
all_processed_texts = []

# 폴더 내 JSON 파일 순회
for file_name in os.listdir(folder_path):
    # JSON 파일만 처리
    if file_name.endswith(".json"):
        file_path = os.path.join(folder_path, file_name)
        with open(file_path, "r", encoding="utf-8") as file:
            data = json.load(file)

        # content 필드에서 텍스트 추출
        texts = [item["content"] for item in data["SJML"]["text"]]

        # 전처리 수행
        processed_texts = ask_spaces(texts)
        all_processed_texts.extend(processed_texts)

    # 목표 문장 개수에 도달하면 중단
    if len(all_processed_texts) >= target_sentence_count:
        break

# 최종 문장 리스트를 목표 개수로 자르기
all_processed_texts = all_processed_texts[:target_sentence_count]

# 결과를 두 개씩 짝짓기
paired_texts = []
for i in range(0, len(all_processed_texts) - 1, 2):  # 두 개씩 묶음
    first = all_processed_texts[i]+"\t"
    second = all_processed_texts[i+1]+" </S>"
    paired_texts.append(f"{first}{second}")

# 결과를 txt 파일로 저장
with open(output_file, "w", encoding="utf-8") as out_file:
    out_file.write("\n".join(paired_texts))

print(f"전처리 완료: 총 {len(paired_texts)}개의 쌍이 저장되었습니다.")


전처리 완료: 총 9182개의 쌍이 저장되었습니다.


In [4]:
from torch.utils.data import (DataLoader, TensorDataset)
from torch import nn
from tqdm import tqdm
import numpy as np
import torch
import os

class TransformerChat(nn.Module):

    def __init__(self, config):
        super().__init__()

        # 전체 단어(음절) 개수
        self.vocab_size = config["vocab_size"]

        # 단어(음절) 벡터 크기
        self.embedding_size = config['embedding_size']

        # Transformer의 Attention Head 개수
        self.num_heads = config['num_heads']

        # Transformer Encoder의 Layer 수
        self.num_encoder_layers = config['num_encoder_layers']

        # Transformer Decoder의 Layer 수
        self.num_decoder_layers = config['num_decoder_layers']

        # 입력 Sequence의 최대 길이
        self.max_length = config['max_length']

        # Transformer 내부 FNN 크기
        self.hidden_size = config['hidden_size']

        # Token Embedding Matrix 선언
        self.embeddings = nn.Embedding(self.vocab_size, self.embedding_size)

        # Transformer Encoder-Decoder 설계(선언)
        self.transformer = nn.Transformer(d_model=self.embedding_size, nhead=self.num_heads, num_encoder_layers=self.num_encoder_layers,
                                          num_decoder_layers=self.num_decoder_layers, dim_feedforward=self.hidden_size)

        # 입력 길이 L에 대한 (L X L) mask 생성: 이전 토큰들의 정보만을 반영하기 위한 mask
        #       [[1, -inf, -inf, -inf],
        #        [1,    1, -inf, -inf],
        #               ......
        #        [1,    1,    1,    1]]
        self.mask = self.transformer.generate_square_subsequent_mask(self.max_length).cuda()

        # 출력층 선언

        # 전체 단어 분포로 변환하기 위한 linear
        # 이곳을 채우세요.
        self.projection_layer = nn.Linear(self.embedding_size, self.vocab_size)

    def forward(self, enc_inputs, dec_inputs):

        # enc_inputs: [batch, seq_len], dec_inputs: [batch, seq_len]
        # enc_input_features: [batch, seq_len, emb_size] -> [seq_len, batch, emb_size]
        # 이곳을 채우세요.
        enc_input_features = self.embeddings(enc_inputs).transpose(0, 1)

        # dec_input_features: [batch, seq_len, emb_size] -> [seq_len, batch, emb_size]
        # 이곳을 채우세요.
        dec_input_features = self.embeddings(dec_inputs).transpose(0, 1)

        # dec_output_features: [seq_len, batch, emb_size]
        dec_output_features = self.transformer(src=enc_input_features, tgt=dec_input_features, src_mask = self.mask, tgt_mask = self.mask)

        # hypothesis : [seq_len, batch, vocab_size]
        hypothesis = self.projection_layer(dec_output_features)

        return hypothesis

In [5]:
# 어휘사전(vocabulary) 생성 함수
def load_vocab(file_dir):

  with open(file_dir,'r',encoding='utf8') as vocab_file:
      char2idx = {}
      idx2char = {}
      index = 0
      for char in vocab_file:
          char = char.strip()
          char2idx[char] = index
          idx2char[index] = char
          index+=1

  return char2idx, idx2char

# 문자 입력열을 인덱스로 변환하는 함수
def convert_data2feature(config, input_sequence, char2idx, decoder_input=False):

    # 고정 길이 벡터 생성
    input_features = np.zeros(config["max_length"], dtype=np.int64)

    if decoder_input:
        # Decoder Input은 Target Sequence에서 Right Shift
        # Target Sequence: ["안", "녕", "하", "세", "요", "</S>"]
        # Decoder Input Sequence: ["<S>", "안", "녕", "하", "세", "요"]
        input_sequence = " ".join(["<S>"] + input_sequence.split())

    # 입력 시퀀스 토큰 분리 및 길이 제한
    tokens = input_sequence.split()[:config["max_length"]]

    # 토큰을 인덱스로 변환하여 고정 길이 벡터에 저장
    for idx, token in enumerate(tokens):
        if token in char2idx:
            input_features[idx] = char2idx[token]
        else:
            input_features[idx] = char2idx.get('<UNK>', 0)  # '<UNK>'가 없으면 기본값 0

    return input_features


# 데이터 읽기 함수
def load_dataset(config):

    # 어휘사전 읽어오기
    char2idx, idx2char = load_vocab(config['vocab_file'])

    file_dir = config['train_file']
    data_file = open(file_dir,'r',encoding='utf8').readlines()

    # 데이터를 저장하기 위한 리스트 생성
    enc_inputs, dec_inputs, dec_outputs = [], [], []

    for line in tqdm(data_file):

        line = line.strip().split('\t')

        input_sequence = line[0]
        output_sequence = line[1]

        enc_inputs.append(convert_data2feature(config, input_sequence, char2idx))
        dec_inputs.append(convert_data2feature(config, output_sequence, char2idx, True))
        dec_outputs.append(convert_data2feature(config, output_sequence, char2idx))

    # 전체 데이터를 저장하고 있는 리스트를 텐서 형태로 변환
    enc_inputs = torch.tensor(enc_inputs, dtype=torch.long)
    dec_inputs = torch.tensor(dec_inputs, dtype=torch.long)
    dec_outputs = torch.tensor(dec_outputs, dtype=torch.long)

    return enc_inputs, dec_inputs, dec_outputs, char2idx, idx2char

In [6]:
def train(config):

    # Transformer Seq2Seq 모델 객체 생성
    model = TransformerChat(config).cuda()

    # 데이터 읽기
    enc_inputs, dec_inputs, dec_outputs, word2idx, idx2word = load_dataset(config)

    # TensorDataset/DataLoader를 통해 배치(batch) 단위로 데이터를 나누고 셔플(shuffle)
    train_features = TensorDataset(enc_inputs, dec_inputs, dec_outputs)
    train_dataloader = DataLoader(train_features, shuffle=True, batch_size=config["batch_size"])

    # 크로스엔트로피 손실 함수
    loss_func = nn.CrossEntropyLoss()

    # 옵티마이저 함수 지정
    optimizer = torch.optim.Adam(model.parameters(), lr=config["learn_rate"])

    for epoch in range(config["epoch"] + 1):

        for (step, batch) in enumerate(train_dataloader):

            # 학습 모드 셋팅
            model.train()

            # batch = (enc_inputs[step], dec_inputs[step], dec_outputs)*batch_size
            # .cuda()를 통해 메모리에 업로드
            batch = tuple(t.cuda() for t in batch)

            # 역전파 변화도 초기화
            optimizer.zero_grad()

            enc_inputs, dec_inputs, dec_outputs = batch

            # hypothesis: [seq_len, batch, vocab_size] -> [seq_len*batch, vocab_size]
            # 이곳을 채우세요.
            hypothesis = model(enc_inputs, dec_inputs).view(-1, config["vocab_size"])

            # labels: [batch, seq_len] -> [seq_len, batch] -> [seq_len(max_length)*batch]
            labels = dec_outputs.transpose(0, 1)
            labels = labels.reshape(config["max_length"]*dec_inputs.size(0))

            # 비용 계산 및 역전파 수행: cross_entopy 내부에서 labels를 원핫벡터로 변환 (골드레이블은 항상 1차원으로 입력)
            loss = loss_func(hypothesis, labels)
            loss.backward()
            optimizer.step()

            # 10 배치마다 중간 결과 출력
            if (step+1)% 10 == 0:
                print("Current Step : {0:d} / {1:d}\tCurrent Loss : {2:f}".format(step+1, int(len(enc_inputs) / config['batch_size']), loss.item()))
                # 생성 문장을 확인하기 위한 함수 호출
                # do_test(config, model, word2idx, idx2word)

        # 에폭마다 가중치 저장
        torch.save(model.state_dict(), os.path.join(config["output_dir"], "epoch_{0:d}.pt".format(epoch)))

In [7]:
# 텐서를 리스트로 변환하는 함수
def tensor2list(input_tensor):
    return input_tensor.cpu().detach().numpy().tolist()

def do_test(config, model, word2idx, idx2word, input_sequence="오늘 약속있으세요?"):

    # 평가 모드 셋팅
    model.eval()

    # 입력된 문자열의 음절을 공백 단위 토큰으로 변환. 공백은 <SP>로 변환: "오늘 약속" -> "오 늘 <SP> 약 속"
    input_sequence = " ".join([e if e != " " else "<SP>" for e in input_sequence])

    # 텐서 변환: [1, seq_len]
    enc_inputs = torch.tensor([convert_data2feature(config, input_sequence, word2idx)], dtype=torch.long).cuda()

    # input_ids : [1, seq_len] -> 첫번째 디코더 입력 "<S>" 만들기
    dec_inputs = torch.tensor([convert_data2feature(config, "", word2idx, True)], dtype=torch.long).cuda()

    # 시스템 응답 문자열 초기화
    response = ''

    # 최대 입력 길이 만큼 Decoding Loop
    for decoding_step in range(config['max_length']-1):

        # dec_outputs: [vocab_size]
        dec_outputs = model(enc_inputs, dec_inputs)[decoding_step, 0, :]
        # 가장 큰 출력을 갖는 인덱스 얻어오기
        dec_output_idx = np.argmax(tensor2list(dec_outputs))

        # 생성된 토큰은 dec_inputs에 추가 (첫번째 차원은 배치)
        dec_inputs[0][decoding_step+1] = dec_output_idx

        # </S> 심볼 생성 시, Decoding 종료
        if idx2word[dec_output_idx] == "</S>":
            break

        # 생성 토큰 추가
        response += idx2word[dec_output_idx]

    # <SP>를 공백으로 변환한 후 응답 문자열 출력
    print(response.replace("<SP>", " "))

def test(config):

    # 어휘사전 읽어오기
    word2idx, idx2word = load_vocab(config['vocab_file'])

    # Transformer Seq2Seq 모델 객체 생성
    model = TransformerChat(config).cuda()

    # 학습한 모델 파일로부터 가중치 불러옴
    model.load_state_dict(torch.load(os.path.join(config["output_dir"], config["trained_model_name"])))

    while(True):
        input_sequence = input("문장을 입력하세요. (종료는 exit을 입력하세요.) : ")
        if input_sequence == 'exit':
            break
        do_test(config, model, word2idx, idx2word, input_sequence)

In [8]:
def do_test (config, model, word2idx, idx2word, input_sequence = "오늘 약속있으세요?"):
  model.eval()
  input_sequence = " ".join([e if e != " " else "<SP>" for e in input_sequence])
  enc_inputs = torch.tensor([convert_data2feature(config, input_sequence, word2idx)], dtype=torch.long).cuda()
  dec_inputs = torch.tensor([convert_data2feature(config, "", word2idx, True)], dtype=torch.long).cuda()
  response = ''
  for decoding_step in range(config['max_length']-1):
    dec_outputs = model(enc_inputs, dec_inputs)[decoding_step, 0, :]
    dec_output_idx = np.argmax(tensor2list(dec_outputs))
    dec_inputs[0][decoding_step+1] = dec_output_idx
    if idx2word[dec_output_idx] == "</S>":
      break
    response += idx2word[dec_output_idx]

  print(response.replace("<SP>", ""))


In [9]:
def test(config):

  word2idx, idx2word = load_vocab(config['vocab_file'])
  model = TransformerChat(config).cuda()
  model.load_state_dict(torch.load(os.path.join(config["output_dir"], config["trained_model_name"])))
  while(True):
    input_sequence = input("문장을 입력하세요. (종료는 exit을 입력하세요.) : ")
    if input_sequence == 'exit':
      break
    do_test (config, model, word2idx, idx2word, input_sequence)

In [10]:
if(__name__=="__main__"):

    root_dir = "/gdrive/My Drive/ML/13w/"
    output_dir = os.path.join(root_dir, "output")
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    config = {"mode": "train",
              "vocab_file": os.path.join(root_dir, "vocab.txt"),
              "train_file": os.path.join(root_dir, "train.txt"),
              "trained_model_name":"epoch_{}.pt".format(10),
              "output_dir":output_dir,
              "epoch": 10,
              "learn_rate":0.00005,
              "num_encoder_layers": 6,
              "num_decoder_layers": 6,
              "num_heads": 4,
              "max_length": 20,
              "batch_size": 128,
              "embedding_size": 256,
              "hidden_size": 512,
              "vocab_size": 4427
            }

    if(config["mode"] == "train"):
        train(config)
    else:
        test(config)

100%|██████████| 9182/9182 [00:00<00:00, 36312.98it/s]
  enc_inputs = torch.tensor(enc_inputs, dtype=torch.long)


Current Step : 10 / 1	Current Loss : 7.229414
Current Step : 20 / 1	Current Loss : 6.801976
Current Step : 30 / 1	Current Loss : 6.342338
Current Step : 40 / 1	Current Loss : 6.204791
Current Step : 50 / 1	Current Loss : 5.941228
Current Step : 60 / 1	Current Loss : 6.005876
Current Step : 70 / 1	Current Loss : 5.576653
Current Step : 10 / 1	Current Loss : 5.382502
Current Step : 20 / 1	Current Loss : 5.320533
Current Step : 30 / 1	Current Loss : 5.165909
Current Step : 40 / 1	Current Loss : 5.157838
Current Step : 50 / 1	Current Loss : 4.933252
Current Step : 60 / 1	Current Loss : 5.015621
Current Step : 70 / 1	Current Loss : 4.958889
Current Step : 10 / 1	Current Loss : 4.767728
Current Step : 20 / 1	Current Loss : 4.498380
Current Step : 30 / 1	Current Loss : 4.635081
Current Step : 40 / 1	Current Loss : 4.725713
Current Step : 50 / 1	Current Loss : 4.634593
Current Step : 60 / 1	Current Loss : 4.617172
Current Step : 70 / 1	Current Loss : 4.478467
Current Step : 10 / 1	Current Loss

In [11]:
if(__name__=="__main__"):

    root_dir = "/gdrive/My Drive/ML/13w/"
    output_dir = os.path.join(root_dir, "output")
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    config = {"mode": "test",
              "vocab_file": os.path.join(root_dir, "vocab.txt"),
              "train_file": os.path.join(root_dir, "train.txt"),
              "trained_model_name":"epoch_{}.pt".format(10),
              "output_dir":output_dir,
              "epoch": 10,
              "learn_rate":0.00005,
              "num_encoder_layers": 6,
              "num_decoder_layers": 6,
              "num_heads": 4,
              "max_length": 20,
              "batch_size": 128,
              "embedding_size": 256,
              "hidden_size": 512,
              "vocab_size": 4427
            }

    if(config["mode"] == "train"):
        train(config)
    else:
        test(config)

  model.load_state_dict(torch.load(os.path.join(config["output_dir"], config["trained_model_name"])))


문장을 입력하세요. (종료는 exit을 입력하세요.) : 입고 있는 상의 정보 알려주세요!
저는아니이이이이아니다이
문장을 입력하세요. (종료는 exit을 입력하세요.) : 옷 색깔이 참 이뻐요
저는아니이이이이아니다이
문장을 입력하세요. (종료는 exit을 입력하세요.) : 청바지가 잘 어울리네요
저는아니이이이이이아니다
문장을 입력하세요. (종료는 exit을 입력하세요.) : 니트가 따뜻해보여요
저는아니이이이이아니다이
문장을 입력하세요. (종료는 exit을 입력하세요.) : 베이지색 바지
저는아니이이이이아니다이
문장을 입력하세요. (종료는 exit을 입력하세요.) : 대학생을 위한 티셔츠 정보
저는이이이아니다이이이
문장을 입력하세요. (종료는 exit을 입력하세요.) : 검정색 자켓이 멋져
저는아니이이이이아니다이
문장을 입력하세요. (종료는 exit을 입력하세요.) : exit
