<a href="https://colab.research.google.com/github/chechae/24w_audio/blob/main/KoBART.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import json
import pandas as pd
import glob
from collections import defaultdict
from transformers import AutoModelForSeq2SeqLM,AutoTokenizer
from tokenizers import Tokenizer
from transformers import pipeline
import warnings
import os
from torch.utils.data import Dataset

In [None]:
from google.colab import drive, output
drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [None]:
class TextStyleTransferDataset(Dataset):
    def __init__(self, df,tokenizer):
      self.df = df
      self.tokenizer = tokenizer

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        row=self.df.iloc[index]
        text1=row[0]
        text2=row[1]
        target_style_name = '표준어'

        # Tokenizer를 허깅페이스의 레포지토리에서 가져왔기 때문에
        # 테스트 코드를 참고해서 모델 입력 형식을 만들었다.
        encoder_text = f"{target_style_name} 말투로 변환:{text1}"
        decoder_text = f"{text2}{self.tokenizer.eos_token}"
        model_inputs = self.tokenizer(encoder_text, max_length=64, truncation=True)

        with self.tokenizer.as_target_tokenizer():
          labels = tokenizer(decoder_text, max_length=64, truncation=True)
        model_inputs['labels'] = labels['input_ids']
        del model_inputs['token_type_ids']

        return model_inputs
# 지난번에 만들어뒀던 데이터셋을 불러와서 데이터프레임을 만들어준다
def make_df(data_root):
    df = pd.read_csv(f'{data_root}/data.tsv',sep='\t')
    # 주로 쓰이는 방법 같지는 않지만,, Train Data와 Test Data를 8:2 비율로 나눠준다
    rate=int(len(df)*0.2)
    df_train,df_test = df[rate:],df[:rate]

    print(f'Train DataFrame length : {len(df_train)},Test DataFrame length : {len(df_test)}')
    return df_train,df_test

In [None]:
# 위 함수와 클래스를 종합해서 Dataset들을 반환해준다
def make_dataset(data_root,tokenizer):
    df_train,df_test = make_df(data_root)

    train_dataset = TextStyleTransferDataset(df_train,tokenizer)
    test_dataset = TextStyleTransferDataset(df_test,tokenizer)

    return train_dataset,test_dataset

## Ko-Bart

사투리 -> 표준어 번역 프로세싱

In [None]:
dir = "/kaggle/input/labeled-trainset" #경상도 텍스트 데이터

files = glob.glob(f"{dir}/*.json")
df = pd.DataFrame(index=range(2), columns=["사투리","표준어"])

for file in files:
    _dict = defaultdict(list)

    with open(file) as f:
        json_data = json.load(f)
        # JSON 파일의 utterance 키 값 아래에 리스트형태로 데이터가 존재했다.
        data = json_data.get("utterance")

        for item in data:
            # standard_form(표준어)와 dialect_form(사투리)가 일치하지 않는다면?
            standard = item.get("standard_form")
            dialect = item.get("dialect_form")
            if standard != dialect:
                _dict["표준어"].append(standard)
                _dict["사투리"].append(dialect)

    # 한 JSON 파일에서 데이터 추출이 끝나면 비어있던 데이터프레임에 추가
    append_df = pd.DataFrame(_dict)
    df = (append_df)
    df = pd.concat([df, append_df], ignore_index=True)

df.to_csv("data.tsv",sep="\t")

In [None]:
df.tail()

Unnamed: 0,사투리,표준어
0,,
1,,


## 전처리

In [None]:
import re

step1 = re.compile(r'&\w+&')
step2 = re.compile(r'\(*\)')
step3 = re.compile(r'\{[^}]*\}')
step4 = re.compile(r'\-[^}]*\-')

#예시 문장
text1 = "&name2& 님은 뭐~ {laughing} (()) 힘들 때 -위- 위로가 되어 준 노래 같은 게 있으신가요?"
text2 = "어~ {반가움} &name123&! (()) 반갑고 ㅋㅋ -오- 오랜만이다"

# 무식해보이지만 일단 확인해보자!
for text in [text1,text2]:
    after_step1=step1.sub('',text)
    after_step2=step2.sub('',after_step1)
    after_step3=step3.sub('',after_step2)
    after_step4=step4.sub('',after_step3)

    print(f"Text : {text}")
    print(f"Step 1 : {after_step1}")
    print(f"Step 2 : {after_step2}")
    print(f"Step 3 : {after_step3}")
    print(f"Step 4 : {after_step4}")
    print("\n\n")

