In [None]:
import tensorflow as tf
from tensorflow.keras import Sequential,losses,mixed_precision
from tensorflow.keras.layers import InputLayer,Dropout, Dense, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.layers import RandomFlip,RandomRotation,RandomZoom,RandomTranslation,RandomBrightness,RandomContrast
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.models import load_model
from IPython.display import display, HTML
import matplotlib.pyplot as plt
import numpy as np

import os
os.chdir('/tf-acno-projects/image-classification/')

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_virtual_device_configuration(
                gpu,
                [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=4500)]  # Limit GPU memory usage
            )
            print(f"Memory limit set for GPU\n")
    except RuntimeError as e:
        print(e)
# Before training
print("\nXLA Status Check:")
print(f"XLA JIT enabled: {tf.config.optimizer.get_jit()}")
print(f"XLA devices: {tf.config.list_logical_devices('XLA_GPU')}")

In [None]:
mixed_precision.set_global_policy('mixed_float16')
tf.config.optimizer.set_jit(True)  # Enable XLA
os.environ['TF_XLA_FLAGS'] = '--tf_xla_enable_xla_devices'

AUTOTUNE = tf.data.AUTOTUNE
data_dir = 'faces_data'
training_cache = 'ds_cache/te_cache'
validation_cache = 'ds_cache/val_cache'
test_cache = 'ds_cache/te_cache'
input_shape=(128,128,3)

In [None]:
data_augmentation = Sequential([
    RandomRotation(0.3),
    RandomZoom(0.2),
    RandomFlip("horizontal_and_vertical"),
    RandomBrightness(0.3),
    RandomContrast(0.3),
    RandomTranslation(0.2, 0.2)
])

In [None]:
def prepare_dataset(ds, batch_size, is_training=True,cache_path=None):
    # Apply data augmentation only during training
    if is_training:
        ds = ds.map(
            lambda x, y: (data_augmentation(x, training=True), y),
            num_parallel_calls=AUTOTUNE
        )
    
    # Normalize images
    ds = ds.map(
        lambda x, y: (tf.cast(x, tf.float32) / 255.0, y),
        num_parallel_calls=AUTOTUNE
    )
    
    ds = ds.cache(cache_path)
         
    if is_training:
        ds = ds.shuffle(1000)
    
    ds = ds.batch(batch_size).prefetch(AUTOTUNE)
    
    return ds

In [None]:
def get_class_distribution(dataset):
    class_counts = np.zeros(5, dtype=np.int32)  # Assuming 5 classes
    
    for _, labels in dataset:
        # Convert tensor to numpy if needed
        if tf.is_tensor(labels):
            labels = labels.numpy()
        # Sum along batch dimension
        class_counts += np.sum(labels, axis=0)
        
    return class_counts

