## 시드 고정


In [None]:
import os
import random
import numpy as np
import torch

# 시드설정
SEED = 123


def seed_everything(seed=SEED):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True


seed_everything(SEED)

## 샘플 예제파일 다운로드


In [None]:
import urllib

url = "https://storage.googleapis.com/download.tensorflow.org/data/sarcasm.json"
urllib.request.urlretrieve(url, "sarcasm.json")

('sarcasm.json', <http.client.HTTPMessage at 0x7c652a41aa10>)

## 데이터 로드


In [None]:
import json
import pandas as pd

with open("sarcasm.json") as f:
    datas = json.load(f)

df = pd.DataFrame(datas)
df.head()

Unnamed: 0,article_link,headline,is_sarcastic
0,https://www.huffingtonpost.com/entry/versace-b...,former versace store clerk sues over secret 'b...,0
1,https://www.huffingtonpost.com/entry/roseanne-...,the 'roseanne' revival catches up to our thorn...,0
2,https://local.theonion.com/mom-starting-to-fea...,mom starting to fear son's web series closest ...,1
3,https://politics.theonion.com/boehner-just-wan...,"boehner just wants wife to listen, not come up...",1
4,https://www.huffingtonpost.com/entry/jk-rowlin...,j.k. rowling wishes snape happy birthday in th...,0


In [None]:
print(df.iloc[0])

article_link    https://www.huffingtonpost.com/entry/versace-b...
headline        former versace store clerk sues over secret 'b...
is_sarcastic                                                    0
Name: 0, dtype: object


## 토큰화 (Word Tokenization)

- get_tokenizer로 토크나이저 생성
- `basic_english`, `spacy`, `revtok`, `subword` 등 지정이 가능하나, 몇몇 토크나이저는 추가 라이브러리 설치가 필요합니다.


In [None]:
# torchtext 설치
# !pip install torchtext

In [None]:
from torchtext.data.utils import get_tokenizer

# 토큰 생성: 단어별로 쪼갠다.
tokenizer = get_tokenizer("basic_english") # 한글 전용 토크나이저도 있다. KoNLP

토큰화한 결과는 특수문자는 개별 토큰으로 처리, 모든 단어는 소문자로 처리됩니다.


In [None]:
tokenizer("Hi, my name is Teddy!!!")

['hi', ',', 'my', 'name', 'is', 'teddy', '!', '!', '!']

In [None]:
tokenizer("Hello, I would love to learn Python!")

['hello', ',', 'i', 'would', 'love', 'to', 'learn', 'python', '!']

In [None]:
tokenizer("안녕하세요? 한글 데이터에 대한 토큰 처리는 어떨까요??")

['안녕하세요', '?', '한글', '데이터에', '대한', '토큰', '처리는', '어떨까요', '?', '?']

## 단어사전 생성


In [None]:
from torchtext.vocab import build_vocab_from_iterator


def yield_tokens(sentences):
    for text in sentences:
        yield tokenizer(text)

`build_vocab_from_iterator` 를 활용하여 단어 사전을 생성합니다.

- `min_freq`: 최소 빈도의 토큰의 개수를 입력합니다.
- `max_tokens`: 최대 빈도 토큰의 수를 한정합니다. 빈도수 기준으로 산정합니다.


In [None]:
vocab = build_vocab_from_iterator(
    yield_tokens(df["headline"].tolist()),  # 텍스트 Iterator
    # 스페셜 토큰 - 모르는 단어 치환
    specials=["<UNK>"],
    min_freq=2,  # 최소 빈도 토큰(최소 2개 이상 나와야 쓰겠다)
    max_tokens=1000,  # 최대 토큰 개수
)

'''
 # 나머지 단어들은 어디로? 그럼 정보의 손실은?
 - 모든 단어를 토큰으로 쓰지는 않음. 모든 단어들이 다 중요하진 않다. 고려해야 할 단어의 수를 줄여주는 것, 연산도 줄여 효율적인 학습을 하고자 한다.
 일정 수준으로 만들어줌
 - 뽑는 것은 빈도수, 자주나오는 중요 단어를 위주로 설정하는 것. 나머지는 0의 값으로 채워짐.
'''

