In [1]:
import tensorflow as tf
from tfkan.layers import Conv2DKAN, DenseKAN
from keras.layers import GlobalAveragePooling2D

import numpy as np
from matplotlib import pyplot as plt

In [2]:
# load fashion-mnist dataset
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
# normalize data
x_train = np.expand_dims(x_train / 255.0, axis=-1).astype(np.float32)
x_test = np.expand_dims(x_test / 255.0, axis=-1).astype(np.float32)

#### To call `update_grid_from_samples()` in user-define training logic

In [3]:
# KAN
kan = tf.keras.models.Sequential([
    Conv2DKAN(filters=8, kernel_size=5, strides=2, padding='valid', kan_kwargs={'grid_size': 3}),
    tf.keras.layers.LayerNormalization(),
    Conv2DKAN(filters=16, kernel_size=5, strides=2, padding='valid', kan_kwargs={'grid_size': 3}),
    GlobalAveragePooling2D(),
    DenseKAN(10, grid_size=3),
    tf.keras.layers.Softmax()
])
kan.build(input_shape=(None, 28, 28, 1))
kan.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2dkan (Conv2DKAN)       (None, 12, 12, 8)         1658      
                                                                 
 layer_normalization (Layer  (None, 12, 12, 8)         16        
 Normalization)                                                  
                                                                 
 conv2dkan_1 (Conv2DKAN)     (None, 4, 4, 16)          24416     
                                                                 
 global_average_pooling2d (  (None, 16)                0         
 GlobalAveragePooling2D)                                         
                                                                 
 dense_kan (DenseKAN)        (None, 10)                1290      
                                                                 
 softmax (Softmax)           (None, 10)                0

In [4]:
def train_kan(
    model,
    x_train,
    y_train,
    x_valid=None,
    y_valid=None,
    epochs: int=5,
    learning_rate: float=1e-3,
    batch_size: int=128,
    verbose: int=1
):  
    # build optimizer
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

    # build dataset
    train_set = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    train_set = train_set.batch(batch_size)
    if x_valid is not None and y_valid is not None:
        valid_set = tf.data.Dataset.from_tensor_slices((x_valid, y_valid))
        valid_set = valid_set.batch(batch_size)
    else:
        valid_set = None

    # define loss function
    loss_func = tf.keras.losses.SparseCategoricalCrossentropy()

    # define metrics
    train_loss = tf.keras.metrics.Mean(name='train_loss')
    train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

    step = 0
    # training loop
    for epoch in range(epochs):
        # reset metrics
        train_loss.reset_states()
        train_accuracy.reset_states()

        for x_batch, y_batch in train_set:
            with tf.GradientTape() as tape:
                y_pred = model(x_batch, training=True)
                loss = loss_func(y_batch, y_pred)
                loss = tf.reduce_mean(loss)
            # update weights
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            
            train_loss(loss)
            train_accuracy(y_batch, y_pred)
            step += 1

            if verbose > 0 and step % verbose == 0:
                # clear the output and print the updated metrics
                print(f"[EPCOH: {epoch+1:3d} / {epochs:3d}, STEP: {step:6d}]: \
train_loss: {train_loss.result():.4f}, train_accuracy: {train_accuracy.result():.4f}", end='\r')
        
        # callback after each epoch
        # call update_grid_from_samples method
        for layer in model.layers:
            if hasattr(layer, 'update_grid_from_samples'):
                layer.update_grid_from_samples(x_batch)
            x_batch = layer(x_batch)

        # eval on validation set
        if valid_set:
            valid_loss = tf.keras.metrics.Mean(name='valid_loss')
            valid_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='valid_accuracy')
            for x_batch, y_batch in valid_set:
                y_pred = model(x_batch, training=False)
                loss = tf.reduce_mean(loss_func(y_batch, y_pred))
                valid_loss(loss)
                valid_accuracy(y_batch, y_pred)
            print(f"[EPCOH: {epoch+1:3d} / {epochs:3d}, STEP: {step:6d}]: \
train_loss: {train_loss.result():.4f}, train_accuracy: {train_accuracy.result():.4f}, \
valid_loss: {valid_loss.result():.4f}, valid_accuracy: {valid_accuracy.result():.4f}")
        else:
            print()
    
    return model

In [6]:
kan = train_kan(kan, x_train, y_train, x_test, y_test, epochs=5, learning_rate=1e-3, batch_size=128, verbose=1)

[EPCOH:   1 /   5, STEP:    469]: train_loss: 1.0307, train_accuracy: 0.6370, valid_loss: 0.7191, valid_accuracy: 0.7340
[EPCOH:   2 /   5, STEP:    938]: train_loss: 0.6334, train_accuracy: 0.7650, valid_loss: 0.6371, valid_accuracy: 0.7665
[EPCOH:   3 /   5, STEP:   1407]: train_loss: 0.5713, train_accuracy: 0.7890, valid_loss: 0.5890, valid_accuracy: 0.7844
[EPCOH:   4 /   5, STEP:   1876]: train_loss: 0.5364, train_accuracy: 0.8031, valid_loss: 0.5555, valid_accuracy: 0.7962
[EPCOH:   5 /   5, STEP:   2345]: train_loss: 0.5114, train_accuracy: 0.8130, valid_loss: 0.5318, valid_accuracy: 0.8097


#### To use `update_grid_from_samples()` in Tensorflow Callbacks

In [8]:
# KAN
kan = tf.keras.models.Sequential([
    Conv2DKAN(filters=8, kernel_size=5, strides=2, padding='valid', kan_kwargs={'grid_size': 3}),
    tf.keras.layers.LayerNormalization(),
    Conv2DKAN(filters=16, kernel_size=5, strides=2, padding='valid', kan_kwargs={'grid_size': 3}),
    GlobalAveragePooling2D(),
    DenseKAN(10, grid_size=3),
    tf.keras.layers.Softmax()
])
kan.build(input_shape=(None, 28, 28, 1))
kan.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2dkan_2 (Conv2DKAN)     (None, 12, 12, 8)         1658      
                                                                 
 layer_normalization_1 (Lay  (None, 12, 12, 8)         16        
 erNormalization)                                                
                                                                 
 conv2dkan_3 (Conv2DKAN)     (None, 4, 4, 16)          24416     
                                                                 
 global_average_pooling2d_1  (None, 16)                0         
  (GlobalAveragePooling2D)                                       
                                                                 
 dense_kan_1 (DenseKAN)      (None, 10)                1290      
                                                                 
 softmax_1 (Softmax)         (None, 10)               

In [9]:
# define update grid callback
class UpdateGridCallback(tf.keras.callbacks.Callback):
    def on_epoch_begin(self, epoch, logs=None):
        """
        update grid before new epoch begins
        """
        global x_train
        x_batch = x_train[:128]
        if epoch > 0:
            for layer in self.model.layers:
                if hasattr(layer, 'update_grid_from_samples'):
                    layer.update_grid_from_samples(x_batch)
                x_batch = layer(x_batch)

In [10]:
kan.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)
# add callback to training
kan.fit(x_train, y_train, epochs=5, batch_size=128, 
        validation_data=(x_test, y_test), callbacks=[UpdateGridCallback()])

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.src.callbacks.History at 0xffff1841ed60>