In [117]:
import numpy as np
from sklearn import datasets

X0, Y = datasets.load_iris(return_X_y=True)
Y0 = np.eye(np.max(Y)+1)[Y]

In [200]:
from math import sqrt

class Gradients:
    def __init__(self, grads):
        self.grads = grads
    
    def apply(self, fn):
        if isinstance(self.grads, dict):
            for k, v in self.grads.items():
                self.grads[k] = fn(k, v)
        elif isinstance(self.grads, list):
            self.grads = list(map(fn, self.grads))
    
    def apply_arg(self, op, arg):
        if isinstance(self.grads, dict):
            for k, v in self.grads.items():
                self.grads[k] = op(k, v, arg[k])
        elif isinstance(self.grads, list):
            self.grads = [op(self.grads[i], arg[i]) for i in range(len(self.grads))]
    
    def applied(self, fn):
        if isinstance(self.grads, dict):
            new_grd = {}
            for k, v in self.grads.items():
                new_grd[k] = fn(k, v)
        elif isinstance(self.grads, list):
            new_grd = list(map(fn, self.grads))
        return Gradients(new_grd)
    
    def applied_arg(self, op, arg):
        if isinstance(self.grads, dict):
            new_grd = {}
            for k, v in self.grads.items():
                new_grd[k] = op(k, v, arg[k])
        elif isinstance(self.grads, list):
            new_grd = [op(self.grads[i], arg[i]) for i in range(len(self.grads))]
        return Gradients(new_grd)
      
    def __getitem__(self, item):
        return self.grads[item]

class AffineLayer:
    def __init__(self, dim_in, dim_out, method='Xavier'):
        vW = 2/(dim_in+dim_out)
        if method == 'He':
            vW *= 2
        vW = sqrt(vW)
        self.W = np.random.normal(0, vW, (dim_in, dim_out))
        self.B = np.random.normal(0, vW, (1, dim_out))
    
    def forward(self, X):
        return np.matmul(X, self.W)+self.B
    
    def save_forward(self, X):
        self.pX = X
        return self.forward(X)
    
    def backward(self, grad):
        dW = np.matmul(self.pX.T, grad)
        dB = np.sum(grad, axis=0, keepdims=True)
        
        next_grad = np.matmul(grad, self.W.T)
        
        return Gradients([dW, dB]), next_grad
    
    def update(self, u_grads):
        self.W -= u_grads[0]
        self.B -= u_grads[1]

class DropOutlayer:
    def __init__(self, rate):
        self.rate = rate
    
    def clear_temp(self):
        self.pFilter = None
    
    def forward(self, X):
        return X
    
    def save_forward(self, X):
        filt = np.random.binomial(1, self.rate, X.shape)
        self.pFilter = filt
        return filt*X
    
    def backward(self, grad):
        return self.pFilter*grad

class SigmoidLayer:
    def __init__(self):
        self.pY = None
        
    def clear_temp(self):
        self.pY = None
    
    def forward(self, X):
        X = np.exp(X)
        return X/(X+1)
    
    def save_forward(self, X):
        Y = self.forward(X)
        self.pY = Y
        return Y
    
    def backward(self, grad):
        return self.pY*(1-self.pY)*grad

class SoftmaxLayer:
    def __init__(self):
        self.pY = None
    
    def clear_temp(self):
        self.pY = None
    
    def forward(self, X):
        X = np.exp(X)
        return X/np.sum(X, axis=1, keepdims=True)
    
    def save_forward(self, X):
        Y = self.forward(X)
        self.pY = Y
        return Y
    
    def backward(self, grad):
        Y = self.pY
        gPY = Y[:, np.newaxis]*Y[:,:, np.newaxis]
        PP = np.zeros_like(gPY)
        for i in range(PP.shape[0]):
            PP[i,:,:] = np.diag(Y[i])
        return np.matmul(grad[:, np.newaxis], PP-gPY).squeeze()

