https://www.juliabloggers.com/tensorflow-2-0-building-simple-classifier-using-low-level-apis/

In [1]:
# IMPORT LIBRARIES
import numpy as np
import pandas as pd
import tensorflow as tf

from enum import Enum
from sklearn.datasets import load_iris
from typing import Callable, Iterable, List, Tuple

In [3]:
# DEFINE USEFUL CONSTANT

class HyperParams(Enum):
    ACTIVATION     = tf.nn.relu
    BATCH_SIZE     = 5
    EPOCHS         = 500
    HIDDEN_NEURONS = 10
    NORMALIZER     = tf.nn.softmax
    OUTPUT_NEURONS = 3
    OPTIMIZER      = tf.keras.optimizers.Adam

In [4]:
HyperParams.ACTIVATION

<function tensorflow.python.ops.gen_nn_ops.relu>

In [5]:
# LOAD DATA in X and Y
iris = load_iris()
xdat = iris.data
ydat = iris.target

In [10]:
xdat.shape

(150, 4)

In [6]:
# NICE (MAYBE USELESS) METHOD TO PARTITION DATA
class Data:

    def __init__(self, xdat: np.ndarray, ydat: np.ndarray, ratio: float = 0.3) -> Tuple:
        self.xdat  = xdat
        self.ydat  = ydat
        self.ratio = ratio

    def partition(self) -> None:
        scnt = self.xdat.shape[0] / np.unique(self.ydat).shape[0]
        ntst = int(self.xdat.shape[0] * self.ratio / (np.unique(self.ydat)).shape[0])
        idx  = np.random.choice(np.arange(0, self.ydat.shape[0] / np.unique(self.ydat).shape[0], dtype = int), ntst, replace = False)
        for i in np.arange(1, np.unique(self.ydat).shape[0]):
            idx = np.concatenate((idx, np.random.choice(np.arange((scnt * i), scnt * (i + 1), dtype = int), ntst, replace = False)))

        self.xtrn = self.xdat[np.where(~np.in1d(np.arange(0, self.ydat.shape[0]), idx))[0], :]
        self.ytrn = self.ydat[np.where(~np.in1d(np.arange(0, self.ydat.shape[0]), idx))[0]]
        self.xtst = self.xdat[idx, :]
        self.ytst = self.ydat[idx]

    def to_tensor(self, depth: int = 3) -> None:
        self.xtrn = tf.convert_to_tensor(self.xtrn, dtype = np.float32) 
        self.xtst = tf.convert_to_tensor(self.xtst, dtype = np.float32)
        self.ytrn = tf.convert_to_tensor(tf.one_hot(self.ytrn, depth = depth))
        self.ytst = tf.convert_to_tensor(tf.one_hot(self.ytst, depth = depth))
    
    def batch(self, num: int = 16) -> None:
        try:
            size = self.xtrn.shape[0] / num
            if self.xtrn.shape[0] % num != 0:
                sizes = [tf.floor(size).numpy().astype(int) for i in range(num)] + [self.xtrn.shape[0] % num]
            else:
                sizes = [tf.floor(size).numpy().astype(int) for i in range(num)]

            self.xtrn_batches = tf.split(self.xtrn, num_or_size_splits = sizes, axis = 0)
            self.ytrn_batches = tf.split(self.ytrn, num_or_size_splits = sizes, axis = 0)

            num = int(self.xtst.shape[0] / sizes[0])
            if self.xtst.shape[0] % sizes[0] != 0:
                sizes = [sizes[i] for i in range(num)] + [self.xtst.shape[0] % sizes[0]]
            else:
                sizes = [sizes[i] for i in range(num)]

            self.xtst_batches = tf.split(self.xtst, num_or_size_splits = sizes, axis = 0)
            self.ytst_batches = tf.split(self.ytst, num_or_size_splits = sizes, axis = 0)
        except:
            self.xtrn_batches = [self.xtrn]
            self.ytrn_batches = [self.ytrn]
            self.xtst_batches = [self.xtst]
            self.ytst_batches = [self.ytst]

In [7]:
# DATA CLEANING AND DATA PROCESSING
data = Data(xdat, ydat)
data.partition()
data.to_tensor()
data.batch(HyperParams.BATCH_SIZE.value)

In [8]:
# Define a layer y = σ(x ⋅ W + b)
class Dense:

    def __init__(self, i: int, o: int, f: Callable[[tf.Tensor], tf.Tensor], initializer: Callable = tf.random.normal) -> None:
        self.w = tf.Variable(initializer([i, o]))
        self.b = tf.Variable(initializer([o]))
        self.f = f

    def __call__(self, x: tf.Tensor) -> tf.Tensor:
        if callable(self.f):
            return self.f(tf.add(tf.matmul(x, self.w), self.b))
        else:
            return tf.add(tf.matmul(x, self.w), self.b)

