In [None]:
# ================================
# Step 1: Import Libraries & Configure GPU
# ================================

# Core libraries for data handling and visualization
import os
import shutil
import random
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# TensorFlow and Keras for building and training the model
import tensorflow as tf
from tensorflow.keras import layers, models, applications, optimizers, losses

# Scikit-learn for evaluation metrics
from sklearn.metrics import classification_report, confusion_matrix

# -------------------------
# Enable Mixed Precision
# -------------------------
# Mixed precision speeds up training on GPUs with Tensor cores and reduces memory usage
tf.keras.mixed_precision.set_global_policy('mixed_float16')
print("✅ Mixed precision training enabled.")

# -------------------------
# GPU Memory Configuration
# -------------------------
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print(f"✅ GPU memory growth enabled for {len(gpus)} GPU(s).")
    except RuntimeError as e:
        print(f"⚠️ GPU configuration error: {e}")

# -------------------------
# Set Random Seed for Reproducibility
# -------------------------
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)
random.seed(SEED)
print(f"✅ Random seed set to {SEED} for reproducibility.")

print("All libraries imported and environment configured successfully.")


In [None]:
# ---
# ## Step 2: Define Configuration and Paths
# ========================================

# --- Define your new broad age groups here ---
# Using a '01_', '02_' prefix ensures they are sorted correctly by name
AGE_GROUPS = {
    '01_Child': (1, 12),
    '02_Teenager': (13, 19),
    '03_YoungAdult': (20, 39),
    '04_MiddleAgedAdult': (40, 59),
    '05_Senior': (60, 120)  # High upper limit to include all older ages
}
NUM_CLASSES = len(AGE_GROUPS)

# --- Original source paths (read-only) ---
SOURCE_TRAIN_DIR = '/kaggle/input/age-prediction-dataset/content/organized_dataset'
SOURCE_VALID_DIR = '/kaggle/input/age-prediction-test-datasets/test/'

# --- New writable paths for your reorganized dataset ---
WORKING_DIR = '/kaggle/working/'
GROUPED_TRAIN_DIR = os.path.join(WORKING_DIR, 'grouped_train/')
GROUPED_VALID_DIR = os.path.join(WORKING_DIR, 'grouped_valid/')

# --- Model and training constants ---
IMG_SIZE = (224, 224)
BATCH_SIZE = 64
INITIAL_EPOCHS = 25  # Increased epochs as EarlyStopping will handle it
FINE_TUNE_EPOCHS = 25

print(f"Project configured for {NUM_CLASSES} age groups.")


In [1]:
# ---
# ## Step 3: Reorganize Original Datasets into Broad Categories
# =============================================================

def reorganize_original_data(source_dir, destination_dir, age_groups, is_train_set=True):
    """
    Copies images directly from the original source folders into new destination folders.
    """
    print(f"Reorganizing data from {source_dir}...")
    for group_name in age_groups.keys():
        os.makedirs(os.path.join(destination_dir, group_name), exist_ok=True)
        
    if not os.path.exists(source_dir):
        print(f"ERROR: Source directory not found at {source_dir}"); return

    for folder_name in sorted(os.listdir(source_dir)):
        source_folder_path = os.path.join(source_dir, folder_name)
        if not os.path.isdir(source_folder_path): continue
            
        try:
            # Extract age number from folder name
            age = int(folder_name.split('_')[-1]) if is_train_set else int(folder_name)

            # Find which group this age belongs to and copy files
            for group_name, (min_age, max_age) in age_groups.items():
                if min_age <= age <= max_age:
                    destination_group_path = os.path.join(destination_dir, group_name)
                    for image_file in os.listdir(source_folder_path):
                        source_path = os.path.join(source_folder_path, image_file)
                        if os.path.isfile(source_path):
                            shutil.copy(source_path, destination_group_path)
                    break
        except (ValueError, IndexError):
            continue
            
    print(f"Reorganization complete. New dataset is at: {destination_dir}\n")

# Run the reorganization for both training and validation sets
reorganize_original_data(SOURCE_TRAIN_DIR, GROUPED_TRAIN_DIR, AGE_GROUPS, is_train_set=True)
reorganize_original_data(SOURCE_VALID_DIR, GROUPED_VALID_DIR, AGE_GROUPS, is_train_set=False)


