In [None]:
#!pip install -q soynlp

In [None]:
import os
import pandas as pd
import tensorflow as tf
from transformers import BertTokenizer, TFBertForSequenceClassification
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import plot_model
import json
import numpy as np
import re
import urllib.request
from tqdm import tqdm
import tensorflow as tf
from transformers import BertTokenizer, TFBertModel
from soynlp.normalizer import *
import matplotlib.pyplot as plt
import seaborn as sns
import collections
from sklearn.feature_extraction.text import CountVectorizer
from wordcloud import WordCloud, STOPWORDS
import random

In [None]:
def convert_examples_to_features(examples, labels, max_seq_len, tokenizer):
    
    input_ids, attention_masks, token_type_ids, data_labels = [], [], [], []
    
    for example, label in tqdm(zip(examples, labels), total=len(examples)):
        # input_id는 워드 임베딩을 위한 문장의 정수 인코딩
        input_id = tokenizer.encode(example, 
                                    max_length=max_seq_len, 
                                    pad_to_max_length=True,
                                   )
        
        # attention_mask는 실제 단어가 위치하면 1, 패딩의 위치에는 0인 시퀀스
        padding_count = input_id.count(tokenizer.pad_token_id)
        attention_mask = [1] * (max_seq_len - padding_count) + [0] * padding_count
        
        # token_type_id은 세그먼트 인코딩
        token_type_id = [0] * max_seq_len
        
        assert len(input_id) == max_seq_len, "Error with input length {} vs {}".format(len(input_id), max_seq_len)
        assert len(attention_mask) == max_seq_len, "Error with attention mask length {} vs {}".format(len(attention_mask), max_seq_len)
        assert len(token_type_id) == max_seq_len, "Error with token type length {} vs {}".format(len(token_type_id), max_seq_len)
        
        input_ids.append(input_id)
        attention_masks.append(attention_mask)
        token_type_ids.append(token_type_id)
        data_labels.append(label)
    
    input_ids = np.array(input_ids, dtype=int)
    attention_masks = np.array(attention_masks, dtype=int)
    token_type_ids = np.array(token_type_ids, dtype=int)
    
    data_labels = np.asarray(data_labels, dtype=np.int32)
    
    return (input_ids, attention_masks, token_type_ids), data_labels

In [None]:
class TFBertForMultiClassClassification(tf.keras.Model):
    def __init__(self, model_name, num_classes, dropout_rate=0.3):
        super(TFBertForMultiClassClassification, self).__init__()
        self.bert = TFBertModel.from_pretrained(model_name, from_pt=True)
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        self.classifier = tf.keras.layers.Dense(num_classes, 
                                                kernel_initializer=tf.keras.initializers.TruncatedNormal(0.02),
                                                activation='softmax', 
                                                name='classifier')
        
    def call(self, inputs, training=False):
        input_ids, attention_mask, token_type_ids = inputs
        outputs = self.bert(input_ids=input_ids, 
                            attention_mask=attention_mask,
                            token_type_ids=token_type_ids)
        cls_token = outputs[1]
        if training:
            cls_token = self.dropout(cls_token, training=training)
        prediction = self.classifier(cls_token)
        return prediction


In [None]:
from transformers import TFRobertaModel, RobertaTokenizer
from transformers import AutoModel, AutoTokenizer

class TFRobertaForMultiClassClassification(tf.keras.Model):
    def __init__(self, model_name, num_classes, dropout_rate=0.3):
        super(TFRobertaForMultiClassClassification, self).__init__()
        self.roberta = AutoTokenizer.from_pretrained(model_name, from_pt=True)
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        self.classifier = tf.keras.layers.Dense(num_classes, 
                                                kernel_initializer=tf.keras.initializers.TruncatedNormal(0.02),
                                                activation='softmax', 
                                                name='classifier')
        
    def call(self, inputs, training=False):
        input_ids, attention_mask = inputs  # RoBERTa는 token_type_ids가 필요하지 않습니다.
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        cls_token = outputs[1]
        if training:
            cls_token = self.dropout(cls_token, training=training)
        prediction = self.classifier(cls_token)
        return prediction