Text : &name2& 님은 뭐~ {laughing} (()) 힘들 때 -위- 위로가 되어 준 노래 같은 게 있으신가요?
Step 1 :  님은 뭐~ {laughing} (()) 힘들 때 -위- 위로가 되어 준 노래 같은 게 있으신가요?
Step 2 :  님은 뭐~ {laughing}  힘들 때 -위- 위로가 되어 준 노래 같은 게 있으신가요?
Step 3 :  님은 뭐~   힘들 때 -위- 위로가 되어 준 노래 같은 게 있으신가요?
Step 4 :  님은 뭐~   힘들 때  위로가 되어 준 노래 같은 게 있으신가요?



Text : 어~ {반가움} &name123&! (()) 반갑고 ㅋㅋ -오- 오랜만이다
Step 1 : 어~ {반가움} ! (()) 반갑고 ㅋㅋ -오- 오랜만이다
Step 2 : 어~ {반가움} !  반갑고 ㅋㅋ -오- 오랜만이다
Step 3 : 어~  !  반갑고 ㅋㅋ -오- 오랜만이다
Step 4 : 어~  !  반갑고 ㅋㅋ  오랜만이다





## 데이터 정제

In [None]:
df = pd.read_csv(f'/kaggle/working/data.tsv',sep='\t')

# data.tsv를 불러왔더니 nan 값이 들어있어 일단 삭제해줬다.
df = df.dropna()

# 아까 지정해뒀던 정규표현식들
step1 = re.compile(r'&\w+&')
step2 = re.compile(r'\(*\)')
step3 = re.compile(r'\{[^}]*\}')
step4 = re.compile(r'\-[^}]*\-')


def make_cleaned_text(text):
    after_step1=step1.sub('',text)
    after_step2=step2.sub('',after_step1)
    after_step3=step3.sub('',after_step2)
    after_step4=step4.sub('',after_step3)

    return after_step4

for idx, row in df.iterrows():
    df.at[idx, '사투리'] = make_cleaned_text(row['사투리'])
    df.at[idx, '표준어'] = make_cleaned_text(row['표준어'])

df = df.iloc[:, 1:]

df.to_csv('/kaggle/working/data_fixed.tsv', index=False, sep='\t')

FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/working/data.tsv'

In [None]:
df.tail()

Unnamed: 0,표준어,사투리
20,고 나중에 조금 들어가도 되겠다.,고 나중에 쫌 들어가도 되겠다.
21,웃긴 얘기는 제가 진짜 솔직히 조금 생각이 안 나네요.,웃긴 얘기는 제가 진짜 솔직히 쫌 생각이 안 나네요.
22,어~ 저 어느 순간 내가 이 경쟁에서 살고 있으면서도 이 경쟁 사회에 대해서 너무 ...,어~ 저 어느 순간 내가 이 경쟁에서 살고 있으면서도 이 경쟁 사회에 대해서 너무 ...
23,어~ 그런 거에서 제가 조금 깨달음을 얻었습니다.,어~ 그런 거에서 제가 쫌 깨달음을 얻었습니다.
24,그 속도도 내가 인제 조금 조절할 수 있는 내 삶에 쪼끔 내가,그 속도도 내가 인제 쫌 조절할 수 있는 내 삶에 쪼끔 내가


## 모델 파라미터 설정

In [None]:
from transformers import AutoModelForSeq2SeqLM,AutoTokenizer

# 모델 학습 후에 가중치 저장할 폴더
model_path='/kaggle/working/saved_model'
# 모델 초기 가중치 로드할 곳
model_name = "gogamza/kobart-base-v2"
# 데이터셋 파일
data_root='/kaggle/working'

# 내가 학습한 모델 가중치의 유무에 따라 분기 처리
if os.path.exists(f'{model_path}/pytorch_model.bin'):
    print("Use Customized Model")
    model = AutoModelForSeq2SeqLM.from_pretrained(model_path)
else:
    print("Use Pretrained Model")
    model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)


# Training Arguments

args = {
    'num_train_epochs': 20,
    'per_device_train_batch_size': 32,
    'per_device_eval_batch_size': 32,
    'overwrite_output_dir': True,
    'eval_steps': 5,
    'save_steps': 5,
    'warmup_steps': 5,
    'evaluation_strategy': "steps",
    'prediction_loss_only': True,
    'save_total_limit': 3
}

You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.


Use Pretrained Model


You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.
You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.


In [None]:
row=df.iloc[0]
text1=row[0]
text2=row[1]
print(text1);print(text2)

코로나 땜에 조금 이렇게
코로나 땜에 쫌 이렇게


In [None]:
from torch.utils.data import Dataset

class TextStyleTransferDataset(Dataset):
    def __init__(self, df,tokenizer):
        self.df = df
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        row=self.df.iloc[index]
        text1=row[0] #표준어
        text2=row[1] #사투리
        target_style_name = '표준어'

        # Tokenizer를 허깅페이스의 레포지토리에서 가져왔기 때문에
        # 테스트 코드를 참고해서 모델 입력 형식을 만들었다.
        encoder_text = f"{target_style_name} 말투로 변환:{text1}"
        decoder_text = f"{text2}{self.tokenizer.eos_token}"
        model_inputs = self.tokenizer(encoder_text, max_length=64, truncation=True)

        with self.tokenizer.as_target_tokenizer():
            labels = tokenizer(decoder_text, max_length=64, truncation=True)
        model_inputs['labels'] = labels['input_ids']
        del model_inputs['token_type_ids']

        return model_inputs

