# 파이토치(PyTorch)로 구현하는 Word2Vec (CBOW + Negative Sampling)

본 노트북은 **"Efficient Estimation of Word Representations in Vector Space" (Mikolov et al., 2013)** 논문에서 제안된 Word2Vec 모델을 파이토치를 사용하여 밑바닥부터 구현합니다. 특히 대규모 데이터셋에서도 효율적인 학습이 가능한 **Negative Sampling** 기법을 적용한 **CBOW(Continuous Bag of Words)** 모델을 다룹니다.

## 1. Word2Vec 개요
Word2Vec은 단어를 컴퓨터가 이해할 수 있는 벡터로 변환하는 '단어 임베딩(Word Embedding)' 기술 중 하나입니다. 비슷한 맥락(Context)에서 등장하는 단어들은 비슷한 의미를 가진다는 '분포 가설(Distributional Hypothesis)'을 기반으로 합니다.

### CBOW (Continuous Bag of Words)
- 주변 단어들(Context)을 입력으로 받아 중심 단어(Target)를 예측하는 모델입니다.
- 예: "The fat cat sat on the mat" -> `{"The", "fat", "sat", "on"}`을 보고 `"cat"`을 예측.
- Skip-Gram 방식(중심 단어로 주변 단어 예측)에 비해 학습 속도가 빠르고, 빈도수가 높은 단어에 대해 더 나은 성능을 보이는 경향이 있습니다.

## 2. 핵심 구현 기법

### Negative Sampling (네거티브 샘플링)
기존의 Softmax 방식은 어휘 수($V$)가 많아지면 분모 계산 비용($O(V)$)이 너무 커져 학습이 불가능에 가깝습니다. 이를 해결하기 위해 Negative Sampling은 문제를 다중 분류에서 **이진 분류(Binary Classification)** 문제로 바꿉니다.
- **정답 단어(Positive Sample)**: 모델이 높은 확률을 출력하도록 학습
- **오답 단어(Negative Sample)**: 모델이 낮은 확률을 출력하도록 학습
- 보통 정답 1개당 5~20개의 오답 단어를 샘플링하여, 전체 어휘에 대해 계산하는 대신 소수의 단어에 대해서만 손실(Loss)을 계산하므로 연산량이 획기적으로 줄어듭니다.

### Subsampling (서브샘플링)
"the", "a", "is"와 같이 너무 자주 등장하는 고빈도 단어는 학습에 큰 도움이 되지 않을 수 있습니다. 따라서 이러한 단어들을 확률적으로 학습 데이터에서 제외(discard)하여 학습 속도를 높이고 희귀 단어의 벡터 품질을 개선합니다.

---
이제 `Salesforce/wikitext` 데이터셋을 사용하여 실제 구현을 진행해 보겠습니다.



In [None]:
import sys
import os
import pickle
import collections
import time
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
import matplotlib.pyplot as plt

# 재현 가능성을 위한 시드 고정
torch.manual_seed(123)
np.random.seed(123)

# GPU/MPS(Mac) 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
print(f"사용 장치(Device): {device}")



In [None]:
class Config:
    # 데이터 설정
    dataset_name = "Salesforce/wikitext"
    dataset_config = "wikitext-2-raw-v1"
    min_count = 5  # 최소 빈도수 미만 단어는 제외 (UNK 처리)
    
    # 모델 하이퍼파라미터
    window_size = 5     # 윈도우 크기 (좌우 5단어)
    hidden_size = 100   # 임베딩 벡터 차원 수
    
    # 학습 설정
    batch_size = 2048
    max_epoch = 10
    learning_rate = 0.001
    
    # 네거티브 샘플링 설정
    negative_sample_size = 5
    power = 0.75        # 인기있는 단어의 샘플링 확률을 조절하는 지수

config = Config()



## 3. 데이터 로드 및 전처리 (Data Loading & Preprocessing)

`datasets` 라이브러리를 통해 **Wikitext-2** 데이터셋을 다운로드하고 전처리를 수행합니다.
1.  **로드**: Hugging Face Hub에서 데이터 다운로드.
2.  **토큰화**: 간단하게 공백 기준으로 분리하고 소문자로 변환합니다.
3.  **어휘 구축(Vocab Building)**: 단어(Word)를 고유한 정수 ID로 매핑하고, 빈도수가 낮은 단어는 `<UNK>`로 처리합니다.
4.  **서브샘플링**: 고빈도 단어를 확률적으로 제거합니다.