In [None]:
def preprocess_sentence(sentence):
    emoticon_normalize(sentence)  # 이모티콘을 정규화합니다.
    repeat_normalize(sentence)    # 반복되는 문자를 정규화합니다.
    sentence = re.sub(r'([^a-zA-Zㄱ-ㅎ가-힣])', " ", sentence)  # 영문, 한글 및 자음/모음을 제외한 문자를 공백으로 치환합니다.
    sentence = re.sub(r'[" "]+', " ", sentence)  # 연속된 공백을 하나의 공백으로 치환합니다.
    sentence = sentence.strip()  # 문장의 양 끝에 있는 공백을 제거합니다.
    return sentence  # 전처리된 문장을 반환합니다.

In [None]:
def preprocessing(df):
    # 중복 제거 
    df = df.drop_duplicates(subset=['conversation']) 
    
    # 문장 정규화
    df['conversation'] = df['conversation'].apply(preprocess_sentence)
    
    # 결측치 제거
    df = df.dropna(subset=['conversation'])
    
    return df

In [None]:
def class_text_to_num(df):
    # 레이블 값을 숫자로 매핑
    label_mapping = {
        '협박 대화': 0,
        '갈취 대화': 1,
        '직장 내 괴롭힘 대화': 2,
        '기타 괴롭힘 대화': 3
    }

    df['class'] = df['class'].map(label_mapping)
    return df 

In [None]:
def aug(x,y,classs):
    def random_deletion(words, p=0.1):
        if len(words) == 1:
            return words

        new_words = []
        for word in words:
            r = random.uniform(0, 1)
            if r > p:
                new_words.append(word)

        if len(new_words) == 0:
            rand_int = random.randint(0, len(words)-1)
            return [words[rand_int]]

        return ''.join(new_words)

    def swap_word(new_words):
        n = 5
        for _ in range(n):
            random_idx_1 = random.randint(0, len(new_words)-1)
            random_idx_2 = random_idx_1
            counter = 0

            while random_idx_2 == random_idx_1:
                random_idx_2 = random.randint(0, len(new_words)-1)
                counter += 1
                if counter > 3:
                    return new_words

            new_words[random_idx_1], new_words[random_idx_2] = new_words[random_idx_2], new_words[random_idx_1]
        return ' '.join(new_words)

    def random_swap(words):
        new_words = list()
        for word in words:
            new_words.append(swap_word(word.split()))

        return new_words
    df = pd.concat([x,y],axis=1).reset_index(drop=True)
    df_rd = df[df['class']==classs].copy()
    df_rd['conversation'] = df_rd['conversation'].apply(random_deletion)
    df_rs = df[df['class']==classs].copy()
    df_rs['conversation'] = random_swap(df_rs['conversation'].values)
    
    df_concated = pd.concat([df, df_rs])
    return df_concated.loc[:,['conversation']] , df_concated['class']

# Run

In [None]:

# data load 
train = pd.read_csv(train_path,index_col=0)

# preprocessing
train = preprocessing(train)


# 라벨 숫자 변환
train = class_text_to_num(train)

# 기타 에서 돈 단어 제거
train['conversation'] = train[['class','conversation']].apply(lambda x : x[1].replace('돈','') if x[0] == 3 else x[1], axis=1)
# 협박 에서 돈 단어 제거 
train['conversation'] = train[['class','conversation']].apply(lambda x : x[1].replace('돈','') if x[0] == 0 else x[1], axis=1)


# 모델 load 
if model_name == 'klue/bert-base':
    tokenizer = BertTokenizer.from_pretrained(model_name)
    model = TFBertForMultiClassClassification(model_name, class_num)
elif model_name == 'klue/roberta-small':
    tokenizer = AutoModel.from_pretrained(model_name)    
    model = TFBertForMultiClassClassification(model_name, class_num)

train_x, val_x, train_y, val_y = train_test_split(
    train.drop('class',axis=1), train['class'], test_size=0.2, random_state=42 , stratify=train['class']
)
# Aug
train_x, train_y = aug(train_x, train_y, 0)
train_x, train_y = aug(train_x, train_y, 1)
train_x, train_y = aug(train_x, train_y, 3)

# 토크나이저
train_X, train_Y = convert_examples_to_features(
    train_x['conversation'], train_y, 
    max_seq_len=max_len, tokenizer=tokenizer
)
val_X, val_Y = convert_examples_to_features(
    val_x['conversation'], val_y, 
    max_seq_len=max_len, tokenizer=tokenizer
)

# 옵티마이저, loss
optimizer = tf.keras.optimizers.Adam(learning_rate= lr)
loss = tf.keras.losses.SparseCategoricalCrossentropy()