# 만들어뒀던 데이터셋을 불러와서 데이터프레임을 만들어준다
def make_df(data_root):
    df = pd.read_csv(f'{data_root}/data.tsv',sep='\t')
    # 주로 쓰이는 방법 같지는 않지만,, Train Data와 Test Data를 8:2 비율로 나눠준다
    rate=int(len(df)*0.1)
    df_train,df_test = df[rate:],df[:rate]

    print(f'Train DataFrame length : {len(df_train)},Test DataFrame length : {len(df_test)}')
    return df_train,df_test

def make_dataset(df):
    df_train,df_test = df

    train_dataset = TextStyleTransferDataset(df_train,tokenizer)
    test_dataset = TextStyleTransferDataset(df_test,tokenizer)

    return train_dataset,test_dataset

## 학습

In [None]:
from transformers import Seq2SeqTrainingArguments,Seq2SeqTrainer,\
                         DataCollatorForSeq2Seq
warnings.filterwarnings("ignore")


df = make_df(data_root)
train_dataset,test_dataset=make_dataset(df)

data_collator = DataCollatorForSeq2Seq(
    tokenizer=tokenizer, model=model
)


training_args = Seq2SeqTrainingArguments(
    **args,
    output_dir=model_path,
    )

trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
)


Train DataFrame length : 23,Test DataFrame length : 2


In [None]:
print(trainer)

<transformers.trainer_seq2seq.Seq2SeqTrainer object at 0x7c07f0623a30>


In [None]:
# 모델 학습 진행
try:
    trainer.train()
except Exception as e:
    print(f"Failed to train model caused by {e}")


Step,Training Loss,Validation Loss
5,No log,6.862717
10,No log,6.115303
15,No log,6.396122
20,No log,6.634148


Non-default generation parameters: {'forced_eos_token_id': 1}
Non-default generation parameters: {'forced_eos_token_id': 1}
Non-default generation parameters: {'forced_eos_token_id': 1}
Non-default generation parameters: {'forced_eos_token_id': 1}


In [None]:
try:
    trainer.save_model(model_path)
    print("Model saved successfully.")
except Exception as e:
    print(f"Failed to save model caused by {e}")

Non-default generation parameters: {'forced_eos_token_id': 1}


Model saved successfully.


## Test (Validation)

AI hub에 있는 텍스트 데이터에는 테스트용(라벨 없는거)이 따로 없긴 하다.
그래서 이전에 그냥 trainset에서 8:2로 split시켜서 validation용으로 퉁치긴 했지만
일단 나중에 오디오 -> 텍스트(STT)실행하고 나서 어차피 데이터는 바꿔서 껴야되기 때문에 그때 validation도 상황에 맞게 데이터 변경해야함


In [None]:
from transformers import pipeline

nlg_pipeline=pipeline('text2text-generation',model=model_path,tokenizer=model_name)

You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.
You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.
You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.
You passed along `num_labels=3` with an incompatible id to label map: {'0': 'NEGATIVE', '1': 'POSITIVE'}. The number of labels wil be overwritten to 2.


In [None]:
def generate_text(pipe, text, num_return_sequences, max_length):
    target_style_name = "표준어"
    text = f"{target_style_name} 말투로 변환:{text}"
    out = pipe(text, num_return_sequences=num_return_sequences, max_length=max_length)
    #num_return_sequences의 값에 따라서 반환되는 텍스트의 개수가 바뀐다. 만약 3으로 지정했다면 길이가 3인 리스트에 담겨서 값이 반환될 것임!
    return [x['generated_text'] for x in out]


print("Write 'q' to exit")
while True:
    src_text=input("Dialect to translate(입력받을 사투리) : ")
    if src_text == 'q':
        break
    target_text_ko=generate_text(nlg_pipeline,src_text,num_return_sequences=1,max_length=64)[0]
    print(f"Translated Standard (표준어로 출력): {target_text_ko}")

Write 'q' to exit


Dialect to translate(입력받을 사투리) :  밥 많이 뭇나


Translated Standard (표준어로 출력): 조금 더 게 조금 더 기다려


Dialect to translate(입력받을 사투리) :  머라카노


Translated Standard (표준어로 출력): 조금 더 게 뭐 조금


Dialect to translate(입력받을 사투리) :  코로나 땜에 쫌 그렇긴했었지


Translated Standard (표준어로 출력): 조금 더 게요.


Dialect to translate(입력받을 사투리) :  q