NameError: name 'SOURCE_TRAIN_DIR' is not defined

In [None]:
# ## Step 4: Create High-Performance tf.data Pipelines (Corrected)
# ================================================================

def create_dataset(directory, augment=False):
    # Load the dataset from the directory
    dataset = tf.keras.utils.image_dataset_from_directory(
        directory,
        labels='inferred',
        label_mode='categorical',
        batch_size=BATCH_SIZE,
        image_size=IMG_SIZE,
        shuffle=True if augment else False
    )
    
    # Define the data augmentation pipeline as separate layers
    data_augmentation = models.Sequential([
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.1),
        layers.RandomZoom(0.1),
        layers.RandomContrast(0.1),
        # --- THIS LINE HAS BEEN CORRECTED ---
        layers.RandAugment(value_range=(0, 255), num_ops=3) # Renamed 'augmentations_per_image' to 'num_ops'
    ], name='data_augmentation')

    # Rescale pixel values from [0, 255] to [0, 1]
    rescale = layers.Rescaling(1./255)
    
    # Apply transformations
    dataset = dataset.map(lambda x, y: (rescale(x), y), num_parallel_calls=tf.data.AUTOTUNE)
    if augment:
        dataset = dataset.map(lambda x, y: (data_augmentation(x, training=True), y), num_parallel_calls=tf.data.AUTOTUNE)

    # Use buffered prefetching to load data in the background
    if not augment:
        dataset = dataset.cache()
        
    return dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

train_ds = create_dataset(GROUPED_TRAIN_DIR, augment=True)
valid_ds = create_dataset(GROUPED_VALID_DIR)

print("High-performance tf.data pipelines created successfully.")

In [None]:
# ---
# ## Step 5: Build the High-Performance Model
# ============================================

def build_model(num_classes):
    base_model = applications.EfficientNetV2B0(
        weights='imagenet', 
        include_top=False, 
        input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3)
    )
    base_model.trainable = False

    inputs = layers.Input(shape=(IMG_SIZE[0], IMG_SIZE[1], 3))
    x = base_model(inputs, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.BatchNormalization()(x) # BN before Dense can sometimes be more stable
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation='softmax', dtype='float32')(x) # Output layer in float32 for stability
    
    model = models.Model(inputs, outputs)
    return model

model = build_model(NUM_CLASSES)
print("Model built with EfficientNetV2B0 base.")


In [None]:
# ## Step 6: Train the Model Head (Corrected)
# ============================================
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping # <-- ADD THIS LINE

# CosineDecay is a modern learning rate schedule that often leads to better results
lr_schedule = optimizers.schedules.CosineDecay(
    initial_learning_rate=1e-3, 
    decay_steps=len(train_ds) * INITIAL_EPOCHS
)

optimizer = optimizers.AdamW(learning_rate=lr_schedule, weight_decay=1e-4)

model.compile(
    optimizer=optimizer,
    loss=losses.CategoricalCrossentropy(),
    metrics=['accuracy']
)

# Callbacks
checkpoint = ModelCheckpoint('best_model.keras', save_best_only=True, monitor='val_accuracy', mode='max', verbose=1)
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)

print("\nStarting initial training of the model head... 🚀")
history = model.fit(
    train_ds,
    epochs=INITIAL_EPOCHS,
    validation_data=valid_ds,
    callbacks=[checkpoint, early_stopping]
)

In [None]:
# ## Step 7: Fine-Tune the Model
# ==============================

# Unfreeze the base model to allow its weights to be updated
base_model.trainable = True

# Re-compile the model with a very low learning rate for fine-tuning
# We use a new cosine decay schedule starting from a much smaller learning rate
finetune_lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
    initial_learning_rate=1e-5, 
    decay_steps=len(train_ds) * FINE_TUNE_EPOCHS
)

# Use the same AdamW optimizer but with the new, lower learning rate
optimizer_finetune = tf.keras.optimizers.AdamW(
    learning_rate=finetune_lr_schedule, 
    weight_decay=1e-4
)

