In [2]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

In [12]:
class Dense:
    def __init__(self, units, input_size, activation, name):
        self.W = tf.Variable(tf.random.normal([units, input_size]) * tf.math.sqrt(1/input_size), name=(name + '_W'))
        self.b = tf.Variable(tf.zeros([units,1]), name=(name + '_b'))
        self.activation = activation

    def forward(self, X, training):
        Z = tf.linalg.matmul(self.W,X) + self.b
        A = self.activation(Z)
        return A

    def get_vars(self):
        return [self.W, self.b]

In [13]:
class Batch_norm:
    def __init__(self, units, input_size, activation, name):
        self.W = tf.Variable(tf.random.normal([units, input_size]) * tf.math.sqrt(1/input_size), name=(name + '_W')) 
        self.activation = activation

        self.gamma = tf.Variable(tf.ones([units, 1]), name=(name + '_gamma'))
        self.beta = tf.Variable(tf.ones([units, 1]), name=(name + '_beta'))
        self.mu_test = tf.Variable(tf.zeros([units,1]))
        self.sigma_test = tf.Variable(tf.ones([units,1]))

    def forward(self, X, training):
        e = 10**-8

        Z = tf.linalg.matmul(self.W,X)

        if training:
            mu = tf.math.reduce_mean(Z, axis=1, keepdims=True)
            sigma = tf.math.reduce_variance(Z - mu, axis=1, keepdims=True)
            sigma = tf.math.sqrt(sigma + e)

            self.mu_test.assign(0.95*self.mu_test + 0.05*mu)
            self.sigma_test.assign(0.95*self.sigma_test + 0.05*sigma)
        else:
            mu = self.mu_test
            sigma = self.sigma_test

        Z = (Z - mu) / sigma
        Z = self.gamma * Z + self.beta

        A = self.activation(Z)
        return A

    def get_vars(self):
        return [self.W, self.gamma, self.beta]

In [14]:
class Dropout:
    def __init__(self, keep_prob):
        self.keep_prob = keep_prob

    def forward(self, X, training):
        if not training:
            return X
        X = tf.nn.dropout(X, 1-self.keep_prob)
        # D = tf.random.uniform(tf.shape(X)) < self.keep_prob
        # D = tf.cast(D, dtype=tf.float32)
        # X = X * D
        # X = X / self.keep_prob
        return X

    def get_vars(self):
        return None

In [15]:
class Softmax:
    def __init__(self, classes, input_size, name):
        self.classes = classes

        self.W = tf.Variable(tf.random.normal([classes, input_size]) * tf.math.sqrt(1/input_size), name=(name + '_W'))
        self.b = tf.Variable(tf.zeros([classes,1]), name=(name + '_b'))

    def forward(self, X, training):
        Z = tf.linalg.matmul(self.W,X) + self.b

        T = tf.math.exp(Z)
        # A = T / tf.math.reduce_sum(T, axis=0)
        return A

    def get_vars(self):
        return [self.W, self.b]

In [11]:
class Model:
    def __init__(self, layer_names, units, input_size, cost_function):
        self.layers = self._create_layers(layer_names, units, input_size)
        self.cost = cost_function

    @tf.function
    def forward(self, X, training=False):
        for layer in self.layers:
            X = layer.forward(X, training)
        return X

    @tf.function
    def train(self, epochs, X_batches_train, Y_batches_train, X_dev, Y_dev, optimizer, callbacks=[]):
        for epoch in tf.range(epochs):
            #training
            cost_train = 0.
            i = 0
            for X_train, Y_train in zip(X_batches_train, Y_batches_train):       
                cost = self.__training_step__(X_train, Y_train, optimizer)
                cost_train = (cost_train * i + cost) / (i+1)
                i += 1

            #validation
            prediction = self.forward(X_dev, training=False)
            cost_dev = self.cost(Y_dev, prediction)

            for callback in callbacks:
                callback(epoch, cost_train, cost_dev)         

    @tf.function
    def __training_step__(self, X_train, Y_train, optimizer):
        with tf.GradientTape() as tape:
            variables = self.__get_variables__()
            tape.watch(variables)
            prediction = self.forward(X_train, training=True)
            cost = self.cost(Y_train, prediction)
            grads = tape.gradient(cost, variables)
        optimizer.apply_gradients(zip(grads, variables))
        return cost

    def __get_variables__(self):
        variables = []
        for layer in self.layers:
            var = layer.get_vars()
            if x != None:
                variables += var
        return variables

    def __create_layers__(layer_names, units, input_size):
        units.insert(0, input_size)
        i = 1

        for layer_name in layer_names:
            if layer_name == 'dense':
                layer = Dense(units[i], units[i-1], tf.nn.sigmoid, 'Dense' + str(i+1))
                layers.append(layer)
                i += 1

            elif layer_name == 'batch_norm':
                layer = Batch_norm(units[i], units[i-1], tf.nn.sigmoid, 'Batch_norm' + str(i+1))
                layers.append(layer)
                i += 1

            elif layer_name == 'dropout':
                layer = Dropout(keep_prob)
                layers.append(layer)

            elif layer_name == 'softmax':  
                layer = Softmax(units[i], units[i-1], 'Dense' + str(i+1))
                layers.append(layer)
                i += 1

        return layers 

