# Word Embedding

본 ipython notebook은 [DIYA](https://blog.diyaml.com/) 회원들의 자연어처리 스터디를 위해, 아래의 자료를 바탕으로 만들어졌습니다.
* [Stanford CS224N Assignment 2](http://web.stanford.edu/class/cs224n/assignments/a2.pdf)
* [ratsgo님의 한국어 임베딩 튜토리얼](https://ratsgo.github.io/embedding/)
* [딥 러닝을 이용한 자연어 처리 입문](https://wikidocs.net/book/2155)
* [Implementing word2vec in PyTorch (skip-gram model)](https://towardsdatascience.com/implementing-word2vec-in-pytorch-skip-gram-model-e6bae040d2fb)
* [Pytorch Global Vectors for Word Representation](https://github.com/kefirski/pytorch_GloVe/blob/master/GloVe/glove.py)

본 실습의 구성은 다음과 같습니다.
1. [Integer Encoding](#Integer-Encoding)
2. [Word2Vec 구현](#Word2Vec)
3. [GloVe 구현](#GloVe)
4. [비교 및 시각화](#Visualization)

In [0]:
# 한글 폰트 설치
!apt -qq -y install fonts-nanum
 
# matplotlib 한글 폰트 설정
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import seaborn as sns

sns.set_style('whitegrid')
fontpath = '/usr/share/fonts/truetype/nanum/NanumBarunGothic.ttf'
font = fm.FontProperties(fname=fontpath)
plt.rc('font', family='NanumBarunGothic')
mpl.font_manager._rebuild()
plt.rcParams['figure.figsize'] = [15, 10]

런타임을 다시 시작한 뒤, 위 셀을 한번 더 실행해주세요 :)

In [0]:
"""
전처리한 Naver Sentiment Movie Corpus의 일부를 다운로드합니다.
불용어는 적용하지 않고, 꼬꼬마(KKMA)의 형태소 분석 기능만을 이용해 전처리하였습니다.
"""
import requests

# Get file link from google drive
file_share_link = "https://drive.google.com/open?id=1r1CtcMOQ7sUNma2V5vqrRxrNZojqCQrY"
file_id = file_share_link[file_share_link.find("=") + 1:]
file_download_link = "https://docs.google.com/uc?export=download&id=" + file_id

# Download file
data = requests.get(file_download_link)
filename = 'nsmc_ratings.txt'
with open(filename, 'wb') as f:
    f.write(data.content)

## Integer Encoding

In [0]:
# 텍스트 형태의 말뭉치를 이중 리스트로 변환합니다.
corpus = []
with open(filename, 'r') as f:
    for line in f.readlines():
        corpus.append(line.strip(' \n').split(' '))

# 샘플 데이터를 출력합니다.
print(corpus[:10])

In [0]:
"""TODO
vocab 변수가 말뭉치에 존재하는 각기 다른 형태소들을 (distinct words) key로,
해당 형태소가 말뭉치 안에 몇 개 들어있는지를 value로 가지도록 구현해주세요.
"""
vocab = {}

for sentence in corpus:
    for word in sentence:
        # TODO
        pass


# Tests
assert len(vocab) == 19161, "형태소의 총 개수는 19161개가 되어야합니다."
assert max(vocab.values()) == 21432, "가장 자주 등장하는 형태소의 등장 횟수는 21432번이 되어야합니다."
print("All tests passed")

In [0]:
"""
각 형태소의 등장 빈도를 꺾은선 그래프로 표현합니다.
"""
import matplotlib
import matplotlib.pyplot as plt
matplotlib.rcParams['figure.figsize'] = [10, 8]

vocab_sorted = sorted(vocab.items(), key=lambda x: x[1], reverse=True)
freqs = list(zip(*vocab_sorted))[1]

plt.subplot(3, 1, 1)
plt.plot(freqs)
plt.subplot(3, 1, 2)
plt.plot(freqs[:200])
plt.subplot(3, 1, 3)
plt.plot(freqs[3000:])

In [0]:
"""TODO
word2idx 변수가 등장빈도가 높은 순서대로 각 형태소를 key로,
해당 형태소의 순위를 (가장 높은 순서가 0순위입니다) value로 가지도록 하되
상위 1%와 하위 70%는 key에서 제거해주세요.
즉, 높은 순서대로 정렬하였을 때 순위가 1% 초과, 30% 미만에 속하는 형태소들만 key로 가지도록 구현해주시면 됩니다.
"""
from collections import OrderedDict
import pickle


word2idx = OrderedDict({})
# TODO


# Tests
assert len(word2idx) == 4982, "word2idx의 총 길이는 4982가 되어야합니다."
assert list(word2idx.keys())[0] == '우리', "word2idx의 첫번째 key는 '우리'입니다."
assert list(word2idx.keys())[-1] == '객', "word2idx의 마지막 key는 '객'입니다."
print("All tests passed")

# Save dict
with open('word2idx.pkl', 'wb') as handle:
    pickle.dump(word2idx, handle, protocol=pickle.HIGHEST_PROTOCOL)

## Word2Vec

In [0]:
"""
Skip-gram based Word2Vec를 위한 데이터셋을 생성합니다.
"""
import torch
import random
from torch.utils.data import Dataset

WINDOW_SIZE = 4
UNK = len(word2idx)  # 하나의 UNK token을 사용합니다.


class SkipGramData(Dataset):
    def __init__(self, corpus, word2idx, num_negatives=10):
        self.corpus = [doc for doc in corpus if len(doc) >= 2]
        self.word2idx = word2idx
        self.num_negatives = num_negatives

    def __len__(self):
        # document의 개수를 출력합니다.
        return len(self.corpus)

    def __getitem__(self, idx):
        doc = self.corpus[idx]
        center_idx = random.randint(0, len(doc) - 1)
        center_word = doc[center_idx]
        if center_word in self.word2idx:
            center = self.word2idx[center_word]
        else:
            center = UNK
        
        # positive sample
        lidx = max(0, center_idx - WINDOW_SIZE)
        ridx = min(len(doc), center_idx + WINDOW_SIZE + 1)
        pos_words = doc[lidx:center_idx] + doc[center_idx + 1:ridx]
        context_word = random.choice(pos_words)
        if context_word in self.word2idx:
            pos = self.word2idx[context_word]
        else:
            pos = UNK

        # negative samples
        negs = []
        for _ in range(self.num_negatives):
            doc_idx = random.randint(0, len(self.corpus) - 1)
            # same document
            if doc_idx == idx:
                doc = self.corpus[idx]
                neg_words = doc[:lidx + 1] + doc[ridx:]
                neg_word = random.choice(neg_words)
            # different document
            else:
                doc = self.corpus[doc_idx]
                neg_word = random.choice(doc)
            
            if neg_word in self.word2idx:
                negs.append(self.word2idx[neg_word])
            else:
                negs.append(UNK)

        return torch.LongTensor([center, pos] + negs)

Word2Vec에서 center word $C$가 주어졌을 때, outside word (context word) $O$가 가지는 조건부 확률 분포는 아래와 같습니다.

$$
P(O=o | C=c ) = \frac{\exp(u_o^T v_c)}{\sum_{w \in \text{Vocab}}\exp(u_w^T v_c)}
$$

이때 negative sampling을 하는 경우, 아래의 목적함수를 최소화하는 것으로 위 조건부 확률을 간접적으로 최대화할 수 있습니다.

$$
\mathbf{\it{J}}_\text{neg-sample}(v_c, o, U) = -\log(\sigma(u_o^T v_c)) - \sum_{k=1}^K \log(\sigma(-u_k^T v_c))
$$

In [0]:
"""TODO
Word2Vec class를 구현해주세요.
위 수식을 참조하여,
forward() 함수를 통해 앞서 작성한 SkipGramData 데이터셋으로부터 하나의 배치가 들어왔을 때
해당 배치의 negative sampling loss (J_neg-sample)를 반환하도록 해주세요.
loss는 스칼라값이어야 합니다.
"""
import torch.nn as nn


class Word2Vec(nn.Module):
    def __init__(self, vocab_dim, embed_dim=10):
        super().__init__()
        self.W_center = nn.Embedding(vocab_dim, embed_dim)
        self.W_context = nn.Embedding(vocab_dim, embed_dim)
    
    def embed(self, idx):
        return self.W_center(idx)

    def forward(self, samples):
        # TODO
        # Note: input의 형태에 대해서는 아래 셀을 참조해주세요.
        pass

In [0]:
"""
Word2Vec를 학습시킵니다.
"""
from torch.utils.data import DataLoader
from tqdm.notebook import tqdm
import numpy as np

learning_rate = 1e-3
epochs = 100  # 가능하다면 수렴할 때까지 더 늘리시면 좋습니다 :)
batch_size = 64

data = SkipGramData(corpus, word2idx, num_negatives=10)
dataloader = DataLoader(data, batch_size=batch_size, shuffle=True, drop_last=True)
model = Word2Vec(len(word2idx) + 1, embed_dim=10)
optim = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

pbar = tqdm(total=epochs * len(dataloader))
for epoch in range(epochs):
    avg_loss = []
    for samples in dataloader:
        model.zero_grad()
        loss = model(samples)
        loss.backward()
        optim.step()
        
        avg_loss.append(loss.item())
        pbar.update(1)

    if (epoch + 1) % 10 == 0:
        print('Loss for epoch {}: {}'.format(
            epoch + 1,
            np.mean(avg_loss)
        ))

# Save model
torch.save(model, 'word2vec.pth')

## GloVe

In [0]:
"""
Co-occurence Matrix를 생성합니다.
"""
num_words = len(word2idx) + 1
M = np.zeros((num_words, num_words))

for doc in corpus:
    current_idx = 0
    doc_len = len(doc)
    while current_idx < doc_len:
        lidx = max(current_idx - WINDOW_SIZE, 0)
        ridx = min(current_idx + WINDOW_SIZE + 1, doc_len)
        context_words = doc[lidx:current_idx] + doc[current_idx + 1:ridx]
        center_word = doc[current_idx]
        if center_word in word2idx:
            center_idx = word2idx[center_word]
        else:
            center_idx = UNK
            
        for context_word in context_words:
            if context_word in word2idx:
                context_idx = word2idx[context_word]
            else:
                context_idx = UNK
            M[context_idx, center_idx] += 1
        
        current_idx += 1

In [0]:
"""
GloVe 모델을 위한 데이터셋을 생성합니다.
"""
class GloVeData(Dataset):
    def __init__(self, corpus, word2idx):
        self.corpus = [word for word in doc for doc in corpus]
        self.word2idx = word2idx

    def __len__(self):
        # 전체 단어의 개수를 출력합니다.
        return len(self.corpus)

    def __getitem__(self, idx):
        word = self.corpus[idx]
        if word in self.word2idx:
            return torch.LongTensor([self.word2idx[word]])
        else:
            return torch.LongTensor([UNK])

GloVe 모델에서의 손실함수는 아래와 같습니다. 여기서 $X_{ij}$는 $i$번째 단어와 $j$번째 단어가 동시에 출현한 (co-occurence) 횟수입니다.

$$
\begin{gather}
\sum_{i, j \in \text{Vocab}} f(X_{ij})(w_i^T \tilde{w}_j + b_i + \tilde{b}_j - \log{X_{ij}})^2 \\
f(X_{ij}) = \begin{cases}
    (x/x_{max})^\alpha & \text{if } x < x_{max} \\
    1 & \text{otherwise}
\end{cases} 
\end{gather}
$$

In [0]:
"""TODO
GloVe class를 구현해주세요.
위 수식을 참조하여,
수식에서의 f에 해당하는 weight_fn()을 구현하고
forward() 함수를 통해 하나의 배치가 들어왔을 때
해당 배치의 loss를 반환하도록 해주세요.
loss는 스칼라값이어야 합니다.
"""
class GloVe(nn.Module):
    def __init__(self, cooccurence, embed_dim=10, x_max=100, alpha=0.75):
        super().__init__()
        self.x_max = x_max
        self.alpha = alpha
        self.X = torch.FloatTensor(cooccurence + 1.0)

        self.W_in = nn.Embedding(len(self.X), embed_dim)
        self.W_out = nn.Embedding(len(self.X), embed_dim)
        self.b_in = nn.Embedding(len(self.X), 1)
        self.b_out = nn.Embedding(len(self.X), 1)

    def embed(self, idx):
        return self.W_in(idx) + self.W_out(idx)

    def weight_fn(self, x):
        # TODO
        pass

    def forward(self, sample_in, sample_out):
        # TODO
        # Note: input의 형태에 대해서는 아래 셀을 참조해주세요.
        pass

In [0]:
"""
GloVe를 학습시킵니다.
"""
from torch.utils.data import DataLoader
from tqdm.notebook import tqdm
import numpy as np

learning_rate = 1e-3
epochs = 10 # 가능하다면 수렴할 때까지 더 늘리시면 좋습니다 :)
batch_size = 64  # 2의 배수가 되도록 설정해주세요

data = GloVeData(corpus, word2idx)
dataloader = DataLoader(data, batch_size=batch_size, shuffle=True, drop_last=True)
model = GloVe(M, embed_dim=10)
optim = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

pbar = tqdm(total=epochs * len(dataloader))
for epoch in range(epochs):
    avg_loss = []
    for samples in dataloader:
        model.zero_grad()
        loss = model(
            samples[:int(batch_size / 2)].squeeze(-1),
            samples[int(batch_size / 2):].squeeze(-1)
        )
        loss.backward()
        optim.step()
        
        avg_loss.append(loss.item())
        pbar.update(1)

    print('Loss for epoch {}: {}'.format(
        epoch + 1,
        np.mean(avg_loss)
    ))

# Save model
torch.save(model, 'glove.pth')

## Visualization

In [0]:
"""
학습한 Word2Vec과 GloVe 모델들이 생성하는 임베딩을 비교해봅시다.
"""
import matplotlib.pyplot as plt
from sklearn.decomposition import TruncatedSVD


# Load models
with open('word2idx.pkl', 'rb') as handle:
    word2idx = pickle.load(handle)
w2v = torch.load('word2vec.pth')
glove = torch.load('glove.pth')

# Create embeddings
embed_w2v = {}
embed_glove = {}
for k, v in word2idx.items():
    idx = torch.LongTensor([v])
    embed_w2v[k] = w2v.embed(idx)
    embed_glove[k] = glove.embed(idx)


def close_words(embed_dict, query, k=20):
    assert query in embed_dict, "해당 형태소가 임베딩에 없습니다."
    query = embed_dict[query]
    keys = list(embed_dict.keys())
    dists = []
    for key in keys:
        value = embed_dict[key]
        cosine_sim = (query * value).sum() / query.norm() / value.norm()
        dists.append(cosine_sim.item())
    sim_words = [x for _, x in reversed(sorted(zip(dists, keys)))][:k + 1]
    vectors = torch.stack([query[0].detach()] + [embed_dict[x][0].detach() for x in sim_words])
    return vectors, sim_words

In [0]:
# '멋지'와 가까운 형태소들을 찾아봅니다.
target = '멋지'
vectors_w2v, labels_w2v = close_words(embed_w2v, target)
vectors_glove, labels_glove = close_words(embed_glove, target)

# 2차원으로 차원을 축소합니다.
svd = TruncatedSVD(n_components=2, n_iter=10)
reduced_w2v = svd.fit_transform(np.asarray(vectors_w2v))
reduced_w2v -= reduced_w2v[0]
reduced_glove = svd.transform(np.asarray(vectors_glove))
reduced_glove -= reduced_glove[0]

# Word2Vec
plt.scatter(0, 0, marker='x', color='red', label='word2vec')
for idx, word in enumerate(labels_w2v[1:]):
    x, y = reduced_w2v[idx]
    plt.scatter(x, y, marker='x', color='red')
    plt.text(x, y, word, fontsize=15)

# GloVe
plt.scatter(0, 0, marker='o', color='blue', label='glove')
for idx, word in enumerate(labels_glove[1:]):
    x, y = reduced_glove[idx]
    plt.scatter(x, y, marker='o', color='blue')
    plt.text(x, y, word, fontsize=15)

plt.legend()