In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchmetrics.functional import accuracy, f1_score


In [2]:
horrorDF = pd.read_excel(
    "../DATA/2조 괴담 파일.xlsx", skiprows=0, header=1, sheet_name="SY", usecols=[3]
)


In [3]:
horrorDF


Unnamed: 0,TEXT
0,오늘 학교에서 모의고사보는데 가위눌리다가 한쪽눈만 떠진상태로 교실에서 내 다리에 올...
1,그녀는 날 사랑한다... 안 사랑한다... 날 사랑한다... 안 사랑한다...날 사...
2,"어린애같지만, 난 항상 지하실에서 나갈때 내 뒤를 쳐다보면서 계단을 뛰어 올라가. ..."
3,생리혈에 덩어리 같은게 있을수 있다는건 나도 알아. 근데 그게 보통 움직이는거야?\n
4,"난 별 생각없이 ""우리 모두 다함께 손뼉쳐!"" 라고 외쳤어. 다락방에서 박수 소리를..."
...,...
994,희망봉근해에 출몰하는 네덜란드 동인도회사의 유령선을 플라잉더치맨이라고 부른다. 알류...
995,"T 씨가 잠에서 깨자, 1년이 경과해있었다.하지만 기억상실은 아니다.가족도, 친구도..."
996,아르메니아 쿠니크호수에는 딱 스와코 *오미와타리 같은 자연현상이 일어난다. 호반 교...
997,"적도상 3만 6000킬로에 해당하는 우주 공간에, 길이 50M의 거대 물체가 떠있..."


In [4]:
horrorSR = horrorDF["TEXT"].str.replace(r"[^ㄱ-ㅎㅏ-ㅣ가-힣a-zA-Z\d ]", "", regex=True)
horrorSR.info()


<class 'pandas.core.series.Series'>
RangeIndex: 999 entries, 0 to 998
Series name: TEXT
Non-Null Count  Dtype 
--------------  ----- 
999 non-null    object
dtypes: object(1)
memory usage: 7.9+ KB


In [5]:
sentences = horrorSR.to_list()
sentences[:10]


['오늘 학교에서 모의고사보는데 가위눌리다가 한쪽눈만 떠진상태로 교실에서 내 다리에 올라와있는 귀신봤어',
 '그녀는 날 사랑한다 안 사랑한다 날 사랑한다 안 사랑한다날 사 어 더 이상 떼어낼 팔다리가 없네 넌 처음부터 날 사랑하지 않았구나',
 '어린애같지만 난 항상 지하실에서 나갈때 내 뒤를 쳐다보면서 계단을 뛰어 올라가 난 잘못된 방향을 바라보고 있었었어',
 '생리혈에 덩어리 같은게 있을수 있다는건 나도 알아 근데 그게 보통 움직이는거야',
 '난 별 생각없이 우리 모두 다함께 손뼉쳐 라고 외쳤어 다락방에서 박수 소리를 들리기 전까진 난 내가 집에 혼자있는줄 알았어',
 '난 숲속에서 일회용 카메라를 발견했다 필름을 현상했을때 그곳엔 내가 처음으로 카메라를 발견했을때의 모습이 찍힌 사진 딱 한장만이 찍혀있었다',
 '우리 할머니는 심한 치매에 걸리셨다 매일 저녁 난 할머니가 2년전에 이미 돌아가셨다고 설명하지만 허사다',
 '버스 문이 닫히고 버스 기사의 웃음소리가 그의 뒤에서 울려퍼졌을때 지미는 자신의 실수를 알아챘다 왜 스쿨 버스가 한밤중에 달리고 있는거지',
 '저녁시간 아빠는 나에게 하루가 어땠는지 물었다 아빠의 얼굴이 약간 정중앙에서 벗어나있는것처럼 보인다는 사실을 깨달았을때 난 아빠의 어깨 너머로 도망가라고 입모양으로 말하고 있는 두려움에 빠진 엄마를 볼수 있었다',
 '아빠 나 무서운 꿈 꿨어 오 공주님 난 아빠가 아냐']

In [6]:
char_set = list(set("".join(sentences)))
char_to_id = {char: idx for idx, char in enumerate(char_set)}
id_to_char = {idx: char for idx, char in enumerate(char_set)}


In [7]:
X_data, y_data = [], []
sequence_length = 10

for sentence in sentences:
    for i in range(0, len(sentence) - sequence_length):
        x_str = sentence[i : i + sequence_length]
        y_str = sentence[i + 1 : i + sequence_length + 1]
        # print(i, x_str, '->', y_str)
        X_data.append([char_to_id[c] for c in x_str])
        y_data.append([char_to_id[c] for c in y_str])


In [8]:
X_data[0], y_data[0]


([676, 1054, 455, 218, 433, 778, 357, 455, 340, 1100],
 [1054, 455, 218, 433, 778, 357, 455, 340, 1100, 1014])

In [9]:
print(
    f"입력 데이터 size: {len(X_data)}, shape: {np.array(X_data).shape}, dim: {np.array(X_data).ndim}"
)
print(
    f"출력 데이터 size: {len(y_data)}, shape: {np.array(y_data).shape}, dim: {np.array(y_data).ndim}"
)


입력 데이터 size: 116572, shape: (116572, 10), dim: 2
출력 데이터 size: 116572, shape: (116572, 10), dim: 2


