In [1]:
import math
from typing import Dict, Callable
from tensorflow.keras import backend as K
import tensorflow as tf
import numpy as np
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
# NUMPY -> KERAS

def gen_model():
    'TODO: docstring'    
    return tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(input_shape=(28, 28, 1),
                               filters=20,
                               kernel_size=5),
        tf.keras.layers.MaxPool2D(pool_size=2, strides=2),
        tf.keras.layers.Conv2D(filters=50,
                               kernel_size=5),
        tf.keras.layers.MaxPool2D(pool_size=2, strides=2),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(500, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')])

mnist = tf.keras.datasets.mnist
(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = (x_train / 255.0).astype(np.float32), (x_test / 255.0).astype(np.float32)
y_train, y_test = y_train.astype(np.float32), y_test.astype(np.float32)
x_train = x_train[..., tf.newaxis]
model = gen_model()
opt = tf.keras.optimizers.SGD(0.01)
model.compile(opt,
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.fit(x=x_train, y=y_train, epochs=1, batch_size=32)

Epoch 1/1
  864/60000 [..............................] - ETA: 6:31 - loss: 2.2820 - acc: 0.1250

In [None]:
# DATASET -> KERAS

def gen_model():
    'TODO: docstring'    
    return tf.keras.models.Sequential([
        tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
        tf.keras.layers.Conv2D(filters=20,
                               kernel_size=5),
        tf.keras.layers.MaxPool2D(pool_size=2, strides=2),
        tf.keras.layers.Conv2D(filters=50,
                               kernel_size=5),
        tf.keras.layers.MaxPool2D(pool_size=2, strides=2),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(500, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')])

mnist = tf.keras.datasets.mnist
(x_train, y_train), _ = mnist.load_data()
x_train = (x_train / 255.0).astype(np.float32)[..., tf.newaxis]
y_train = tf.one_hot(y_train, 10)
train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(100).batch(32)
train_it = train_ds.make_one_shot_iterator()
xs, ys = train_it.get_next()
model = gen_model()
opt = tf.keras.optimizers.SGD(0.01)
steps = math.floor(x_train.shape[0] / 32)
model.compile(opt, 
              loss='categorical_crossentropy',
              metrics=['accuracy'])
model.fit(xs, ys, epochs=1, steps_per_epoch=steps)

In [None]:
# -----------

In [203]:
def _1cycle(iteration_idx:int,
            cyc_iterations:int,
            ramp_iterations:int,
            min_lr:float,
            max_lr:float):
    'TODO: docstring'
    if cyc_iterations % 2 != 1:
        raise ValueError('Even value for `cyc_iterations` implies asymetric step size')
    mid = (cyc_iterations - 1)/2
    if iteration_idx == mid: return max_lr
    elif iteration_idx == 0 or iteration_idx == (2 * mid): return min_lr
    elif iteration_idx < cyc_iterations: 
        mod = (iteration_idx % mid)
        numerator =  mod if iteration_idx < mid else mid - mod
        return min_lr + (numerator / mid) * (max_lr - min_lr)
    else:
        idx = iteration_idx - cyc_iterations
        ramp_max = min_lr
        ramp_min = min_lr * 1e-5
        return ramp_max - ((idx + 1) / ramp_iterations) * (ramp_max - ramp_min)

class OneCycleSchedulerCallback(tf.keras.callbacks.Callback):
    
    def __init__(self, cyc_iterations:int, ramp_iterations:int, min_lr:float, max_lr:float):
        'TODO: docstring'        
        self.cyc_iterations = cyc_iterations
        self.ramp_iterations = ramp_iterations
        self.min_lr = min_lr
        self.max_lr = max_lr
        self.lr = min_lr
        self.lrs = []
        self.its = []
    
    def on_batch_begin(self, batch, logs=None):
        'TODO: docstring'        
        if not hasattr(self.model.optimizer, 'lr'):
            raise ValueError('Optimizer must have a "lr" attribute.')
        self.lr = _1cycle_f(logs['batch'] - 1, self.cyc_iterations, self.ramp_iterations, self.min_lr, self.max_lr)
        K.set_value(self.model.optimizer.lr, self.lr)
    
    def on_batch_end(self, batch, logs=None):
        'TODO: docstring'
        self.lrs.append(self.lr)
        self.its.append(logs['batch'])
        
    def plot_lr_vs_iteration(self):
        'TODO: docstring'        
        df = pd.DataFrame({'lr' : self.lrs, 'iteration': self.its})
        sns.lineplot(x='iteration', y='lr', data=df)
        
def _triangular_f(it:int, ss:int, min_lr:float, max_lr:float):
    'TODO: docstring'
    # calculate number of completed cycles
    cyc = math.floor(it / (ss * 2))
    # calculate number of completed iterations in current cycle
    it_cyc = it - (cyc * 2 * ss)
    # calculate distance from lr_max iteration
    mid_dist = math.fabs(it_cyc - ss)
    # scale lr difference
    scalar = mid_dist / ss
    return min_lr + (1 - scalar) * (max_lr - min_lr)

class LRFinderCallback(tf.keras.callbacks.Callback):
    
    def __init__(self, step_size:int, min_lr:float, max_lr:float, evaluate_mod:int, evaluate_fn:Callable):
        'TODO: docstring'        
        super().__init__()
        self.step_size = step_size
        self.lr = min_lr
        self.min_lr = min_lr
        self.max_lr = max_lr
        self.evaluate_mod = evaluate_mod
        self.evaluate_fn = evaluate_fn
        self.lrs = []
        self.its = []
        self.val_lrs = []
        self.val_loss = []
    
    def on_batch_begin(self, batch, logs=None):
        'TODO: docstring'
        if not hasattr(self.model.optimizer, 'lr'):
            raise ValueError('Optimizer must have a "lr" attribute.')
        self.lr = _triangular_f(logs['batch'], self.step_size, self.min_lr, self.max_lr)
        K.set_value(self.model.optimizer.lr, self.lr)
                
    def on_batch_end(self, batch, logs=None):
        'TODO: docstring'
        self.lrs.append(self.lr)
        self.its.append(logs['batch'])
        if logs['batch'] % self.evaluate_mod == 0:
            self.val_lrs.append(self.lr)
            self.val_loss.append(self.evaluate_fn())
        
    def plot_lr_vs_iteration(self):
        'TODO: docstring'        
        df = pd.DataFrame({'lr' : self.lrs, 'iteration': self.its})
        sns.lineplot(x='iteration', y='lr', data=df)
        
    def plot_lr_vs_val_acc(self):
        'TODO: docstring'        
        df = pd.DataFrame({'lr' : self.val_lrs, 'loss': self.val_loss})
        sns.lineplot(x='lr', y='loss', data=df)

In [None]:
def gen_model():
    'TODO: docstring'    
    return tf.keras.models.Sequential([
        tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
        tf.keras.layers.Conv2D(filters=20,
                               kernel_size=5),
        tf.keras.layers.MaxPool2D(pool_size=2, strides=2),
        tf.keras.layers.Conv2D(filters=50,
                               kernel_size=5),
        tf.keras.layers.MaxPool2D(pool_size=2, strides=2),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(500, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')])

def gen_mnist_iterator(x, y, bs):
    'TODO: docstring'    
    x = (x / 255.0).astype(np.float32)[..., tf.newaxis]
    y = tf.one_hot(y, 10)
    ds = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(100).batch(bs)
    return ds.make_one_shot_iterator()

mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
train_xs, train_ys = gen_mnist_iterator(x_train, y_train, 32).get_next()
test_xs, test_ys = gen_mnist_iterator(x_test, y_test, 25).get_next()

model = gen_model()
opt = tf.keras.optimizers.SGD(0.01)
train_steps = math.floor(x_train.shape[0] / 32)
test_steps = math.floor(x_test.shape[0] / 25)
#cb = LRFinderCallback(step_size=10,
#                      min_lr=0.1,
#                      max_lr=1,
#                      evaluate_mod=10000,
#                      evaluate_fn= lambda: model.evaluate(test_xs, test_ys, steps=test_steps))

cb = OneCycleSchedulerCallback(cyc_iterations=50000,
                               ramp_iterations=10000,
                               min_lr=0.1,
                               max_lr=1)
model.compile(opt, 
              loss='categorical_crossentropy',
              metrics=['accuracy'])
model.fit(train_xs, train_ys, epochs=1, steps_per_epoch=train_steps, callbacks=[cb])

Epoch 1/1
 254/1875 [===>..........................] - ETA: 7:39 - loss: 0.4393 - acc: 0.8643