In [1]:
#STEP 1: Environment Setup + GPU Verification
import os, time
import tensorflow as tf

print("TensorFlow version:", tf.__version__)

# --- Check for GPU availability ---
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f"GPU detected: {gpus[0].name}")
    try:
        tf.keras.mixed_precision.set_global_policy('mixed_float16')
        print("Mixed precision enabled for performance boost.")
    except Exception as e:
        print("Mixed precision could not be set:", e)
else:
    print("No GPU detected — running on CPU (training will be slower).")

tf.config.threading.set_inter_op_parallelism_threads(4)
tf.config.threading.set_intra_op_parallelism_threads(4)

SEED = 42
tf.random.set_seed(SEED)

print("Environment initialized successfully.")
print("Mixed precision:", tf.keras.mixed_precision.global_policy())

TensorFlow version: 2.19.0
No GPU detected — running on CPU (training will be slower).
Environment initialized successfully.
Mixed precision: <DTypePolicy "float32">


In [2]:
# STEP 2: Dataset Verification & Path Setup 
import os

TRAIN_DIR = 'Dataset/train'
TEST_DIR  = 'Dataset/test'

if not os.path.exists(TRAIN_DIR):
    ALT_TRAIN = 'input/train'
    ALT_TEST  = 'input/test'
    if os.path.exists(ALT_TRAIN):
        TRAIN_DIR, TEST_DIR = ALT_TRAIN, ALT_TEST

assert os.path.exists(TRAIN_DIR), f"Train directory not found: {TRAIN_DIR}"
assert os.path.exists(TEST_DIR),  f"Test directory not found: {TEST_DIR}"

print("Dataset directories verified successfully.")
print(f"Train path : {TRAIN_DIR}")
print(f"Test path  : {TEST_DIR}")
print("Train classes:", sorted(os.listdir(TRAIN_DIR)))
print("Test classes :", sorted(os.listdir(TEST_DIR)))


AssertionError: Train directory not found: Dataset/train

In [None]:
# STEP 3: Data Loading + Preprocessing 
import os, math, tensorflow as tf

# --- Config ---
IMG_SIZE = (48, 48)
BATCH_SIZE = 64        
SEED = 42
AUTOTUNE = tf.data.AUTOTUNE

# --- Dataset paths (auto-check for both working/input dirs) ---
train_dir = 'Dataset/train'
test_dir  = 'Dataset/test'

if not os.path.exists(train_dir):
    alt_train = 'input/train'
    alt_test  = 'input/test'
    if os.path.exists(alt_train):
        train_dir, test_dir = alt_train, alt_test

assert os.path.exists(train_dir), f"Train path not found: {train_dir}"
assert os.path.exists(test_dir),  f"Test path not found: {test_dir}"

print(f" Dataset verified:\n   Train: {train_dir}\n   Test : {test_dir}")

# --- Create image datasets ---
train_ds = tf.keras.utils.image_dataset_from_directory(
    train_dir,
    labels='inferred',
    label_mode='categorical',
    color_mode='grayscale',
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=True,
    seed=SEED
)

val_ds = tf.keras.utils.image_dataset_from_directory(
    test_dir,
    labels='inferred',
    label_mode='categorical',
    color_mode='grayscale',
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=False
)

class_names = train_ds.class_names
print("Classes:", class_names)

# --- Data Augmentation (helps prevent overfitting, boosts generalization) ---
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.1),
    tf.keras.layers.RandomZoom(0.1),
    tf.keras.layers.RandomTranslation(0.1, 0.1)
])

# --- Normalization Layer ---
normalization = tf.keras.layers.Rescaling(1./255)

def preprocess_train(images, labels):
    images = tf.cast(images, tf.float32)
    images = normalization(images)
    images = data_augmentation(images)
    return images, labels

def preprocess_val(images, labels):
    images = tf.cast(images, tf.float32)
    images = normalization(images)
    return images, labels

train_ds = (train_ds
            .map(preprocess_train, num_parallel_calls=AUTOTUNE)
            .cache()
            .shuffle(2048)
            .prefetch(AUTOTUNE))

val_ds = (val_ds
          .map(preprocess_val, num_parallel_calls=AUTOTUNE)
          .cache()
          .prefetch(AUTOTUNE))

print(f"tf.data pipeline ready | IMG_SIZE: {IMG_SIZE} | BATCH: {BATCH_SIZE}")


