<a href="https://colab.research.google.com/github/UBDBD/Deep-Learning_Project/blob/main/Restore%20obfuscated%20Korean%20text%20for%20accurate%20meaning%20comprehension.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 라이브러리
import pandas as pd
import re
import torch
import torch.nn as nn
import torch.optim as optim
import pickle
import time

from g2pk import G2p
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import defaultdict

[nltk_data] Downloading package cmudict to /root/nltk_data...
[nltk_data]   Unzipping corpora/cmudict.zip.


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# G2P

##발음 처리

In [None]:
# g2p 한글만 변환
class CustomG2p(G2p):
    def g2p_only_korean(self, text):
        return "".join([self(char) if re.fullmatch(r'[가-힣]', char) else char for char in text])

In [None]:
# g2p 처리 함수
def g2p_process(text):
  g2p = CustomG2p()

  words = text.split(' ')
  g2p_word = [g2p.g2p_only_korean(word) for word in words]
  g2p_text = ' '.join(g2p_word)
  return g2p_text

## 학습 데이터 전처리


In [None]:
'''
# 학습 데이터 전처리
data = pd.read_csv('./Data/train.csv')

data_dict = {'input': data['input'].tolist(), 'output: data['output'].tolist()}

for i in tqdm(range(len(data['input']))):
    input_text = data['input'][i]
    g2p_text = g2p_process(input_text)

    data_dict['input'][i] = g2p_text
'''

"\n# 학습 데이터 전처리\ndata = pd.read_csv('/content/drive/MyDrive/프로젝트/Deep-Learning/난독화된 한글 리뷰 복원_data/train.csv')\n\ndata_dict = {'input': data['input'].tolist(), 'output: data['output'].tolist()}\n\nfor i in tqdm(range(len(data['input']))):\n    input_text = data['input'][i]\n    g2p_text = g2p_process(input_text)\n\n    data_dict['input'][i] = g2p_text\n"

In [None]:
'''
# 전처리 저장
df = pd.DataFrame(data_dict)
df.to_csv('./Data/g2p_data.csv', encoding='utf-8-sig')
'''

"\n# 전처리 저장\ndf = pd.DataFrame(data_dict)\ndf.to_csv('/content/drive/MyDrive/프로젝트/Deep-Learning/난독화된 한글 리뷰 복원_data/g2p_data.csv', encoding='utf-8-sig')\n"

# LSTM


## 데이터 전처리

In [None]:
# 데이터 불러오기
g2p_data = pd.read_csv('./Data/g2p_data.csv')
train_data, test_data = train_test_split(g2p_data, test_size=0.2, random_state=42)

In [None]:
# 음절 분리 함수
def split_syllables(text):
    return list(text)

In [None]:
# 데이터 변환
train_data['input_syllables'] = train_data['input'].apply(split_syllables)
train_data['output_syllables'] = train_data['output'].apply(split_syllables)

In [None]:
# 데이터 사전
char2index = defaultdict(lambda: len(char2index) + 1)
index2char = {}

if ' ' not in char2index:
    char2index[' '] = 1
    index2char[1] = ' '

for text in pd.concat([train_data['input_syllables'], train_data['output_syllables']]):
    for char in text:
        if char not in char2index:
            index = len(char2index) + 1
            char2index[char] = index
            index2char[index] = char

In [None]:
# 데이터셋 정의
class SyllableDataset(Dataset):
    def __init__(self, data):
        self.inputs = data['input_syllables'].tolist()
        self.targets = data['output_syllables'].tolist()

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        if isinstance(idx, list):
            return [self.__getitem__(i) for i in idx]

        input_seq = self.inputs[idx]
        target_seq = self.targets[idx]

        input_tensor = torch.tensor([char2index.get(ch, char2index[' ']) for ch in input_seq], dtype=torch.long)
        target_tensor = torch.tensor([char2index.get(ch, char2index[' ']) for ch in target_seq], dtype=torch.long)

        return input_tensor, target_tensor


In [None]:
# 데이터 패딩 함수
def collate_fn(batch):
    inputs, targets = zip(*batch)
    inputs_padded = pad_sequence(inputs, batch_first=True, padding_value=0)
    targets_padded = pad_sequence(targets, batch_first=True, padding_value=0)
    return inputs_padded.to(device), targets_padded.to(device)

In [None]:
# 데이터셋 변환
batch_size = 64

train_data = train_data.reset_index(drop=True)
train_dataset = SyllableDataset(train_data)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)

##모델 학습

In [None]:
# 모델 정의
class SyllableLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers):
        super(SyllableLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, vocab_size)
        self.softmax = nn.LogSoftmax(dim=2)

    def forward(self, x):
        x = self.embedding(x)
        lstm_out, _ = self.lstm(x)
        output = self.fc(lstm_out)
        return self.softmax(output)

In [None]:
# 학습 설정
vocab_size = len(char2index) + 1
embedding_dim = 128
hidden_dim = 256
num_layers = 2
num_epochs = 10

lstm_model = SyllableLSTM(vocab_size, embedding_dim, hidden_dim, num_layers).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001, weight_decay=1e-5)

In [None]:
# 모델 학습
'''
torch.cuda.empty_cache()

for epoch in range(num_epochs):
    total_loss = 0
    for input_batch, target_batch in tqdm(train_loader):
        optimizer.zero_grad()
        output = lstm_model(input_batch)

        # 길이 맞춰서 자르기
        min_len = min(output.size(1), target_batch.size(1))
        output = output[:, :min_len, :].contiguous()
        target_batch = target_batch[:, :min_len].contiguous()

        loss = criterion(output.reshape(-1, vocab_size), target_batch.reshape(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss:.2f}')
    '''

