# beomi/KcELECTRA-base

In [None]:
# Learning Device list Check 
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

In [None]:
# %run KcELECTRA-base.version.py       # 필요한 라이브러리 설치
# !pip freeze                          # 설치된 라이브러리 확인

In [None]:
import re
import torch
import pandas as pd

# GPU 설정
# 없을 시 CPU
# CPU로 뜨지만, GPU 잘만 돌아감
device = torch.device('cpu')
print("device:", device)

In [None]:
# 엑셀 파일에서 데이터프레임 읽기
df = pd.read_excel(r"C:\Users\GJAISCHOOL\Desktop\X_filter\Algorithm\dataset\sample_data(100).xlsx")

print(df.shape)
df.head()

In [None]:
null_idx = df[df.label.isnull()].index                             # 해당 index에 null 값 확인
df.loc[null_idx, "Sentence"]                                       # null 값이 존재한 인덱스의 Sentence 값 불러오기

# lable은 Sentence의 가장 끝 문장열로 설정
df.loc[null_idx, "label"] = df.loc[null_idx, "Sentence"].apply(lambda x: x[-1])

# Sentence는 "\t" 앞부분까지의 문자열로 설정
df.loc[null_idx, "Sentence"] = df.loc[null_idx, "Sentence"].apply(lambda x: x[-2])

In [None]:
train_data = df.sample(frac=0.8, random_state=42)                 # train(80%), test(20%) 셋 구분 
test_data = df.drop(train_data.index)                             # 랜덤으로 샘플링(랜덤으로 숫자 배치)

# 데이터셋 갯수 확인
print('중복 제거 전 학습 데이터셋 : {}'.format(len(train_data)))
print('중복 제거 전 테스트 데이터셋 : {}'.format(len(test_data)))

# 중복 데이터 제거
train_data.drop_duplicates(subset=["Sentence"], inplace=True)
test_data.drop_duplicates(subset=["Sentence"], inplace=True)

# 데이터셋 갯수 확인
print('중복 제거 후 학습 데이터셋 : {}'.format(len(train_data)))
print('중복 제거 후 테스트 데이터셋 : {}'.format(len(test_data)))

In [None]:
from transformers import ElectraTokenizer, ElectraForSequenceClassification, TFElectraForSequenceClassification
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
import torch

# KcELECTRA 모델 및 토크나이저 로드
Kc_model = "beomi/KcELECTRA-base"
Kc_tokenizer = AutoTokenizer.from_pretrained(Kc_model)

In [None]:
tokenized_train_sentences = Kc_tokenizer( 
    list(train_data["Sentence"]),           # train_data 리스트화
    return_tensors="pt",                    # pytorch의 tensor 형태로 변환
    max_length=128,                         # 최대 토큰길이 설정
    padding=True,                           # 학습 시 빈 값을 0으로 대체
    truncation=True,                        # 초과 토큰 제거
    add_special_tokens=True                 # 추가 토큰 설정
)

In [None]:
tokenized_test_sentences = Kc_tokenizer(
    list(test_data["Sentence"]),           # test_data 리스트화
    return_tensors="pt",
    max_length=128,
    padding=True,
    truncation=True,
    add_special_tokens=True
)

In [None]:
"""
CLS(분류 토큰)   : 시작을 알릴 때 사용 \n
SEP(구분자 토큰) : 서로 다를 때 구분할 사용할 토큰 \n
MASK            : 특정 토큰이 마스크화, 모델이 예측하도록 훈련 \n
PAD             : 일관된 길이로 동일하게 만듦
                                                            """

In [None]:
print(tokenized_train_sentences[0])        # 0번째 문장에 해당하는 객체
print("="*200) 
print(tokenized_train_sentences[0].tokens) # 0번째 문장에 토큰의 목록
print("="*200)
print(tokenized_train_sentences[0].ids)    # 0번째 문장에 대한 고유 ID

In [None]:
class CurseDataset(torch.utils.data.Dataset):                                        # 학습을 위한 데이터셋 만들기     
    def __init__(self, encoding, labels):                                            # pytorch에서 학습할 수 있게 데이터셋 생성
        self.encodings = encoding                                                    # Feature Data / 'Sentence'
        self.labels = labels                                                         # Target Data / 'Label'
    
    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} # self.encodings 딕셔너리 내의 값 중에 val를 torch.tensor로 변환해 하여
        item["labels"] = torch.tensor(self.labels[idx])                             # key: toch.tensor(val[ix])라는 item 딕셔너리 형성
        return item                                                                 # 새로운 labels 키 값에 value torch.tensor(self.labels[idx]) 쌍을 생성
    
    def __len__(self):
        return len(self.labels)                                                     # self.labels 길이 반환

In [None]:
# train_set, test_set에 대한 데이터셋을 각각 생성
train_label = train_data["label"].values
test_label = test_data["label"].values

train_dataset = CurseDataset(tokenized_train_sentences, train_label)
test_dataset = CurseDataset(tokenized_test_sentences, test_label)

In [None]:
Kc_model = AutoModelForSequenceClassification.from_pretrained(Kc_model, num_labels=2)   # 사전 학습된 모델 찾아오기
Kc_model.to(device)                                                                       # num_labels 평가지표 확인

In [None]:
# 3 - 2 학습 파라미터 설정
training_args = TrainingArguments(
    output_dir = './results',           # 학습결과 저장경로
    num_train_epochs=10,                # 학습 epoch 설정
    per_device_train_batch_size = 8,    # train batch_size 설정
    per_device_eval_batch_size = 64,    # test batch_size 설정
    logging_dir = './logs',             # 학습 log 저장경로 / 손실 값 저장
    logging_steps=500,                  # 학습 log 기록 단위 / 500번 마다 출력
    save_total_limit = 2,               # 학습결과 저장 최대갯수
)

