In [None]:
import tensorflow as tf

In [None]:
from tensorflow.keras.layers import Dense, BatchNormalization
from tensorflow.keras.models import Sequential

In [None]:
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
diabetes = load_diabetes()

data = diabetes['data']
targets = diabetes['target']

In [None]:
X_train,X_val,y_train,y_val = train_test_split(data, targets, test_size = 0.1)

#### Dummy Model

In [None]:
model = Sequential([
    Dense(128, activation='relu', input_shape=(X_train.shape[1],)),
    Dense(64,activation='relu'),
    tf.keras.layers.BatchNormalization(),
    Dense(64, activation='relu'),
    Dense(64, activation='relu'),
    Dense(1)  
])

In [None]:
model.compile(
    optimizer='adam',
    loss='mse',
    metrics=['mae']
)

### Custom Callback
##### we will use logs dictionary to access the loss and metric value

In [None]:
class customCallback(tf.keras.callbacks.Callback):
    
    def on_train_batch_end(self,batch,logs = None):
        if batch % 2 is 0:
            print( f"\n After Batch {batch}, loss is {logs['loss']}" )

    def on_test_batch_end(self,batch,logs=None):
        if batch % 2 is 0:
            print(f"\n After Batch {batch}, loss is {logs['loss']} ")

    def on_epoch_end(self, epoch, logs = None):
        print(f"Epoch {epoch}, Mean Absolute Error is {logs['mae']}, Loss is {logs['loss']}")
        
    def on_predict_batch_end(self, batch, logs=None):
        print(f"Finished Prediction on Batch {batch}")

In [None]:
history = model.fit(X_train, y_train, epochs=10, callbacks=[customCallback()], verbose = 0, batch_size=2**6)

In [None]:
model.evaluate(X_val,y_val, callbacks=[customCallback()], verbose=0, batch_size=10)

In [None]:
model.predict(X_val,batch_size=10, callbacks=[customCallback()], verbose=False )

### We will define a custom callback to reduce the learning rate w.r.t to # of Epochs

##### It is going to have a more complex custom callback

In [None]:
lr_schedule = [
    (5,0.05), (10,0.03), (15,0.02), (20,0.01)
]
# we will get new learning rate using this function by comparing to list above.
def get_new_learning_rate(epoch, lr):
    for i in lr_schedule:
        if epoch in i:
            lr = i[1]

    return lr

In [None]:
class Learning_rate_scheduler( tf.keras.callbacks.Callback ):
     def __init__(self, new_lr):
        super(Learning_rate_scheduler, self).__init__
        #adding new learning rate function to our callback
        self.new_lr = new_lr
    
     def on_epoch_begin(self, epoch, logs=None):
        #we will check if our optimizer has learning rate option or not
        try:
            curr_rate = tf.keras.backend.get_value(self.model.optimizer.lr)

            #calling auxillary function to get scheduled learning rate, we have passed the function as parameter which is new_lr

            scheduled_rate = self.new_lr(epoch, curr_rate)

            tf.keras.backend.set_value(self.model.optimizer.lr, scheduled_rate)

            print(f"Learning Rate for Epoch {epoch} is {tf.keras.backend.get_value(self.model.optimizer.lr)}")

        except Exception as E:
            print(f'{E}\n Most Probably your optimizer do not have learing rate option.')

In [None]:
model = Sequential([
    
    Dense(128, activation='relu', input_shape=(X_train.shape[1],)),
    Dense(64,activation='relu'),
    tf.keras.layers.BatchNormalization(),
    Dense(64, activation='relu'),
    Dense(64, activation='relu'),
    Dense(1)        
])

In [None]:
model.compile(loss='mse',
                optimizer="adam",
                metrics=['mae', 'mse'])

In [None]:
model.fit(X_train, y_train, epochs=25, batch_size=64, callbacks=[Learning_rate_scheduler(get_new_learning_rate)], verbose=False)