# model compile
model.compile(optimizer=optimizer, loss=loss, metrics = ['accuracy'])

from tensorflow.keras.callbacks import LearningRateScheduler

# Define the learning rate schedule function
def lr_schedule(epoch):
    if epoch == 0:
        return lr
    elif epoch == 1 :
        return 0.00005 # 5e-5 , 0.00005
    elif epoch == 2 :
        return 0.000001
    elif epoch == 3 :
        return 0.000005
    elif epoch == 4 :
        return 0.0000001
    elif epoch == 5 :
        return 0.0000005
    elif epoch == 6 :
        return 0.00000001
    else:
        return 0.00000001

# Create the LearningRateScheduler callback
lr_scheduler = LearningRateScheduler(lr_schedule)
    
# Train
history = model.fit(
    train_X, train_Y, 
    validation_data=(val_X,val_Y),
    epochs=epochs, 
    batch_size=batch_size, 
    callbacks=[lr_scheduler],
)

- 수정사항 :
    1. stratify=train['class'] 으로 변경

# 하이퍼 파라미터

In [None]:
train_path = './train.csv'
val_path = './val.csv'
model_name = 'klue/bert-base' # klue/roberta-large klue/bert-base
class_num = 4 
max_len = 200
lr = 5e-5
batch_size = 2


epochs = 10

In [None]:
import gc

# 메모리 해제
gc.collect()

# 결과 분석

In [None]:
# 데이터 기타 제외 가해자만 남기기 
# 직장 제외 데이터 증강(랜덤 스위치), 데이터 삭제
# 기타 에서 '돈' 키워드 제거 - 상승
# 0번(협박)만 증강 2번
# 제출 : 0.8275

from sklearn.metrics import classification_report, f1_score, confusion_matrix
from sklearn.metrics import accuracy_score

# 실제 예측값 생성
real_predictions = model.predict(val_X)

# 예측값을 레이블로 변환
real_predicted_labels = np.argmax(real_predictions, axis=1)

# 정확도 계산
real_accuracy = accuracy_score(val_Y, real_predicted_labels)
print(f"Real Accuracy: {real_accuracy:.4f}")

# 분류 보고서 생성
real_report = classification_report(val_Y, real_predicted_labels, target_names=[f"Class {i}" for i in range(4)])
print(real_report)

# F1 스코어 계산
real_f1 = f1_score(val_Y, real_predicted_labels, average='weighted')
print(f"\nWeighted F1 Score (based on real predictions): {real_f1:.4f}")

In [None]:
# 데이터 기타 제외 가해자만 남기기 
# 직장 제외 데이터 증강(랜덤 스위치), 데이터 삭제
# 기타 에서 '돈' 키워드 제거 - 상승
# 제출 : 0.8575

from sklearn.metrics import classification_report, f1_score, confusion_matrix
from sklearn.metrics import accuracy_score

# 실제 예측값 생성
real_predictions = model.predict(val_X)

# 예측값을 레이블로 변환
real_predicted_labels = np.argmax(real_predictions, axis=1)

# 정확도 계산
real_accuracy = accuracy_score(val_Y, real_predicted_labels)
print(f"Real Accuracy: {real_accuracy:.4f}")

# 분류 보고서 생성
real_report = classification_report(val_Y, real_predicted_labels, target_names=[f"Class {i}" for i in range(4)])
print(real_report)

# F1 스코어 계산
real_f1 = f1_score(val_Y, real_predicted_labels, average='weighted')
print(f"\nWeighted F1 Score (based on real predictions): {real_f1:.4f}")

In [None]:
# 데이터 기타 제외 가해자만 남기기 
# 직장 제외 데이터 증강(랜덤 스위치), 데이터 삭제
# 제출 : 0.845

from sklearn.metrics import classification_report, f1_score, confusion_matrix
from sklearn.metrics import accuracy_score

# 실제 예측값 생성
real_predictions = model.predict(val_X)

# 예측값을 레이블로 변환
real_predicted_labels = np.argmax(real_predictions, axis=1)

# 정확도 계산
real_accuracy = accuracy_score(val_Y, real_predicted_labels)
print(f"Real Accuracy: {real_accuracy:.4f}")

# 분류 보고서 생성
real_report = classification_report(val_Y, real_predicted_labels, target_names=[f"Class {i}" for i in range(4)])
print(real_report)

# F1 스코어 계산
real_f1 = f1_score(val_Y, real_predicted_labels, average='weighted')
print(f"\nWeighted F1 Score (based on real predictions): {real_f1:.4f}")