In [119]:
class DNN:
    def __init__(self):
        self.layers = []
    
    def add(self, layer):
        self.layers.append(layer)
    
    def forward(self, X):
        for layer in self.layers:
            X = layer.forward(X)
        return X
    
    def save_forward(self, X):
        for layer in self.layers:
            X = layer.save_forward(X)
        return X
    

class CrossEntropyLoss:
    def loss(self, target, predict):
        return -np.mean(target*np.log(predict))
    
    def loss_grad(self, target, predict):
        grad = -target/predict/target.shape[0]
        return self.loss(target, predict), grad
    

In [212]:

class NNOptimizer:
    def __init__(self, learning_rate):
        self.rate = learning_rate
        self._lr_multiplier = lambda x: x*self.rate
    
    def loss(self, model: DNN, X, Y, loss):
        return loss.loss(Y, model.forward(X))
    
    def fit(self, model: DNN, X, Y, loss):
        l, grad = loss.loss_grad(Y, model.save_forward(X))
        mul_lr = lambda x: x*self.rate
        
        for layer in reversed(model.layers):
            if hasattr(layer, 'update'):
                local_g, grad = layer.backward(grad)
                local_g.apply(mul_lr)
                layer.update(local_g)
            else:
                grad = layer.backward(grad)
            if hasattr(layer, 'clear_temp'):
                layer.clear_temp()
        return l, self.loss(model, X, Y, loss)

class SDGOptimizer(NNOptimizer):
    def __init__(self, learning_rate, batch_size=10):
        super().__init__(learning_rate)
        self.bsize = batch_size
        
    def fit(self, model: DNN, X, Y, loss):
        sample = np.random.choice(X.shape[0], self.bsize, replace=False)
        X, Y = X[sample], Y[sample]
        
        l, grad = loss.loss_grad(Y, model.save_forward(X))
        
        for layer in reversed(model.layers):
            if hasattr(layer, 'update'):
                local_g, grad = layer.backward(grad)
                local_g.apply(self._lr_multiplier)
                layer.update(local_g)
            else:
                grad = layer.backward(grad)
            if hasattr(layer, 'clear_temp'):
                layer.clear_temp()
        return l, self.loss(model, X, Y, loss)
    
class MomentumOptimizer(NNOptimizer):
    def __init__(self, learning_rate, momentum=0.9):
        super().__init__(-learning_rate)
        self.momentum = momentum
        self.vel = {}
        
    def _calc_update_gradient(self, idx, local_grad):
        if idx not in self.vel:
            self.vel[idx] = local_grad.applied(np.zeros_like)
        local_grad.apply(self._lr_multiplier)
        self.vel[idx].apply_arg(lambda prev, new: prev*self.momentum-new, local_grad)
        return self.vel[idx]
        
    def fit(self, model: DNN, X, Y, loss):        
        l, grad = loss.loss_grad(Y, model.save_forward(X))
        
        for idx, layer in enumerate(reversed(model.layers)):
            if hasattr(layer, 'update'):
                local_g, grad = layer.backward(grad)
                layer.update(self._calc_update_gradient(idx, local_g))
            else:
                grad = layer.backward(grad)
            if hasattr(layer, 'clear_temp'):
                layer.clear_temp()
        return l, self.loss(model, X, Y, loss)

class AdaGradOptimizer(NNOptimizer):
    def __init__(self, learning_rate=0.01):
        super().__init__(learning_rate)
        self.h = {}
    
    def _calc_update_gradient(self, idx, local_grad):
        if idx not in self.h:
            self.h[idx] = local_grad.applied(np.zeros_like)
        self.h[idx].apply_arg(lambda prev, new: prev+new**2, local_grad)
        local_grad.apply_arg(lambda x, h: x*self.rate/np.sqrt(h), self.h[idx])
        return local_grad
    
    def fit(self, model: DNN, X, Y, loss):        
        l, grad = loss.loss_grad(Y, model.save_forward(X))
        
        for idx, layer in enumerate(reversed(model.layers)):
            if hasattr(layer, 'update'):
                local_g, grad = layer.backward(grad)
                layer.update(self._calc_update_gradient(idx, local_g))
            else:
                grad = layer.backward(grad)
            if hasattr(layer, 'clear_temp'):
                layer.clear_temp()
        return l, self.loss(model, X, Y, loss)