model.compile(
    optimizer=optimizer_finetune,
    loss=tf.keras.losses.CategoricalCrossentropy(),
    metrics=['accuracy']
)

print("\nModel re-compiled for fine-tuning.")
model.summary()

# Continue training from where the last phase left off
total_epochs = INITIAL_EPOCHS + FINE_TUNE_EPOCHS
initial_epoch_for_fine_tune = history.epoch[-1] + 1 if history.epoch else 0

print("\nStarting fine-tuning... ✨")

history_fine_tune = model.fit(
    train_ds,
    epochs=total_epochs,
    initial_epoch=initial_epoch_for_fine_tune,
    validation_data=valid_ds,
    callbacks=[checkpoint, early_stopping] # Re-use the same callbacks
)

print("\nFine-tuning complete!")

In [None]:
# ## Step 8: Visualize Results and Evaluate
# ==========================================

# Combine training histories from both phases for a complete plot
# Use .get() to safely access keys that might not exist if a training phase was skipped
acc = history.history.get('accuracy', []) + history_fine_tune.history.get('accuracy', [])
val_acc = history.history.get('val_accuracy', []) + history_fine_tune.history.get('val_accuracy', [])
loss = history.history.get('loss', []) + history_fine_tune.history.get('loss', [])
val_loss = history.history.get('val_loss', []) + history_fine_tune.history.get('val_loss', [])

# Check if training actually happened before trying to plot
if acc:
    plt.figure(figsize=(14, 6))
    
    # Plot Accuracy
    plt.subplot(1, 2, 1)
    plt.plot(acc, label='Training Accuracy')
    plt.plot(val_acc, label='Validation Accuracy')
    # Add a vertical line to show where fine-tuning started
    if len(history.epoch) > 0 and len(history.epoch) < INITIAL_EPOCHS:
        plt.axvline(len(history.epoch) - 1, color='gray', linestyle='--', label='Start Fine-Tuning')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')

    # Plot Loss
    plt.subplot(1, 2, 2)
    plt.plot(loss, label='Training Loss')
    plt.plot(val_loss, label='Validation Loss')
    if len(history.epoch) > 0 and len(history.epoch) < INITIAL_EPOCHS:
        plt.axvline(len(history.epoch) - 1, color='gray', linestyle='--', label='Start Fine-Tuning')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    
    plt.show()

# --- Final Evaluation ---
# Load the best weights that were saved by ModelCheckpoint
print("\n--- Loading best model and performing final evaluation ---")
model.load_weights('best_model.keras')
final_loss, final_accuracy = model.evaluate(valid_ds, verbose=0)
print(f"Final Validation Loss: {final_loss:.4f}")
print(f"Final Validation Accuracy: {final_accuracy*100:.2f}%")

# --- Generate Classification Report and Confusion Matrix ---
print("\n--- Classification Report ---")
# Get the ground truth labels from the validation dataset
y_true = np.concatenate([y for x, y in valid_ds], axis=0)
y_true_indices = np.argmax(y_true, axis=1)

# Make predictions on the validation dataset
y_pred_probs = model.predict(valid_ds)
y_pred_indices = np.argmax(y_pred_probs, axis=1)

# Get the class names from the generator
class_labels = list(valid_ds.class_names)

# Print the detailed classification report
print(classification_report(y_true_indices, y_pred_indices, target_names=class_labels))

# Plot the confusion matrix
print("\n--- Confusion Matrix ---")
cm = confusion_matrix(y_true_indices, y_pred_indices)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='g', xticklabels=class_labels, yticklabels=class_labels, cmap='Blues')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

# Loading Pre-trained Models and Creating Inference Pipeline
Let's load our pre-trained generalist and specialist models and create a pipeline for age prediction.

# Three-Step Age Prediction Pipeline

Our age prediction system uses a sophisticated three-step approach:

1. **Feature Extraction** (VGG-Face Model)
   - Uses `vgg_face_weights.h5`
   - Extracts rich facial features from images

2. **Generalist Model**
   - Uses `generalist_model.h5` and `label_encoder.pkl`
   - Predicts broad age group categories

