In [1]:
import pandas as pd
import warnings

# 모든 경고 메시지 무시
warnings.filterwarnings("ignore")

In [2]:
# 라벨 분포 확인용
def print_label_count(df):
    print(len(df))
    df["label_int"] = pd.cut(
        df["label"],
        bins=[x for x in range(6)],
        labels=[x for x in range(5)],
        right=False,
    )
    print(df.groupby("label_int")["id"].count())

In [3]:
from hanspell import passportKey, spell_checker
# passportKey 설정
passportKey.init()

In [5]:
sent = "맞춤법 틀리면 외 않되? 쓰고싶은대로쓰면돼지 되지야 "
spelled_sent = spell_checker.check(sent)

hanspell_sent = spelled_sent.checked
print(hanspell_sent)

맞춤법 틀리면 왜 안돼? 쓰고 싶은 대로 쓰면 되지 돼지야 


In [3]:
import pykospacing


# 띄어쓰기 교정
def spacing_text(df):
    spacing = pykospacing.Spacing()
    df["sentence_1"] = df["sentence_1"].map(spacing)
    df["sentence_2"] = df["sentence_2"].map(spacing)
    return df


# Speicial 토큰 처리
def replace_person_token(df):
    df["sentence_1"] = df["sentence_1"].str.replace("<PERSON>", "궯궯궯")
    df["sentence_2"] = df["sentence_2"].str.replace("<PERSON>", "궯궯궯")
    return df 


def recover_person_token(df):
    df["sentence_1"] = df["sentence_1"].str.replace("궯궯궯", "<PERSON>")
    df["sentence_2"] = df["sentence_2"].str.replace("궯궯궯", "<PERSON>")
    return df

In [4]:
# swap data
def agument_with_swap(df):
    df_swaped = df.rename(
        columns={"sentence_1": "sentence_2", "sentence_2": "sentence_1"}
    )
    return pd.concat([df, df_swaped])

In [5]:
# 데이터 증강
import random
import pickle
import re

wordnet = {}
with open("./wordnet.pickle", "rb") as f:
    wordnet = pickle.load(f)


# 한글만 남기고 나머지는 삭제
def get_only_hangul(line):
    parseText = re.compile("/ ^[ㄱ-ㅎㅏ-ㅣ가-힣]*$/").sub("", line)

    return parseText


########################################################################
# Synonym replacement
# Replace n words in the sentence with synonyms from wordnet
########################################################################
def synonym_replacement(words, n):
    new_words = words.copy()
    random_word_list = list(set([word for word in words]))
    random.shuffle(random_word_list)
    num_replaced = 0
    for random_word in random_word_list:
        synonyms = get_synonyms(random_word)
        if len(synonyms) >= 1:
            synonym = random.choice(list(synonyms))
            new_words = [synonym if word == random_word else word for word in new_words]
            num_replaced += 1
        if num_replaced >= n:
            break

    if len(new_words) != 0:
        sentence = " ".join(new_words)
        new_words = sentence.split(" ")

    else:
        new_words = ""

    return new_words


def get_synonyms(word):
    synomyms = []

    try:
        for syn in wordnet[word]:
            for s in syn:
                synomyms.append(s)
    except:
        pass

    return synomyms


########################################################################
# Random deletion
# Randomly delete words from the sentence with probability p
########################################################################
def random_deletion(words, p):
    if len(words) == 1:
        return words

    new_words = []
    for word in words:
        r = random.uniform(0, 1)
        if r > p:
            new_words.append(word)

    if len(new_words) == 0:
        rand_int = random.randint(0, len(words) - 1)
        return [words[rand_int]]

    return new_words


########################################################################
# Random swap
# Randomly swap two words in the sentence n times
########################################################################
def random_swap(words, n):
    new_words = words.copy()
    for _ in range(n):
        new_words = swap_word(new_words)

    return new_words


def swap_word(new_words):
    random_idx_1 = random.randint(0, len(new_words) - 1)
    random_idx_2 = random_idx_1
    counter = 0

    while random_idx_2 == random_idx_1:
        random_idx_2 = random.randint(0, len(new_words) - 1)
        counter += 1
        if counter > 3:
            return new_words

    new_words[random_idx_1], new_words[random_idx_2] = (
        new_words[random_idx_2],
        new_words[random_idx_1],
    )
    return new_words