In [None]:
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

# 학습과정에서 사용할 평가지표를 위한 함수 설정
def compute_metrics(pred):
  labels = pred.label_ids
  preds = pred.predictions.argmax(-1)
  # 정밀도, 재현율, f1 구하기 
  precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary')
  # 정확도 구하기
  acc = accuracy_score(labels, preds)
  return{
      'accuracy': acc,
      'f1': f1,
      'precision': precision,
      'recall': recall
  }

In [None]:
# Trainer 모듈을 사용해 모델의 학습을 컨트롤하은 trainer를 생성
trainer = Trainer(
    model = Kc_model,                    # 학습하고자하는 Transformers model
    args=training_args,                  # 위에서 정의한 학습 피라미터 설정
    train_dataset=train_dataset,         # 학습 데이터셋
    eval_dataset=test_dataset,           # 평가 데이터셋
    compute_metrics=compute_metrics,     # 평가지표
)

In [None]:
trainer.train()

In [None]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# Evaluate on the test dataset
results = trainer.predict(test_dataset)

# Get the true labels and predicted logits
true_labels = results.label_ids
predicted_logits = results.predictions.argmax(-1)

# Calculate the confusion matrix
conf_mat = confusion_matrix(true_labels, predicted_logits)

# Plot the confusion matrix using seaborn
plt.figure(figsize=(8, 6))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', xticklabels=['P', 'N'], yticklabels=['T', 'F'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()


In [None]:
def sentence_predict(sent):
    # 평가모드로 변경
    Kc_model.eval()

    # 입력된 문장 토크나이징
    tokenized_sent = Kc_tokenizer(
        sent,
        return_tensors="pt",
        truncation=True,
        add_special_tokens=True,
        max_length=128
    )

    # 모델이 위치한 GPU로 이동
    # tokenized_sent.to(deivce)

    # 예측
    with torch.no_grad():
        outputs = Kc_model(
            input_ids=tokenized_sent["input_ids"],
            attention_mask=tokenized_sent["attention_mask"],
            token_type_ids=tokenized_sent["token_type_ids"]
        )

    # 결과 return
    logits = outputs[0]
    logits = logits.detach().cpu()
    result = logits.argmax(-1)
    sentence = True
    input_sentence = tokenized_sent["input_ids"]
    if result == 0:
        sentence = True

    elif result == 1:
        sentence = False
    return sentence, input_sentence


sentence_predict("씨발")[0]

In [None]:
# 'RNN_2' 모듈을 가져오기 위한 경로를 sys.path에 추가
import sys
sys.path.append(r'C:\Users\GJAISCHOOL\Desktop\X_filter\Algorithm\RNN')
 
# 'RNN' 모듈을 import
from RNN import *

In [None]:
def badword_find(sent):
    badword_df = pd.read_excel(r'C:\Users\GJAISCHOOL\Desktop\X_filter\Algorithm\dataset\word_list.xlsx')
    found_bad_word = False  # 입력 문장에 단어가 발견되었는지를 나타내는 플래그
    for idx, row in badword_df.iterrows():
        if row["WORD"] in sent:
            # 'WORD'가 입력 문장에 포함된 경우
            new_word = row["대체어"]
            if not pd.isnull(new_word):
                RNN_result = sentence_generation(new_word, 2) + " "
                sent = sent.replace(row["WORD"], RNN_result)
                found_bad_word = True
            else:
                sent = sent.replace(row["WORD"], "*" * len(row["WORD"]))
                found_bad_word = True
    
    if not found_bad_word:
        # result = "@" * len(input_sentence)
        sent = "혐오 표현입니다."
    return sent

# 테스트
input_sentence = "엄마가 죽었으면 좋겠어"
speak_result = badword_find(input_sentence)
print(speak_result)

In [None]:
def speak_pre(sent):
    sentence = sentence_predict(sent)
    if sentence[0] == True:
        return sent
    elif sentence[0] == False:
        return badword_find(sent)
    
def speak(sent):
    speak_pre(sent)
    sentence = sentence_predict(sent)
    if sentence == True:
        return sent
    elif sent == "혐오 표현입니다.":
        return sent
    elif sentence == False:
        return badword_find(sent)

In [None]:
import re
import kss
from pykospacing import Spacing
from soynlp.normalizer import repeat_normalize

# 띄어쓰기 설정
spacing = Spacing()

In [None]:
# 특수문자 제거
def cleanse(text):
    pattern = re.compile(r'\s+')
    text = re.sub(pattern, ' ', text)
    text = re.sub('[^가-힣ㄱ-ㅎㅏ|a-zA-Z0-9]','', text)
    return text

In [None]:
def clean_and_repeat_normalize(text):
    cleansed_text = cleanse(text)                                             # 특수문자 제거
    normalized_text = repeat_normalize(cleansed_text, num_repeats=2)          # 중복문자 제거
    input_data = re.sub(r'\d', '', normalized_text)                           # 제거
    normalized_text = spacing(input_data)                                     # 띄어쓰기 보정 

    return normalized_text

clean_and_repeat_normalize("아버지가방에들어가신다")

In [None]:
def final_output():
    input_text = clean_and_repeat_normalize(input())
    sentences = kss.split_sentences(input_text)

    sentences_list = []
    for sentence in sentences:
        speak(sentence)
        sentences_list.append(sentence) 

    long_test = ' '.join(sentences_list)
    print(long_test)
    return long_test

In [None]:
# 씨1@발 이게 맞아?
final_output()