vocab.set_default_index(vocab["<UNK>"])

In [None]:
# 전체 단어사전의 개수 출력
len(vocab)

1000

In [None]:
# string -> index
stoi = vocab.get_stoi()
# index -> string
itos = vocab.get_itos()

In [None]:
itos[0]

'<UNK>'

In [None]:
itos[15]

'trump'

In [None]:
stoi["trump"]

15

In [None]:
sample_sentence = "Hello, I am Teddy. Nice to meet you!!"

In [None]:
tokenizer(sample_sentence)

['hello', ',', 'i', 'am', 'teddy', '.', 'nice', 'to', 'meet', 'you', '!', '!']

In [None]:
vocab(tokenizer(sample_sentence))  # 0은 빈도수가 낮거나 사전에 없는 단어

[0, 7, 50, 0, 0, 11, 0, 2, 423, 20, 141, 141]

## Dataset 분할


In [None]:
from sklearn.model_selection import train_test_split

x_train, x_test, y_train, y_test = train_test_split(
    df["headline"],
    df["is_sarcastic"],
    stratify=df["is_sarcastic"],
    test_size=0.2,
    random_state=SEED,
)

## Dataset 생성


In [None]:
from torch.utils.data import DataLoader, Dataset


class CustomDataset(Dataset):
    def __init__(self, texts, labels, vocab, tokenizer):
        super().__init__()
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        text = self.texts.iloc[idx]
        label = self.labels.iloc[idx]
        return self.vocab(self.tokenizer(text)), label

In [None]:
# Custom Dataset 생성
train_ds = CustomDataset(x_train, y_train, vocab=vocab, tokenizer=tokenizer)
valid_ds = CustomDataset(x_test, y_test, vocab=vocab, tokenizer=tokenizer)

In [None]:
# 1개의 데이터 추출
text, label = next(iter(train_ds))
len(text), label

(10, 1)

In [None]:
# iterator 생성
iterator = iter(train_ds)

In [None]:
# Next 로 순회하면서 1개씩 출력
next(iterator)

([0, 0, 0, 262, 142, 214, 925, 186, 32, 0], 1)

In [None]:
text

[0, 0, 0, 262, 142, 214, 925, 186, 32, 0]

## DataLoader 생성


GPU 를 설정합니다


In [None]:
# CUDA 사용 가능 여부 확인
if torch.backends.mps.is_built():
    # mac os mps 지원 체크
    device = torch.device("mps" if torch.backends.mps.is_built() else "cpu")
else:
    # cuda 사용 가능한지 체크
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


DataLoader 를 생성합니다


In [None]:
from torch.nn.utils.rnn import pad_sequence


def collate_batch(batch, max_sequence_length):
    label_list, text_list = [], []

    for text, label in batch:
        # 최대 문장길이를 넘어가는 단어는 제거합니다.
        processed_text = torch.tensor(text[:max_sequence_length], dtype=torch.int64) # 최대 길이를 넘어가면 슬라이싱
        text_list.append(processed_text)
        label_list.append(label)

    label_list = torch.tensor(label_list, dtype=torch.int64)

    # padding을 주어 짧은 문장에 대한 길이를 맞춥니다.
    text_list = pad_sequence(text_list, batch_first=True, padding_value=0) # 패딩값을 줘서 부족한 길이는 채우기(맞추기)

    return text_list.to(device), label_list.to(device)

In [None]:
# 한 문장에 최대 포함하는 단어의 개수를 지정합니다. (예시. 120 단어)
MAX_SEQUENCE_LENGTH = 120

train_loader = DataLoader(
    train_ds,
    batch_size=32,
    shuffle=True,
    collate_fn=lambda x: collate_batch(x, MAX_SEQUENCE_LENGTH),
)

validation_loader = DataLoader(
    valid_ds,
    batch_size=32,
    shuffle=False,
    collate_fn=lambda x: collate_batch(x, MAX_SEQUENCE_LENGTH),
)