In [9]:
# PERFORM FEED FORWARD
layer = Dense(4, 2, tf.nn.relu)
# GET THE OUTPUT OF THIS NEWLY CREATED LAYER
# IF FED WITH ONE INPUT DATAPOINT FROM THE TRAINSET
# REMEMBER! EAGER EXECUTION! WE GET THE OUTPUT RIGHT NOW
layer(data.xtrn[1:2, :])
#> tf.Tensor([[12.937485  0.      ]], shape=(1, 2), dtype=float32)

<tf.Tensor: id=79, shape=(1, 2), dtype=float32, numpy=array([[5.5676966, 0.       ]], dtype=float32)>

In [14]:
# FEED WITH 4 INPUT DATAPOINTS...
# AND ...
# YOU'LL GET 4 OUTPUT DAATAPOINTS!!!
# MAGIC
layer(data.xtrn[1:5, :])
#> tf.Tensor(
#> [[12.937484  0.      ]
#>  [12.557415  0.      ]
#>  [13.761768  0.      ]
#>  [14.996015  0.      ]], shape=(4, 2), dtype=float32)

<tf.Tensor: id=115, shape=(4, 2), dtype=float32, numpy=
array([[5.567697 , 0.       ],
       [6.1614122, 0.       ],
       [6.9433584, 0.       ],
       [5.930968 , 0.       ]], dtype=float32)>

In [15]:
# MORE POWER -> WE WANT A CHAIN OF LAYERS NOW!
class Chain:

    def __init__(self, layers: List[Iterable[Dense]]) -> None:
        self.layers = layers
    
    def __call__(self, x: tf.Tensor) -> tf.Tensor:
        self.out = x; self.params = []
        for l in self.layers:
            self.out = l(self.out)
            self.params.append([l.w, l.b])
        
        self.params = [j for i in self.params for j in i]
        return self.out

    def backward(self, inputs: tf.Tensor, targets: tf.Tensor) -> None:
        # HERE WE LEARN :D -> THE MAGIC IS HERE
        grads = self.grad(inputs, targets)
        self.optimize(grads, 0.001)
    
    def loss(self, preds: tf.Tensor, targets: tf.Tensor) -> tf.Tensor:
        return tf.reduce_mean(
            tf.keras.losses.categorical_crossentropy(
                targets, preds
            )
        )
        
    def grad(self, inputs: tf.Tensor, targets: tf.Tensor) -> List:
        with tf.GradientTape() as g:
            error = self.loss(self(inputs), targets)
        
        return g.gradient(error, self.params)

    def optimize(self, grads: List[tf.Tensor], rate: float) -> None:
        opt = HyperParams.OPTIMIZER.value(learning_rate = rate)
        opt.apply_gradients(zip(grads, self.params))

In [17]:
# CREATE A MODEL...
#YES .. OUR MODEL IS A TRIVIAL CHAIN OF DENSE LAYERS...
# TWO LAYERS PRECISELY
model = Chain([
    Dense(data.xtrn.shape[1], HyperParams.HIDDEN_NEURONS.value, HyperParams.ACTIVATION),
    Dense(HyperParams.HIDDEN_NEURONS.value, HyperParams.OUTPUT_NEURONS.value, HyperParams.NORMALIZER)
])

In [19]:
# TRY TO FEED SOMETHING...
# REMEMBER! EAGER EXECUTION! WE GET THE OUTPUT RIGHT NOW
model(data.xtrn[1:2, :])
#> tf.Tensor([[2.8523763e-16 1.8208168e-15 1.0000000e+00]], shape=(1, 3), dtype=float32)

<tf.Tensor: id=253, shape=(1, 3), dtype=float32, numpy=array([[9.5460939e-01, 9.1180876e-09, 4.5390658e-02]], dtype=float32)>

In [20]:
model(data.xtrn[1:5, :])
#> tf.Tensor(
#> [[2.8523763e-16 1.8208168e-15 1.0000000e+00]
#>  [4.9846957e-16 8.1282060e-16 1.0000000e+00]
#>  [6.2472026e-16 1.2082151e-15 1.0000000e+00]
#>  [1.8308374e-17 2.8908239e-17 1.0000000e+00]], shape=(4, 3), dtype=float32)

<tf.Tensor: id=267, shape=(4, 3), dtype=float32, numpy=
array([[9.5460939e-01, 9.1180707e-09, 4.5390647e-02],
       [9.7212243e-01, 1.5073410e-09, 2.7877590e-02],
       [9.6991414e-01, 5.6262484e-10, 3.0085811e-02],
       [9.6266270e-01, 6.0332490e-09, 3.7337299e-02]], dtype=float32)>

