In [1]:
import transformers
import re
import random
import numpy as np


class BERT_Augmentation():
    def __init__(self):
        self.model_name = 'monologg/koelectra-base-v3-generator'
        self.model = transformers.AutoModelForMaskedLM.from_pretrained(self.model_name)
        self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name)
        self.unmasker = transformers.pipeline("fill-mask", model=self.model, tokenizer=self.tokenizer)
        random.seed(42)
        
        #무작위 데이터 대체
    def random_masking_replacement(self, sentence, ratio=0.15):
        """Masking random eojeol of the sentence, and recover them using PLM.

        Args:
            sentence (str): Source sentence
            ratio (int): Ratio of masking

        Returns:
          str: Recovered sentence
        """
        
        span = int(round(len(sentence.split()) * ratio))
        
        # 품질 유지를 위해, 문장의 어절 수가 4 이하라면 원문장을 그대로 리턴합니다.
        if len(sentence.split()) <= 4:
            return sentence

        mask = self.tokenizer.mask_token
        unmasker = self.unmasker

        unmask_sentence = sentence
        # 처음과 끝 부분을 [MASK]로 치환 후 추론할 때의 품질이 좋지 않음.
        random_idx = random.randint(1, len(unmask_sentence.split()) - span)
        
        unmask_sentence = unmask_sentence.split()
        # del unmask_sentence[random_idx:random_idx+span]
        cache = []
        for _ in range(span):
            # 처음과 끝 부분을 [MASK]로 치환 후 추론할 때의 품질이 좋지 않음.
            while cache and random_idx in cache:
                random_idx = random.randint(1, len(unmask_sentence) - 2)
            cache.append(random_idx)
            unmask_sentence[random_idx] = mask
            unmask_sentence = unmasker(" ".join(unmask_sentence))[0]['sequence']
            unmask_sentence = unmask_sentence.split()
        unmask_sentence = " ".join(unmask_sentence)
        unmask_sentence = unmask_sentence.replace("  ", " ")

        return unmask_sentence.strip()

    #데이터 무작위 추가
    def random_masking_insertion(self, sentence, ratio=0.15):
        
        span = int(round(len(sentence.split()) * ratio))
        mask = self.tokenizer.mask_token
        unmasker = self.unmasker
        
        # Recover
        unmask_sentence = sentence
        
        for _ in range(span):
            unmask_sentence = unmask_sentence.split()
            random_idx = random.randint(0, len(unmask_sentence)-1)
            unmask_sentence.insert(random_idx, mask)
            unmask_sentence = unmasker(" ".join(unmask_sentence))[0]['sequence']

        unmask_sentence = unmask_sentence.replace("  ", " ")

        return unmask_sentence.strip()

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
import pandas as pd
BERT_aug = BERT_Augmentation()
random_masking_replacement = BERT_aug.random_masking_replacement
random_masking_insertion = BERT_aug.random_masking_insertion

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to see activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Hardware accelerator e.g. GPU is available in the environment, but no `device` argument is passed to the `Pipeline` object. Model will be on CPU.


In [14]:
sentence = "언제부터 몸살 기운이 있었어요??"
ratio = 0.15
print('random_insertion_replacement:', random_masking_insertion(sentence, ratio))

random_insertion_replacement: 언제부터 이런 몸살 기운이 있었어요??


In [15]:
print('random_masking_replacement:', random_masking_replacement(sentence, ratio))
print('random_insertion_replacement:', random_masking_insertion(sentence, ratio))

random_masking_replacement: 언제부터 몸살 기운이 있었어요??
random_insertion_replacement: 그런데 언제부터 몸살 기운이 있었어요??


In [20]:
def augment_dataframe(df, text_columns, augmentation_method, ratio=0.15):
    augmenter = BERT_Augmentation()

    augmented_rows = []
    for index, row in df.iterrows():
        new_row = row.copy()
        for column in text_columns:
            text = row[column]
            if augmentation_method == 'replacement':
                new_row[column] = augmenter.random_masking_replacement(text, ratio)
            elif augmentation_method == 'insertion':
                new_row[column] = augmenter.random_masking_insertion(text, ratio)
            else:
                raise ValueError("Unsupported augmentation method")
        augmented_rows.append(new_row)

    augmented_df = pd.DataFrame(augmented_rows)
    combined_df = pd.concat([df, augmented_df], ignore_index=True)
    return combined_df

# CSV 파일 경로
input_file = 'custom_hospital_situation1.csv'
output_file = 'augmented_output.csv'

# CSV 파일을 읽어 데이터프레임으로 변환
try:
    df = pd.read_csv(input_file, encoding='utf-8')
except UnicodeDecodeError:
    df = pd.read_csv(input_file, encoding='euc-kr')

# 'inputs'와 'response' 열에 대해 증강 수행
text_columns = ['inputs', 'response']
augmentation_method = 'replacement'  # 'replacement' 또는 'insertion'
ratio = 0.15  # 증강 비율

augmented_df = augment_dataframe(df, text_columns, augmentation_method, ratio)
augmented_df.to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"Augmented data saved to {output_file}")


Hardware accelerator e.g. GPU is available in the environment, but no `device` argument is passed to the `Pipeline` object. Model will be on CPU.


Augmented data saved to augmented_output.csv


In [None]:
custom_hospital_situation1.csv