In [None]:
x, y = next(iter(train_loader))
x = x.to(device)
y = y.to(device)

# x, y의 shape 확인
# (batch_size, seq_length), (batch_size)
x.shape, y.shape

(torch.Size([32, 18]), torch.Size([32]))

In [None]:
# train_loader의 it
iterator = iter(train_loader)

In [None]:
x, y = next(iterator)
x.shape, y.shape

(torch.Size([32, 21]), torch.Size([32]))

In [None]:
x[0]

tensor([ 0,  3,  0, 19,  0,  0,  7,  0, 12,  0,  6,  9,  0,  3,  0,  0,  0,  0,
         0,  0,  0], device='cuda:0')

In [None]:
x[2]

tensor([191,   0, 129,   1,   0,   0,   1,   6,   0,   0,  32, 204,   1,  21,
         52,   2,   0, 644,   1,   5, 883], device='cuda:0')

## Embedding Layer


In [None]:
import torch.nn as nn

In [None]:
NUM_VOCAB = len(vocab)
NUM_VOCAB

1000

In [None]:
x, y = next(iter(train_loader))
x = x.to(device)
y = y.to(device)

x.shape, y.shape
# (batch_size, seq_length), (batch_size) 한 문장에 들어간 단어의 개수

(torch.Size([32, 20]), torch.Size([32]))

`nn.Embedding()` 생성


In [None]:
# Embedding: (vocab_size, embedding_dim)
EMBEDDING_DIM = 30  # Dimension을 30 차원으로 설정(hyper-parameter)
embedding = nn.Embedding(len(vocab), EMBEDDING_DIM).to(device)

`nn.Embedding()` 의 입출력 shape 에 대한 이해


In [None]:
# x : (batch_size, seq_length)
embedding_out = embedding(x)
embedding_out.shape
# embedding_out: (batch_size, seq_length, embedding_dim) 한 단어당 30차원이 추가(맵핑)

torch.Size([32, 20, 30])

In [None]:
embedding_out[0][0]

tensor([-0.6517, -0.1083,  0.1789,  1.9432, -0.1924,  0.8609, -2.1043, -0.8672,
        -0.5252, -0.2743, -1.2242,  0.4111,  3.1094, -0.9358,  0.1438,  1.3130,
         1.8931, -0.2015,  1.1094, -0.9413, -1.8549,  0.4594, -0.6027, -1.7381,
        -0.6008,  1.4465,  0.4170, -1.2727, -0.5886,  2.0518], device='cuda:0',
       grad_fn=<SelectBackward0>)

## LSTM Layer

- 참고 링크: https://teddylee777.github.io/pytorch/pytorch-lstm/


In [None]:
from IPython.display import Image

Image(url="https://teddylee777.github.io/images/2023-03-05/lstm-shapes-01.png", width=700)

In [None]:
EMBEDDING_DIM = 30  # input_size: embedding_dim(임베딩 차원)
HIDDEN_SIZE = 64  # hidden_size: 추출할 특성의 수(hyper-parameter)
NUM_LAYERS = 1  # LSTM Stacking Layer 수
BIDIRECTIONAL = 1  # 양방향 특성 추출: True(2), False(1)

BATCH_SIZE = x.size(0)
SEQ_LENGTH = x.size(1)
print("BATCH_SIZE: ", BATCH_SIZE)
print("SEQ_LENGTH: ", SEQ_LENGTH)

BATCH_SIZE:  32
SEQ_LENGTH:  20


In [None]:
lstm = nn.LSTM(
    input_size=EMBEDDING_DIM, hidden_size=HIDDEN_SIZE, batch_first=True, device=device
)
lstm

LSTM(30, 64, batch_first=True)

In [None]:
# initial weights 초기화(지금 들어가는 단어가 첫 단어다)
h_0 = torch.zeros(NUM_LAYERS * BIDIRECTIONAL, SEQ_LENGTH, HIDDEN_SIZE).to(device)
c_0 = torch.zeros(NUM_LAYERS * BIDIRECTIONAL, SEQ_LENGTH, HIDDEN_SIZE).to(device)

