In [1]:
import torch
import numpy as np

# NumPy 배열을 PyTorch 텐서로 변환
W = np.arange(21).reshape(7, 3)
W = torch.tensor(W, dtype=torch.float32, requires_grad=True)  # requires_grad=True는 기울기 계산을 위한 설정

# 인덱스를 정의
idx = torch.tensor([1, 0, 3, 0])

# Embedding 클래스 구현
class Embedding:
    def __init__(self, W):
        self.params = [W]
        self.grads = [torch.zeros_like(W)]
        self.idx = None
        
    def forward(self, idx):
        self.idx = idx
        W, = self.params
        out = W[idx]
        return out
    
    def backward(self, dout):
        dW, = self.grads
        dW.zero_()  # 기울기를 0으로 초기화
        # dout을 인덱스에 맞게 더함
        for i, idx in enumerate(self.idx):
            dW[idx] += dout[i]
        return None

# 모델 초기화
embedding = Embedding(W)

# 순전파 (forward)
out = embedding.forward(idx)
print("Forward output:")
print(out)

# 임의의 기울기 (dout)
dout = torch.ones_like(out)

# 역전파 (backward)
embedding.backward(dout)
print("Gradient of W:")
print(embedding.grads[0])


Forward output:
tensor([[ 3.,  4.,  5.],
        [ 0.,  1.,  2.],
        [ 9., 10., 11.],
        [ 0.,  1.,  2.]], grad_fn=<IndexBackward0>)
Gradient of W:
tensor([[2., 2., 2.],
        [1., 1., 1.],
        [0., 0., 0.],
        [1., 1., 1.],
        [0., 0., 0.],
        [0., 0., 0.],
        [0., 0., 0.]])


In [2]:
import torch
import torch.nn as nn

class EmbeddingDot:
    def __init__(self, W):
        # Embedding 클래스는 nn.Embedding으로 구현
        self.embed = nn.Embedding.from_pretrained(torch.tensor(W, dtype=torch.float32))
        self.params = self.embed.parameters()  # 가중치 파라미터
        self.grads = [torch.zeros_like(param) for param in self.params]  # 기울기 초기화
        self.cache = None
        
    def forward(self, h, idx):
        # target_W를 가져오는 과정
        target_W = self.embed(idx)
        out = torch.sum(target_W * h, dim=1)  # 내적 계산
        
        # 캐시 저장 (h, target_W)
        self.cache = (h, target_W)
        return out
    
    def backward(self, dout):
        # 역전파 계산
        h, target_W = self.cache
        dout = dout.view(-1, 1)  # dout 크기 변경
        
        # 기울기 계산
        dtarget_W = dout * h
        target_W.grad = dtarget_W.sum(dim=0)  # accumulate gradient
        
        dh = dout * target_W
        return dh


In [4]:
import torch

# 단어 목록
words = ['you', 'say', 'goodbye', 'i', 'hello', '.']

# 무작위로 하나의 단어 선택 (torch.randint로 처리)
random_idx = torch.randint(0, len(words), (1,))
print(words[random_idx])

# 무작위로 5개 단어 선택 (중복 없이)
indices = torch.randperm(len(words))[:5]
print([words[i] for i in indices])

# 확률 분포에 따른 무작위 선택
p = torch.tensor([0.5, 0.1, 0.05, 0.2, 0.05, 0.1])
chosen_idx = torch.multinomial(p, 1)
print(words[chosen_idx])

# 확률 분포 변경
p = torch.tensor([0.7, 0.29, 0.01])
new_p = torch.pow(p, 0.75)

# 확률을 합이 1이 되도록 정규화
new_p = new_p / new_p.sum()
print(new_p)


say
['say', 'hello', '.', 'goodbye', 'i']
you
tensor([0.6420, 0.3315, 0.0265])


In [5]:
import torch
import numpy as np
import collections