In [None]:
# STEP 4: Data Augmentation
import tensorflow as tf
import matplotlib.pyplot as plt

data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal", seed=42),
    tf.keras.layers.RandomRotation(0.07, seed=42),     # slightly reduced
    tf.keras.layers.RandomZoom(0.07, seed=42),
    tf.keras.layers.RandomTranslation(0.05, 0.05, seed=42),
], name="data_augmentation")

print("Data augmentation pipeline initialized successfully.")

# --- Optional visualization ---
VISUALIZE = True  # set False during training to save time
if VISUALIZE:
    import numpy as np
    for images, labels in train_ds.take(1):
        aug_images = data_augmentation(images)
        plt.figure(figsize=(10, 6))
        for i in range(12):
            ax = plt.subplot(3, 4, i + 1)
            plt.imshow(tf.squeeze(aug_images[i]), cmap='gray')
            plt.axis("off")
        plt.suptitle("Augmented Samples", fontsize=14)
        plt.show()
        break


In [None]:
# STEP 5: Enhanced Lightweight CNN
import tensorflow as tf
from tensorflow.keras import layers, models, mixed_precision

# --- GPU & precision ---
if tf.config.list_physical_devices('GPU'):
    mixed_precision.set_global_policy('mixed_float16')
    print("Mixed precision policy:", mixed_precision.global_policy())

# --- Build model ---
def build_light_cnn(input_shape=(48,48,1), n_classes=7):
    inputs = layers.Input(shape=input_shape, name="input_image")
    x = data_augmentation(inputs)

    # Block 1
    x = layers.Conv2D(32, 3, padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)
    x = layers.MaxPooling2D(2)(x)
    x = layers.Dropout(0.25)(x)

    # Block 2
    x = layers.Conv2D(64, 3, padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)
    x = layers.MaxPooling2D(2)(x)

    # Block 3
    x = layers.Conv2D(128, 3, padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)
    x = layers.MaxPooling2D(2)(x)
    x = layers.Dropout(0.35)(x)

    # Block 4 (extra compact feature extractor)
    x = layers.Conv2D(256, 3, padding='same', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)
    x = layers.GlobalAveragePooling2D()(x)

    # Dense head
    x = layers.Dense(128, kernel_initializer='he_normal')(x)
    x = layers.LeakyReLU(0.1)(x)
    x = layers.Dropout(0.4)(x)

    outputs = layers.Dense(n_classes, activation='softmax', dtype='float32')(x)
    return models.Model(inputs, outputs, name="LightCNN_Enhanced")

# --- Compile ---
model = build_light_cnn()
model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()
print(f"✅ Model parameters: {model.count_params():,}")


In [None]:
# STEP 6: Callbacks 
import os, datetime, tensorflow as tf

# --- Paths ---
CHECKPOINT_DIR = "working/checkpoints"
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

BEST_MODEL_PATH = os.path.join(CHECKPOINT_DIR, "best_model.keras")
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
LOG_DIR = os.path.join(CHECKPOINT_DIR, f"logs_{timestamp}")
os.makedirs(LOG_DIR, exist_ok=True)

# --- Define Callbacks ---
callbacks = [
    # Best model saving
    tf.keras.callbacks.ModelCheckpoint(
        filepath=BEST_MODEL_PATH,
        monitor='val_accuracy',
        mode='max',
        save_best_only=True,
        verbose=1
    ),

    # Early stopping to prevent overfitting
    tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=6,
        mode='max',
        restore_best_weights=True,
        verbose=1
    ),

    # Learning rate scheduler
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=3,
        min_lr=1e-6,
        verbose=1
    ),

    # CSV logger for later visualization
    tf.keras.callbacks.CSVLogger(
        os.path.join(CHECKPOINT_DIR, "training_log.csv"),
        append=False
    ),

    # TensorBoard (optional, skip if not needed)
    tf.keras.callbacks.TensorBoard(
        log_dir=LOG_DIR,
        histogram_freq=1
    )
]

print("Callbacks configured successfully.")
print(f"Best model path : {BEST_MODEL_PATH}")
print(f"TensorBoard log : {LOG_DIR}")


In [None]:
# STEP 7: Model Training — GPU Optimized & Resume-Safe
import time, os, tensorflow as tf
from tensorflow.keras.models import load_model

