In [None]:
# Robust Emotion Recognition with MobileNetV2

import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
import gc
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Dense, Dropout, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
import warnings
warnings.filterwarnings('ignore')

# Clear any existing models from memory
tf.keras.backend.clear_session()
gc.collect()

print("TensorFlow Version:", tf.__version__)
print("GPU Available:", tf.config.list_physical_devices('GPU'))

# Step 1: Define Paths and Parameters
train_dir = "/content/drive/MyDrive/FER2013/train"
val_dir = "/content/drive/MyDrive/FER2013/test"
model_save_path = '/content/drive/MyDrive/FER2013/emotion_mobileNet.h5'
checkpoint_path = '/content/drive/MyDrive/FER2013/best_model_checkpoint.h5'

IMG_SIZE = 128
BATCH_SIZE = 16  # Reduced batch size for stability
EPOCHS = 15

# Step 2: Check if directories exist
def check_directories():
    if not os.path.exists(train_dir):
        print(f"ERROR: Training directory not found: {train_dir}")
        return False
    if not os.path.exists(val_dir):
        print(f"ERROR: Validation directory not found: {val_dir}")
        return False

    train_classes = os.listdir(train_dir)
    val_classes = os.listdir(val_dir)
    print(f"Training classes found: {train_classes}")
    print(f"Validation classes found: {val_classes}")
    return True

if not check_directories():
    print("Please check your directory paths and try again.")
    exit()

# Step 3: Configure GPU memory growth (prevents GPU memory issues)
try:
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU memory growth configured.")
except RuntimeError as e:
    print(f"GPU configuration error: {e}")

# Step 4: Load and Preprocess Data with error handling
def create_data_generators():
    try:
        train_datagen = ImageDataGenerator(
            rescale=1./255,
            rotation_range=15,  # Reduced for stability
            zoom_range=0.1,
            width_shift_range=0.1,
            height_shift_range=0.1,
            shear_range=0.1,
            horizontal_flip=True,
            fill_mode="nearest"
        )

        val_datagen = ImageDataGenerator(rescale=1./255)

        train_gen = train_datagen.flow_from_directory(
            train_dir,
            target_size=(IMG_SIZE, IMG_SIZE),
            color_mode='rgb',
            batch_size=BATCH_SIZE,
            class_mode='categorical',
            shuffle=True
        )

        val_gen = val_datagen.flow_from_directory(
            val_dir,
            target_size=(IMG_SIZE, IMG_SIZE),
            color_mode='rgb',
            batch_size=BATCH_SIZE,
            class_mode='categorical',
            shuffle=False
        )

        print(f"Training samples: {train_gen.samples}")
        print(f"Validation samples: {val_gen.samples}")
        print(f"Number of classes: {train_gen.num_classes}")
        print(f"Class indices: {train_gen.class_indices}")

        return train_gen, val_gen

    except Exception as e:
        print(f"Error creating data generators: {e}")
        return None, None

train_gen, val_gen = create_data_generators()
if train_gen is None or val_gen is None:
    print("Failed to create data generators. Exiting.")
    exit()

# Step 5: Build the Model with error handling
def build_model(num_classes):
    try:
        # Clear session before building
        tf.keras.backend.clear_session()

        base = MobileNetV2(
            weights='imagenet',
            include_top=False,
            input_shape=(IMG_SIZE, IMG_SIZE, 3)
        )
        base.trainable = False  # Freeze base model initially

        # Add custom layers
        x = GlobalAveragePooling2D()(base.output)
        x = Dropout(0.3)(x)  # Reduced dropout
        x = Dense(128, activation='relu')(x)
        x = Dropout(0.2)(x)
        predictions = Dense(num_classes, activation='softmax')(x)

        model = Model(inputs=base.input, outputs=predictions)

        model.compile(
            optimizer=Adam(learning_rate=1e-4),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )

        print("Model built successfully!")
        model.summary()
        return model

    except Exception as e:
        print(f"Error building model: {e}")
        return None

model = build_model(train_gen.num_classes)
if model is None:
    print("Failed to build model. Exiting.")
    exit()

# Step 6: Define Callbacks for robust training
def create_callbacks():
    callbacks = [
        # Save best model during training
        ModelCheckpoint(
            checkpoint_path,
            monitor='val_accuracy',
            save_best_only=True,
            save_weights_only=False,
            verbose=1
        ),

        # Reduce learning rate when loss plateaus
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=3,
            min_lr=1e-7,
            verbose=1
        ),

        # Early stopping to prevent overfitting
        EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True,
            verbose=1
        )
    ]
    return callbacks

callbacks = create_callbacks()

# Step 7: Train the Model with comprehensive error handling
def train_model():
    try:
        print("Starting training...")

        # Calculate steps per epoch
        train_steps = train_gen.samples // BATCH_SIZE
        val_steps = val_gen.samples // BATCH_SIZE

        print(f"Train steps per epoch: {train_steps}")
        print(f"Validation steps per epoch: {val_steps}")

        history = model.fit(
            train_gen,
            steps_per_epoch=train_steps,
            epochs=EPOCHS,
            validation_data=val_gen,
            validation_steps=val_steps,
            callbacks=callbacks,
            verbose=1
        )

        print("Training completed successfully!")
        return history

    except Exception as e:
        print(f"Training error: {e}")
        print("Attempting to save current model state...")
        try:
            model.save('/content/drive/MyDrive/FER2013/emergency_save.h5')
            print("Emergency model saved!")
        except:
            print("Could not save emergency model.")
        return None

# Execute training
history = train_model()

