# Business Default Model 

## Import packages 

In [None]:
import numpy as np 

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import Model

import tensorflow_datasets as tfds

## Custom layers 

In [None]:
class L1_ActivityRegularization(keras.layers.Layer):
    """Layer that creates an activity sparsity regularization loss."""

    def __init__(self, rate=1e-2):
        super(L1_ActivityRegularization, self).__init__()
        self.rate = rate

    def call(self, inputs):
        # We use `add_loss` to create a regularization loss
        # that depends on the inputs.
        self.add_loss(self.rate * tf.reduce_sum(tf.math.abs(inputs)))
        return inputs

class residualBlock(keras.layers.Layer):
    """
    y1 = ReLU(w1.x+b1)
    y2 = RelU(w2.y1+b2)
    y = ReLU(y2+x)
    """

    def __init__(self,):
        super(residualBlock, self).__init__()
        self.r1 = L1_ActivityRegularization(1e-3)
        self.r2 = L1_ActivityRegularization(1e-3)

    def build(self, input_shape):

        self.w1 = self.add_weight(
            shape=(input_shape[-1], input_shape[-1]),
            initializer="random_normal",
            trainable=True,
        )
        self.b1 = self.add_weight(
            shape=(input_shape[-1],), initializer="random_normal", trainable=True
        )
        self.w2 = self.add_weight(
            shape=(input_shape[-1], input_shape[-1]),
            initializer="random_normal",
            trainable=True,
        )
        self.b2 = self.add_weight(
            shape=(input_shape[-1],), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        y1 = self.r1(tf.nn.relu(tf.matmul(inputs, self.w1) + self.b1))
        y2 = self.r2(tf.nn.relu(tf.matmul(y1, self.w2) + self.b2))
        return tf.nn.relu(y2+inputs)

## Declare Model 

In [None]:
class BankModel(Model):
    def __init__(self,):
        super(BankModel, self).__init__()
        
        # Declare model layers 
        self.layer_1 = residualBlock()
        self.layer_2 = keras.layers.Dense(64, activation="relu")
        self.layer_3  = keras.layers.Dense(10, activation="softmax")

        # Declare loss and metrics
        self.loss_cc = tf.keras.losses.CategoricalCrossentropy()
        self.acc = tf.keras.metrics.CategoricalAccuracy()
        self.loss_tracker = keras.metrics.Mean(name="loss")
        
    @property
    def metrics(self):
        """List of the model's metrics.
        We make sure the loss tracker is listed as part of `model.metrics`
        so that `fit()` and `evaluate()` are able to `reset()` the loss tracker
        at the start of each epoch and at the start of an `evaluate()` call.
        """
        return [self.loss_tracker,self.acc]
    
    def call(self, inputs):
        x = self.layer_1(inputs)
        x = self.layer_2(x)
        y = self.layer_3(x)
        return y

    def train_step(self, data):
        X, y = data

        with tf.GradientTape() as tape:
            yh = self.layer_1(X)
            yh = self.layer_2(yh)
            yh = self.layer_3(yh)
            
            # Compute loss
            loss = self.loss_cc(y,yh)
            self.loss_tracker.update_state(loss)
            
        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        
        # Update metrics (includes the metric that tracks the loss)
        self.acc.update_state(y,yh)
        
        # Return a dict mapping metric names to current value   
        results = {"acc": self.acc.result()}
        results["loss"] = self.loss_tracker.result()
        return results
    
    def test_step(self, data):
        X, y = data
        
        yh = self.layer_1(X)
        yh = self.layer_2(yh)
        yh = self.layer_3(yh)
            
        # Compute loss
        loss = self.loss_cc(y,yh)
        self.loss_tracker.update_state(loss)

        # Return a dict mapping metric names to current value  
        self.acc.update_state(y,yh)
        results = {"acc": self.acc.result()}
        results["loss"] = self.loss_tracker.result()
        return results
    
    def build_graph(self, raw_shape):
        x = tf.keras.layers.Input(shape=raw_shape)
        return Model(inputs=[x], outputs=self.call(x))

## Load data for testing model 

In [None]:
(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

In [None]:
def prep_img(image, label):
    """Normalizes images: `uint8` -> `float32` and flatten."""
    image = tf.squeeze(tf.cast(image, tf.float32))
    return tf.reshape(image,(28*28,)) / 255., tf.one_hot(label,10)

In [None]:
ds_train = ds_train.map(
    prep_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(64)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)

In [None]:
ds_test = ds_test.map(
    prep_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(64)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)

In [None]:
for t in ds_train.take(1):
    X, y = t
    print(X.shape)
    print(y.shape)

## Test model 

In [None]:
input_shape = (784,)
model = BankModel()
model.compile(optimizer="rmsprop")
model.build_graph(input_shape).summary()

In [None]:
history = model.fit(ds_train, epochs=5, validation_data=ds_test)