class RMSPropOptimizer(NNOptimizer):
    def __init__(self, learning_rate, fp=0.9, epsi=1e-5):
        super().__init__(learning_rate)
        self.h = {}
        self.fp = fp
        self.eps = epsi
    
    def _calc_update_gradient(self, idx, local_grad):
        if idx not in self.h:
            self.h[idx] = local_grad.applied(np.zeros_like)
        self.h[idx].apply_arg(lambda prev, new: self.fp*prev+(1-self.fp)*new**2, local_grad)
        local_grad.apply_arg(lambda x, h: x*self.rate/(np.sqrt(h)+self.eps), self.h[idx])
        return local_grad
        
    def fit(self, model: DNN, X, Y, loss):        
        l, grad = loss.loss_grad(Y, model.save_forward(X))
        
        for idx, layer in enumerate(reversed(model.layers)):
            if hasattr(layer, 'update'):
                local_g, grad = layer.backward(grad)
                layer.update(self._calc_update_gradient(idx, local_g))
            else:
                grad = layer.backward(grad)
            if hasattr(layer, 'clear_temp'):
                layer.clear_temp()
        return l, self.loss(model, X, Y, loss)

class AdamOptimizer(NNOptimizer):
    def __init__(self, learning_rate, b1=0.9, b2=0.999, epsi=1e-5):
        super().__init__(learning_rate)
        self.m = {}
        self.v = {}
        self.b1 = b1
        self.b2 = b2
        self.eps = epsi
        
    def _to_update_grad(self, m, v):
        M = m/(1-self.b1)
        V = v/(1-self.b2)
        return self.rate*M/(np.sqrt(V)+self.eps)
        
    def _calc_update_gradient(self, idx, local_grad):
        if idx not in self.h:
            self.m[idx] = local_grad.applied(np.zeros_like)
            self.v[idx] = local_grad.applied(np.zeros_like)
        self.m[idx].apply_arg(lambda prev, new: self.b1*prev+(1-self.b1)*new, local_grad)
        self.v[idx].apply_arg(lambda prev, new: self.b2*prev+(1-self.b2)*new**2, local_grad)
        
        return self.m[idx].applied_arg(self._to_update_grad, self.v[idx])
    
    def fit(self, model: DNN, X, Y, loss):
        l, grad = loss.loss_grad(Y, model.save_forward(X))
        
        for idx, layer in enumerate(reversed(model.layers)):
            if hasattr(layer, 'update'):
                local_g, grad = layer.backward(grad)
                layer.update(self._calc_update_gradient(idx, local_g))
            else:
                grad = layer.backward(grad)
            if hasattr(layer, 'clear_temp'):
                layer.clear_temp()
        return l, self.loss(model, X, Y, loss)

In [213]:
model = DNN()
model.add(AffineLayer(4, 5))
model.add(SigmoidLayer())
model.add(DropOutlayer(0.8))
model.add(AffineLayer(5, 3))
model.add(SoftmaxLayer())

ce = CrossEntropyLoss()
opt = RMSPropOptimizer(0.01, 0.8)
opt.loss(model, X0, Y0, ce)

0.4645503118529852

In [216]:
for it in range(5000):
    _, last_loss = opt.fit(model, X0, Y0, ce)
    
print(last_loss)

0.015345795364334886


In [217]:
np.mean(np.argmax(model.forward(X0), axis=1)==Y)

0.98