########################################################################
# Random insertion
# Randomly insert n words into the sentence
########################################################################
def random_insertion(words, n):
    new_words = words.copy()
    for _ in range(n):
        add_word(new_words)

    return new_words


def add_word(new_words):
    synonyms = []
    counter = 0
    while len(synonyms) < 1:
        if len(new_words) >= 1:
            random_word = new_words[random.randint(0, len(new_words) - 1)]
            synonyms = get_synonyms(random_word)
            counter += 1
        else:
            random_word = ""

        if counter >= 10:
            return

    random_synonym = synonyms[0]
    random_idx = random.randint(0, len(new_words) - 1)
    new_words.insert(random_idx, random_synonym)


def EDA(sentence, alpha_sr=0.1, alpha_ri=0.1, alpha_rs=0.1, num_aug=9):
    sentence = get_only_hangul(sentence)
    words = sentence.split(" ")
    words = [word for word in words if word != ""]
    num_words = len(words)

    augmented_sentences = []
    num_new_per_technique = int(num_aug / 3) + 1  # 3가지 증강 기법에 맞춰서 수정

    n_sr = max(1, int(alpha_sr * num_words))
    n_ri = max(1, int(alpha_ri * num_words))
    n_rs = max(1, int(alpha_rs * num_words))

    # sr: Synonym replacement
    for _ in range(num_new_per_technique):
        a_words = synonym_replacement(words, n_sr)
        augmented_sentences.append(" ".join(a_words))

    # ri: Random insertion
    for _ in range(num_new_per_technique):
        a_words = random_insertion(words, n_ri)
        augmented_sentences.append(" ".join(a_words))

    # rs: Random swap
    for _ in range(num_new_per_technique):
        a_words = random_swap(words, n_rs)
        augmented_sentences.append(" ".join(a_words))

    # Hangul cleanup and shuffle
    augmented_sentences = [
        get_only_hangul(sentence) for sentence in augmented_sentences
    ]
    random.shuffle(augmented_sentences)

    # Limit the number of augmentations to num_aug
    if num_aug >= 1:
        augmented_sentences = augmented_sentences[:num_aug]
    else:
        keep_prob = num_aug / len(augmented_sentences)
        augmented_sentences = [
            s for s in augmented_sentences if random.uniform(0, 1) < keep_prob
        ]

    # Original sentence 포함
    augmented_sentences.append(sentence)

    return augmented_sentences

In [6]:
# EDA 적용 함수
def apply_eda(df, alpha_sr=0.1, alpha_ri=0.1, alpha_rs=0.1, num_aug=2):
    def conditional_EDA(row, column_name):
        if row["label"] >= 1:  
            return EDA(
                row[column_name], alpha_sr, alpha_ri, alpha_rs, num_aug)
        else:
            return [row[column_name]]

    df["sentence_1"] = df.apply(lambda row: conditional_EDA(row, "sentence_1"), axis=1)
    df = df.explode("sentence_1").reset_index(drop=True)

    df["sentence_2"] = df.apply(lambda row: conditional_EDA(row, "sentence_2"), axis=1)
    df = df.explode("sentence_2").reset_index(drop=True)
    return df

In [7]:
def make(df, df_name):
    df = spacing_text(df)
    df = replace_person_token(df)
    df = apply_eda(df)
    df = recover_person_token(df)
    df = agument_with_swap(df)
    df = df.drop_duplicates()
    df.to_csv(f"./{df_name}.csv")
    return df

In [17]:
# 테스트용
train_02 = pd.read_csv("../data/processed/train_02.csv", encoding="UTF-8")
train_02.dtypes

id               object
source           object
sentence_1       object
sentence_2       object
label           float64
binary-label    float64
dtype: object

In [53]:
train = pd.read_csv("../data/raw/train.csv", encoding="UTF-8")
dev = pd.read_csv("../data/raw/dev.csv", encoding="UTF-8")

train = make(train, "train_v2")
dev = make(dev, "dev_v2")

In [54]:
print_label_count(train)
print_label_count(dev)

68286
label_int
0     7422
1    15066
2    12362
3    18678
4    13906
Name: id, dtype: int64
5094
label_int
0     176
1    1122
2    1222
3    1212
4    1163
Name: id, dtype: int64