In [10]:
class Adam:
    def __init__(self, learning_rate, beta_v = 0.9, beta_s = 0.999):
        self.v = []
        self.s = []
        self.iteration = 0
        self.learning_rate = learning_rate
        self.beta_v = beta_v
        self.beta_s = beat_s

    def apply_gradients(self, grads_vars):
        self.iteration += 1
        grads, variables = grads_vars

        for i in tf.range(tf.size(grads)):
            if tf.size(self.v) <= i:
                self.v.append(tf.Variable(tf.zeros(tf.shape(grads[i]))))
            if tf.size(self.s) <= i:
                self.s.append(tf.Variable(tf.zeros(tf.shape(grads[i]))))

            self.v[i].assign(self.beta_v*self.v[i] + (1-self.beta_v)*grads[i])
            v_corrected = self.v[i] / (1 - tf.math.pow(self.beta_v,self.iteration))

            self.s[i].assign(self.beta_s*self.s[i] + (1-self.beta_s)* tf.math.square(grads[i]))
            s_corrected = self.s[i] / (1 - tf.math.pow(self.beta_s,self.iteration))

            change = self.learning_rate * (v_corrected/(tf.math.sqrt(s_corrected) + 10**-8))

            variables[i].assign_add(-change)

In [30]:
# def training_step_template(X_train_batches, Y_train_batches, X_test, Y_test, layers, optymizer, learning_rate):
#     n = X_train_batches.shape[0]
#     cost_train = 0.
#     for i in tf.range(n):
#         X = X_train_batches[i].to_tensor()
#         Y = Y_train_batches[i].to_tensor()
#         i = tf.cast(i, tf.float32)
#         with tf.GradientTape() as tape:
#             params = get_vars(layers)
#             tape.watch(params)
#             A = forward_pass(X, layers, training=True)
#             cost = Cost(A, Y)
#             cost_train = (cost_train * i + cost) / (i+1)
#             grads = tape.gradient(cost, params)
#         optymizer.update_params(params, grads, learning_rate)
#     A_test = forward_pass(X_test, layers, False)
#     cost_test = Cost(A_test, Y_test)
#     cost_train = tf.math.reduce_mean(cost_train)
#     return (cost_train, cost_test)

In [16]:
def Loss(A, Y):
    loss = tf.math.log(A)
    loss = Y * loss
    loss = -tf.math.reduce_sum(loss, axis=0)
    return loss

In [17]:
def Cost(A, Y):
    losses = Loss(A, Y)
    cost = tf.reduce_mean(losses)
    return cost

In [18]:
def create_batches(data, batch_size):
    mini_batches = []
    n = int(data.shape[1] / batch_size)
    for i in range(n):
        mini_batches.append(data[:,i*batch_size:(i+1)*batch_size])
    mini_batches.append(data[:,-(data.shape[1] % batch_size):])
    return mini_batches

In [19]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train / 255
x_test = x_test / 255
y_train = tf.one_hot(y_train, depth=10, axis=0)
y_test = tf.one_hot(y_test, depth=10, axis=0)

x_train = tf.reshape(x_train, [784, -1])
x_train = tf.cast(x_train, tf.float32)

x_test = tf.reshape(x_test, [784, -1])
x_test = tf.cast(x_test, tf.float32)

x_train = x_test[:,:10000].numpy()
y_train = y_train[:,:10000].numpy()

x_test = x_test[:,:1000]
y_test = y_test[:,:1000]


x_train_batches = create_batches(x_train, batch_size)
y_train_batches = create_batches(y_train, batch_size)

# x_train_batches = tf.ragged.constant(x_train_batches)
# y_train_batches = tf.ragged.constant(y_train_batches)

NameError: name 'batch_size' is not defined

In [24]:
layer_names = ['dense', 'batch_norm', 'dense', 'softmax']
hidden_units = [16,8,8,10]
input_size = 784
hidden_units.insert(0, input_size)

learning_rate = tf.constant(0.001, dtype=tf.float32)
keep_prob = 0.9

image_height = 28
image_width = 28

m_train = x_train.shape[1]
m_test = x_test.shape[1]

batch_size = 256

In [31]:
layers = create_layers()
costs = []
optimizer = Adam()

training_step = tf.function(training_step_template)

In [32]:
print(training_step(x_train_batches, y_train_batches, x_test, y_test, layers, optimizer, learning_rate))

TypeError: in user code:

    <ipython-input-30-e397c0f141f2>:18 training_step_template  *
        cost_train = (cost_train * i + cost) / (i+1)
    d:\Programming\NeuralNetworks\venv\lib\site-packages\tensorflow\python\ops\math_ops.py:984 binary_op_wrapper
        return func(x, y, name=name)
    d:\Programming\NeuralNetworks\venv\lib\site-packages\tensorflow\python\ops\math_ops.py:1283 _mul_dispatch
        return gen_math_ops.mul(x, y, name=name)
    d:\Programming\NeuralNetworks\venv\lib\site-packages\tensorflow\python\ops\gen_math_ops.py:6091 mul
        _, _, _op, _outputs = _op_def_library._apply_op_helper(
    d:\Programming\NeuralNetworks\venv\lib\site-packages\tensorflow\python\framework\op_def_library.py:503 _apply_op_helper
        raise TypeError(

    TypeError: Input 'y' of 'Mul' Op has type float32 that does not match type int32 of argument 'x'.


In [None]:
for i in tf.range(1000, dtype=tf.float32):
    cost = training_step(x_train_batches, y_train_batches, x_test, y_test, layers, optimizer, learning_rate)
    costs.append(cost)
    if i%100 == 0:
        print('Epoch {0}: {1}'.format(i, cost))

plt.plot(costs)
plt.show()

In [None]:
prediction = forward_pass(x_train, layers, False)
a = tf.argmax(prediction, axis=0)
y = tf.argmax(y_train, axis=0)

b = a==y
b = tf.cast(b, tf.float32)
b = tf.reduce_sum(b)
print(b/10000)