In [None]:
# 데이터셋 로드
print("데이터셋 다운로드 및 로드 중...")
ds = load_dataset(config.dataset_name, config.dataset_config)

def build_vocab(dataset, min_count=5):
    '''
    데이터셋에서 단어 사전을 구축하고 ID로 변환합니다.
    '''
    counter = collections.Counter()
    
    # Train, Validation, Test 데이터를 모두 사용하여 단어 집계
    for split in ['train', 'validation', 'test']:
        for line in dataset[split]['text']:
            words = line.strip().lower().split()
            counter.update(words)
            
    # min_count 이상인 단어만 유지
    word_to_id = {'<unk>': 0}
    id_to_word = {0: '<unk>'}
    
    # 빈도수 순으로 정렬하여 ID 부여 (옵션)
    sorted_words = sorted(counter.items(), key=lambda x: x[1], reverse=True)
    
    valid_words = [w for w, c in sorted_words if c >= min_count]
    for w in valid_words:
        new_id = len(word_to_id)
        word_to_id[w] = new_id
        id_to_word[new_id] = w
        
    print(f"전체 단어 수(Total Words): {sum(counter.values())}")
    print(f"전체 고유 단어 수(Unique Words): {len(counter)}")
    print(f"사전 크기(Vocab Size) (min_count={min_count} 적용): {len(word_to_id)}")
    
    return word_to_id, id_to_word, counter

word_to_id, id_to_word, raw_counter = build_vocab(ds, config.min_count)

def convert_to_ids(dataset, split, word_to_id):
    '''
    텍스트 데이터를 ID 리스트(Corpus)로 변환합니다.
    '''
    corpus = []
    unk_id = word_to_id['<unk>']
    
    for line in dataset[split]['text']:
        words = line.strip().lower().split()
        if not words: continue # 빈 줄 건너뛰기
        # 줄바꿈 처리는 <eos>를 넣기도 하지만 여기선 단순화하여 문장들을 쭉 이어붙입니다.
        ids = [word_to_id.get(w, unk_id) for w in words]
        corpus.extend(ids)
        
    return np.array(corpus)

corpus_train = convert_to_ids(ds, 'train', word_to_id)
print(f"학습 말뭉치(Corpus) 길이: {len(corpus_train)}")

# 서브샘플링 (Paper 식 적용)
# P(discard) = 1 - sqrt(t / f(w))  (t는 임계값, 보통 1e-5, f(w)는 단어 빈도 비율)
def subsample(corpus, counter, total_tokens, threshold=1e-5):
    subsampled_corpus = []
    drop_count = 0
    
    # 각 단어 ID별 확률 미리 계산
    probs = {}
    tokens_count = sum(counter.values())
    
    for w_id in set(corpus):
        if w_id == 0: continue # UNK는 유지
        word = id_to_word[w_id]
        freq = counter[word]
        f_w = freq / tokens_count
        p_keep = np.sqrt(threshold / f_w) + (threshold / f_w)
        
        # Mikolov paper formula slightly varies across implementations
        # PyTorch word2vec implementation uses: p_discard = 1 - sqrt(t/f_w)
        # Here we use: P(keep) = (sqrt(f_w / t) + 1) * (t / f_w) ? No, let's use the simple one.
        # P(keep) = sqrt(t / f_w) (approx)
        
        # Let's use the formula from the paper more strictly:
        # P(w_i) = 1 - sqrt(t / f(w_i)) is prob of discarding.
        # So P(keep) = sqrt(t / f(w_i))
        # Clipping to 1.0
        
        probs[w_id] = min(1.0, np.sqrt(threshold / f_w))

    for w_id in corpus:
        if w_id == 0:
            subsampled_corpus.append(w_id)
            continue
            
        prob = probs.get(w_id, 1.0)
        if np.random.rand() < prob:
            subsampled_corpus.append(w_id)
        else:
            drop_count += 1
            
    print(f"서브샘플링 후 말뭉치 길이: {len(subsampled_corpus)} (삭제됨: {drop_count})")
    return np.array(subsampled_corpus)