# ---- CONFIG ----
EPOCHS = 25                    
BEST_MODEL_PATH = "working/checkpoints/best_model.keras"

# ---- Resume if checkpoint exists ----
initial_epoch = 0
if os.path.exists(BEST_MODEL_PATH):
    try:
        print("Found previous best model — loading for resume.")
        model = load_model(BEST_MODEL_PATH)
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=5e-4),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
    except Exception as e:
        print("Failed to load previous checkpoint — starting fresh:", e)

# ---- Optional: Class Weights (to fix imbalance) ----
# If you calculated class counts earlier, plug them here.
class_weights = {
    0: 1.5,  # angry
    1: 1.0,  # disgust
    2: 1.2,  # fear
    3: 0.9,  # happy
    4: 1.3,  # sad
    5: 1.0,  # surprise
    6: 1.1   # neutral
}

# ---- Timer Callback ----
class EpochTimer(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        self.start = time.time()
    def on_epoch_end(self, epoch, logs=None):
        print(f"⏱ Epoch {epoch+1} — "
              f"val_acc: {logs.get('val_accuracy'):.4f}, "
              f"val_loss: {logs.get('val_loss'):.4f}")
    def on_train_end(self, logs=None):
        print(f"Total training time: {(time.time()-self.start)/60:.2f} min")

epoch_timer = EpochTimer()

# ---- Train ----
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=[*callbacks, epoch_timer],
    class_weight=class_weights,     
    verbose=1
)

In [None]:
#STEP 8: Resume / Fine-Tune Training — GPU Optimized (Target 70–80%)
import os, time, tensorflow as tf
from tensorflow.keras.models import load_model

# === Paths ===
CHECKPOINT_DIR = "working/checkpoints"
BEST_MODEL_PATH = os.path.join(CHECKPOINT_DIR, "best_model.keras")

# === Fine-Tuning Config ===
INITIAL_EPOCH = 3
TOTAL_EPOCHS  = 25        # extended fine-tuning
BASE_LR       = 1e-4
FINE_TUNE_LR  = 1e-5      # smaller LR for unfreezing stage

# === Load Best Model or Build Pretrained Backbone ===
if os.path.exists(BEST_MODEL_PATH):
    print("Found previous checkpoint — loading MobileNetV2 fine-tuning model.")
    model = load_model(BEST_MODEL_PATH)
else:
    print("No checkpoint found — building new fine-tuning model.")

    base_model = tf.keras.applications.MobileNetV2(
        input_shape=(48, 48, 3),
        include_top=False,
        weights='imagenet',
        pooling='avg'
    )
    base_model.trainable = False  # freeze base initially

    inputs = tf.keras.Input(shape=(48, 48, 1))
    x = tf.keras.layers.Conv2D(3, (3, 3), padding='same')(inputs)  # convert grayscale to RGB
    x = data_augmentation(x)
    x = tf.keras.applications.mobilenet_v2.preprocess_input(x)
    x = base_model(x, training=False)
    x = tf.keras.layers.Dense(256, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.4)(x)
    outputs = tf.keras.layers.Dense(7, activation='softmax', dtype='float32')(x)
    model = tf.keras.Model(inputs, outputs)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=BASE_LR),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

# === Stage 1: Warmup Training (frozen base) ===
print("Stage 1: Warming up new classifier layers...")
warmup_history = model.fit(
    train_ds,
    validation_data=val_ds,
    initial_epoch=INITIAL_EPOCH,
    epochs=10,
    callbacks=callbacks,
    verbose=1
)

# === Stage 2: Fine-tuning top 50% of backbone ===
fine_tune_at = len(model.layers) // 2
for layer in model.layers[fine_tune_at:]:
    if not isinstance(layer, tf.keras.layers.BatchNormalization):
        layer.trainable = True

print(f"Unfroze {len(model.layers) - fine_tune_at} layers for fine-tuning.")

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=FINE_TUNE_LR),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# === Fine-tuning ===
print(" Stage 2: Fine-tuning with lower LR...")
start_time = time.time()

history_ft = model.fit(
    train_ds,
    validation_data=val_ds,
    initial_epoch=10,
    epochs=TOTAL_EPOCHS,
    callbacks=callbacks,
    verbose=1
)

elapsed = (time.time() - start_time) / 60
print(f"Fine-tuning completed in {elapsed:.2f} minutes.")
print("Expected Accuracy: 70–80% range with full GPU training.")
