In [1]:
class MatMul:
    def __init__(self, W):
        self.params = [W]
        self.grads = [np.zeros_like(W)]
        self.x = None

    def forward(self, x):
        W, = self.params
        out = np.dot(x, W)
        self.x = x
        return out

    def backward(self, dout):
        W, = self.params
        dx = np.dot(dout, W.T)
        dW = np.dot(self.x.T, dout)
        self.grads[0][...] = dW
        # self.grads[0] = np.zeros_like(W)
        # self.grads[0][...] 로 값을 update해준다.
        # 그냥 self.grads[0] = dw하면 메모리 주소를 참조하게 됨
        return dx

In [3]:
import sys
sys.path.append('..')
import numpy as np
from common.layers import MatMul

# sample data - Contexts
c0 = np.array([[1, 0, 0, 0, 0, 0, 0]]) # 2d!! for mini-batch
c1 = np.array([[0, 0, 1, 0, 0, 0, 0]])

# initialization of Weights
W_in = np.random.randn(7, 3) 
W_out = np.random.randn(3, 7)

# layers
in_layer0 = MatMul(W_in)
in_layer1 = MatMul(W_in)
out_layer = MatMul(W_out)

# forward
## 1. 1st Fully Connected Layer (flatten)
h0 = in_layer0.forward(c0)
h1 = in_layer0.forward(c1)
## 2. hidden layer (average)
h = 0.5 * (h0 + h1)
## 3. 2nd Fully Connected Layer (logits)
s = out_layer.forward(h)

print(s)

[[ 0.78435961  0.12663617  0.12511096 -0.63885019 -1.63886509  1.18232562
  -0.47736658]]


In [4]:
def softmax(x):
    '''
    - 2차원 입력의 경우:
    먼저, 각 행에서 최대값을 빼줍니다(x - x.max(axis=1, keepdims=True)). 이는 수치적으로 안정된 소프트맥스 계산을 위한 기법으로, 오버플로우를 방지합니다.
    그 다음, np.exp(x)를 사용하여 각 요소의 지수 값을 계산합니다.
    마지막으로, 각 행의 합으로 각 요소를 나누어 정규화합니다(x /= x.sum(axis=1, keepdims=True)).

    - 1차원 입력의 경우:
    최대값을 빼줍니다(x - np.max(x)). 이는 2차원 입력의 경우와 마찬가지로 수치적 안정성을 위함입니다.
    np.exp(x)를 사용하여 지수 값을 계산하고, 이를 전체 합으로 나누어 정규화합니다(np.exp(x) / np.sum(np.exp(x))).
    '''
    if x.ndim == 2:
        x = x - x.max(axis=1, keepdims=True)
        x = np.exp(x)
        x /= x.sum(axis=1, keepdims=True)
    elif x.ndim == 1:
        x = x - np.max(x)
        x = np.exp(x) / np.sum(np.exp(x))

    return x

def cross_entropy_error(y, t):
    if y.ndim == 1:
        t = t.reshape(1, t.size)
        y = y.reshape(1, y.size)
        
    # 정답 데이터가 원핫 벡터일 경우 정답 레이블 인덱스로 변환
    if t.size == y.size:
        t = t.argmax(axis=1)
             
    batch_size = y.shape[0]

    return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size

class SoftmaxWithLoss:
    def __init__(self):
        self.params, self.grads = [], []
        self.y = None  # softmax의 출력
        self.t = None  # 정답 레이블

    def forward(self, x, t):
        self.t = t
        self.y = softmax(x)

        # 정답 레이블이 원핫 벡터일 경우 정답의 인덱스로 변환
        if self.t.size == self.y.size:
            self.t = self.t.argmax(axis=1)

        loss = cross_entropy_error(self.y, self.t)
        return loss

    def backward(self, dout=1):
        batch_size = self.t.shape[0]

        dx = self.y.copy()
        # y - t (t는 label이잖아 그래서 1임)
        dx[np.arange(batch_size), self.t] -= 1
        dx *= dout
        dx = dx / batch_size

        return dx

def preprocess(text):
    text = text.lower()
    text = text.replace('.', ' .')
    words = text.split(' ')

    word_to_id = {}
    id_to_word = {}
    for word in words:
        if word not in word_to_id:
            new_id = len(word_to_id)
            word_to_id[word] = new_id
            id_to_word[new_id] = word

    corpus = np.array([word_to_id[w] for w in words]) # encoding

    return corpus, word_to_id, id_to_word

# 1. preprocess

In [6]:
aimport sys
sys.path.append('..')
from common.util import preprocess