3. **Specialist Model**
   - Uses `specialist_model.h5`
   - Makes precise age predictions within the predicted group

In [None]:
# Import pickle for loading the label encoder
import pickle

# Load all required models
print("Loading models...")

# 1. Load VGG-Face model for feature extraction
vgg_model = tf.keras.models.load_model('vgg_face_weights.h5')
print("✅ VGG-Face feature extractor loaded")

# 2. Load Generalist model and label encoder
generalist_model = tf.keras.models.load_model('generalist_model.h5')
with open('label_encoder.pkl', 'rb') as f:
    label_encoder = pickle.load(f)
print("✅ Generalist model and label encoder loaded")

# 3. Load Specialist model
specialist_model = tf.keras.models.load_model('specialist_model.h5')
print("✅ Specialist model loaded")

def preprocess_image(image_path, target_size=(224, 224)):
    """
    Preprocess an image for model inference
    """
    # Read and resize the image
    img = tf.keras.preprocessing.image.load_img(image_path, target_size=target_size)
    # Convert to array and add batch dimension
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    img_array = tf.expand_dims(img_array, 0)
    # Rescale the image
    img_array = img_array / 255.0
    return img_array

def extract_features(image_array):
    """
    Extract facial features using VGG-Face model
    """
    features = vgg_model.predict(image_array, verbose=0)
    return features

def predict_age(image_path):
    """
    Three-step age prediction process:
    1. Extract facial features using VGG-Face
    2. Predict age group using Generalist model
    3. Predict precise age using Specialist model
    """
    # Step 1: Preprocess image and extract features
    processed_image = preprocess_image(image_path)
    features = extract_features(processed_image)
    
    # Step 2: Get age range prediction from generalist model
    range_prediction = generalist_model.predict(features, verbose=0)
    predicted_range_idx = np.argmax(range_prediction[0])
    predicted_range = label_encoder.inverse_transform([predicted_range_idx])[0]
    
    # Get the age range bounds
    min_age, max_age = AGE_GROUPS[predicted_range]
    
    # Step 3: Get specific age prediction from specialist model
    # Create combined input for specialist model [features, age_group_one_hot]
    age_group_one_hot = tf.keras.utils.to_categorical(predicted_range_idx, num_classes=len(AGE_GROUPS))
    specialist_input = [features, np.expand_dims(age_group_one_hot, 0)]
    
    age_prediction = specialist_model.predict(specialist_input, verbose=0)
    
    # Scale the specialist model's prediction to the predicted range
    predicted_age = min_age + (max_age - min_age) * age_prediction[0][0]
    predicted_age = round(float(predicted_age))
    
    return {
        'age_group': predicted_range,
        'predicted_age': predicted_age,
        'confidence': float(range_prediction[0][predicted_range_idx]),
        'features': features  # Include features in case needed for further analysis
    }

print("✅ Age prediction pipeline created successfully!")

NameError: name 'tf' is not defined

# Testing the Pipeline
Let's test our model pipeline with a sample image to ensure everything works correctly.

In [None]:
# Test the pipeline with a sample image
# Replace 'sample_image.jpg' with the path to your test image
test_image_path = 'sample_image.jpg'  # Update this path

try:
    result = predict_age(test_image_path)
    
    print("\nPrediction Results:")
    print("------------------")
    print(f"1. Age Group: {result['age_group']}")
    print(f"2. Predicted Age: {result['predicted_age']} years")
    print(f"3. Confidence: {result['confidence']*100:.2f}%")
    
    # Display the test image with predictions
    plt.figure(figsize=(8, 8))
    img = tf.keras.preprocessing.image.load_img(test_image_path)
    plt.imshow(img)
    plt.title(f"Age Prediction Results\n" + 
              f"Predicted Age: {result['predicted_age']} years\n" +
              f"Group: {result['age_group']}\n" +
              f"Confidence: {result['confidence']*100:.1f}%")
    plt.axis('off')
    plt.show()
    
except FileNotFoundError:
    print("Error: Test image file not found. Please update the test_image_path.")
except Exception as e:
    print(f"Error testing the pipeline: {str(e)}")