In [None]:
embedding_out.shape

torch.Size([32, 20, 30])

In [None]:
# 임베딩 레이어 Output, 초기화 (hidden_state, cell_state)
lstm_out, (hidden, cell) = lstm(embedding_out)

# (batch_size, seq_length, hidden_size)
lstm_out.shape

torch.Size([32, 20, 64])

In [None]:
# (num_layers * bidirectional, batch_size, hidden_size)
# (num_layers * bidirectional, batch_size, hidden_size)
hidden.shape, cell.shape

(torch.Size([1, 32, 64]), torch.Size([1, 32, 64]))

## Embedding -> LSTM 의 입출력 이해


In [None]:
def EmbeddingLSTM(
    x, vocab_size, embedding_dim, hidden_size, bidirectional, num_layers, device
):
    """
    x             : 데이터 입력 (batch_size, seq_length)
    vocab_size    : 단어사전의 개수
    embedding_dim : 임베딩 차원
    hidden_size   : 특성추출의 개수(hyper-parameter)
    bidirectional : 양방향 특성 추출: 양방향(True), 단방향(False)
    num_layers    : Stacking LSTM 레이어 수, 기본: 1
    """
    x = x.to(device)
    batch_size = x.size(0)

    print(f"===== Part1. 입력(x) =====\n")
    print(f"입력(x)의 차원(batch_size({batch_size}), seq_length({x.size(1)}))")
    print(f"{x.shape}\n")

    embedding = nn.Embedding(vocab_size, embedding_dim, device=device)
    embedding_out = embedding(x)
    print(f"===== Part2. Embedding =====\n")
    print(
        f"(batch_size({batch_size}), seq_length({x.size(1)}), embedding_dim({embedding_dim}))"
    )
    print(f"{embedding_out.shape}")

    lstm = nn.LSTM(
        input_size=embedding_dim,
        hidden_size=hidden_size,
        num_layers=num_layers,
        bidirectional=bidirectional,
        batch_first=True,
        device=device,
    )

    bidi = 2 if bidirectional else 1

    out, (h, c) = lstm(embedding_out)
    print()
    print(f"===== Part3. LSTM =====\n")
    print("out, (h, c) = lstm(x)\n")
    print("LSTM output")
    print(
        f"(batch_size({x.size(0)}), seq_length({x.size(1)}), hidden_size({hidden_size})*bidirectional({bidi}))"
    )
    print(f"{out.shape}\n")
    print("===" * 8)
    print("\n(hidden, cell) state\n")
    print(
        f"(num_layers({num_layers})*bidirectional({bidi}), batch_size({batch_size}), hidden_size({hidden_size}))"
    )
    print(f"{h.shape}\n")
    print("===" * 8)

In [None]:
EmbeddingLSTM(
    x,
    vocab_size=len(vocab),
    embedding_dim=30,
    hidden_size=64,
    bidirectional=False,
    num_layers=2,
    device=device,
)

===== Part1. 입력(x) =====

입력(x)의 차원(batch_size(32), seq_length(20))
torch.Size([32, 20])

===== Part2. Embedding =====

(batch_size(32), seq_length(20), embedding_dim(30))
torch.Size([32, 20, 30])

===== Part3. LSTM =====

out, (h, c) = lstm(x)

LSTM output
(batch_size(32), seq_length(20), hidden_size(64)*bidirectional(1))
torch.Size([32, 20, 64])


(hidden, cell) state

(num_layers(2)*bidirectional(1), batch_size(32), hidden_size(64))
torch.Size([2, 32, 64])



## 모델


In [None]:
from tqdm import tqdm  # Progress Bar 출력
import numpy as np
import torch.nn as nn
import torch.optim as optim


