# Practice on Keras Functional API

In [1]:
# imports
import keras
from keras import layers
from keras import backend as K
import numpy as np

In [2]:
# mnist dataset
# loading training MNIST dataset
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# reshaping and normalizing the images
x_train = x_train.reshape(60000, 784).astype('float32')/255  
x_test = x_test.reshape(10000, 784).astype('float32')/255
# one-hot encoding of labels
y_train = keras.utils.to_categorical(y_train, 10)
y_test = keras.utils.to_categorical(y_test, 10)


In [3]:
# building the model
def build_model():
    """
    Build a simple MLP model for MNIST classification.
    :return: A Keras Model with inputs and outputs defined.
    """
    inputs = keras.Input(shape=(784,), name='input')  # input layer
    x = layers.Dense(64, activation='relu', name='hidden1')(inputs)  # hidden layer
    x = layers.Dense(64, activation='relu', name='hidden2')(x)  # hidden layer
    outputs = layers.Dense(10, activation='softmax', name='output')(x)  # output layer
    return keras.Model(inputs=inputs, outputs=outputs)

model_ce = build_model()
model_fl = build_model()
model_rl = build_model()

model_rl.summary()

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input (InputLayer)          [(None, 784)]             0         
                                                                 
 hidden1 (Dense)             (None, 64)                50240     
                                                                 
 hidden2 (Dense)             (None, 64)                4160      
                                                                 
 output (Dense)              (None, 10)                650       
                                                                 
Total params: 55050 (215.04 KB)
Trainable params: 55050 (215.04 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [4]:
optimizer = keras.optimizers.SGD()
ce_loss = keras.losses.CategoricalCrossentropy()
metrics = [keras.metrics.CategoricalAccuracy()]
# compiling the model
model_ce.compile(optimizer=optimizer, loss=ce_loss, metrics=metrics)
# training the model
model_ce.fit(x_train, y_train, batch_size=64, epochs=6, validation_split=0.2)
# evaluating the model
test_scores = model_ce.evaluate(x_test, y_test, verbose=2)
print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])

# saving the model
path = './weights/mnist_ce_model.keras'
model_ce.save(path)

Epoch 1/6
Epoch 2/6
Epoch 3/6
Epoch 4/6
Epoch 5/6
Epoch 6/6
313/313 - 0s - loss: 0.2506 - categorical_accuracy: 0.9280 - 319ms/epoch - 1ms/step
Test loss: 0.2505910396575928
Test accuracy: 0.9279999732971191


In [5]:
focal_loss = keras.losses.CategoricalFocalCrossentropy()
# compiling the model
optimizer = keras.optimizers.SGD()
model_fl.compile(optimizer=optimizer, loss=focal_loss, metrics=metrics)
# training the model
model_fl.fit(x_train, y_train, batch_size=64, epochs=6, validation_split=0.2)
# evaluating the model
test_scores = model_fl.evaluate(x_test, y_test, verbose=2)
print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])
# saving the model
path = './weights/mnist_focal_model.keras'
model_fl.save(path)

Epoch 1/6
Epoch 2/6
Epoch 3/6
Epoch 4/6
Epoch 5/6
Epoch 6/6
313/313 - 0s - loss: 0.0504 - categorical_accuracy: 0.8923 - 311ms/epoch - 992us/step
Test loss: 0.050351034849882126
Test accuracy: 0.892300009727478