# Negative Sampling 클래스 구현 (PyTorch 버전)
class UnigramSampler:
    def __init__(self, corpus, power, sample_size, device='cpu'):
        self.sample_size = sample_size
        self.vocab_size = None
        self.word_p = None
        self.device = device
        
        counts = collections.Counter()
        for word_id in corpus:
            counts[word_id] += 1
            
        vocab_size = len(counts)
        self.vocab_size = vocab_size
        
        self.word_p = torch.zeros(vocab_size, dtype=torch.float32, device=self.device)
        for i in range(vocab_size):
            self.word_p[i] = counts[i]
            
        self.word_p = self.word_p ** power
        self.word_p /= self.word_p.sum()  # 정규화
        
    def get_negative_sample(self, target):
        batch_size = target.shape[0]
        
        negative_sample = torch.zeros((batch_size, self.sample_size), dtype=torch.int64, device=self.device)
        
        for i in range(batch_size):
            p = self.word_p.clone()
            target_idx = target[i]
            p[target_idx] = 0  # target이 뽑히지 않도록 함
            p /= p.sum()  # 정규화
            
            # 음성 샘플링: multinomial 사용
            negative_sample[i, :] = torch.multinomial(p, self.sample_size, replacement=False)
            
        return negative_sample


# 예시 데이터
corpus = np.array([0, 1, 2, 3, 4, 1, 2, 3])  # 샘플 corpus
power = 0.75  # power
sample_size = 2  # 음성 샘플의 수

# `UnigramSampler` 인스턴스 생성
sampler = UnigramSampler(corpus, power, sample_size, device='cpu')

# target 샘플
target = torch.tensor([1, 3, 0], dtype=torch.int64, device='cpu')

# 음성 샘플링
negative_sample = sampler.get_negative_sample(target)
print(negative_sample)


tensor([[2, 3],
        [2, 0],
        [1, 2]])


In [7]:
import torch
import torch.nn as nn

class NegativeSamplingLoss(nn.Module):
    def __init__(self, W, corpus, power=0.75, sample_size=5):
        super().__init__()
        self.sample_size = sample_size
        self.sampler = UnigramSampler(corpus, power, sample_size)
        
        # BCEWithLogitsLoss는 시그모이드와 BCELoss를 결합한 함수
        self.loss_fn = nn.BCEWithLogitsLoss(reduction='sum')
        
        # Embedding layers
        self.embed = nn.Embedding.from_pretrained(W, freeze=False)
        
    def forward(self, h, target):
        batch_size = target.shape[0]
        negative_sample = self.sampler.get_negative_sample(target)
        
        # 긍정적 예 순전파
        positive_score = torch.sum(self.embed(target) * h, dim=1)
        correct_label = torch.ones(batch_size, device=h.device)
        loss = self.loss_fn(positive_score, correct_label)
        
        # 부정적 예 순전파
        negative_label = torch.zeros(batch_size, device=h.device)
        for i in range(self.sample_size):
            negative_target = negative_sample[:, i]
            negative_score = torch.sum(self.embed(negative_target) * h, dim=1)
            loss += self.loss_fn(negative_score, negative_label)
        
        return loss
    
    def backward(self, dout=1):
        # backward는 자동으로 PyTorch에서 처리됩니다.
        pass


In [8]:
import torch
import torch.nn as nn

class CBOW(nn.Module):
    def __init__(self, vocab_size, hidden_size, window_size, corpus, sample_size=5, power=0.75):
        super(CBOW, self).__init__()
        
        # 가중치 초기화
        self.in_embedding = nn.Embedding(vocab_size, hidden_size)
        self.out_embedding = nn.Embedding(vocab_size, hidden_size)
        
        # Negative Sampling 손실 함수
        self.ns_loss = NegativeSamplingLoss(self.out_embedding.weight, corpus, power=power, sample_size=sample_size)
        
        # 모델 파라미터
        self.params = list(self.in_embedding.parameters()) + list(self.out_embedding.parameters())
        self.grads = list(self.in_embedding.parameters()) + list(self.out_embedding.parameters())
        
    def forward(self, contexts, target):
        # 컨텍스트 단어들에 대한 평균 임베딩 계산
        h = self.in_embedding(contexts)  # (batch_size, window_size * 2, hidden_size)
        h = h.mean(dim=1)  # (batch_size, hidden_size)
        
        # NegativeSamplingLoss를 통해 손실 계산
        loss = self.ns_loss.forward(h, target)
        return loss
    
    def backward(self, dout=1):
        dout = self.ns_loss.backward(dout)
        dout /= len(self.in_embedding.weight)
        # Embedding Layer의 역전파
        self.in_embedding.weight.grad += dout
        return None


