<a href="https://colab.research.google.com/github/deguc/Shannon/blob/main/004_CBOW.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def onehot(x,k):

    return np.identity(k)[x]


def dataset(token,size,vocab_size):

    X,Y = [],[]
    k = len(token)

    for i in range(k-size):

        j = i+size

        X += [token[i:j]]
        Y += [token[j]]

    return np.vstack(X),onehot(np.hstack(Y),vocab_size)


class DataLoader:

    def __init__(self,dataset,batch_size=10):

        self.x,self.y = dataset
        self.batch_size = batch_size
        self.data_size = self.x.shape[0]
        self.cnt = 0

    def shuffle(self):

        idx = np.random.permutation(self.data_size)
        self.x,self.y = self.x[idx],self.y[idx]

    def get_idx(self,idx):

        i = self.batch_size*idx
        j = i + self.batch_size

        return i,j

    def __len__(self):
        return self.data_size // self.batch_size

    def __iter__(self):
        return self

    def __getitem__(self,idx):


        if idx < 0 or idx >= len(self):
            raise IndexError('out of range')

        i,j = self.get_idx(idx)

        return self.x[i:j],self.y[i:j]

    def __next__(self):

        if self.cnt == 0:
            self.shuffle()

        if self.cnt >= len(self):

            self.cnt = 0

            raise StopIteration

        else:

            i,j = self.get_idx(self.cnt)

            self.cnt += 1

            return self.x[i:j],self.y[i:j]


def zeros_ps(ps):

    gs = []

    for p in ps:

        gs += [np.zeros_like(p)]

    return gs


class Module:

    def __init__(self):

        self.ps,self.gs = [],[]
        self.train_flag = None


class Linear(Module):

    def __init__(self,d_in,d_out,biased=True):
        super().__init__()
        self.biased = biased

        std = np.sqrt(d_in/2)
        self.ps = [
            np.random.randn(d_in,d_out)/std,
            np.zeros(d_out)
        ]
        self.gs = zeros_ps(self.ps)

        self.inputs = None

    def __call__(self,x):

        self.inputs = x

        return x @ self.ps[0] + self.ps[1]

    def backward(self,dout):

        self.gs[0][...] = self.inputs.T @ dout

        if self.biased:
            self.gs[1][...] = np.sum(dout,axis=0)

        return dout @ self.ps[0].T


class ReLU(Module):

    def __init__(self):
        super().__init__()

        self.mask = None

    def __call__(self,x):

        self.mask = x <= 0
        out = x.copy()
        out[self.mask] = 0

        return out

    def backward(self,dout):

        dout[self.mask] = 0

        return dout


class CBOW:

    def __init__(self,vocab_size,d_emb,ctx_size):

        self.ctx_size = ctx_size
        std = np.sqrt(vocab_size/2)
        W = np.random.randn(vocab_size,d_emb)/std
        self.emb = [Embedding(W) for _ in range(ctx_size)]
        self.aff = Linear(d_emb*ctx_size,vocab_size,biased=False)
        self.layers = [*self.emb,self.aff]

        self.ps = [[],[]]

        for l in self.layers:
            self.ps[0] += l.ps
            self.ps[1] += l.gs

    def __call__(self,x):

        h = []
        for i,e in enumerate(self.emb):
            h += [e(x[:,i])]

        out = np.hstack(h)

        return self.aff(out)

    def backward(self,dout):


        dout = self.aff.backward(dout)
        dout = np.split(dout,self.ctx_size,axis=-1)

        for e,d in zip(self.emb,dout):
            e.backward(d)

    def pred(self,x):

        return np.argmax(self(x),axis=-1)

    def train(self):

        for l in self.layers:
            l.train_flag = True

    def eval(self):

        for l in self.layers:
            l.train_flag = False


def softmax(x):

    c = np.max(x,axis=-1,keepdims=True)
    z = np.exp(x-c)

    return z / np.sum(z,axis=-1,keepdims=True)