In [None]:
# 데이터 기타 제외 가해자만 남기기 
# 직장 제외 데이터 증강(랜덤 스위치)
# 제출 : 0.76

from sklearn.metrics import classification_report, f1_score, confusion_matrix
from sklearn.metrics import accuracy_score

# 실제 예측값 생성
real_predictions = model.predict(val_X)

# 예측값을 레이블로 변환
real_predicted_labels = np.argmax(real_predictions, axis=1)

# 정확도 계산
real_accuracy = accuracy_score(val_Y, real_predicted_labels)
print(f"Real Accuracy: {real_accuracy:.4f}")

# 분류 보고서 생성
real_report = classification_report(val_Y, real_predicted_labels, target_names=[f"Class {i}" for i in range(4)])
print(real_report)

# F1 스코어 계산
real_f1 = f1_score(val_Y, real_predicted_labels, average='weighted')
print(f"\nWeighted F1 Score (based on real predictions): {real_f1:.4f}")

In [None]:
# 데이터 기타 제외 가해자만 남기기
# 제출 : 0.805

from sklearn.metrics import classification_report, f1_score, confusion_matrix
from sklearn.metrics import accuracy_score

# 실제 예측값 생성
real_predictions = model.predict(val_X)

# 예측값을 레이블로 변환
real_predicted_labels = np.argmax(real_predictions, axis=1)

# 정확도 계산
real_accuracy = accuracy_score(val_Y, real_predicted_labels)
print(f"Real Accuracy: {real_accuracy:.4f}")

# 분류 보고서 생성
real_report = classification_report(val_Y, real_predicted_labels, target_names=[f"Class {i}" for i in range(4)])
print(real_report)

# F1 스코어 계산
real_f1 = f1_score(val_Y, real_predicted_labels, average='weighted')
print(f"\nWeighted F1 Score (based on real predictions): {real_f1:.4f}")

In [None]:
Real Accuracy: 0.8987
              precision    recall  f1-score   support

     협박 0       0.89      0.88      0.88       179
     갈취 1       0.88      0.89      0.89       195
     직장 2       1.00      0.87      0.93       194
     기타 3       0.85      0.96      0.90       202

    accuracy                           0.90       770
   macro avg       0.90      0.90      0.90       770
weighted avg       0.90      0.90      0.90       770

In [None]:
# 데이터 전체 가해자만 남기기

from sklearn.metrics import classification_report, f1_score, confusion_matrix
from sklearn.metrics import accuracy_score

# 실제 예측값 생성
real_predictions = model.predict(val_X)

# 예측값을 레이블로 변환
real_predicted_labels = np.argmax(real_predictions, axis=1)

# 정확도 계산
real_accuracy = accuracy_score(val_Y, real_predicted_labels)
print(f"Real Accuracy: {real_accuracy:.4f}")

# 분류 보고서 생성
real_report = classification_report(val_Y, real_predicted_labels, target_names=[f"Class {i}" for i in range(4)])
print(real_report)

# F1 스코어 계산
real_f1 = f1_score(val_Y, real_predicted_labels, average='weighted')
print(f"\nWeighted F1 Score (based on real predictions): {real_f1:.4f}")

Real Accuracy: 0.8651
              precision    recall  f1-score   support

     협박(0)       0.92      0.82      0.87       178
     갈취(1)       0.77      0.93      0.84       195
     직장(2)       0.91      0.95      0.93       194
     기타(3)       0.89      0.76      0.82       204

    accuracy                           0.87       771
   macro avg       0.87      0.87      0.87       771
weighted avg       0.87      0.87      0.86       771


- 협박(Class: 협박)


상황: 협박 클래스의 경우 정밀도와 재현율이 비교적 높은 편입니다. 그러나 재현율이 약간 낮은 편이며, 이는 실제 협박 케이스 중에서 일부를 놓치고 있다는 의미입니다.
해결 방법: 재현율을 높이기 위해 실제 협박 사례가 누락되지 않도록 `데이터를 추가`하는 것이 좋을 수 있습니다. 협박 클래스에 대한 추가 데이터를 확보하거나 수집하여 모델이 협박을 더 잘 인식하고 예측하도록 돕는 것이 가능합니다.

- 갈취(Class: 갈취)