class TextClassificationModel(nn.Module):
    def __init__(
        self,
        num_classes,
        vocab_size,
        embedding_dim,
        hidden_size,
        num_layers,
        bidirectional=True,
        drop_prob=0.1,
    ):
        super(TextClassificationModel, self).__init__()
        self.num_classes = num_classes
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bidirectional = 2 if bidirectional else 1

        self.embedding = nn.Embedding(
            num_embeddings=vocab_size, embedding_dim=embedding_dim
        )

        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional,
        )

        self.dropout = nn.Dropout(drop_prob)

        self.relu = nn.ReLU()

        self.fc = nn.Linear(hidden_size * self.bidirectional, hidden_size)
        self.output = nn.Linear(hidden_size, num_classes)

    def init_hidden_and_cell_state(self, batch_size, device):
        # LSTM 입력시 초기 Cell 에 대한 가중치 초기화를 진행합니다.
        # (num_layers*bidirectional, batch_size, hidden_size)
        self.hidden_and_cell = (
            torch.zeros(
                self.num_layers * self.bidirectional, batch_size, self.hidden_size
            ).to(device),
            torch.zeros(
                self.num_layers * self.bidirectional, batch_size, self.hidden_size
            ).to(device),
        )

    def forward(self, x):
        x = self.embedding(x)
        output, (h, c) = self.lstm(x, self.hidden_and_cell)
        # (batch_size, seq_length, hidden_size*bidirectional)
        # last sequence 의 (batch_size, hidden_size*bidirectional)
        h = output[:, -1, :] # 아웃풋의 마지막 cell의 값. 마지막 셀의 출력값만 가져온 것(히든이나 셀을 쓸 수도 있다.)
        o = self.dropout(h)
        o = self.relu(self.fc(o))
        o = self.dropout(o)
        return self.output(o)

In [None]:
config = {
    "num_classes": 2,
    "vocab_size": len(vocab),
    "embedding_dim": 16,
    "hidden_size": 32,
    "num_layers": 2,
    "bidirectional": True,
}

model = TextClassificationModel(**config)
model.to(device)

TextClassificationModel(
  (embedding): Embedding(1000, 16)
  (lstm): LSTM(16, 32, num_layers=2, batch_first=True, bidirectional=True)
  (dropout): Dropout(p=0.1, inplace=False)
  (relu): ReLU()
  (fc): Linear(in_features=64, out_features=32, bias=True)
  (output): Linear(in_features=32, out_features=2, bias=True)
)

## 손실 함수 및 옵티마이저 정의


In [None]:
# loss 정의: CrossEntropyLoss
loss_fn = nn.CrossEntropyLoss()

# 옵티마이저 정의: bert.paramters()와 learning_rate 설정
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
from tqdm import tqdm


def fit(model, data_loader, loss_fn, optimizer, device, phase="train"):
    if phase == "train":
        # 모델을 훈련모드로 설정합니다. training mode 일 때 Gradient 가 업데이트 됩니다. 반드시 train()으로 모드 변경을 해야 합니다.
        model.train()
    else:
        # model.eval()은 모델을 평가모드로 설정을 바꾸어 줍니다.
        model.eval()

    # loss와 accuracy 계산을 위한 임시 변수 입니다. 0으로 초기화합니다.
    running_loss = 0
    corr = 0

    # 예쁘게 Progress Bar를 출력하면서 훈련 상태를 모니터링 하기 위하여 tqdm으로 래핑합니다.
    prograss_bar = tqdm(
        data_loader, leave=False, unit="batch", total=len(data_loader), mininterval=1
    )

    # mini-batch 학습을 시작합니다.
    for txt, lbl in prograss_bar:
        # image, label 데이터를 device에 올립니다.
        txt, lbl = txt.to(device), lbl.to(device)

        optimizer.zero_grad()
        # 누적 Gradient를 초기화 합니다. 반드시 필요!
        with torch.set_grad_enabled(phase == "train"):
            model.init_hidden_and_cell_state(len(txt), device)
            # Forward Propagation을 진행하여 결과를 얻습니다.
            output = model(txt)

            # 손실함수에 output, label 값을 대입하여 손실을 계산합니다.
            loss = loss_fn(output, lbl)

            if phase == "train":
                # 오차역전파(Back Propagation)을 진행하여 미분 값을 계산합니다.
                loss.backward()

                # 계산된 Gradient를 업데이트 합니다.
                optimizer.step()

        # output 의 뉴런별 확률 값을 sparse vector 로 변환합니다.
        pred = output.argmax(axis=1)

        # 정답 개수를 카운트 합니다.
        corr += (lbl == pred).sum().item()

        # 이를 누적한 뒤 Epoch 종료시 전체 데이터셋의 개수로 나누어 평균 loss를 산출합니다.
        running_loss += loss.item()

    # 누적된 정답수를 전체 개수로 나누어 주면 정확도가 산출됩니다.
    acc = corr / len(data_loader.dataset)

    # 평균 손실(loss)과 정확도를 반환합니다.
    # train_loss, train_acc
    return running_loss / len(data_loader), acc

