In [None]:
!pip install lightning wandb

In [None]:
from os.path import exists
import pickle

okt_train_file = "okt_stem_train_dataset.pkl"
okt_test_file = "okt_stem_test_dataset.pkl"

if exists(okt_train_file):
    print(f"{okt_train_file} already exists")
    with open(okt_train_file, "rb") as file:
        tokenized_train_dataset = pickle.load(file)
    with open(okt_test_file, "rb") as file:
        tokenized_test_dataset = pickle.load(file)

In [None]:
tokenized_train_dataset[:3]

In [None]:
import matplotlib.pyplot as plt

print('리뷰의 최대 길이 :',max(len(text) for text, _ in tokenized_train_dataset))
print('리뷰의 평균 길이 :',sum(map(lambda x: len(x[0]), tokenized_train_dataset))/len(tokenized_train_dataset))

plt.hist([len(text) for text, _ in tokenized_train_dataset], bins=50)
plt.xlabel('length of text')
plt.ylabel('number of text')
plt.show()

In [None]:
with open("kor_stopword.txt", "r") as file:
    kor_stopwords = [stopword.strip() for stopword in file.readlines()]

In [None]:
kor_stopwords = set(kor_stopwords)  # set대신 list를 사용하면 어떻게 될까?

In [None]:
word2vec_train_datas = []
for train_text, _ in tokenized_train_dataset:
    word2vec_train_datas.append([word for word in train_text if word not in kor_stopwords])

In [None]:
print('리뷰의 최대 길이 :',max(len(text) for text in word2vec_train_datas))
print('리뷰의 평균 길이 :',sum(map(len, word2vec_train_datas))/len(word2vec_train_datas))

plt.hist([len(text) for text in word2vec_train_datas], bins=50)
plt.xlabel('length of text')
plt.ylabel('number of text')
plt.show()

In [None]:
from gensim.models import Word2Vec

CBOW_W2V = Word2Vec(sentences = word2vec_train_datas, vector_size = 32, window = 5, min_count = 1, workers = 4, sg = 0)

In [None]:
print(type(CBOW_W2V.wv.vectors))
print(CBOW_W2V.wv.vectors.shape)

In [None]:
print(CBOW_W2V.wv.most_similar("히어로"))

In [None]:
SkipGram_W2V = Word2Vec(sentences = word2vec_train_datas, vector_size = 32, window = 5, min_count = 1, workers = 4, sg = 1)

In [None]:
print(type(SkipGram_W2V.wv.vectors))
print(SkipGram_W2V.wv.vectors.shape)

In [None]:
print(CBOW_W2V.wv.most_similar(CBOW_W2V.wv["공포영화"]))
print(CBOW_W2V.wv.most_similar(CBOW_W2V.wv["공포영화"]-CBOW_W2V.wv["공포"]))
print(CBOW_W2V.wv.most_similar(CBOW_W2V.wv["공포영화"]-CBOW_W2V.wv["공포"]+CBOW_W2V.wv["액션"]))

In [None]:
print(SkipGram_W2V.wv.most_similar(SkipGram_W2V.wv["공포영화"]))
print(SkipGram_W2V.wv.most_similar(SkipGram_W2V.wv["공포영화"]-SkipGram_W2V.wv["공포"]))
print(SkipGram_W2V.wv.most_similar(SkipGram_W2V.wv["공포영화"]-SkipGram_W2V.wv["공포"]+SkipGram_W2V.wv["액션"]))

In [None]:
CBOW_W2V.save("CBOW_W2V.model")
SkipGram_W2V.save("SkipGram_W2V.model")

In [None]:
SkipGram_W2V = Word2Vec.load("SkipGram_W2V.model")

In [None]:
SkipGram_W2V.wv.most_similar(CBOW_W2V.wv["공포영화"])

In [None]:
from collections import Counter
token_counter = Counter()

for tokens, _ in tokenized_train_dataset:
    token_counter.update(tokens)

min_count = 1
vocab = {"[PAD]":0, "[UNK]":1}
vocab_idx = 2

for token, count in token_counter.items():
    if count > min_count and token not in kor_stopwords:
        vocab[token] = vocab_idx
        vocab_idx += 1

In [None]:
print("공포영화" in SkipGram_W2V.wv.key_to_index)
print("[UNK]" in SkipGram_W2V.wv.key_to_index)
print("[PAD]" in SkipGram_W2V.wv.key_to_index)

In [None]:
print(SkipGram_W2V.wv.key_to_index["공포영화"])
print(vocab["공포영화"])

In [None]:
import numpy as np

embedding_list = []

for token, idx in vocab.items():
    if token in CBOW_W2V.wv:
        embedding_list.append(SkipGram_W2V.wv[token])
    elif token == "[PAD]":
        embedding_list.append(np.zeros(SkipGram_W2V.wv.vectors.shape[1]))
    elif token == "[UNK]":
        embedding_list.append(np.random.uniform(-1, 1, SkipGram_W2V.wv.vectors.shape[1]))
    else:
        print(token)

