In [1]:
import numpy as np

In [704]:
class Variable():
    def __init__(self, data, T=None, grad=None, copy=True):
        if data is None or type(data) != np.ndarray:
            raise AttributeError('Wrong data type')
        
        if copy:
            self.data = data.copy()
        else:
            self.data = data
        if grad is None:
            grad = np.zeros_like(self.data)
        self.grad = grad
        if T is None:
            T = Variable(self.data.T, self, self.grad.T, copy=False)
        self.T = T
        self.fn = None
        self.child = []
        self.ready = False
    
    def zero_grad(self):
        self.grad[:,:] = 0.0
        self.child = []
        self.ready = False
    
    def __repr__(self):
        return 'Variable(\n{}\n)\n'.format(self.data.__str__())
    
    def __str__(self):
        return self.data.__str__()
    
    def __add__(self, b):
        if type(b) is not Variable:
            b = Variable(np.ones_like(self.data)*b)
            
        c = Variable(self.data + b.data)
        c.fn = [Variable.__grad_add__, self, b]
        
        self.child.append(c)
        b.child.append(c)
        return c
    
    def __grad_add__(self, a, b):
        a.grad += np.ones_like(a.grad) * self.grad
        b.grad += np.ones_like(b.grad) * self.grad
    
    def __sub__(self, b):
        if type(b) is not Variable:
            b = Variable(np.ones_like(self.data)*b)
        c = Variable(self.data - b.data)
        c.fn = [Variable.__grad_sub__, self, b]
        
        self.child.append(c)
        b.child.append(c)
        return c
    
    def __grad_sub__(self, a, b):
        a.grad += np.ones_like(a.grad) * self.grad
        b.grad -= np.ones_like(b.grad) * self.grad
    
    def __mul__(self, b):
        if type(b) is not Variable:
            b = Variable(np.ones_like(self.data)*b)
        
        c = Variable(self.data * b.data)
        c.fn = [Variable.__grad_mul__, self, b]
        
        self.child.append(c)
        b.child.append(c)
        return c
    
    def __grad_mul__(self, a, b):
        a.grad += b.data * self.grad
        b.grad += a.data * self.grad
    
    def __matmul__(self, b):
        c = Variable(np.matmul(self.data, b.data))
        c.fn = [Variable.__grad_matmul__, self, b]
           
        self.child.append(c)
        b.child.append(c)
        return c
    
    def __grad_matmul__(self, a, b):
        a.grad += np.matmul(self.grad, b.data.T)
        b.grad += np.matmul(a.data.T, self.grad)
    
    
    def tanh(self):
        c = Variable(np.tanh(self.data))
        c.fn = [Variable.__grad_tanh__, self]
        
        self.child.append(c)
        return c
        
    def __grad_tanh__(self, a):
        a.grad += self.grad * (1 - (self.data**2))
    
    def crossentropy(self, target):
        s = self.softmax(1)
        if type(target) is Variable:
            target = target.data
            
        target = target.astype(np.int)
        
        if target.shape[0] > 1:
            slis = tuple(zip(range(target.shape[0]), target))
        else:
            slis = (0, target[0])
        
        c = Variable(np.array(np.sum(-np.log(s[slis]))))
        c.fn = [Variable.__grad_corssentropy, self, target]
        
        self.child.append(c)
        return c
    
    def __grad_corssentropy(self, a, target):
        y = np.zeros_like(a.grad)
        if target.shape[0] > 1:
            slis = tuple(zip(range(target.shape[0]), target))
        else:
            slis = (0, target[0])
            
        y[slis] = 1.0
        a.grad += (a.softmax(1) - y)
    
    def softmax(self, dim):
        # move dim idxs
        exp_data = np.exp(self.data)
        return exp_data / np.sum(exp_data, axis=dim).reshape([-1]+[1 for _ in range(dim)])
    
    def argsoftmax(self):
        s = self.softmax(1)
        return Variable(np.argmax(s, axis=1).reshape(-1,1))
    
    def backward(self, backward_grad):
        if type(backward_grad) is Variable:
            backward_grad = backward_grad.data
        
        if backward_grad.shape != self.data.shape:
            raise AttributeError('Wrong backward grad shape {} != {}'.format(backward_grad.shape, self.data.shape))
        
        self.grad = backward_grad
        self.__backward()
    
    def __backward(self):
        if self.fn is None:
            return;
        
        # check self grad is ready, trace child variables
        self.ready = True
        for child in self.child:
            self.ready &= child.ready
        
        if not self.ready:
            return;
        
        backward_op = self.fn[0]
        
        backward_op(self, *self.fn[1:])
        
        for v in self.fn[1:]:
            if type(v) is Variable:
                v.__backward()