# corpus_train = subsample(corpus_train, raw_counter, sum(raw_counter.values()))
# (학습 시간을 위해 서브샘플링은 코드는 제공하되 기본 실행에서는 주석 처리하거나 건너뜁니다. Wikitext는 크지 않으므로 다 써도 됩니다.)



## 4. 파이토치 Dataset 정의

CBOW 학습을 위해 슬라이딩 윈도우 방식으로 데이터를 제공합니다.
- `__getitem__`: 중심 단어(Target)와 주변 단어들(Context)을 반환합니다.
- `window_size`만큼 앞뒤의 단어를 잘라 Context로 만듭니다.



In [None]:
class Word2VecDataset(Dataset):
    def __init__(self, corpus, window_size=5):
        self.corpus = corpus
        self.window_size = window_size
        self.data_size = len(corpus)
        
    def __len__(self):
        # 윈도우 크기만큼 양 끝을 제외한 모든 단어가 중심 단어가 될 수 있습니다.
        return self.data_size - 2 * self.window_size

    def __getitem__(self, idx):
        # idx는 실제 말뭉치(corpus)에서의 인덱스가 아니라, 
        # 가능한 중심 단어들의 리스트에서의 인덱스라고 봅니다.
        # 따라서 실제 corpus 인덱스는 idx + window_size 입니다.
        center_idx = idx + self.window_size
        target = self.corpus[center_idx]
        
        # Context 추출: 중심 단어의 앞뒤 window_size 만큼
        start_idx = center_idx - self.window_size
        end_idx = center_idx + self.window_size + 1
        
        # 중심 단어 제외하고 연결
        context = np.concatenate((
            self.corpus[start_idx : center_idx],
            self.corpus[center_idx + 1 : end_idx]
        ))
        
        return torch.from_numpy(context).long(), torch.tensor(target).long()

# 데이터셋 인스턴스 생성
dataset = Word2VecDataset(corpus_train, window_size=config.window_size)
dataloader = DataLoader(dataset, batch_size=config.batch_size, shuffle=True, num_workers=0)



## 5. Negative Sampler 구현

학습 시 매 배치의 정답(Target) 단어마다 오답(Negative) 단어를 샘플링해주는 클래스입니다.
- 단어 빈도수의 0.75승($P(w)^{0.75}$)에 비례하여 샘플링 확률을 설정합니다. 이는 희귀 단어가 조금 더 자주 선택되도록 보정하는 역할을 합니다.
- `get_negative_sample`: 배치의 타겟 단어들에 대해 오답 샘플을 반환합니다.



In [None]:
class UnigramSampler:
    def __init__(self, corpus, power, sample_size):
        self.sample_size = sample_size
        
        # 단어 빈도 계산
        counts = collections.Counter(corpus)
        vocab_size = len(word_to_id) # 전체 사전 크기 기준
        
        self.word_p = np.zeros(vocab_size)
        for w_id, count in counts.items():
            self.word_p[w_id] = count
            
        # 확률 분포 보정 (Power 0.75)
        self.word_p = np.power(self.word_p, power)
        self.word_p /= np.sum(self.word_p)
        
        # PyTorch의 multinomial 사용을 위해 텐서로 변환
        self.word_p_torch = torch.from_numpy(self.word_p).float()

    def get_negative_sample(self, target_batch):
        '''
        target_batch: (batch_size,)
        return: (batch_size, sample_size)
        '''
        batch_size = target_batch.shape[0]
        
        # torch.multinomial을 사용하여 가중치 기반 샘플링 (복원 추출)
        negative_sample = torch.multinomial(self.word_p_torch, batch_size * self.sample_size, replacement=True)
        
        # 결과 형태 변환
        negative_sample = negative_sample.view(batch_size, self.sample_size)
        
        return negative_sample

sampler = UnigramSampler(corpus_train, config.power, config.negative_sample_size)
# 샘플링 확률 테이블도 GPU로 옮겨두면 빠릅니다.
sampler.word_p_torch = sampler.word_p_torch.to(device)



## 6. CBOW 모델 아키텍처 (Negative Sampling 포함)

CBOW 모델은 두 개의 임베딩 레이어로 구성됩니다.
1.  **`in_embed`**: 입력 단어(Context)들의 벡터 표현 (실제 우리가 사용할 단어 벡터).
2.  **`out_embed`**: 출력 단어(Target) 예측을 위한 가중치 행렬 (학습 보조용).