In [21]:
# CUSTOM DEFINITION OF MODEL ACCURACY...
# THE ACCURACY IS WHAT WE WANT.
# FUCK THE STANDARDIZATION OF KERAS
# WE ARE DIFFERENT
def accuracy(y, yhat):
    # BUT IN THE END LET?S IMPLEMENT A STANDARD 
    # ACCURACY XD
    # LET'S BE CREATIVE ANOTHER DAY
    j = 0; correct = []
    for i in tf.argmax(y, 1):
        if i == tf.argmax(yhat[j]):
            correct.append(1)
        
        j += 1
    
    num = tf.cast(tf.reduce_sum(correct), dtype = tf.float32)
    den = tf.cast(y.shape[0], dtype = tf.float32)
    return num / den

In [22]:
# TRAINING PROCEDURE
epoch_trn_loss = []
epoch_tst_loss = []
epoch_trn_accy = []
epoch_tst_accy = []
'''
As you can see, we have three loops, two of which are 
inner-loops for the minibatches on both training and 
testing datasets. Needless to say, the minibatches used 
in the testing dataset above are not really necessary, 
since we can have a single batch for validation. 
However, we have them for purpose of comparing the 
performance of the optimization algorithm on both single 
batch and three minibatches. 
'''
for j in range(HyperParams.EPOCHS.value):
    # TRIVIAL LISTS WHERE TO COLLECT THE LOSS FOR EACH DATAPOINT
    trn_loss = []; trn_accy = []
    for i in range(len(data.xtrn_batches)):
        # ALL THE MAGIC HAPPEN IN BACKWARD METHOD.... :D
        model.backward(data.xtrn_batches[i], data.ytrn_batches[i])
        ypred = model(data.xtrn_batches[i])
        trn_loss.append(model.loss(ypred, data.ytrn_batches[i]))
        trn_accy.append(accuracy(data.ytrn_batches[i], ypred))

    trn_err = tf.reduce_mean(trn_loss).numpy()
    trn_acy = tf.reduce_mean(trn_accy).numpy()

    tst_loss = []; tst_accy = []
    for i in range(len(data.xtst_batches)):
        ypred = model(data.xtst_batches[i])
        tst_loss.append(model.loss(ypred, data.ytst_batches[i]))
        tst_accy.append(accuracy(data.ytst_batches[i], ypred))
    
    # GET THE OVERALL LOSS AND ACCURACY 
    tst_err = tf.reduce_mean(tst_loss).numpy()
    tst_acy = tf.reduce_mean(tst_accy).numpy()
    
    epoch_trn_loss.append(trn_err)
    epoch_tst_loss.append(tst_err)
    epoch_trn_accy.append(trn_acy)
    epoch_tst_accy.append(tst_acy)
    
    if j % 20 == 0:
        print("Epoch: {0:4d} \t Training Error: {1:.4f} \t Testing Error: {2:.4f} \t Accuracy Training: {3:.4f} \t Accuracy Testing: {4:.4f}".format(j, trn_err, tst_err, trn_acy, tst_acy))
        

Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
Epoch:    0 	 Training Error: 4.9966 	 Testing Error: 3.6393 	 Accuracy Training: 0.6667 	 Accuracy Testing: 0.7619
Epoch:   20 	 Training Error: 3.4939 	 Testing Error: 2.5276 	 Accuracy Training: 0.6667 	 Accuracy Testing: 0.7619
Epoch:   40 	 Training Error: 2.3230 	 Testing Error: 1.6882 	 Accuracy Training: 0.6667 	 Accuracy Testing: 0.7619
Epoch:   60 	 Training Error: 1.4973 	 Testing Error: 1.0984 	 Accuracy Training: 0.6667 	 Accuracy Testing: 0.7619
Epoch:   80 	 Training Error: 0.8956 	 Testing Error: 0.7481 	 Accuracy Training: 0.6571 	 Accuracy Testing: 0.7619
Epoch:  100 	 Training Error: 0.7920 	 Testing Error: 0.7211 	 Accuracy Training: 0.6571 	 Accuracy Testing: 0.7619
Epoch:  120 	 Training Error: 0.7674 	 Testing Error: 0.6982 	 Accuracy Training: 0.6571 	 Accuracy Testing: 0.7619
Epoch:  140 	 Training Error: 0.7366 	 Testing Error: 0.6754 	 Accuracy Training: 0.6571 	 Acc

In [23]:
# Finally, the following tabularizes the statistics 
# we obtained from model estimation.
df = pd.DataFrame({
    "trn_loss" : epoch_trn_loss,
    "trn_accy" : epoch_trn_accy,
    "tst_loss" : epoch_tst_loss,
    "tst_accy" : epoch_tst_accy
})

df.to_csv("../tf2_output_normal_initializer_batch_size_" + str(HyperParams.BATCH_SIZE.value) + ".csv")