In [696]:
import torch

def testGrad():
    n = 1
    c = 5
    x = np.random.random((n,2))
    h = np.random.random((n,16))
    y = np.random.randint(0, c, n)
    
    u = np.random.random((2, 16))
    w = np.random.random((16,16))
    v = np.random.random((16, c))
    
    def equation(*args):
        x, h, u, w, v = args
        h1 = ((x @ u) + (h @ w)).tanh()
        return ((x @ u) + (h1 @ w)).tanh() @ v
    
    def allv(l):
        return [Variable(i) for i in l]

    def allt(l):
        return [torch.tensor(i, requires_grad=True) for i in l]
    
    wow = [x, h, u, w, v]
    
    Vs = allv(wow)
    Ts = allt(wow)
    Y = Variable(y)
    _Y = torch.tensor(y.astype(np.float), requires_grad=True)
    
    A = equation(*Vs)
    A = A.crossentropy(Y)
    _A = equation(*Ts)
    _A = torch.nn.CrossEntropyLoss(reduction='sum')(_A, _Y.long())
    #_A = torch.nn.MSELoss()(_A, _A)
    
    print(A, _A)
    
    A.backward(np.array(1))
    _A.backward()
    
    t = None
    
    for V, T in zip(Vs, Ts):
        r = torch.allclose(torch.tensor(V.grad), T.grad)
        print(torch.allclose(torch.tensor(V.grad), T.grad))
        if t is None:
            t = r
        else:
            t &= r
    
    return t
testGrad()

1.6278228097737124 tensor(1.6278, dtype=torch.float64, grad_fn=<NllLossBackward>)
True
True
True
True
True


True

In [697]:
class RNN():
    def __init__(self, in_channels, out_channels, hidden_channels):
        self.U = Variable(np.random.uniform(-1,1, (in_channels, hidden_channels)))
        self.W = Variable(np.random.uniform(-1,1, (hidden_channels, hidden_channels)))
        self.V = Variable(np.random.uniform(-1,1, (hidden_channels, out_channels)))
        self.b = Variable(np.random.uniform(-1,1, (1, hidden_channels)))
        self.c = Variable(np.random.uniform(-1,1, (1, out_channels)))
        self.h = None
    
    def forward(self, x):
        t = len(x)
        self.h = None
        y = []
        hs = []
        
        for i in range(t):
            a = self.b + (x[i] @ self.U)
            if self.h is not None:
                a += (self.h @ self.W)
        
            self.h = a.tanh()
            hs.append(self.h)
            
            o = self.c + (self.h @ self.V)
            y.append(o)
        
        return y, hs
    
    def zero_grad(self):
        self.U.zero_grad()
        self.W.zero_grad()
        self.V.zero_grad()
        self.b.zero_grad()
        self.c.zero_grad()
    
    def step(self, lr=1e-1):
        self.U.data -= lr * self.U.grad
        self.W.data -= lr * self.W.grad
        self.V.data -= lr * self.V.grad
        self.b.data -= lr * self.b.grad
        self.c.data -= lr * self.c.grad

In [698]:
class pytorchRNN():
    def __init__(self, model):
        self.U = torch.tensor(model.U.data, requires_grad=True)
        self.W = torch.tensor(model.W.data, requires_grad=True)
        self.V = torch.tensor(model.V.data, requires_grad=True)
        self.b = torch.tensor(model.b.data, requires_grad=True)
        self.c = torch.tensor(model.c.data, requires_grad=True)
    
    def forward(self, x):
        t = len(x)
        self.h = None
        y = []
        hs = []
        
        for i in range(t):
            a = self.b + (torch.tensor(x[i].data, requires_grad=True) @ self.U)
            if self.h is not None:
                a += (self.h @ self.W)
            self.h = a.tanh()
            hs.append(self.h)
            
            o = self.c + (self.h @ self.V)
            y.append(o)
            
        return y, hs
    
    def zero_grad(self):
        if self.U.grad is None:
            return;
        with torch.no_grad():
            self.U.grad.zero_()
            if self.W.grad is not None:
                self.W.grad.zero_()
            self.V.grad.zero_()
            self.b.grad.zero_()
            self.c.grad.zero_()
    
    def step(self, lr=1e-1):
        if self.U.grad is None:
            return;
        with torch.no_grad():
            self.U -= lr * self.U.grad
            if self.W.grad is not None:
                self.W -= lr * self.W.grad
            self.V -= lr * self.V.grad
            self.b -= lr * self.b.grad
            self.c -= lr * self.c.grad

In [699]:
def toBinaray(x, digits, complement=False):
    if complement and x < 0:
        x += (1<<(digits))
    x = abs(x)
    return [ float(int(i)) for i in list(("{:0" + str(digits) + "b}").format(x))[::-1]][:digits]