def cross_entropy(y,t):

    eps = 1e-6

    return -np.sum(t*np.log(y+eps))/y.shape[0]


class Loss:

    def __init__(self,model,clf=softmax,loss=cross_entropy):

        self.model = model
        self.clf = clf
        self.loss = loss

        self.dout = None


    def __call__(self,y,t):

        out = self.clf(y)
        self.dout = out - t

        return self.loss(out,t)

    def backward(self):
        self.model.backward(self.dout)


class AdamW:

    def __init__(self,ps,lr,beta1=0.25,beta2=0.9,weight_decay=0.1):

        self.ps = ps
        self.cache = (lr,beta1,beta2,weight_decay)
        self.hs = [
            zeros_ps(ps[0]),
            zeros_ps(ps[1])
        ]
        self.cnt = 0

    def __call__(self):

        eps = 1e-6
        ps,gs = self.ps
        ms,vs = self.hs
        lr,b1,b2,w = self.cache
        self.cnt += 1
        n = self.cnt

        for p,g,m,v in zip(ps,gs,ms,vs):

            m[...] = b1*m + (1-b1)*g
            v[...] = b2*v + (1-b2)*g*g

            m0 = m / (1-b1**n)
            v0 = v / (1-b2**n)

            p - w*lr*g

            p -= lr*m0/(np.sqrt(v0)+eps)


def trainer(model,loss,optimizer,data,epochs=100):

    ls = []

    for _ in range(epochs):

        model.train()
        l = 0

        for x,t in data:

            y = model(x)
            l += loss(y,t)
            loss.backward()
            optimizer()

        ls += [l/len(data)]
        model.eval()

    return ls


def disp_loss(loss):

    plt.title('Loss Function')
    plt.xlabel('epochs')
    plt.ylabel('cross entropy')
    plt.plot(loss)
    plt.show()

class Tokenizer:

    def __init__(self,text):

        vocab = {}

        for w in text:

            if w not in vocab:
                vocab[w] = len(vocab)

        self.vocab = vocab

        self.dic = {k:v for v,k in self.vocab.items()}

        self.token = np.array([vocab[w] for w in text])

        self.vocab_size = len(vocab)

    def encode(self,text):

        encoded = []

        for w in text:
            encoded += [self.vocab[w]]

        return encoded

    def decode(self,encoded):

        decoded = ''

        for i in encoded:
            decoded += self.dic[i]

        return decoded


class Embedding(Module):

    def __init__(self,W):
        super().__init__()

        self.ps = [W]
        self.gs =zeros_ps(self.ps)

        self.idx = None

    def __call__(self,idx):

        W, = self.ps
        self.idx = idx

        return W[idx]

    def backward(self,dout):

        dW, = self.gs
        dW[...] = 0
        np.add.at(dW,self.idx,dout)

def generator(model,tokenizer,text,ctx_size,new_size):

    new_token = []
    token = tokenizer.encode(text)

    for _ in range(new_size):
       x = np.array([token[-ctx_size:]])
       new_word = model.pred(x)[0]
       token += [new_word]
       new_token += [new_word]

    return tokenizer.decode(new_token)


np.set_printoptions(precision=2,suppress=True)

text = 'これやこの いくもかえるも わかれては しるもしらぬも おうさかのせき'

tokenizer = Tokenizer(text)
token = tokenizer.token
vocab_size = tokenizer.vocab_size
d_emb = 4*vocab_size
ctx_size = 5

dataset = dataset(token,size=ctx_size,vocab_size=vocab_size)
data = DataLoader(dataset,batch_size=5)


epochs = 100
model = CBOW(vocab_size,d_emb,ctx_size)
loss = Loss(model)
optimizer = AdamW(model.ps,lr=0.01)
ls = trainer(model,loss,optimizer,data,epochs)

text = 'これやこの'
pred = generator(model,tokenizer,text,ctx_size,new_size=8)
print(pred)
#disp_loss(ls)