In [None]:
def plot_training_history(train_acc,val_acc,train_loss,val_loss):
    plt.figure(figsize=(12, 6))
    
    plt.subplot(1, 2, 1)
    plt.plot(train_acc, label='Training Accuracy')
    plt.plot(val_acc, label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(train_loss, label='Training Loss')
    plt.plot(val_loss, label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.show()

In [None]:
def create_model():
    regularization_value = 0.001    
    model = Sequential([
       InputLayer(shape=input_shape),
       Conv2D(16,(3,3),
              strides=1,
              activation='relu',
              kernel_regularizer=l2(regularization_value),),
       MaxPooling2D(),
       Dropout(0.20),  
       
       Conv2D(32,(3,3),
              strides=1,
              activation='relu',
              kernel_regularizer=l2(regularization_value)),
       MaxPooling2D(),
       Dropout(0.30),  
                     
       Flatten(),
       Dense(64,
              activation='relu',
              kernel_regularizer=l2(regularization_value)),
       Dropout(0.5),
       Dense(5,
              activation='softmax',
              dtype='float32')
       ])

    return model

In [None]:
dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    image_size=(128, 128),
    label_mode='categorical',# Ensure labels are one-hot encoded
    batch_size= None,
    shuffle=False
)
# Check class names and indices
class_names = dataset.class_names
for i, class_name in enumerate(class_names):
    print(f"Class: {class_name}, Index: {i}\n")

In [None]:
DATASET_SIZE = tf.data.experimental.cardinality(dataset).numpy()
# Define the sizes of your splits
train_size = int(0.8 * DATASET_SIZE)
val_size = int(0.1 * DATASET_SIZE)
test_size = DATASET_SIZE - train_size - val_size

# Split the dataset
train_dataset = dataset.take(train_size) # takes 80% of the dataset
remaining = dataset.skip(train_size) # skips the first 80% means here we have the 20%
val_dataset = remaining.take(val_size) # First 10% of the remaining 20%
test_dataset = remaining.skip(val_size) # take the next 10% from the 20% remaining before

In [None]:
# Get distribution
train_dist = get_class_distribution(train_dataset)

# Calculate class weights
total_samples = sum(train_dist)
n_classes = len(train_dist)
class_weights = {
    i: total_samples / (n_classes * count) if count> 0 else 0
    for i, count in enumerate(train_dist)
}

In [None]:
callbacks = [
    EarlyStopping(monitor="val_loss", mode="min", patience=20,min_delta=0.001),
    ReduceLROnPlateau(monitor="val_loss", mode="min", factor=0.2, patience=10)
]

In [None]:
input_shape = (128,128,3)
batch_sizes = [8,16,32,64]
learning_rates = [0.0001,0.0005]
Epochs = 100
results = []

for batch_size in batch_sizes:
    # Configure datasets with current batch size
    train_ds = prepare_dataset(train_dataset, batch_size,cache_path=training_cache)
    val_ds = prepare_dataset(val_dataset, batch_size, is_training=False,cache_path=validation_cache)
    
    for lr in learning_rates:
        tf.keras.backend.clear_session()
        optimizer = Adam(learning_rate=lr)
        loss_scaling_optimizer = mixed_precision.LossScaleOptimizer(optimizer)
        print(f"Training On ... \t Batch : {batch_size} , LR : {lr}")
        model = create_model()
        model.compile(
            optimizer=loss_scaling_optimizer,
            loss=losses.categorical_crossentropy,
            metrics=['accuracy'])
        
        history = model.fit(train_ds,
                            epochs=Epochs,
                            validation_data=val_ds,
                            class_weight=class_weights,  # Add class weights here
                            callbacks=callbacks,
                            verbose=0)
        
        # Evaluate on validation set
        val_loss, val_accuracy = model.evaluate(val_ds, verbose=0)
        
        results.append({
            'batch_size': batch_size,
            'learning_rate': lr,
            'val_accuracy': val_accuracy,
            'val_loss' : val_loss,
            'accuracy': history['accuracy'][-1],
            'loss': history['loss'][-1],
        })

In [None]:
display(HTML("<h1 style='color: red;'>Execution complete! 🎉</h1>"))
display(HTML("<h1 style='color: green;'>Execution complete! 🎉</h1>"))
display(HTML("<h1 style='color: blue;'>Execution complete! 🎉</h1>"))
display(HTML("<h1 style='color: yellow;'>Execution complete! 🎉</h1>"))

In [None]:
best_result = max(results, key=lambda x: x['val_accuracy'])  

print("\nBest parameters:")
print(f"Batch Size: {best_result['batch_size']}")
print(f"Learning Rate: {best_result['learning_rate']}")
print(f"Validation Accuracy: {best_result['val_accuracy']:.4f}") 
print(f"Validation Loss: {best_result['val_loss']:.4f}")  
print(f"Accuracy: {best_result['accuracy']:.4f}")  
print(f"Loss: {best_result['loss']:.4f}") 

# Plot of the training accuracy and loss
plot_training_history(
    best_result['accuracy'],
    best_result['val_accuracy'],
    best_result['loss'],
    best_result['val_loss'],
)

In [None]:
# Train final model with best parameters
tf.keras.backend.clear_session()
optimizer = Adam(learning_rate=best_result['learning_rate'])
loss_scaling_optimizer = mixed_precision.LossScaleOptimizer(optimizer)

final_model = create_model(input_shape)
final_model.compile(optimizer=loss_scaling_optimizer,
                    loss=losses.categorical_crossentropy,
                    metrics=['accuracy'])

In [None]:
callbacks = callbacks + [ModelCheckpoint("best_model.keras", monitor="val_loss", mode="min", save_best_only=True)]

In [None]:
final_train_batches = prepare_dataset(train_dataset, best_result['batch_size'],cache_path=training_cache)
final_val_batches = prepare_dataset(val_dataset, best_result['batch_size'], is_training=False,cache_path=validation_cache)

final_history = final_model.fit(final_train_batches,
                                epochs=150,  # You might want to increase epochs for final training
                                validation_data=final_val_batches,
                                callbacks=callbacks)


In [None]:
train_loss = final_history.history['loss']
val_loss = final_history.history['val_loss']
train_acc = final_history.history['accuracy']
val_acc = final_history.history['val_accuracy']

plot_training_history(train_acc,val_acc,train_loss,val_loss)

In [None]:
# Load the best model saved by ModelCheckpoint
best_model = load_model("best_model.keras")

In [None]:
# Evaluate on test set
final_test_batches = prepare_dataset(test_dataset, best_result['batch_size'],is_training=False,cache_path=test_cache)
test_loss, test_accuracy = best_model.evaluate(final_test_batches)
print(f"\nFinal Test Accuracy: {test_accuracy:.4f},Final Test Loss: {test_loss:.4f}")

Final Test Accuracy: 0.9250,Final Test Loss: 0.1644


In [None]:
final_model.save(os.path.join('models','emotions_5_class_model_28-10.keras'))