text = 'You say goodbye and I say hello.'
corpus, word_to_id, id_to_word = preprocess(text)
print(corpus)
print(id_to_word)

[0 1 2 3 4 1 5 6]
{0: 'you', 1: 'say', 2: 'goodbye', 3: 'and', 4: 'i', 5: 'hello', 6: '.'}


# 2. Context/Target seperation
![image.png](attachment:5ca87ee7-1332-4055-8c42-755b13d93798.png)

In [35]:
def create_contexts_target(corpus, window_size=1): # corpus는 전체 문장에 대한 id값(idx)
    target = corpus[window_size:-window_size] # 전과 후에 단어가 있는게 보장될라면, window_size부터~ -window_size
    
    contexts = []
    for idx in range(window_size, len(corpus)-window_size): # target에 대한 것!
        cs = []
        for t in range(-window_size, window_size+1): # 한 target에 대해서, window크기만큼 왼쪽 ~ 오른쪽
            if t == 0:
                continue
            cs.append(corpus[idx + t]) # [idx+0]은 target이라서 t=0일때 pass하는거임 (idx가 target을 나타낸다)
            #print(f"{idx}, {t}, {cs}")
        #print()
        contexts.append(cs)
        
    return np.array(contexts), np.array(target)

contexts, target = create_contexts_target(corpus, window_size=1)
print(contexts)
print(target)

[[0 2]
 [1 3]
 [2 4]
 [3 1]
 [4 5]
 [1 6]]
[1 2 3 4 1 5]


# 3. 원핫 표현으로 변환
![image.png](attachment:a41f086d-3f71-469e-b8a5-a31eba5b3acd.png)

In [33]:
def convert_one_hot(corpus, vocab_size):
    '''원핫 표현으로 변환

    :param corpus: target/contexts 단어 ID 목록(1차원 또는 2차원 넘파이 배열)
    :param vocab_size: 어휘 수 -> 고정 길이 벡터 위함!!
    :return: 원핫 표현(2차원 또는 3차원 넘파이 배열)
    '''
    N = corpus.shape[0] # target(1차원) or contexts(2차원) 개수

    if corpus.ndim == 1:  # target이라면 (N,) -> (N, vocab_size)
        one_hot = np.zeros((N, vocab_size), dtype=np.int32) 
        for idx, word_id in enumerate(corpus):
            one_hot[idx, word_id] = 1  # [2] -> [0 0 1 0 0 ...]

    elif corpus.ndim == 2: # contexts라면 (N, win*2) -> (N, win*2, vocab_size)
        C = corpus.shape[1] # win*2 (그니까 contexts 개수)
        one_hot = np.zeros((N, C, vocab_size), dtype=np.int32) # context내의 어휘 당 vocab_size 고정길이벡터
        for idx_0, word_ids in enumerate(corpus): # contexts IDs "한 줄"씩(N,C) [0,2] [1,3]...
            for idx_1, word_id in enumerate(word_ids): # 한 줄안에 IDs 각각 변환(vocab_size)
                one_hot[idx_0, idx_1, word_id] = 1     # [0, 0, 2(2번째,해당context)]
    return one_hot

In [36]:
vocab_size = len(word_to_id) # 전체 vocab 크기 (corpus랑 다름)
target = convert_one_hot(target, vocab_size)
contexts = convert_one_hot(contexts, vocab_size)
print(target)

[[0 1 0 0 0 0 0]
 [0 0 1 0 0 0 0]
 [0 0 0 1 0 0 0]
 [0 0 0 0 1 0 0]
 [0 1 0 0 0 0 0]
 [0 0 0 0 0 1 0]]


# CBOW 신경망(model)
![image.png](attachment:be2aa153-c4e2-48de-8682-9b6b3e838dde.png)

In [43]:
import sys
sys.path.append('..')
import numpy as np
from common.layers import MatMul, SoftmaxWithLoss # 위에서 다 함