In [6]:
# # defining focal  loss function
# # FL(p_t) = -alpha_t * (1-p_t)^gamma * log(p_t)
# def focal_loss(alpha=0.25, gamma=2.0):
#     """
#     Focal Loss, Keras styles
#     :param alpha: Weighting factor for the positive class, typically in the range [0, 1].
#     :param gamma: Focusing parameter to down-weight well-classified examples, typically in the range [0, 5].
#     :return: A callable focal_loss_fixed(y_true, y_pred) to be used as a Keras loss function.
#     """
#     def focal_loss_fixed(y_true, y_pred):
#         """
#         The actual loss computation.
#         :param y_true: Ground truth labels, shape of [batch_size, num_classes].
#         :param y_pred: Predicted class probabilities, shape of [batch_size, num_classes].
#         :return: 
#         """
#         # Clip the prediction value to prevent NaN's and Inf's
#         y_pred = K.clip(y_pred, K.epsilon(), 1. - K.epsilon())
#         # Calculate Cross Entropy
#         cross_entropy = -y_true * K.log(y_pred)
#         # Calculate Focal Loss
#         loss = alpha * K.pow(1 - y_pred, gamma) * cross_entropy
#         # Compute mean loss in batch
#         return K.mean(K.sum(loss, axis=-1))
#     return focal_loss_fixed


In [19]:
# defining rational loss function
# RL(p_t) = - 1/p_t * log(p_t)
def rational_loss():
    """
    Rational Loss for multi-class classification, Keras style.
    RL(p_t) = - 1/p_t * log(p_t), where p_t is the probability associated with the true class.

    :return: A callable rational_loss_fixed(y_true, y_pred) to be used as a Keras loss function.
    """
    def rational_loss_fixed(y_true, y_pred):
        """
        The actual loss computation.
        :param y_true: Ground truth labels, shape of [batch_size, num_classes].
        :param y_pred: Predicted class probabilities, shape of [batch_size, num_classes].
        :return: A scalar representing the mean rational loss over the batch.
        """
        # Clip the prediction value to prevent NaN's and Inf's
        y_pred = K.clip(y_pred, K.epsilon(), 1. - K.epsilon())
        # Calculate Cross Entropy
        cross_entropy = -y_true * K.log(y_pred)
        # Compute the Rational Loss
        loss = (1 / y_pred) * cross_entropy
        # Average the loss over the batch
        return K.mean(K.sum(loss, axis=-1))
    # TODO: use builtin categorical crossentropy
    # TODO: plot the function
    

    return rational_loss_fixed


In [20]:
# compiling the model
rational_loss = rational_loss()
optimizer = keras.optimizers.SGD()
model_rl.compile(optimizer=optimizer, loss=rational_loss, metrics=metrics)
# training the model
history = model_rl.fit(x_train, y_train, batch_size=64, epochs=6, validation_split=0.2)
# evaluating the model
test_scores = model_rl.evaluate(x_test, y_test, verbose=2)
print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])
# saving the model
path = './weights/mnist_rational_model.keras'
model_rl.save(path)

Epoch 1/6
Epoch 2/6
Epoch 3/6
Epoch 4/6
Epoch 5/6
Epoch 6/6
313/313 - 0s - loss: 144901712.0000 - categorical_accuracy: 0.1010 - 377ms/epoch - 1ms/step
Test loss: 144901712.0
Test accuracy: 0.10100000351667404


In [9]:
# rebuilding the models
model_ce = build_model()
model_fl = build_model()
model_rl = build_model()


In [10]:
# imbalance 
def create_imbalanced_data(x, y, imbalance_rate=0.5):
    """
    Create an imbalanced dataset based on a given probability distribution.
    The probability for class d is given by: P(d) = 0.5^d / 2*(1 - 0.5^10)

    :param x: Features, shape of [total_samples, feature_dim].
    :param y: One-hot encoded labels, shape of [total_samples, num_classes].
    :param imbalance_rate: Base rate for the exponential decay of class frequency (default 0.5).
    :return: Tuple of imbalanced features and labels, shapes of [selected_samples, feature_dim] and [selected_samples, num_classes].
    """
    total_samples = len(y)
    a = imbalance_rate
    normalization_factor = 2 * (1 - a**10)

    indices_by_class = [np.where(y[:, d] == 1)[0] for d in range(10)]
    selected_indices = []

    for d in range(10):
        probability_d = (a**d) / normalization_factor
        frequency = int(total_samples * probability_d)
        np.random.shuffle(indices_by_class[d]) # Shuffle to ensure random selection
        selected_indices.extend(indices_by_class[d][:frequency])

    return x[selected_indices], y[selected_indices]