"\ntorch.cuda.empty_cache()\n\nfor epoch in range(num_epochs):\n    total_loss = 0\n    for input_batch, target_batch in tqdm(train_loader):\n        optimizer.zero_grad()\n        output = lstm_model(input_batch)\n\n        # 길이 맞춰서 자르기\n        min_len = min(output.size(1), target_batch.size(1))\n        output = output[:, :min_len, :].contiguous()\n        target_batch = target_batch[:, :min_len].contiguous()\n\n        loss = criterion(output.reshape(-1, vocab_size), target_batch.reshape(-1))\n        loss.backward()\n        optimizer.step()\n        total_loss += loss.item()\n\n    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss:.2f}')\n    "

In [None]:
# 모델 저장
'''
torch.save(lstm_model.state_dict(), './LSTM_model')
with open('./char2index.pkl', 'wb') as f:
    pickle.dump(dict(char2index), f)
'''

"\ntorch.save(lstm_model.state_dict(), '/content/drive/MyDrive/프로젝트/Deep-Learning/LSTM_model')\nwith open('/content/drive/MyDrive/프로젝트/Deep-Learning/char2index.pkl', 'wb') as f:\n    pickle.dump(dict(char2index), f)\n"

In [None]:
# 모델 불러오기
with open('./char2index.pkl', 'rb') as f:
    char2index_loaded = pickle.load(f)

char2index = defaultdict(lambda: len(char2index_loaded) + 1)
char2index.update(char2index_loaded)

lstm_model = SyllableLSTM(vocab_size, embedding_dim, hidden_dim, num_layers).to(device)
lstm_model.load_state_dict(torch.load('./LSTM_model'))

<All keys matched successfully>

# 모델 평가

In [None]:
# 복원 함수
def restored_lstm(model, input_text):
    model.eval()

    input_encoded = [char2index[char] if char in char2index else char2index[' '] for char in input_text]
    input_tensor = torch.tensor(input_encoded, dtype=torch.long).unsqueeze(0).to(device)

    with torch.no_grad():
        output = model(input_tensor)
        predicted_indices = output.argmax(dim=2).squeeze(0).tolist()

    valid_indices = [idx if idx in index2char else 1 for idx in predicted_indices]
    restored_text = "".join([index2char[idx] for idx in valid_indices])

    if len(restored_text) < len(input_text):
        restored_text += ' ' * (len(input_text) - len(restored_text))
    elif len(restored_text) > len(input_text):
        restored_text = restored_text[:len(input_text)]

    return restored_text

In [None]:
# 평가 함수
def evaluate_model(preds, targets):
    all_num_same, all_pred_len, all_target_len = 0, 0, 0

    for pred, target in zip(preds, targets):
        length = min(len(pred), len(target))
        num_same = sum([1 for i in range(length) if pred[i] == target[i]])

        all_num_same += num_same
        all_pred_len += len(pred)
        all_target_len += len(target)

    precision = all_num_same / all_pred_len if all_pred_len > 0 else 0.0
    recall = all_num_same / all_target_len if all_target_len > 0 else 0.0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0.0

    return {
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

In [None]:
# 모델 평가
lstm_preds = []
lstm_targets = []

for _, row in tqdm(test_data.iterrows(), total=len(test_data)):
    input_text = row['input']
    target_text = row['output']

    lstm_out = restored_lstm(lstm_model, input_text)

    lstm_preds.append(lstm_out)
    lstm_targets.append(target_text)

lstm_scores = evaluate_model(lstm_preds, lstm_targets)

print('')
print(f'Model: {lstm_scores}')


100%|██████████| 2253/2253 [00:10<00:00, 220.36it/s]


LSTM: {'precision': 0.9485883591431552, 'recall': 0.9482631365919859, 'f1': 0.9484257199872361}





In [None]:
# 테스트
input_text = '녀뮨넒뭅 만죡숭러윤 효템뤼에오. 푸싸눼 옰면 콕 츄쩐학꼬 싶은 콧쉰웨오. 췌꾜윕뉘댜! ㅎㅎ 당음웨 또 옭 컷 갗았요.'

lstm_start_time = time.time()

g2p_text = g2p_process(input_text)
restored_text = restored_lstm(lstm_model, g2p_text)

lstm_end_time = time.time()
lstm_eval_time = lstm_end_time - lstm_start_time

print(f'input: {input_text}')
print(f'g2p: {g2p_text}')
print(f'output: {restored_text}')
print(f'time: {lstm_eval_time}')

input: 녀뮨넒뭅 만죡숭러윤 효템뤼에오. 푸싸눼 옰면 콕 츄쩐학꼬 싶은 콧쉰웨오. 췌꾜윕뉘댜! ㅎㅎ 당음웨 또 옭 컷 갗았요.
g2p: 녀뮨넘뭅 만죡숭러윤 효템뤼에오. 푸싸눼 올면 콕 츄쩐학꼬 십은 콛쉰웨오. 췌꾜윕뉘댜! ㅎㅎ 당음웨 또 옥 컫 갇앋요.
output: 너무너무 만족스러운 호텔이에요. 부산에 오면 꼭 추천하고 싶은 곳이에요. 최고입니다! ㅎㅎ 다음에 또 올 것 같아요.
time: 1.8878514766693115