In [10]:
class CharDataset(Dataset):
    def __init__(
        self,
        X_data,
        y_data,
    ) -> None:
        super().__init__()
        self.in_text = torch.FloatTensor(X_data)
        self.out_text = torch.FloatTensor(y_data)

    def __len__(self):
        return len(self.out_text)

    def __getitem__(self, index):
        return self.in_text[index], self.out_text[index]


In [11]:
class CharGRU(nn.Module):
    def __init__(
        self, dict_size, embedding_dim, hidden_size, sequence_length, n_layers, dropout
    ) -> None:
        super().__init__()
        # self.embedding = nn.Embedding(dict_size, embedding_dim)
        self.gru = nn.GRU(
            dict_size, hidden_size, n_layers, dropout=dropout, batch_first=True
        )
        self.linear = nn.Linear(hidden_size, dict_size)
        self.dropout = nn.Dropout(dropout)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        # self.embedding.weight.data.uniform_(-initrange, initrange)
        self.linear.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()

    def forward(self, text):
        # embedded = self.embedding(text)
        output, _ = self.gru(text)
        # last_output = output[:, -1, :]
        last_output = self.dropout(output)
        return self.linear(last_output)


In [12]:
batch_size = 64
dict_size = len(char_to_id)

hidden_size = 64

embedding_dim = 128

n_layer = 10

lr = 0.001


learningDS = CharDataset(X_data, y_data)
learningDL = DataLoader(dataset=learningDS, batch_size=batch_size, shuffle=True)


device = "cuda" if torch.cuda.is_available() else "cpu"

classifier = CharGRU(
    dict_size, embedding_dim, hidden_size, sequence_length, n_layer, 0.5
).to(device)

criterion = nn.CrossEntropyLoss().to(device)

optimizer = optim.AdamW(classifier.parameters(), 0.001)

schduler = optim.lr_scheduler.StepLR(optimizer, 5, 0.1)


print(len(learningDL))


1822


In [13]:
from torchinfo import summary

summary(classifier)


Layer (type:depth-idx)                   Param #
CharGRU                                  --
├─GRU: 1-1                               478,080
├─Linear: 1-2                            81,510
├─Dropout: 1-3                           --
Total params: 559,590
Trainable params: 559,590
Non-trainable params: 0

In [14]:
from tqdm import tqdm


def learning(
    epoch, epochs, model, dataLoader, criterion, optimizer, device, mode="train"
):
    if mode == "train":
        model.train()
        is_train = True
    elif mode == "test":
        model.eval()
        is_train = False
    else:
        raise ValueError("Invalid mode. Must be 'train' or 'test'")

    loss_list = []
    acc_list = []
    f1_list = []
    running_loss = 0.0

    with torch.set_grad_enabled(is_train):
        pbar = tqdm(enumerate(dataLoader), total=len(dataLoader))
        for step, (in_text, out_text) in pbar:
            in_text, out_text = in_text.to(device), out_text.to(device)
            oh_in_text = F.one_hot(in_text.long(), dict_size).float()
            oh_out_text = F.one_hot(out_text.long(), dict_size).float()
            pre_text = model(oh_in_text)
            loss = criterion(pre_text, oh_out_text)
            running_loss += loss.item()
            acc_list.append(
                accuracy(
                    pre_text.argmax(2),
                    out_text,
                    task="multiclass",
                    num_classes=dict_size,
                )
            )
            f1_list.append(
                f1_score(
                    pre_text.argmax(2),
                    out_text,
                    task="multiclass",
                    num_classes=dict_size,
                    average="macro",
                )
            )
            if is_train:
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
                optimizer.step()
                if step % 10 == 9:
                    pbar.set_description(
                        f"Epoch [{epoch + 1}/{epochs}], 횟수 [{step + 1}/{len(dataLoader)}], Loss: {running_loss / 10:.4f}"
                    )
                    loss_list.append(running_loss)
                    running_loss = 0.0
    return np.mean(loss_list), np.mean(acc_list), np.mean(f1_list)


In [15]:
def predict(model, word):
    for char in word:
        if char in char_to_id.keys():
            model.eval()
            with torch.no_grad():
                pred_text = (
                    F.one_hot(torch.tensor(char_to_id[char]), dict_size)
                    .unsqueeze(0)
                    .float()
                    .to(device)
                )
                pred_num = model(pred_text).argmax(1).item()
                result = id_to_char[pred_num]
        else:
            result = " "
    return result


In [16]:
epochs = 1
scoreList = [[], []]
for epoch in range(epochs):
    loss, acc, f1 = learning(
        epoch,
        epochs,
        classifier,
        learningDL,
        criterion,
        optimizer,
        device,
        mode="train",
    )
    print(
        f"Epoch {epoch + 1} 학습 종료 ===> 손실: {loss:.4f}, 정확도: {acc:.4f}, f1 점수: {f1:.4f}"
    )
    start = "난"
    make_sentence = ""
    for _ in range(100):
        make_sentence += start
        next_char = predict(classifier, start)
        start = next_char
    print(make_sentence)
    scoreList[0].append(loss), scoreList[1].append(acc)


Epoch [1/1], 횟수 [1820/1822], Loss: 0.0142: 100%|██████████| 1822/1822 [02:32<00:00, 11.97it/s]


Epoch 1 학습 종료 ===> 손실: 0.1665, 정확도: 0.0462, f1 점수: 0.0021
난 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 


In [17]:
predict(classifier, "난")


' '

In [18]:
start = "나"
make_sentence = ""
for _ in range(100):
    make_sentence += start
    next_char = predict(classifier, start)
    start = next_char
print(make_sentence)


나 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 그 