In [9]:
import torch
import torch.nn as nn

class CBOW(nn.Module):
    def __init__(self, vocab_size, hidden_size, window_size, corpus, sample_size=5, power=0.75):
        super(CBOW, self).__init__()
        
        # 가중치 초기화
        self.in_embedding = nn.Embedding(vocab_size, hidden_size)
        self.out_embedding = nn.Embedding(vocab_size, hidden_size)
        
        # Negative Sampling 손실 함수
        self.ns_loss = NegativeSamplingLoss(self.out_embedding.weight, corpus, power=power, sample_size=sample_size)
        
        # 모델 파라미터
        self.params = list(self.in_embedding.parameters()) + list(self.out_embedding.parameters())
        self.grads = list(self.in_embedding.parameters()) + list(self.out_embedding.parameters())
        
    def forward(self, contexts, target):
        # 컨텍스트 단어들에 대한 평균 임베딩 계산
        h = self.in_embedding(contexts)  # (batch_size, window_size * 2, hidden_size)
        h = h.mean(dim=1)  # (batch_size, hidden_size)
        
        # NegativeSamplingLoss를 통해 손실 계산
        loss = self.ns_loss.forward(h, target)
        return loss
    
    def backward(self, dout=1):
        dout = self.ns_loss.backward(dout)
        dout /= len(self.in_embedding.weight)
        # Embedding Layer의 역전파
        self.in_embedding.weight.grad += dout
        return None


In [10]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset
from dataset import ptb
from common.util import create_contexts_target
from torch import nn, optim

# 파라미터 설정
window_size = 5
hidden_size = 100
batch_size = 100
max_epoch = 1

# 데이터 읽기
corpus, word_to_id, id_to_word = ptb.load_data('train')
vocab_size = len(word_to_id)

# 데이터 준비
contexts, target = create_contexts_target(corpus, window_size)

# PyTorch Dataset 및 DataLoader
contexts_tensor = torch.tensor(contexts, dtype=torch.long)
target_tensor = torch.tensor(target, dtype=torch.long)
dataset = TensorDataset(contexts_tensor, target_tensor)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# 모델 정의 (SkipGram)
class SkipGram(nn.Module):
    def __init__(self, vocab_size, hidden_size):
        super(SkipGram, self).__init__()
        self.in_embedding = nn.Embedding(vocab_size, hidden_size)
        self.out_embedding = nn.Embedding(vocab_size, hidden_size)

    def forward(self, contexts, target):
        h = self.in_embedding(contexts)
        h = h.mean(dim=1)  # 컨텍스트 단어들의 평균을 구합니다.
        score = torch.sum(self.out_embedding(target) * h, dim=1)
        return score

# 모델 초기화
model = SkipGram(vocab_size, hidden_size)
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCEWithLogitsLoss(reduction='sum')

# 훈련 루프
def train(model, data_loader, optimizer, loss_fn, max_epoch):
    model.train()
    for epoch in range(max_epoch):
        total_loss = 0
        for contexts, target in data_loader:
            optimizer.zero_grad()

            # 모델 출력 및 손실 계산
            score = model(contexts, target)
            correct_label = torch.ones_like(score)
            loss = loss_fn(score, correct_label)
            total_loss += loss.item()

            # 역전파 및 최적화
            loss.backward()
            optimizer.step()

        print(f"Epoch {epoch+1}/{max_epoch}, Loss: {total_loss:.4f}")

# 훈련 시작
train(model, data_loader, optimizer, loss_fn, max_epoch)

# 훈련 과정 시각화
plt.plot(range(max_epoch), total_loss)
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()


KeyboardInterrupt: 