x_train_imbalanced, y_train_imbalanced = create_imbalanced_data(x_train, y_train)


In [15]:
# compiling the model
model_ce.compile(optimizer=keras.optimizers.SGD(), loss=keras.losses.CategoricalCrossentropy(), metrics=metrics)
model_fl.compile(optimizer=keras.optimizers.SGD(), loss=keras.losses.CategoricalFocalCrossentropy(), metrics=metrics)

In [14]:
rational_loss = rational_loss()
optimizer = keras.optimizers.SGD()
model_rl.compile(optimizer=optimizer, loss=rational_loss, metrics=metrics)


TypeError: rational_loss_fixed() missing 2 required positional arguments: 'y_true' and 'y_pred'

In [16]:
print("Training on imbalanced data:")
print("Cross Entropy:")
model_ce.fit(x_train_imbalanced, y_train_imbalanced, epochs=10, batch_size=32)
print("Focal Loss:")
model_fl.fit(x_train_imbalanced, y_train_imbalanced, epochs=10, batch_size=32)
print("Rational Loss:")
model_rl.fit(x_train_imbalanced, y_train_imbalanced, epochs=10, batch_size=32)

Training on imbalanced data:
Cross Entropy:
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Focal Loss:
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Rational Loss:


RuntimeError: You must compile your model before training/testing. Use `model.compile(optimizer, loss)`.

In [17]:
# Balanced data
print(model_ce.evaluate(x_test, y_test))
print(model_fl.evaluate(x_test, y_test))
print(model_rl.evaluate(x_test, y_test))

# Imbalanced data
x_test_imbalanced, y_test_imbalanced = create_imbalanced_data(x_test, y_test)
print(model_ce.evaluate(x_test_imbalanced, y_test_imbalanced))
print(model_fl.evaluate(x_test_imbalanced, y_test_imbalanced))
print(model_rl.evaluate(x_test_imbalanced, y_test_imbalanced))

[0.6644544005393982, 0.7811999917030334]
[0.2201809138059616, 0.7008000016212463]


RuntimeError: You must compile your model before training/testing. Use `model.compile(optimizer, loss)`.

In [18]:
def accuracy_by_bins(model, x, y):
    """
    Calculate and print the accuracy of the given model for specific bins of classes.
    The bins are defined as: 0-1, 2-7, 8-9.

    :param model: Trained Keras model to evaluate.
    :param x: Input features, shape of [num_samples, feature_dim].
    :param y: One-hot encoded labels, shape of [num_samples, num_classes].
    """
    predictions = model.predict(x).argmax(axis=-1)
    true_labels = y.argmax(axis=-1)
    bins = [(0, 1), (2, 7), (8, 9)]
    for bin_start, bin_end in bins:
        mask = (true_labels >= bin_start) & (true_labels <= bin_end) 
        bin_accuracy = np.mean(predictions[mask] == true_labels[mask])
        print(f"Accuracy for bin {bin_start}-{bin_end}: {bin_accuracy}")

print("Accuracy by bins for balanced data:")
print("Cross Entropy:")
accuracy_by_bins(model_ce, x_test, y_test)
print("Focal Loss:")
accuracy_by_bins(model_fl, x_test, y_test)
print("Rational Loss:")
accuracy_by_bins(model_rl, x_test, y_test)

Accuracy by bins for balanced data:
Cross Entropy:
Accuracy for bin 0-1: 0.9929078014184397
Accuracy for bin 2-7: 0.9010504913588614
Accuracy for bin 8-9: 0.19868885526979324
Focal Loss:
Accuracy for bin 0-1: 0.991016548463357
Accuracy for bin 2-7: 0.8309047780413419
Accuracy for bin 8-9: 0.004034291477559254
Rational Loss:
Accuracy for bin 0-1: 0.17494089834515367
Accuracy for bin 2-7: 0.17705862419518809
Accuracy for bin 8-9: 0.0