# Step 8: Plot Training History (only if training succeeded)
def plot_history(history):
    if history is None:
        print("No training history to plot.")
        return

    try:
        plt.figure(figsize=(12, 4))

        plt.subplot(1, 2, 1)
        plt.plot(history.history['accuracy'], label='Train Accuracy', linewidth=2)
        plt.plot(history.history['val_accuracy'], label='Validation Accuracy', linewidth=2)
        plt.title('Model Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)

        plt.subplot(1, 2, 2)
        plt.plot(history.history['loss'], label='Train Loss', linewidth=2)
        plt.plot(history.history['val_loss'], label='Validation Loss', linewidth=2)
        plt.title('Model Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()

        # Save plots
        plt.savefig('/content/drive/MyDrive/FER2013/training_plots.png', dpi=300, bbox_inches='tight')
        print("Training plots saved!")

    except Exception as e:
        print(f"Error plotting history: {e}")

plot_history(history)

# Step 9: Save the Final Model
def save_final_model():
    try:
        # Load the best model from checkpoint if it exists
        if os.path.exists(checkpoint_path):
            print("Loading best model from checkpoint...")
            model_to_save = load_model(checkpoint_path)
        else:
            model_to_save = model

        model_to_save.save(model_save_path)
        print(f"Final model saved to: {model_save_path}")

        # Also save in SavedModel format for better compatibility
        savedmodel_path = '/content/drive/MyDrive/FER2013/emotion_model_savedmodel'
        model_to_save.save(savedmodel_path, save_format='tf')
        print(f"SavedModel format saved to: {savedmodel_path}")

    except Exception as e:
        print(f"Error saving model: {e}")

save_final_model()

# Step 10: Load Model and Setup Prediction Function
def load_trained_model():
    try:
        if os.path.exists(model_save_path):
            loaded_model = load_model(model_save_path)
            print("Model loaded successfully!")
            return loaded_model
        else:
            print("Saved model not found, using current model.")
            return model
    except Exception as e:
        print(f"Error loading model: {e}")
        return None

# Load the trained model
final_model = load_trained_model()

# Step 11: Setup Face Detection and Prediction
try:
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    emotions = list(train_gen.class_indices.keys())
    print(f"Emotion classes: {emotions}")
except Exception as e:
    print(f"Error setting up face detection: {e}")

def predict_emotion(img_path, model=final_model):
    """
    Robust emotion prediction function with comprehensive error handling
    """
    try:
        if model is None:
            print("No model available for prediction.")
            return

        if not os.path.exists(img_path):
            print(f"Image file not found: {img_path}")
            return

        # Load and validate image
        img = cv2.imread(img_path)
        if img is None:
            print(f"Failed to load image: {img_path}")
            return

        print(f"Original image shape: {img.shape}")

        # Convert to grayscale for face detection
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

        # Detect faces
        faces = face_cascade.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=5,
            minSize=(30, 30)
        )

        if len(faces) == 0:
            print("No faces detected in the image.")
            plt.figure(figsize=(10, 6))
            plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            plt.axis('off')
            plt.title("No faces detected")
            plt.show()
            return

        print(f"Found {len(faces)} face(s)")

        # Process each detected face
        for i, (x, y, w, h) in enumerate(faces):
            # Extract face region
            face = img[y:y+h, x:x+w]

            # Preprocess face for prediction
            face_resized = cv2.resize(face, (IMG_SIZE, IMG_SIZE))
            face_normalized = face_resized.astype('float32') / 255.0
            face_batch = np.expand_dims(face_normalized, axis=0)

            # Make prediction
            pred = model.predict(face_batch, verbose=0)
            emotion_idx = np.argmax(pred)
            emotion_label = emotions[emotion_idx]
            confidence = pred[0][emotion_idx] * 100

            print(f"Face {i+1}: {emotion_label} ({confidence:.1f}% confidence)")

            # Draw rectangle and label
            cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
            label_text = f"{emotion_label} ({confidence:.1f}%)"
            cv2.putText(img, label_text, (x, y-10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

        # Display result
        plt.figure(figsize=(12, 8))
        plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        plt.axis('off')
        plt.title("Emotion Detection Results")
        plt.show()

    except Exception as e:
        print(f"Error in emotion prediction: {e}")
        import traceback
        traceback.print_exc()

# Step 12: Test the Prediction Function
print("\n" + "="*50)
print("TRAINING COMPLETE!")
print("="*50)
print("To test emotion prediction, use:")
print('predict_emotion("/path/to/your/image.jpg")')
predict_emotion('/content/drive/MyDrive/FER2013/test/angry/PrivateTest_10131363.jpg')

# Memory cleanup
gc.collect()
tf.keras.backend.clear_session()

print("\nModel training and setup completed successfully!")
print(f"Model saved at: {model_save_path}")
print("You can now use the predict_emotion() function to test images.")




TensorFlow Version: 2.19.0
GPU Available: []
Training classes found: ['angry', 'fear', 'neutral', 'disgust', 'happy', 'surprise', 'sad']
Validation classes found: ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad']
Found 28889 images belonging to 7 classes.
Found 5303 images belonging to 6 classes.
Training samples: 28889
Validation samples: 5303
Number of classes: 7
Class indices: {'angry': 0, 'disgust': 1, 'fear': 2, 'happy': 3, 'neutral': 4, 'sad': 5, 'surprise': 6}
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_128_no_top.h5
[1m9406464/9406464[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Model built successfully!


Starting training...
Train steps per epoch: 1805
Validation steps per epoch: 331
Epoch 1/15
[1m   8/1805[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m2:50:59[0m 6s/step - accuracy: 0.1290 - loss: 2.8529