In [7]:
import numpy as np 
from utils import get_images

# MNIST path
mnist_path = 'data/mnist_raw/'
x_train_num, y_train_num, x_test_num, y_test_num = get_images(mnist_path)

x_train = x_train_num[:50000].reshape(50000, -1).astype(np.float32)
y_train = y_train_num[:50000].reshape(50000, 1)

x_val = x_train_num[50000:].reshape(10000, -1).astype(np.float64)
y_val = y_train_num[50000:].reshape(10000, 1)

x_test = x_test_num.copy().reshape(10000, -1).astype(np.float64)
y_test = y_test_num.copy().reshape(10000, 1)


def normalise(x_mean, x_std, x_data):
    return (x_data-x_mean)/x_std

x_mean = x_train.mean()
x_std = x_train.std()

x_train = normalise(x_mean, x_std, x_train)
x_val = normalise(x_mean, x_std, x_val)
x_test = normalise(x_mean, x_std, x_test)


In [8]:
def create_minibatches(mb_size, x, y, shuffle = True):
    '''
    x  #muestras, 784
    y #muestras, 1
    '''
    assert x.shape[0] == y.shape[0], 'Error en cantidad de muestras'
    total_data = x.shape[0]
    if shuffle: 
        idxs = np.arange(total_data)
        np.random.shuffle(idxs)
        x = x[idxs]
        y = y[idxs]  
    return ((x[i:i+mb_size], y[i:i+mb_size]) for i in range(0, total_data, mb_size))

class np_tensor(np.ndarray): pass

#clase linear
class Linear():
    def __init__(self,input_size,output_size):
        '''
        init parameters utilizando kaiming he
        '''        
        self.W = (np.random.randn(output_size, input_size) /np.sqrt(input_size/2)).view(np_tensor)
        self.b = (np.zeros((output_size,1))).view(np_tensor)
    
    def __call__(self,X): #forward de la clase lineal
        z = self.W @ X + self.b
        return z
    
    def backward(self,X,Z):
        X.grad = self.W.T @ Z.grad
        self.W.grad = Z.grad @ X.T 
        self.b.grad = np.sum(Z.grad, axis=1, keepdims=True)

#clase relu
class ReLU():
    def __call__(self,Z):
        return np.maximum(0,Z)
    def backward(self,Z,a):
        Z.grad = a.grad.copy()
        Z.grad[Z <= 0] = 0

#clase sequential
class Sequential_layers():
    def __init__(self,layers):
        '''
        layers, lista que contiene objetos de tipo Linear,ReLU
        '''
        self.layers = layers
        self.x = None
        self.outputs = {}

    def __call__(self,X):
        self.x = X 
        self.outputs['L0'] = self.x
        for i, layer in enumerate(self.layers,1):
            self.x = layer(self.x)
            self.outputs['L' + str(i)] = self.x 
        return self.x 

    def backward(self):
        for i in reversed(range(len(self.layers))):
            self.layers[i].backward(self.outputs['L' + str(i)],self.outputs['L' + str(i+1)])

    def update(self,learning_rate=1e-3):
        for layer in self.layers:
            if isinstance(layer, ReLU): continue
            layer.W = layer.W - learning_rate*layer.W.grad
            layer.b = layer.b - learning_rate*layer.b.grad
    
    def predict(self,X):
        return np.argmax(self.__call__(X))

In [9]:
# cost function
def softmaxXEntropy(x, y):
    batch_size = x.shape[1]
    exp_scores = np.exp(x)
    probs = exp_scores / exp_scores.sum(axis = 0)
    preds = probs.copy()
    # Costo
    y_hat = probs[y.squeeze(), np.arange(batch_size)]
    cost = np.sum(-np.log(y_hat)) / batch_size
    # Calcular gradientes
    probs[y.squeeze(), np.arange(batch_size)] -= 1 #dl/dx
    x.grad = probs.copy()
    
    return preds, cost

def acurracy(x,y,mb_size):
    correct = 0
    total = 0
    for i, (x,y) in enumerate(create_minibatches(mb_size,x,y)):
        pred = model(x.T.view(np_tensor))
        correct += np.sum(np.argmax(pred,axis=0) == y.squeeze())
        total += pred.shape[1]
    return correct/total

def train(model,epochs,mb_size=128,learning_rate=1e-3):
    for epoch in range(epochs):
        for i, (x,y) in enumerate(create_minibatches(mb_size,x_train,y_train)):
            scores = model(x.T.view(np_tensor))
            _,cost = softmaxXEntropy(scores,y)
            model.backward()
            model.update(learning_rate) #parecido al step de pytorch
        print(f'costo: {cost} ,acurracy: {acurracy(x_val,y_val,mb_size)}')

In [10]:
model = Sequential_layers([Linear(784,200),ReLU(),Linear(200,10)])
mb_size = 512
learning_rate = 1e-4
epochs = 20

In [11]:
train(model,epochs,mb_size,learning_rate)

costo: 0.29620508837672005 ,acurracy: 0.9109
costo: 0.2854749359085084 ,acurracy: 0.927
costo: 0.26910484429787174 ,acurracy: 0.9374
costo: 0.23812861902122404 ,acurracy: 0.9456
costo: 0.17455924045087476 ,acurracy: 0.9505
costo: 0.15852763888610846 ,acurracy: 0.9537
costo: 0.1878869276145521 ,acurracy: 0.9558
costo: 0.1323894972017735 ,acurracy: 0.9581
costo: 0.1494196775341326 ,acurracy: 0.9597
costo: 0.13179302831067716 ,acurracy: 0.9605
costo: 0.1155244925501145 ,acurracy: 0.9622
costo: 0.10347332158937236 ,acurracy: 0.9637
costo: 0.11917215050447852 ,acurracy: 0.9653
costo: 0.10407535905162035 ,acurracy: 0.9651
costo: 0.09431414500068903 ,acurracy: 0.9666
costo: 0.08913059829026054 ,acurracy: 0.968
costo: 0.10941372092214004 ,acurracy: 0.9677
costo: 0.08801542089020027 ,acurracy: 0.9689
costo: 0.10668923334088748 ,acurracy: 0.9687
costo: 0.1089836856674366 ,acurracy: 0.9694