In [None]:
embedding_loopup_matrix = np.vstack(embedding_list)

In [None]:
import torch
import random
import numpy as np

np.random.seed(0)
random.seed(0)
torch.manual_seed(0)

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class SentimentClassifier(nn.Module):
    def __init__(self, vocab_size):
        super(SentimentClassifier, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(torch.FloatTensor(embedding_loopup_matrix), freeze=False)
        self.fc1 = nn.Linear(32 * 100, 100)
        self.fc2 = nn.Linear(100, 2)

    def forward(self, x):
        x = self.embedding(x)
        x = x.view(-1, 32 * 100)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
import lightning as pl

class SentimentClassifierPL(pl.LightningModule):
    def __init__(self, sentiment_classifier):
        super(SentimentClassifierPL, self).__init__()
        self.model = sentiment_classifier
        self.loss = nn.CrossEntropyLoss()
        
        self.validation_step_outputs = []
        self.test_step_outputs = []
        self.save_hyperparameters()
    
    def training_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.model(inputs)
        loss = self.loss(outputs, labels)
        self.log("train_loss", loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.model(inputs)
        loss = self.loss(outputs, labels)
        self.log("val_loss", loss)
        self.validation_step_outputs.append((loss, outputs, labels))
        return loss, outputs, labels
    
    def on_validation_epoch_end(self):
        outputs = self.validation_step_outputs
        avg_loss = torch.stack([x[0] for x in outputs]).mean()
        self.log("avg_val_loss", avg_loss)
        
        all_outputs = torch.cat([x[1] for x in outputs])
        all_labels = torch.cat([x[2] for x in outputs])
        all_preds = all_outputs.argmax(dim=1)
        accuracy = (all_preds == all_labels).float().mean()
        self.log("val_accuracy", accuracy)
        self.validation_step_outputs.clear()
    
    def test_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.model(inputs)
        loss = self.loss(outputs, labels)
        self.log("test_loss", loss)
        self.test_step_outputs.append((loss, outputs, labels))
        return loss, outputs, labels
    
    def on_test_epoch_end(self):
        outputs = self.test_step_outputs
        avg_loss = torch.stack([x[0] for x in outputs]).mean()
        self.log("avg_test_loss", avg_loss)
        
        all_outputs = torch.cat([x[1] for x in outputs])
        all_labels = torch.cat([x[2] for x in outputs])
        all_preds = all_outputs.argmax(dim=1)
        accuracy = (all_preds == all_labels).float().mean()
        self.log("test_accuracy", accuracy)
        self.test_step_outputs.clear()
        
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
        return optimizer

In [None]:
from torch.utils.data import Dataset, DataLoader

class SentimentDataset(Dataset):
    def __init__(self, data, vocab):
        self.data = data
        self.vocab = vocab

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        label = int(self.data[index][1])
        tokens = self.data[index][0]

        token_ids = [self.vocab[token] if token in self.vocab else 1 for token in tokens]
        
        if len(token_ids) > 100:
            token_ids = token_ids[:100]
        else:
            token_ids = token_ids[:100] + [0] * (100 - len(token_ids))
            
        return torch.tensor(token_ids), torch.tensor(label)

In [None]:
import wandb
from lightning.pytorch.loggers import WandbLogger

wandb.login()

def check_performance(vocab,train_data, test_data, wandb_log_name):
    wandb_logger = WandbLogger(project="NLP_test", name=wandb_log_name, group="Lec02")
    
    model = SentimentClassifier(len(vocab))
    pl_model = SentimentClassifierPL(model)    
    
    train_dataset = SentimentDataset(train_data, vocab)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)
    val_dataset = SentimentDataset(test_data, vocab)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=4)
    test_dataset = SentimentDataset(test_data, vocab)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)
    
    trainer = pl.Trainer(max_epochs=1, 
                     accelerator="gpu",
                     logger=wandb_logger
                     )
    
    trainer.fit(model=pl_model, 
                train_dataloaders=train_loader,
                val_dataloaders=val_loader)
    
    trainer.test(dataloaders=test_loader)
    
    wandb.finish()

In [None]:
check_performance(vocab, tokenized_train_dataset, tokenized_test_dataset, "okt_stem_vocab_with_SKipGram")

### 실습 문제
아래는 사전 학습된 GloVe embedding을 다운로드하고 처리하는 코드이다.

glove라는 dictionary는 단어를 key로, GloVe vector를 value로 가질 때

이를 이용하여 embedding lookup matrix를 만들고 기존 사용한 모델의 embedding layer에 추가해 학습을 진행하시오.

(기존에 사용한 모델의 경우 embedding의 크기가 32이므로 glove의 크기에 맞게 수정하여야 함)

In [None]:
!wget http://nlp.stanford.edu/data/glove.6B.zip         # https://github.com/stanfordnlp/GloVe
!unzip glove.6B.zip

glove_path = "glove.6B.100d.txt"

glove = {}
with open(glove_path, "r") as file:
    for line in file:
        word, *vector = line.split()
        vector = np.array(vector)
        glove[word] = vector