In [1]:
# 라이브러리 설치
import os
os.system("pip -q install --upgrade transformers")

0

In [2]:
# 라이브러리 임포트
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, DataCollatorForSeq2Seq, Seq2SeqTrainingArguments, Seq2SeqTrainer, pipeline
from torch.utils.data import Dataset
import pandas as pd

In [None]:
# 스타일맵
StyleMap = {
    'formal': '문어체',
    'informal': '구어체',
    'android': '안드로이드',
    'azae': '아재',
    'chat': '채팅',
    'choding': '초등학생',
    'emoticon': '이모티콘',
    'enfp': 'enfp',
    'gentle': '신사',
    'halbae': '할아버지',
    'halmae': '할머니',
    'joongding': '중학생',
    'king': '왕',
    'naruto': '나루토',
    'seonbi': '선비',
    'sosim': '소심한',
    'translator': '번역기'
}

In [None]:
# 부모 클래스
class ModelBase:
  """
  부모 클래스.
  사용자 측에서 생성할 것을 권장하지 않음.
  """
  def __init__(self):
    self._styleMap = StyleMap

  def Help():
    pass

In [None]:
# 모델 학습 클래스
class TrainProcess(ModelBase):
  """
  모델 학습 클래스
  """
  def __init__(
      self,
      model_name: str = "gogamza/kobart-base-v2"
  ):
    super().__init__()
    self.__trainDF = None
    self.__testDF = None
    self.__trainDS = None
    self.__testDS = None
    self.__tokenizer = AutoTokenizer.from_pretrained(model_name)
    self.__model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
    self.__DataCollator = DataCollatorForSeq2Seq(tokenizer = self.__tokenizer, model = self.__model)
    self.__trainer = None




  def SplitTrainTestData(
      self,
      dataset_path: str,
      nan_allow: int = 2,
      test_size: float = 0.1,
      random_state: int = 1945,
      show_result: bool = True
  ):
    """
    데이터 프레임을 학습/테스트 데이터로 분리

    dataset_path : 데이터셋 경로
    nan_allow : 한 행의 결측치 허용 개수
    test_size : 테스트 데이터 크기
    random_state : 랜덤 시드
    show_result : 결과 출력
    """
    # 데이터 불러오기
    data = pd.read_csv(dataset_path, sep = "\t")

    # 결측치 허용범위를 초과하는 행은 제거
    data = data[data.notna().sum(axis=1) >= nan_allow]

    # 학습, 테스트 데이터 분리
    self.__trainDF, self.__testDF = train_test_split(
        data,
        test_size = test_size,
        random_state = random_state
    )

    # 결과 출력
    if show_result:
      print(f"학습 데이터: {self.__trainDF.shape}, 테스트 데이터: {self.__testDF.shape}\n")

    # 자기 자신 객체를 반환
    return self




  def SetDatasets(
      self,
      max_length: int = 64,
      truncation: bool = True,
      clear_dataframe_after_dataset: bool = True,
      show_result: bool = True
  ):
    """
    학습을 위한 데이터셋 설정

    max_length : 최대 토큰 길이
    truncation : 최대 길이 초과 시 잘라내기
    clear_dataframe_after_dataset : 데이터셋 생성 후 데이터 프레임 참조 해제
    show_result : 결과 출력
    """
    # 데이터셋 생성
    self.__trainDS = self.TextStyleDataset(
        self.__trainDF,
        self.__tokenizer,
        max_length,
        truncation,
        self._styleMap
    )
    self.__testDS = self.TextStyleDataset(
        self.__testDF,
        self.__tokenizer,
        max_length,
        truncation,
        self._styleMap
    )

    # 불필요한 데이터 프레임 참조 해제
    if clear_dataframe_after_dataset:
      self.__trainDF = None
      self.__testDF = None

    # 결과 출력
    if show_result:
      print("학습/테스트 데이터셋 생성.\n")

    # 자기 자신 객체를 반환
    return self




  def SetTrainingArguments(
      self,
      output_dir = "./data/text-transfer-smilegate-bart-eos/dummy/",
      overwrite_output_dir = True,
      num_train_epochs = 24,
      per_device_train_batch_size = 16,
      per_device_eval_batch_size = 16,
      eval_steps = 500,
      warmup_steps = 300,
      prediction_loss_only = True,
      evaluation_strategy = "steps",
      save_strategy = "no",
      save_steps = 0,
      save_total_limit = 0,
      logging_dir = None,
      logging_strategy = "no",
      do_eval = False,
      do_predict = False,
      load_best_model_at_end = False,
      metric_for_best_model = 'eval_loss',
      show_result: bool = True
  ):
    """
    학습 준비
    """
    # Training Arguments
    trainingArgs = Seq2SeqTrainingArguments(
        output_dir = output_dir,
        overwrite_output_dir = overwrite_output_dir,
        num_train_epochs = num_train_epochs,
        per_device_train_batch_size = per_device_train_batch_size,
        per_device_eval_batch_size = per_device_eval_batch_size,
        eval_steps = eval_steps,
        warmup_steps = warmup_steps,
        prediction_loss_only = prediction_loss_only,
        eval_strategy = evaluation_strategy,
        save_strategy = save_strategy,
        save_steps = save_steps,
        save_total_limit = save_total_limit,
        logging_dir = logging_dir,
        logging_strategy = logging_strategy,
        do_eval = do_eval,
        do_predict = do_predict,
        load_best_model_at_end = load_best_model_at_end,
        metric_for_best_model = metric_for_best_model
    )

    # 결과 출력
    if show_result:
      print("Training Arguments 설정 완료.")
      if output_dir != None:
        print(" 모델 학습 시 저장 경로: ", output_dir)

    # Trainer
    self.__trainer = Seq2SeqTrainer(
        model = self.__model,
        args = trainingArgs,
        data_collator = self.__DataCollator,
        train_dataset = self.__trainDS,
        eval_dataset = self.__testDS,
    )

    # 결과 출력
    if show_result:
      print("Trainer 설정 완료\n")

    # 자기 자신 객체를 반환
    return self




  def Train(
      self,
      clear_data_after_training: bool = True,
      show_result: bool = True
  ):
    """
    모델 학습

    clear_data_after_training : 학습 후 사용하지 않는 데이터 참조 해제
    show_result : 결과 출력
    """
    # 학습
    self.__trainer.train()

    # 불필요한 데이터 참조 해제
    if clear_data_after_training:
      self.ClearData()

    # 결과 출력
    if show_result:
      print("학습 완료.\n")

    # 자기 자신 객체를 반환
    return self




  def SaveModel(
      self,
      path: str,
      show_result: bool = True
  ):
    """
    학습된 모델 저장

    path : 저장 경로
    show_result : 결과 출력
    """
    if self.__trainer == None:
      print("SaveModel() 오류. 저장할 모델이 없음.\n")
    else:
      self.__trainer.save_model(path)
      if show_result:
        print(f"학습한 모델 저장.\n저장 경로: {path}\n")

    # 자기 자신 객체를 반환
    return self




  def ClearData(self):
    """
    학습된 모델, 스타일맵을 제외한 모든 데이터 참조 해제
    """
    self.__trainDF = None
    self.__testDF = None
    self.__trainDS = None
    self.__testDS = None
    self.__tokenizer = None
    self.__model = None
    self.__DataCollator = None




  def Help():
    """
    도움말 출력
    """
    print(
"""
모델 생성 순서
1. SplitTrainTestData : 데이터 프레임을 학습/테스트 데이터로 분리
2. SetDatasets : 학습을 위한 데이터셋 설정
3. SetTrainingArguments : 학습 준비
4. Train : 모델 학습

선택 사항
SaveModel : 학습된 모델 저장
ClearData : 학습된 모델, 스타일맵을 제외한 모든 데이터 참조 해제

모든 함수는 자기 자신의 객체를 반환.
매개변수에 대해서는 함수 설명 참고.
"""
    )




  class TextStyleDataset(Dataset):
    """
    학습을 위한 데이터셋 클래스.
    사용자 측에서 생성할 것을 권장하지 않음.
    """
    def __init__(
        self,
        data,
        tokenizer,
        max_length,
        truncation,
        style_map
    ):
      self.__data = data
      self.__tokenizer = tokenizer
      self.__maxLength = max_length
      self.__truncation = truncation
      self.__StyleMap = style_map

    def __len__(self):
      return len(self.__data)

    def __getitem__(self, index):
      # 무작위 스타일 2개 추출
      row = self.__data.iloc[index, :].dropna().sample(2)

      # 인코더 텍스트 설정
      encoder_text = f"{self.__StyleMap[row.index[1]]} 말투로 변환:{row[0]}"

      # 디코더 텍스트 설정
      decoder_text = f"{row[1]}{self.__tokenizer.eos_token}"

      # 인코더 설정
      model_inputs = self.__tokenizer(
          encoder_text,
          max_length = self.__maxLength,
          truncation = self.__truncation
      )

      # 디코더 설정
      with self.__tokenizer.as_target_tokenizer():
        labels = self.__tokenizer(
            decoder_text,
            max_length = self.__maxLength,
            truncation = self.__truncation
        )
      model_inputs['labels'] = labels['input_ids']

      # 불필요한 요소 제거
      del model_inputs['token_type_ids']

      # 반환
      return model_inputs

In [None]:
# 텍스트 변환 클래스
class ModelPipeline(ModelBase):
  """
  텍스트 변환 클래스
  """
  def __init__(
      self,
      model_name: str = "gogamza/kobart-base-v2",
      model_path: str = "./data/text-transfer-smilegate-bart-eos/"
  ):
    super().__init__()
    self.__pipeline = pipeline(
        'text2text-generation',
        model = model_path,
        tokenizer = model_name
    )




  def GenerateText(
      self,
      text: str,
      target_style: str,
      num_return_sequences: int = 5,
      max_new_tokens: int = 256
  ):
    """
    스타일 변환
    """
    # 텍스트 생성
    out = self.__pipeline(
        f"{self._styleMap[target_style]} 말투로 변환:{text}",
        num_return_sequences = num_return_sequences,
        max_new_tokens = max_new_tokens
    )

    # 반환
    return [x['generated_text'] for x in out]




  def Help():
    """
    도움말 출력
    """
    print(
"""
GenerateText : 텍스트를 입력해서 스타일 변환
"""
    )