In [None]:
import time

# 최대 Epoch을 지정합니다.
num_epochs = 5

min_loss = np.inf

STATE_DICT_PATH = "LSTM-Text-Classification.pth"

# Epoch 별 훈련 및 검증을 수행합니다.
for epoch in range(num_epochs):
    # Model Training
    # 훈련 손실과 정확도를 반환 받습니다.
    start = time.time()
    train_loss, train_acc = fit(
        model, train_loader, loss_fn, optimizer, device, phase="train"
    )

    # 검증 손실과 검증 정확도를 반환 받습니다.
    val_loss, val_acc = fit(
        model, validation_loader, loss_fn, optimizer, device, phase="eval"
    )

    # val_loss 가 개선되었다면 min_loss를 갱신하고 model의 가중치(weights)를 저장합니다.
    if val_loss < min_loss:
        print(
            f"[INFO] val_loss has been improved from {min_loss:.5f} to {val_loss:.5f}. Saving Model!"
        )
        min_loss = val_loss
        torch.save(model.state_dict(), STATE_DICT_PATH)

    time_elapsed = time.time() - start
    # Epoch 별 결과를 출력합니다.
    print(
        f"[Epoch{epoch+1:02d}] time: {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s \t loss: {train_loss:.5f}, acc: {train_acc:.5f} | val_loss: {val_loss:.5f}, val_acc: {val_acc:.5f}"
    )



[INFO] val_loss has been improved from inf to 0.58071. Saving Model!
[Epoch01] time: 0m 5s 	 loss: 0.66644, acc: 0.59250 | val_loss: 0.58071, val_acc: 0.69787




[INFO] val_loss has been improved from 0.58071 to 0.48387. Saving Model!
[Epoch02] time: 0m 4s 	 loss: 0.52523, acc: 0.73993 | val_loss: 0.48387, val_acc: 0.76844




[INFO] val_loss has been improved from 0.48387 to 0.44555. Saving Model!
[Epoch03] time: 0m 5s 	 loss: 0.45286, acc: 0.79192 | val_loss: 0.44555, val_acc: 0.79240




[INFO] val_loss has been improved from 0.44555 to 0.42525. Saving Model!
[Epoch04] time: 0m 4s 	 loss: 0.40831, acc: 0.81556 | val_loss: 0.42525, val_acc: 0.80101


                                          

[Epoch05] time: 0m 4s 	 loss: 0.37758, acc: 0.83124 | val_loss: 0.42685, val_acc: 0.81299




## 저장한 가중치 로드


In [None]:
# 모델에 저장한 가중치를 로드합니다.
model.load_state_dict(torch.load(STATE_DICT_PATH))

<All keys matched successfully>

## 최종 검증성능 측정


In [None]:
# 최종 검증 손실(validation loss)와 검증 정확도(validation accuracy)를 산출합니다.
final_loss, final_acc = fit(
    model, validation_loader, loss_fn, optimizer, device, phase="eval"
)
print(f"\nevaluation loss: {final_loss:.5f}, evaluation accuracy: {final_acc:.5f}")

                                          


evaluation loss: 0.42525, evaluation accuracy: 0.80101