def toNumber(b, complement=False):
    if complement:
        last = b[-1]
        b = b[:-1]
    d = sum([int(x)<<i for i, x in enumerate(b)])
    if complement:
        d -= int(last)*(1<<(len(b)))
    return d

def BinaryDataset(digits=8):
    thr = (1<<(digits-1))
    while True:
        a, b = np.random.randint(0, thr, 2)
        c = a + b
        x = np.array([toBinaray(a, digits), toBinaray(b, digits)])
        y = np.array([toBinaray(c, digits)])
        yield [Variable(x.T[i:i+1, :]) for i in range(digits)], [Variable(y.T[i:i+1, :]) for i in range(digits)]

In [705]:
model = RNN(2, 2, 16)
model2 = pytorchRNN(model)

In [706]:
epoch_size = 20000
error = 0
error2 = 0

dataset = BinaryDataset(8)

for epoch in range(epoch_size):
    x, y = next(dataset)
    
    model.zero_grad()
    model2.zero_grad()
    
    output, hs = model.forward(x)
    output2, hs2 = model2.forward(x)
    
    loss = [output[i].crossentropy(y[i]) for i in range(len(y))]
    loss2 = [torch.nn.CrossEntropyLoss(reduction='sum')(output2[i], torch.tensor(y[i].data[0,]).long()) for i in range(len(y))]
    for l in loss[::-1]:
        l.backward(np.array(1))
    for l in loss2[::-1]:
        l.backward(retain_graph=True)
        
    model.step(1e-2)
    model2.step(1e-2)
    
    error += np.count_nonzero([np.all(output[i].argsoftmax().data != y[i].data) for i in range(len(y))])
    error2 += sum([(torch.max(output2[i], 1)[1] != torch.tensor(y[i].data[0,]).long()).sum().item() for i in range(len(y))])
    if (epoch+1) % 1000 == 0:
        print('[{:5d}] error : {} - {}, loss : {} - {}'.format(epoch+1, error / 1000, error2 / 1000, sum([l.data for l in loss]), float(sum(loss2))))
        error = 0
        error2 = 0

[ 1000] error : 3.464 - 3.47, loss : 6.787096388418801 - 6.4107264009101605
[ 2000] error : 2.275 - 2.18, loss : 6.112918221100947 - 7.724017438327864
[ 3000] error : 1.062 - 0.878, loss : 0.3218426310338222 - 0.3245673247588545
[ 4000] error : 0.062 - 0.054, loss : 0.08897429517948031 - 0.0706128636280523
[ 5000] error : 0.0 - 0.0, loss : 0.040026431988435196 - 0.04500407209151014
[ 6000] error : 0.0 - 0.0, loss : 0.02236197762822727 - 0.04026761795416345
[ 7000] error : 0.0 - 0.0, loss : 0.015575632767256702 - 0.01658517600234699
[ 8000] error : 0.0 - 0.0, loss : 0.03005009823441702 - 0.02003097368407758
[ 9000] error : 0.0 - 0.0, loss : 0.021729447287012562 - 0.024331664871079628
[10000] error : 0.0 - 0.0, loss : 0.006876849113934807 - 0.014882849798210529
[11000] error : 0.0 - 0.0, loss : 0.012847021414003275 - 0.009033295059531099
[12000] error : 0.0 - 0.0, loss : 0.010358946705204052 - 0.014153454555267508
[13000] error : 0.0 - 0.0, loss : 0.01685293693257689 - 0.0122285187632442

In [710]:
eval_size = 1000
error = 0
error2 = 0

eval_dataset = BinaryDataset(50)

for i in range(eval_size):
    x, y = next(eval_dataset)
    
    output, _ = model.forward(x)
    output2, _ = model2.forward(x)
    
    error += np.count_nonzero([np.all(output[i].argsoftmax().data != y[i].data) for i in range(len(y))])
    error2 += sum([(torch.max(output2[i], 1)[1] != torch.tensor(y[i].data[0,]).long()).sum().item() for i in range(len(y))])
    
    output = [float(o.argsoftmax().data) for o in output]
    output2 = [float(torch.max(o, 1)[1]) for o in output2]
    
    x = np.concatenate([v.data for v in x]).T
    y = np.concatenate([v.data for v in y]).T
    
    #print('[{:4d}] {:d} + {:d} = {:d}, m1:{}, m2:{}'.format(i, toNumber(x[0,:]), toNumber(x[1, :]), toNumber(y[0,:]), toNumber(output), toNumber(output2)))
print('m1 error : {}, m2 error : {}'.format(error/eval_size, error2/eval_size))

m1 error : 0.0, m2 error : 0.0