class SimpleCBOW:
    def __init__(self, vocab_size, hidden_size):
        V, H = vocab_size, hidden_size

        # 가중치 초기화 ~N(표준편차가 0.01)
        W_in = 0.01 * np.random.randn(V, H).astype('f')
        W_out = 0.01 * np.random.randn(H, V).astype('f') # 32비트 부동소수점 속도 빠르다

        # 계층
        self.in_layer0 = MatMul(W_in) # 1st context
        self.in_layer1 = MatMul(W_in) # 2nd context
        self.out_layer = MatMul(W_out)
        self.loss_layer = SoftmaxWithLoss()

        # 모든 layer의 가중치와 기울기를 순서대로 리스트에 모음
        layers = [self.in_layer0, self.in_layer1, self.out_layer]
        self.params, self.grads = [], []
        for layer in layers:
            self.params += layer.params
            self.grads += layers.grads

        # word embedding(단어의 분산표현)은 W_in
        self.word_vecs = W_in

    def forward(self, contexts, target): # (context경우의수, windowsize+1, vocab)
        h0 = self.in_layer0.forward(contexts[:, 0]) # 모든 contexts 경우의 수에 대해 첫번째 context vector를 완전연결계층 (context경우의수전체, 첫window)
        h1 = self.in_layer1.forward(contexts[:, 1]) # context0 - target - context1 이라고 치면 이번엔 "context1"을 통과시킴. (이해 안되면 밑에 코드 참고)
        # average
        h = (h1 + h2) * 0.5 # 그러면 모든 contexts 경우의 수만큼 남겠지 
        # logit
        score = self.out_layer.forward(h)
        # loss
        loss = self.loss_layer.forward(score, target)
        return loss        

    def backward(self, dout=1):
        # loss
        ds = self.loss_layer.backward(dout)
        # logit(matmul)
        da = self.out_layer.backward(ds)
        # average
        da *= 0.5 # 순전파 시에 h1+h2에 0.5 곱해준걸, da에 곱해줌
        self.in_layer1.backward(da) # 덧셈은 그대로 흘림
        self.in_layer0.backward(da) # 덧셈은 그대로 흘림
        return None # 굳이 return 안해도 알아서 내부에서 layer.params랑 layer.grads랑 업데이트 됨 (self.grads[0][...] = dW)

In [31]:
test = np.random.randint(1, 10, (6,2,7))
print(test)
print()
print(test[:,0])

[[[1 8 2 2 5 6 4]
  [2 6 9 2 9 2 3]]

 [[2 8 4 2 6 4 3]
  [7 4 4 9 6 9 7]]

 [[2 3 4 5 5 7 9]
  [2 5 9 9 3 7 7]]

 [[2 5 8 2 7 6 8]
  [1 1 9 5 8 5 1]]

 [[9 8 7 3 8 8 2]
  [5 6 9 8 6 8 1]]

 [[9 2 5 5 1 1 8]
  [8 2 7 9 9 8 8]]]

[[1 8 2 2 5 6 4]
 [2 8 4 2 6 4 3]
 [2 3 4 5 5 7 9]
 [2 5 8 2 7 6 8]
 [9 8 7 3 8 8 2]
 [9 2 5 5 1 1 8]]


# Skip-gram

In [41]:
import sys
sys.path.append('..')
import numpy as np
from common.layers import MatMul, SoftmaxWithLoss

class SimpleSkipGram:
    def __init__(self, vocab_size, hidden_size):
        V, H = vocab_size, hidden_size
        
        # 1. 가중치 초기화
        W_in = 0.01 * np.random.randn(V, H).astype('f')
        W_out = 0.01 * np.random.randn(H, V).astype('f')

        # 2. 계층 생성
        self.in_layer = MatMul(W_in)
        self.out_layer = MatMul(W_out)
        # window만큼 output/loss 층!!
        self.loss_layer1 = SoftmaxWithLoss()
        self.loss_layer2 = SoftmaxWithLoss()

        # 3. 모든 가중치와 기울기를 리스트에 모은다.
        layers = [self.in_layer, self.out_layer]
        self.params, self.grads = [], []
        for layer in layers:
            self.params += layer.params
            self.grads += layer.grads

        # 4. 인스턴스 변수에 단어의 분산 표현을 저장한다.
        self.word_vecs = W_in

    def forward(self, contexts, target):
        h = self.in_layer.forward(target)
        s = self.out_layer.forward(h)
        # 여기선 평균내지 않는다. 어차피 target에 대한거 하나라서
        l1 = self.loss_layer1.forward(s, contexts[:, 0]) 
        l2 = self.loss_layer2.forward(s, contexts[:, 1])
        loss = l1 + l2 # 합한다!
        return loss

    def backward(self, dout=1):
        # loss 합 -> loss 각자 -> affine 출력 -> affine 입력 -> 입력 
        dl1 = self.loss_layer1.backward(dout)
        dl2 = self.loss_layer2.backward(dout)
        ds = dl1 + dl2 # 2개(window*2)로 쪼갠거 총 손실 기울기
        dh = self.out_layer.backward(ds) # 두번째 완열계
        self.in_layer.backward(dh) # 첫번쨰 완열계
        return None