**Forward 과정**:
1.  Context 단어들의 벡터를 조회하여 평균(Mean)을 구합니다 ($h$).
2.  **Positive Score**: 중심 단어(Target)의 벡터와 $h$의 내적(Dot Product).
3.  **Negative Score**: 오답 단어(Negative Samples)들의 벡터와 $h$의 내적.
4.  **Loss 계산**: LogSigmoid 함수를 이용하여 Positive는 1에 가깝게, Negative는 0에 가깝게 학습합니다.
    - Loss = $- \log(\sigma(Score_{pos})) - \sum \log(\sigma(-Score_{neg}))$



In [None]:
class CBOW(nn.Module):
    def __init__(self, vocab_size, hidden_size):
        super(CBOW, self).__init__()
        
        # 입력 임베딩 (W_in): 단어의 분산 표현
        self.in_embed = nn.Embedding(vocab_size, hidden_size)
        # 출력 임베딩 (W_out): 네거티브 샘플링 시 가중치 역할
        self.out_embed = nn.Embedding(vocab_size, hidden_size)
        
        # 가중치 초기화 (작은 정규분포 값)
        self.in_embed.weight.data.normal_(0, 0.01)
        self.out_embed.weight.data.normal_(0, 0.01)
        
    def forward(self, contexts, targets, negative_samples):
        '''
        contexts: (batch_size, 2*window_size)
        targets: (batch_size)
        negative_samples: (batch_size, sample_size)
        '''
        
        # 1. Context Hidden Layer (입력의 평균)
        # (batch, 2*window, hidden) -> (batch, hidden)
        h = self.in_embed(contexts)
        h = torch.mean(h, dim=1)
        
        # 2. Positive Sample Score (정답과의 유사도)
        # target_w: (batch, hidden)
        target_w = self.out_embed(targets)
        # score_pos: (batch,) - 각 배치의 내적값
        score_pos = torch.sum(h * target_w, dim=1)
        
        # 3. Negative Sample Score (오답과의 유사도)
        # neg_w: (batch, sample_size, hidden)
        neg_w = self.out_embed(negative_samples)
        
        # 배치 행렬 곱(bmm)을 위해 h의 차원 변경: (batch, hidden) -> (batch, hidden, 1)
        # neg_w: (batch, sample_size, hidden)
        # 결과: (batch, sample_size, 1) -> squeeze -> (batch, sample_size)
        score_neg = torch.bmm(neg_w, h.unsqueeze(2)).squeeze(2)
        
        # 4. Loss Function (Negative Sampling Loss)
        # Positive는 1이 되도록 (logsigmoid(x)), Negative는 0이 되도록 (logsigmoid(-x))
        # LogSigmoid(x) = log(1 / (1 + exp(-x)))
        loss_pos = -torch.nn.functional.logsigmoid(score_pos).mean()
        loss_neg = -torch.nn.functional.logsigmoid(-score_neg).sum(dim=1).mean()
        
        return loss_pos + loss_neg

# 모델 초기화
vocab_size = len(word_to_id)
model = CBOW(vocab_size, config.hidden_size).to(device)
optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
print(model)



## 7. 학습 (Training)

준비된 데이터로 모델을 학습시킵니다.
- 매 스텝마다 `UnigramSampler`를 통해 새로운 Negative Sample을 생성하여 모델에 전달합니다.
- Loss가 점차 감소하는 것을 확인합니다.



In [None]:
loss_history = []
start_time = time.time()

print(f"학습 시작 (총 에폭: {config.max_epoch})...")
model.train()

for epoch in range(config.max_epoch):
    total_loss = 0
    cnt = 0
    
    for contexts, targets in dataloader:
        # 데이터를 GPU/Device로 이동
        contexts = contexts.to(device)
        targets = targets.to(device)
        
        # 배치에 맞는 네거티브 샘플 생성
        negative_samples = sampler.get_negative_sample(targets)
        negative_samples = negative_samples.to(device)
        
        # Forward 및 Backward
        optimizer.zero_grad()
        loss = model(contexts, targets, negative_samples)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        cnt += 1
        
        # 로그 출력 (1000 step 마다)
        if cnt % 1000 == 0:
            elapsed = time.time() - start_time
            print(f"\rEpoch {epoch+1}/{config.max_epoch} | Step {cnt}/{len(dataloader)} | Loss {total_loss/cnt:.4f} | Time {elapsed:.1f}s", end='')
            
    avg_loss = total_loss / cnt
    loss_history.append(avg_loss)
    print(f"\n[Epoch {epoch+1} 완료] Avg Loss: {avg_loss:.4f}")