상황: 갈취 클래스의 경우 재현율이 높은 편으로, 대부분의 갈취 사례를 모델이 예측하는 것으로 보입니다. 그러나 정밀도가 상대적으로 낮아, 모델이 갈취로 잘못 예측하는 경우가 있을 수 있습니다.
해결 방법: 정밀도를 향상시키기 위해 모델이 갈취로 잘못 예측하는 경우를 줄일 필요가 있습니다. 이를 위해 추가 데이터 수집 대신 모델의 하이퍼파라미터를 조정하거나 `데이터 전처리`를 통해 모델이 갈취 클래스를 더 잘 구분하도록 돕는 것이 중요할 수 있습니다.

- 직장(Class: 직장)

상황: 직장 클래스의 경우 정밀도와 재현율이 높은 편으로, 모델이 직장 사례를 예측하는 데 잘 성공하고 있습니다.
해결 방법: 현재 상태에서는 특별히 추가적인 데이터나 조치가 필요하지 않아 보입니다. 모델이 직장 클래스를 잘 예측하고 있으므로 유사한 성능을 유지하는 것이 중요합니다.

- 기타(Class: 기타)

상황: 기타 클래스의 경우 정밀도와 재현율이 상대적으로 높은 편입니다. 그러나 재현율이 낮은 편으로, 실제 기타 사례 중에서 일부를 놓치고 있다는 의미입니다.
해결 방법: 재현율을 높이기 위해 기타 클래스의 실제 케이스가 누락되지 않도록 데이터를 추가하는 것이 도움이 될 수 있습니다. `기타 클래스에 대한 더 많은 다양한 예시를 모델이 학습하게 함으로써 모델의 일반화 능력을 향상`시킬 수 있습니다.


# submission

In [None]:
def convert_examples_to_features_test(examples, max_seq_len, tokenizer):
    
    input_ids, attention_masks, token_type_ids = [], [], []
    
    for example in tqdm(examples, total=len(examples)):
        # input_id는 워드 임베딩을 위한 문장의 정수 인코딩
        input_id = tokenizer.encode(example, max_length=max_seq_len, 
                                    pad_to_max_length=True)
        
        # attention_mask는 실제 단어가 위치하면 1, 패딩의 위치에는 0인 시퀀스
        padding_count = input_id.count(tokenizer.pad_token_id)
        attention_mask = [1] * (max_seq_len - padding_count) + [0] * padding_count
        
        # token_type_id은 세그먼트 인코딩
        token_type_id = [0] * max_seq_len
        
        assert len(input_id) == max_seq_len, "Error with input length {} vs {}".format(len(input_id), max_seq_len)
        assert len(attention_mask) == max_seq_len, "Error with attention mask length {} vs {}".format(len(attention_mask), max_seq_len)
        assert len(token_type_id) == max_seq_len, "Error with token type length {} vs {}".format(len(token_type_id), max_seq_len)
        
        input_ids.append(input_id)
        attention_masks.append(attention_mask)
        token_type_ids.append(token_type_id)
    
    input_ids = np.array(input_ids, dtype=int)
    attention_masks = np.array(attention_masks, dtype=int)
    token_type_ids = np.array(token_type_ids, dtype=int)
    
    return (input_ids, attention_masks, token_type_ids)


In [None]:
def test_pipeline(model, tokenizer, max_len):
    
    test_data_path = "./data/test.json"
    with open(test_data_path, "r", encoding="utf-8") as json_file:
        test_data = json.load(json_file)
        
    test = pd.DataFrame({'file_name':list(test_data.keys()), 'conversation': list(test_data.values())})
    test['conversation'] = test['conversation'].apply(lambda x : x['text'])
    
    # preprocessing
    test['conversation'] = test['conversation'].apply(preprocess_sentence)
    
    # 토크나이저
    test_X = convert_examples_to_features_test(
        test['conversation'],
        max_seq_len=max_len, 
        tokenizer=tokenizer
    )
    
    # 예측
    predictions = model.predict(test_X)
    test_class_probabilities = tf.nn.softmax(predictions, axis=-1).numpy() # [[0.13297564 0.8358507  0.00801584 0.02315779]]
    test_predicted_class = np.argmax(test_class_probabilities, axis=1) # [ 1 ]
    
    return test_predicted_class


In [None]:
predictions = test_pipeline(model, tokenizer, max_len)

In [None]:
predictions

In [None]:
submission = pd.read_csv('./data/submission.csv')
submission['class'] = predictions
submission.to_csv('submissions/submission.csv')