print("학습 완료!")

# 손실 그래프 그리기
plt.plot(loss_history)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.show()



## 8. 모델 평가 및 분석

학습된 단어 벡터(`in_embed`)를 사용하여 단어 간의 유사도와 유추(Analogy) 문제를 풀어봅니다.
- **`most_similar`**: 주어진 단어와 코사인 유사도(Cosine Similarity)가 가장 높은 단어들을 찾습니다.
- **`analogy`**: "King - Man + Queen = ?" 같은 벡터 연산을 수행합니다.



In [None]:
def get_word_vec(model, word, word_to_id):
    if word not in word_to_id:
        return None
    # 모델의 in_embed 가중치를 가져옵니다 (Distributed Representation)
    word_id = word_to_id[word]
    vec = model.in_embed.weight.data[word_id].cpu().numpy()
    return vec

def most_similar(query, top=5):
    if query not in word_to_id:
        print(f"'{query}' 단어를 사전에 찾을 수 없습니다.")
        return
    
    # 1. 쿼리 벡터 가져오기
    query_id = word_to_id[query]
    query_vec = model.in_embed.weight.data[query_id].unsqueeze(0) # (1, hidden)
    
    # 2. 전체 단어 벡터와의 코사인 유사도 계산
    # (효율을 위해 정규화된 행렬곱 사용)
    W = model.in_embed.weight.data # (vocab, hidden)
    
    # 벡터 정규화 (L2 Norm으로 나눔)
    query_norm = query_vec / (query_vec.norm(dim=1, keepdim=True) + 1e-8)
    W_norm = W / (W.norm(dim=1, keepdim=True) + 1e-8)
    
    # 유사도 계산: (vocab, )
    similarities = torch.mm(W_norm, query_norm.t()).squeeze()
    
    # 3. 상위 top 개 추출
    # argsort는 오름차순이므로 뒤집어야 함, 혹은 topk 사용
    top_vals, top_idxs = torch.topk(similarities, k=top+1) # 자기 자신 포함되므로 +1
    
    print(f"\n[Query: {query}]")
    count = 0
    for val, idx in zip(top_vals, top_idxs):
        idx = idx.item()
        if id_to_word[idx] == query: continue # 자기 자신 제외
        print(f"  {id_to_word[idx]}: {val:.4f}")
        count += 1
        if count >= top: break

def analogy(a, b, c, top=5):
    # a:b = c:?  ==>  b - a + c = ?
    for w in (a, b, c):
        if w not in word_to_id:
            print(f"'{w}' 단어 없음")
            return
            
    vec_a = model.in_embed.weight.data[word_to_id[a]]
    vec_b = model.in_embed.weight.data[word_to_id[b]]
    vec_c = model.in_embed.weight.data[word_to_id[c]]
    
    target_vec = vec_b - vec_a + vec_c
    
    # 정규화된 코사인 유사도 검색
    W = model.in_embed.weight.data
    W_norm = W / (W.norm(dim=1, keepdim=True) + 1e-8)
    target_vec_norm = target_vec / (target_vec.norm() + 1e-8)
    
    similarities = torch.mm(W_norm, target_vec_norm.unsqueeze(1)).squeeze()
    top_vals, top_idxs = torch.topk(similarities, k=top+3) # 입력 단어들이 나올 수 있으므로 넉넉히
    
    print(f"\n[Analogy: {a}:{b} = {c}:?]")
    count = 0
    for val, idx in zip(top_vals, top_idxs):
        idx = idx.item()
        word = id_to_word[idx]
        if word in (a, b, c): continue
        print(f"  {word}: {val:.4f}")
        count += 1
        if count >= top: break

# 테스트
print("유사 단어 검색 결과:")
most_similar('good')
most_similar('car')
most_similar('apple')
most_similar('run')

print("\n유추(Analogy) 테스트 결과 :")
analogy('king', 'man', 'queen')
analogy('walk', 'walking', 'run')
analogy('take', 'took', 'go')