In [None]:
!python -m venv tfenv

c:\Python27\python.exe: No module named venv


In [3]:
!pip install -q tensorflow

In [7]:
%pip install -q scipy opencv-python scikit-learn


Note: you may need to restart the kernel to use updated packages.


In [None]:
!pip install -q tensorflow tensorflow-addons tensorflow-hub tensorflow-datasets
!pip install  gdown matplotlib seaborn 
!pip install keras-facenet



In [11]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, optimizers, losses, metrics, Model
# import tensorflow_hub as hub
from keras_facenet import FaceNet # type: ignore
import pandas as pd
import numpy as np
import os
from pathlib import Path
import zipfile
import gdown
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
# from google.colab import drive, files
import warnings
warnings.filterwarnings('ignore')

In [12]:
# drive.mount('/content/drive')
DATASET_PATH = "C:/Users/aa24afl/Downloads/FairFace"
# DATASET_PATH = "C:/Users/DELL/Downloads/datasets/FairFace"

In [None]:

if os.path.exists(DATASET_PATH):
    print("Path exists.")
else:
    print("Path does not exist.")

# Verify dataset structure
expected_files = [
    "train_labels.csv",
    "val_labels.csv",
    "train/",
    "val/"
]

print(f"Loading dataset from: {DATASET_PATH}")

# Check if all required files/folders exist
missing_files = []
for item in expected_files:
    if not os.path.exists(os.path.join(DATASET_PATH, item)):
        missing_files.append(item)

if missing_files:
    raise FileNotFoundError(
        f"Dataset incomplete. Missing: {missing_files}\n"
        f"Expected structure:\n"
        f"{DATASET_PATH}/\n"
        f"├── train_labels.csv\n"
        f"├── val_labels.csv\n"
        f"├── train/ [contains images]\n"
        f"└── val/   [contains images]"
    )


# Set paths for data loading
TRAIN_CSV_PATH = os.path.join(DATASET_PATH, "train_labels.csv")
VAL_CSV_PATH = os.path.join(DATASET_PATH, "val_labels.csv")
TRAIN_IMG_DIR = os.path.join(DATASET_PATH, "train")
VAL_IMG_DIR = os.path.join(DATASET_PATH, "val")

print("\nDataset structure verified:")
print(f"Training CSV:   {TRAIN_CSV_PATH}")
print(f"Validation CSV: {VAL_CSV_PATH}")
print(f"Training images: {TRAIN_IMG_DIR} ({len(os.listdir(TRAIN_IMG_DIR))} files)")
print(f"Validation images: {VAL_IMG_DIR} ({len(os.listdir(VAL_IMG_DIR))} files)")

In [14]:
class FairFaceDataProcessor:
    def __init__(self, csv_path, img_dir, img_size=160, sample_size=None, is_validation=False):
        self.img_size = img_size
        self.img_dir = img_dir
        self.is_validation = is_validation

        # Load CSV data
        self.df = pd.read_csv(csv_path)

        # Filter dataset to keep only valid age groups
        valid_age_groups = [
            '0-2', '3-9', '10-19', '20-29', '30-39',
            '40-49', '50-59', '60-69', 'more than 70'
        ]
        original_len = len(self.df)
        self.df = self.df[self.df['age'].isin(valid_age_groups)].reset_index(drop=True)
        print(f"Removed {original_len - len(self.df)} rows with invalid age groups")

        # Sample data if specified
        if sample_size and sample_size < len(self.df):
            self.df = self.df.sample(n=sample_size, random_state=42).reset_index(drop=True)
            print(f"Sampled {'validation' if is_validation else 'training'}: {len(self.df)} samples")

        # Initialize label encoders only once (use training data to fit)
        if not hasattr(self, 'age_encoder'):
            self.age_encoder = LabelEncoder()
            self.gender_encoder = LabelEncoder()
            self.race_encoder = LabelEncoder()

            # Fit encoders on this dataset
            self.age_encoder.fit(self.df['age'])
            self.gender_encoder.fit(self.df['gender'])
            self.race_encoder.fit(self.df['race'])

        # Encode labels
        self.df['age_encoded'] = self.age_encoder.transform(self.df['age'])
        # self.df['gender_encoded'] = self.gender_encoder.transform(self.df['gender'])
        self.df['gender_encoded'] = (self.df['gender'] == 'Male').astype(int)
        self.df['race_encoded'] = self.race_encoder.transform(self.df['race'])

        self.num_classes = {
            'age': len(self.age_encoder.classes_),
            'gender': len(self.gender_encoder.classes_),
            'race': len(self.race_encoder.classes_)
        }

        print(f"Classes - Age: {self.num_classes['age']}, Gender: {self.num_classes['gender']}, Race: {self.num_classes['race']}")
        if not is_validation:
            print(f"Age groups: {list(self.age_encoder.classes_)}")
            print(f"Gender groups: {list(self.gender_encoder.classes_)}")
            print(f"Race groups: {list(self.race_encoder.classes_)}")

    def filter_missing_files(self):
        """Filter out rows where image files don't exist"""
        print(f"Original dataset size: {len(self.df)}")

        # Check which files exist
        existing_files = []
        for idx, row in self.df.iterrows():
            img_path = os.path.join(self.img_dir, row['file'])
            if os.path.exists(img_path):
                existing_files.append(idx)

        # Filter dataframe to keep only existing files
        self.df = self.df.loc[existing_files].reset_index(drop=True)

        print(f"Filtered dataset size: {len(self.df)}")

        return self

    def load_and_preprocess_image(self, image_path, augment=False):
        """Load and preprocess single image"""
        try:
            # Check if file exists
            if not tf.io.gfile.exists(image_path):
                print(f"Warning: File not found: {image_path}")
                return tf.zeros([self.img_size, self.img_size, 3], dtype=tf.float32)

            image = tf.io.read_file(image_path)
            image = tf.image.decode_image(image, channels=3)
            image = tf.image.resize(image, [self.img_size, self.img_size])
            image = tf.cast(image, tf.float32) / 255.0

            if augment:
                # Data augmentation
                image = tf.image.random_flip_left_right(image)
                image = tf.image.random_brightness(image, 0.1)
                image = tf.image.random_contrast(image, 0.9, 1.1)
                image = tf.image.random_saturation(image, 0.9, 1.1)
                image = tf.image.random_hue(image, 0.05)

            # Normalize using ImageNet statistics (for FaceNet compatibility)
            image = tf.image.per_image_standardization(image)
            return image
        except Exception as e:
            print(f"Error loading image {image_path}: {e}")
            # Return black image if file not found
            return tf.zeros([self.img_size, self.img_size, 3], dtype=tf.float32)

    def create_dataset(self, batch_size=32, augment=False, shuffle=True):
        """Create TensorFlow dataset"""
        def generator():
            indices = np.arange(len(self.df))
            if shuffle:
                np.random.shuffle(indices)

            for idx in indices:
                row = self.df.iloc[idx]

                img_path = os.path.join(self.img_dir, row['file'])

                image = self.load_and_preprocess_image(img_path, augment)

                yield (
                    image,
                    {
                        'age_output': tf.cast(row['age_encoded'], dtype=tf.int32),
                        'gender_output': tf.cast(row['gender_encoded'], dtype=tf.int32),
                        'race_output': tf.cast(row['race_encoded'], dtype=tf.int32)
                    }
                )

        # Create dataset
        dataset = tf.data.Dataset.from_generator(
            generator,
            output_signature=(
                tf.TensorSpec(shape=(self.img_size, self.img_size, 3), dtype=tf.float32),
                {
                    'age_output': tf.TensorSpec(shape=(), dtype=tf.int32),
                    'gender_output': tf.TensorSpec(shape=(), dtype=tf.int32),
                    'race_output': tf.TensorSpec(shape=(), dtype=tf.int32)
                }
            )
        )

        if shuffle:
            dataset = dataset.shuffle(buffer_size=1000)

        dataset = dataset.batch(batch_size)
        dataset = dataset.prefetch(tf.data.AUTOTUNE)

        return dataset

In [15]:
class FaceNetMultiTask(tf.keras.Model):
    def __init__(self, num_age_classes, num_gender_classes, num_race_classes,
                 freeze_backbone=False):
        super(FaceNetMultiTask, self).__init__()

        self.backbone = FaceNet().model

        # Freeze backbone
        if freeze_backbone:
            self.backbone.trainable = False

        # # Classification heads
        # self.age_classifier = tf.keras.Sequential([
        #     layers.Dense(512, activation='relu'),
        #     layers.Dropout(0.7),
        #     layers.Dense(128, activation='relu'),
        #     layers.Dense(num_age_classes, activation='softmax', name='age_output')
        # ], name='age_head')

        # self.gender_classifier = tf.keras.Sequential([
        #     layers.Dense(512, activation='relu'),
        #     layers.Dropout(0.7),
        #     layers.Dense(128, activation='relu'),
        #     layers.Dense(1, activation='sigmoid', name='gender_output')
        # ], name='gender_head')

        # self.race_classifier = tf.keras.Sequential([
        #     layers.Dense(512, activation='relu'),
        #     layers.Dropout(0.7),
        #     layers.Dense(128, activation='relu'),
        #     layers.Dense(num_race_classes, activation='softmax', name='race_output')
        # ], name='race_head')

        # Shared dense layer
        self.shared_dense = layers.Dense(512, activation='relu', name='shared_dense')
        self.shared_dropout = layers.Dropout(0.7, name='shared_dropout')

        # Task-specific classification heads
        # age branch
        self.age_classifier = tf.keras.Sequential([
            layers.Dense(128, activation='relu', name='age_dense'),
            layers.Dense(num_age_classes, activation='softmax', name='age_output')
        ], name='age_head')

        # gender branch
        self.gender_classifier = tf.keras.Sequential([
            layers.Dense(1, activation='sigmoid', name='gender_output')
        ], name='gender_head')

        # race branch
        self.race_classifier = tf.keras.Sequential([
            layers.Dense(num_race_classes, activation='softmax', name='race_output')
        ], name='race_head')


    def call(self, inputs, training=None):
        # Get FaceNet embeddings
        embeddings = self.backbone(inputs, training=training)

        # Shared processing
        shared_features = self.shared_dense(embeddings, training=training)
        shared_features = self.shared_dropout(shared_features, training=training)

        # Task-specific predictions
        age_pred = self.age_classifier(shared_features, training=training)
        gender_pred = self.gender_classifier(shared_features, training=training)
        race_pred = self.race_classifier(shared_features, training=training)

        return {
            'age_output': age_pred,
            'gender_output': gender_pred,
            'race_output': race_pred
        }

def create_and_compile_model(num_age_classes, num_gender_classes, num_race_classes, freeze_backbone=False):
    """Create and compile the multi-task model """

    model = FaceNetMultiTask(
        num_age_classes=num_age_classes,
        num_gender_classes=num_gender_classes,
        num_race_classes=num_race_classes,
        freeze_backbone=freeze_backbone
    )

    # Build the model
    model.build((None, 160, 160, 3))

    # Optimizer with learning rate schedule
    initial_lr = 0.001

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=initial_lr),
        loss={
            'age_output': losses.SparseCategoricalCrossentropy(),
            'gender_output': tf.keras.losses.BinaryCrossentropy(),
            'race_output': losses.SparseCategoricalCrossentropy()
        },
        loss_weights={
            'age_output': 1.0,
            'gender_output': 1.0,
            'race_output': 1.0
        },
        metrics={
            'age_output': ['sparse_categorical_accuracy'],
            'gender_output': ['binary_accuracy'],
            'race_output': ['sparse_categorical_accuracy']
        }
    )

    print(f"\n Model compiled successfully!")
    print(f"   - Backbone frozen: {freeze_backbone}")
    print(f"   - Learning rate: {initial_lr}")

    return model

In [None]:
BATCH_SIZE = 32
IMG_SIZE = 160
EPOCHS = 20
SAMPLE_SIZE = 30000

print("Creating training data processor...")
train_processor = FairFaceDataProcessor(
    TRAIN_CSV_PATH,
    DATASET_PATH,
    img_size=IMG_SIZE,
    sample_size=SAMPLE_SIZE,
    is_validation=False
).filter_missing_files()

print("\nCreating validation data processor...")
val_processor = FairFaceDataProcessor(
    VAL_CSV_PATH,
    DATASET_PATH,
    img_size=IMG_SIZE,
    sample_size=SAMPLE_SIZE//5 if SAMPLE_SIZE else None,
    is_validation=True
).filter_missing_files()

# Copy encoders from training to validation processor
val_processor.age_encoder = train_processor.age_encoder
val_processor.gender_encoder = train_processor.gender_encoder
val_processor.race_encoder = train_processor.race_encoder

# Re-encode validation labels with training encoders
val_processor.df['age_encoded'] = val_processor.age_encoder.transform(val_processor.df['age'])
val_processor.df['gender_encoded'] = val_processor.gender_encoder.transform(val_processor.df['gender'])
val_processor.df['race_encoded'] = val_processor.race_encoder.transform(val_processor.df['race'])

print("\nCreating datasets...")
train_dataset = train_processor.create_dataset(
    batch_size=BATCH_SIZE,
    augment=True,
    shuffle=True
)

val_dataset = val_processor.create_dataset(
    batch_size=BATCH_SIZE,
    augment=False,
    shuffle=False
)


In [None]:
print("Building model...")
model = create_and_compile_model(
    num_age_classes=train_processor.num_classes['age'],
    num_gender_classes=train_processor.num_classes['gender'],
    num_race_classes=train_processor.num_classes['race'],
    freeze_backbone=True 
)

print(model.summary())

In [8]:
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        'best_fairface_facenet_model.h5',
        monitor='val_loss',
        save_best_only=True,
        save_weights_only=False,
        verbose=1
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-7,
        verbose=1
    ),
    tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True,
        verbose=1
    ),
    tf.keras.callbacks.CSVLogger('training_log.csv')
]



In [None]:
def verify_dataset_structure():
    print("=== Dataset Structure Verification ===")

    # Check CSV files
    train_csv = pd.read_csv(TRAIN_CSV_PATH)
    val_csv = pd.read_csv(VAL_CSV_PATH)

    print(f"Training CSV shape: {train_csv.shape}")
    print(f"Validation CSV shape: {val_csv.shape}")
    print(f"Training CSV columns: {list(train_csv.columns)}")

    # Check image directories
    train_images = os.listdir(TRAIN_IMG_DIR)
    val_images = os.listdir(VAL_IMG_DIR) if os.path.exists(VAL_IMG_DIR) else []

    print(f"Training images found: {len(train_images)}")
    print(f"Validation images found: {len(val_images)}")

    # Check if CSV files match image files
    print("\n=== File Matching Check ===")

    # Sample a few files from CSV and check if they exist
    sample_files = train_csv['file'].head(10).tolist()
    missing_files = []

    for file_name in sample_files:
        # full_path = os.path.join(TRAIN_IMG_DIR, file_name)
        full_path = os.path.join(DATASET_PATH, file_name)
        if not os.path.exists(full_path):
            missing_files.append(full_path)
        else:
            print(f"Found: {full_path}")

    if missing_files:
        print(f"✗ Missing files: {missing_files}")
    else:
        print("✓ All sampled files found!")

    # Check first few rows of CSV
    print(f"\nFirst few rows of training CSV:")
    print(train_csv.head())

    return len(missing_files) == 0

# Run verification
dataset_ok = verify_dataset_structure()

if dataset_ok:
    print("\n✓ Dataset structure looks good! Proceeding with training...")
else:
    print("\n✗ Dataset structure issues found. Please check file paths.")

In [None]:
print("Starting training...")
history = model.fit(
    train_dataset,
    epochs=20,
    validation_data=val_dataset,
    callbacks=callbacks,
    verbose=1
)

print("Training completed!")


In [None]:
print("Available history keys:", list(history.history.keys()))

In [None]:
def plot_history(history):
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))

    # Total Loss (Training vs Validation)
    ax1.plot(history.history['loss'], label='Training Loss', color='blue')
    ax1.plot(history.history['val_loss'], label='Validation Loss', color='red')
    ax1.set_title('Total Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True)

    # Individual Task Losses
    ax2.plot(history.history['age_output_loss'], label='Age Train', color='red', linestyle='-')
    ax2.plot(history.history['val_age_output_loss'], label='Age Val', color='red', linestyle='--')
    ax2.plot(history.history['gender_output_loss'], label='Gender Train', color='green', linestyle='-')
    ax2.plot(history.history['val_gender_output_loss'], label='Gender Val', color='green', linestyle='--')
    ax2.plot(history.history['race_output_loss'], label='Race Train', color='orange', linestyle='-')
    ax2.plot(history.history['val_race_output_loss'], label='Race Val', color='orange', linestyle='--')
    ax2.set_title('Individual Task Losses')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Loss')
    ax2.legend()
    ax2.grid(True)

    # Task Accuracies 
    ax3.plot(history.history['age_output_sparse_categorical_accuracy'], label='Age Train', color='red', linestyle='-')
    ax3.plot(history.history['val_age_output_sparse_categorical_accuracy'], label='Age Val', color='red', linestyle='--')
    ax3.plot(history.history['gender_output_binary_accuracy'], label='Gender Train', color='green', linestyle='-')
    ax3.plot(history.history['val_gender_output_binary_accuracy'], label='Gender Val', color='green', linestyle='--')
    ax3.plot(history.history['race_output_sparse_categorical_accuracy'], label='Race Train', color='orange', linestyle='-')
    ax3.plot(history.history['val_race_output_sparse_categorical_accuracy'], label='Race Val', color='orange', linestyle='--')
    ax3.set_title('Task Accuracies')
    ax3.set_xlabel('Epoch')
    ax3.set_ylabel('Accuracy')
    ax3.legend()
    ax3.grid(True)

    # Learning Rate
    ax4.plot(history.history['learning_rate'], label='Learning Rate', color='purple')
    ax4.set_title('Learning Rate Schedule')
    ax4.set_xlabel('Epoch')
    ax4.set_ylabel('Learning Rate')
    ax4.legend()
    ax4.grid(True)
    ax4.set_yscale('log') 

    plt.tight_layout()
    plt.show()
    
    # Print final metrics
    print("Final Training Metrics:")
    print(f"Total Loss: {history.history['loss'][-1]:.4f} | Val Loss: {history.history['val_loss'][-1]:.4f}")
    print(f"Age Loss: {history.history['age_output_loss'][-1]:.4f} | Val: {history.history['val_age_output_loss'][-1]:.4f}")
    print(f"Gender Loss: {history.history['gender_output_loss'][-1]:.4f} | Val: {history.history['val_gender_output_loss'][-1]:.4f}")
    print(f"Race Loss: {history.history['race_output_loss'][-1]:.4f} | Val: {history.history['val_race_output_loss'][-1]:.4f}")
    print(f"Age Accuracy: {history.history['age_output_sparse_categorical_accuracy'][-1]:.4f} | Val: {history.history['val_age_output_sparse_categorical_accuracy'][-1]:.4f}")
    print(f"Gender Accuracy: {history.history['gender_output_binary_accuracy'][-1]:.4f} | Val: {history.history['val_gender_output_binary_accuracy'][-1]:.4f}")
    print(f"Race Accuracy: {history.history['race_output_sparse_categorical_accuracy'][-1]:.4f} | Val: {history.history['val_race_output_sparse_categorical_accuracy'][-1]:.4f}")
    print(f"Final Learning Rate: {history.history['learning_rate'][-1]:.2e}")

plot_history(history)

In [None]:
# Load your training log
df = pd.read_csv('utkface_finetuning_log.csv')

# Create comprehensive plots
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('UTKFace Model Training Progress', fontsize=16, fontweight='bold')

# 1. Age Prediction Accuracy
axes[0, 0].plot(df['epoch'], df['age_output_sparse_categorical_accuracy'], 
                'b-', linewidth=2, label='Training Accuracy', marker='o', markersize=4)
axes[0, 0].plot(df['epoch'], df['val_age_output_sparse_categorical_accuracy'], 
                'r-', linewidth=2, label='Validation Accuracy', marker='s', markersize=4)
axes[0, 0].set_title('Age Prediction Accuracy', fontsize=14, fontweight='bold')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('Accuracy')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
axes[0, 0].set_ylim([0.4, 0.9])

# 2. Gender Prediction Accuracy
axes[0, 1].plot(df['epoch'], df['gender_output_sparse_categorical_accuracy'], 
                'b-', linewidth=2, label='Training Accuracy', marker='o', markersize=4)
axes[0, 1].plot(df['epoch'], df['val_gender_output_sparse_categorical_accuracy'], 
                'r-', linewidth=2, label='Validation Accuracy', marker='s', markersize=4)
axes[0, 1].set_title('Gender Prediction Accuracy', fontsize=14, fontweight='bold')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('Accuracy')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
axes[0, 1].set_ylim([0.8, 1.0])

# 3. Race Prediction Accuracy
axes[1, 0].plot(df['epoch'], df['race_output_sparse_categorical_accuracy'], 
                'b-', linewidth=2, label='Training Accuracy', marker='o', markersize=4)
axes[1, 0].plot(df['epoch'], df['val_race_output_sparse_categorical_accuracy'], 
                'r-', linewidth=2, label='Validation Accuracy', marker='s', markersize=4)
axes[1, 0].set_title('Race Prediction Accuracy', fontsize=14, fontweight='bold')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('Accuracy')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
axes[1, 0].set_ylim([0.6, 1.0])

# 4. Overall Loss
axes[1, 1].plot(df['epoch'], df['loss'], 
                'b-', linewidth=2, label='Training Loss', marker='o', markersize=4)
axes[1, 1].plot(df['epoch'], df['val_loss'], 
                'r-', linewidth=2, label='Validation Loss', marker='s', markersize=4)
axes[1, 1].set_title('Overall Model Loss', fontsize=14, fontweight='bold')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('Loss')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print final performance summary
print("="*60)
print("FINAL MODEL PERFORMANCE SUMMARY")
print("="*60)

final_epoch = df.iloc[-1]
print(f"Epoch: {int(final_epoch['epoch'])}")
print(f"Learning Rate: {final_epoch['learning_rate']:.2e}")
print()

print("TRAINING ACCURACIES:")
print(f"  Age:    {final_epoch['age_output_sparse_categorical_accuracy']:.4f} ({final_epoch['age_output_sparse_categorical_accuracy']*100:.2f}%)")
print(f"  Gender: {final_epoch['gender_output_sparse_categorical_accuracy']:.4f} ({final_epoch['gender_output_sparse_categorical_accuracy']*100:.2f}%)")
print(f"  Race:   {final_epoch['race_output_sparse_categorical_accuracy']:.4f} ({final_epoch['race_output_sparse_categorical_accuracy']*100:.2f}%)")
print()

print("VALIDATION ACCURACIES:")
print(f"  Age:    {final_epoch['val_age_output_sparse_categorical_accuracy']:.4f} ({final_epoch['val_age_output_sparse_categorical_accuracy']*100:.2f}%)")
print(f"  Gender: {final_epoch['val_gender_output_sparse_categorical_accuracy']:.4f} ({final_epoch['val_gender_output_sparse_categorical_accuracy']*100:.2f}%)")
print(f"  Race:   {final_epoch['val_race_output_sparse_categorical_accuracy']:.4f} ({final_epoch['val_race_output_sparse_categorical_accuracy']*100:.2f}%)")
print()

print("LOSSES:")
print(f"  Training Loss:   {final_epoch['loss']:.4f}")
print(f"  Validation Loss: {final_epoch['val_loss']:.4f}")

# Calculate improvement from first to last epoch
first_epoch = df.iloc[0]
print()
print("IMPROVEMENT FROM EPOCH 0 TO FINAL EPOCH:")
print(f"  Age Accuracy:    {(final_epoch['val_age_output_sparse_categorical_accuracy'] - first_epoch['val_age_output_sparse_categorical_accuracy'])*100:.2f}% points")
print(f"  Gender Accuracy: {(final_epoch['val_gender_output_sparse_categorical_accuracy'] - first_epoch['val_gender_output_sparse_categorical_accuracy'])*100:.2f}% points")
print(f"  Race Accuracy:   {(final_epoch['val_race_output_sparse_categorical_accuracy'] - first_epoch['val_race_output_sparse_categorical_accuracy'])*100:.2f}% points")

# Check for overfitting
print()
print("OVERFITTING ANALYSIS:")
for task in ['age', 'gender', 'race']:
    train_acc = final_epoch[f'{task}_output_sparse_categorical_accuracy']
    val_acc = final_epoch[f'val_{task}_output_sparse_categorical_accuracy']
    gap = (train_acc - val_acc) * 100
    
    if gap > 5:
        status = "⚠️  Potential overfitting"
    elif gap > 2:
        status = "⚡ Slight overfitting"
    else:
        status = "✅ Good generalization"
    
    print(f"  {task.capitalize():6}: Train-Val gap = {gap:.2f}% - {status}")

In [24]:
def predict_sample(model, img_path, train_processor, img_size=(160, 160)):
    """
    Predict demographics for a single image
    """
    # Load and preprocess image - matching training preprocessing
    img = tf.io.read_file(img_path)
    img = tf.image.decode_image(img, channels=3)
    img = tf.image.resize(img, img_size)
    img = tf.cast(img, tf.float32) / 255.0

    # Apply same normalization as training (ImageNet standardization)
    img = tf.image.per_image_standardization(img)
    img = tf.expand_dims(img, axis=0)  # Add batch dim

    # Predict
    preds = model.predict(img, verbose=0)

    # Extract predictions - model returns a dictionary
    age_pred = np.argmax(preds['age_output'][0])
    gender_pred = np.argmax(preds['gender_output'][0])
    race_pred = np.argmax(preds['race_output'][0])

    # Decode labels using the training processor encoders
    age_label = train_processor.age_encoder.inverse_transform([age_pred])[0]
    gender_label = train_processor.gender_encoder.inverse_transform([gender_pred])[0]
    race_label = train_processor.race_encoder.inverse_transform([race_pred])[0]

    # Display - load original image for display
    display_img = tf.io.read_file(img_path)
    display_img = tf.image.decode_image(display_img, channels=3)
    display_img = tf.image.resize(display_img, img_size)
    display_img = tf.cast(display_img, tf.float32) / 255.0

    plt.figure(figsize=(8, 6))
    plt.imshow(display_img)
    plt.title(f"Predictions:\nAge: {age_label}\nGender: {gender_label}\nRace: {race_label}")
    plt.axis('off')
    plt.show()

    return age_label, gender_label, race_label

In [25]:
def test_predictions(model, train_processor, num_samples=3):
    """Test predictions on random samples from filtered training data"""

    # Get some sample files from the filtered training data
    sample_indices = np.random.choice(len(train_processor.df), num_samples, replace=False)

    for i, idx in enumerate(sample_indices):
        print(f"\n=== Sample {i+1} ===")
        row = train_processor.df.iloc[idx]
        img_path = os.path.join(train_processor.img_dir, row['file'])

        # True labels
        true_age = row['age']
        true_gender = row['gender']
        true_race = row['race']

        print(f"True labels - Age: {true_age}, Gender: {true_gender}, Race: {true_race}")

        # Predict
        pred_age, pred_gender, pred_race = predict_sample(model, img_path, train_processor)

        # Compare
        print(f"Predicted - Age: {pred_age}, Gender: {pred_gender}, Race: {pred_race}")
        print(f"Correct - Age: {true_age == pred_age}, Gender: {true_gender == pred_gender}, Race: {true_race == pred_race}")

In [None]:
model = load_model('best_utkface_facenet_finetuned_model.h5')
evaluate_model(model, val_dataset, val_processor)


In [None]:
test_predictions(model, train_processor, num_samples=5)

In [None]:
print(model.summary())

In [None]:
%pip install lime


In [None]:
%pip install lime shap tf-explain alibi opencv-python scikit-image slicer lazy_loader numba cloudpickle spacy transformers attrs dill

In [None]:

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import cv2
from PIL import Image
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
from tqdm import tqdm
import warnings
from skimage.segmentation import quickshift
warnings.filterwarnings('ignore')

# LIME
import lime
from lime import lime_image
from lime.wrappers.scikit_image import SegmentationAlgorithm

# SHAP
import shap

# TF-Explain
import tf_explain
from tf_explain.core.grad_cam import GradCAM
from tf_explain.core.integrated_gradients import IntegratedGradients as TFIntegratedGradients
from tf_explain.core.occlusion_sensitivity import OcclusionSensitivity

# Alibi
# from alibi.explainers import IntegratedGradients as AlibiIG

print("✅ All packages installed successfully")


In [None]:
class TensorFlowModelExplainer:
    def __init__(self, model, data_processor, task='age_output'):
        self.model = model
        self.data_processor = data_processor
        self.task = task
        
        if task == 'age_output':
            self.label_encoder = data_processor.age_encoder
        elif task == 'gender_output':
            self.label_encoder = data_processor.gender_encoder
        elif task == 'race_output':
            self.label_encoder = data_processor.race_encoder
        
        print(f"Explainer ready for {task}. Classes: {list(self.label_encoder.classes_)}")
    
    def preprocess_image(self, image_path):
        """Preprocess image for model input"""
        try:
            image = tf.io.read_file(image_path)
            image = tf.image.decode_image(image, channels=3)
            image = tf.image.resize(image, [160, 160])
            image = tf.cast(image, tf.float32) / 255.0
            image = tf.image.per_image_standardization(image)
            return image
        except Exception as e:
            print(f"Error loading image {image_path}: {e}")
            return tf.zeros([160, 160, 3], dtype=tf.float32)
    
    def predict_proba(self, images):
        """Predict probabilities for LIME"""
        if not isinstance(images, tf.Tensor):
            if images.ndim == 3:  
                images = images[np.newaxis, ...]
            images = tf.convert_to_tensor(images, dtype=tf.float32)
            
            # Normalize if needed
            if tf.reduce_max(images) > 1.0:
                images = images / 255.0
            
            # Apply per-image standardization (matching training preprocessing)
            standardized_images = []
            for i in range(images.shape[0]):
                img = tf.image.per_image_standardization(images[i])
                standardized_images.append(img)
            images = tf.stack(standardized_images)
        
        predictions = self.model(images, training=False)
        
        # Handle multi-task model output
        if isinstance(predictions, dict):
            task_pred = predictions[self.task]
        else:
            task_pred = predictions
            
        # Apply softmax for classification tasks (except binary gender)
        if self.task == 'gender_output':
            # Binary classification - use sigmoid output directly
            probs = task_pred
            # Convert to 2-class format for LIME
            probs = tf.concat([1-probs, probs], axis=1)
        else:
            # Multi-class classification - apply softmax
            probs = tf.nn.softmax(task_pred, axis=1)
            
        return probs.numpy()
    
    def get_single_prediction(self, image_path):
        """Get prediction for a single image"""
        image = self.preprocess_image(image_path)
        image = tf.expand_dims(image, 0)
        predictions = self.model(image, training=False)
        
        # Handle multi-task model output
        if isinstance(predictions, dict):
            task_pred = predictions[self.task]
        else:
            task_pred = predictions
        
        if self.task == 'gender_output':
            # Binary classification
            prob = tf.nn.sigmoid(task_pred)[0, 0].numpy()
            pred_class = int(prob > 0.5)
            confidence = prob if pred_class == 1 else (1 - prob)
            all_probs = [1-prob, prob]
        else:
            # Multi-class classification
            probs = tf.nn.softmax(task_pred, axis=1)
            pred_class = tf.argmax(probs, axis=1)[0].numpy()
            confidence = probs[0, pred_class].numpy()
            all_probs = probs[0].numpy()
        
        label = self.label_encoder.inverse_transform([pred_class])[0]
        
        return {
            'predicted_class': pred_class,
            'predicted_label': label,
            'confidence': confidence,
            'all_probabilities': all_probs,
            'class_names': list(self.label_encoder.classes_),
        }

print("TensorFlowModelExplainer class defined")

In [None]:
class TensorFlowLIMEExplainer:
    def __init__(self, model_explainer):
        self.model_explainer = model_explainer
        self.explainer = lime_image.LimeImageExplainer()

    def explain_and_visualize(self, image_path, num_samples=1000, top_classes=3, num_features=10):
        """Run LIME explanation and visualize results"""
        
        # Load image for display
        image_pil = Image.open(image_path).convert('RGB')
        image_np = np.array(image_pil.resize((160, 160)))

        # Get prediction info
        pred_info = self.model_explainer.get_single_prediction(image_path)
        print(f"\nPrediction: {pred_info['predicted_label']} (confidence: {pred_info['confidence']:.3f})")
        
        print("All class probabilities:")
        for i, label in enumerate(pred_info['class_names']):
            print(f"- {label}: {pred_info['all_probabilities'][i]:.3f}")

        # LIME prediction function
        def lime_predict_fn(images):
            """Prediction function for LIME"""
            return self.model_explainer.predict_proba(images)

        # Run LIME explanation
        print(f"\nRunning LIME explanation with {num_samples} samples...")
        explanation = self.explainer.explain_instance(
            image_np, 
            lime_predict_fn,
            top_labels=min(top_classes, len(pred_info['class_names'])),
            num_samples=num_samples,
            segmentation_fn=SegmentationAlgorithm(
                'quickshift', 
                kernel_size=4, 
                max_dist=200, 
                ratio=0.2
            )
        )

        # Visualization
        fig, axes = plt.subplots(1, 3, figsize=(18, 6))

        # Original image
        axes[0].imshow(image_np)
        axes[0].set_title("Original Image")
        axes[0].axis('off')

        # LIME overlay for predicted class
        temp, mask = explanation.get_image_and_mask(
            pred_info['predicted_class'], 
            positive_only=False, 
            num_features=num_features, 
            hide_rest=False
        )
        axes[1].imshow(temp)
        axes[1].set_title(f"LIME Explanation\n{pred_info['predicted_label']}")
        axes[1].axis('off')

        # Contribution mask
        axes[2].imshow(mask, cmap='RdYlBu_r')
        axes[2].set_title("Feature Importance\n(Red=Negative, Blue=Positive)")
        axes[2].axis('off')

        plt.tight_layout()
        plt.show()

        # Show top contributing superpixels
        print(f"\nTop {num_features} superpixel contributions for '{pred_info['predicted_label']}':")
        local_exp = explanation.local_exp[pred_info['predicted_class']]
        for i, (sp_id, weight) in enumerate(sorted(local_exp, key=lambda x: -abs(x[1]))[:num_features]):
            direction = "positive" if weight > 0 else "negative"
            print(f"{i+1}. Superpixel {sp_id}: {weight:.3f} ({direction})")

        return explanation, pred_info, image_np

print("TensorFlowLIMEExplainer class defined")

In [None]:
class TensorFlowSHAPExplainer:
    def __init__(self, model_explainer, background_images=None):
        self.model_explainer = model_explainer
        self.model = model_explainer.model
        self.task = model_explainer.task
        
        # If no background images provided, create a simple baseline
        if background_images is None:
            print("No background images provided, using zero baseline")
            self.background = np.zeros((1, 160, 160, 3))
        else:
            self.background = background_images
            
        # Create a wrapper function for SHAP
        def model_wrapper(x):
            predictions = self.model(x, training=False)
            if isinstance(predictions, dict):
                task_pred = predictions[self.task]
            else:
                task_pred = predictions
                
            # Convert to probabilities
            if self.task == 'gender_output':
                probs = tf.nn.sigmoid(task_pred)
                # Convert to 2-class format
                probs = tf.concat([1-probs, probs], axis=1)
            else:
                probs = tf.nn.softmax(task_pred, axis=1)
            return probs
        
        self.model_wrapper = model_wrapper
        
        try:
            self.explainer = shap.DeepExplainer(self.model_wrapper, self.background)
            print(f"SHAP explainer initialized for {self.task}")
        except Exception as e:
            print(f"Failed to initialize SHAP explainer: {e}")
            self.explainer = None
    
    def explain_and_visualize(self, image_path, max_evals=100):
        """Generate and visualize SHAP values"""
        
        if self.explainer is None:
            print("SHAP explainer not available")
            return None, None, None
            
        # Load and preprocess image
        image = self.model_explainer.preprocess_image(image_path)
        image = tf.expand_dims(image, 0)
        
        pred_info = self.model_explainer.get_single_prediction(image_path)
        print(f"SHAP for: {pred_info['predicted_label']} (confidence: {pred_info['confidence']:.3f})")
        
        try:
            # Calculate SHAP values
            print(f"Computing SHAP values (max_evals={max_evals})...")
            shap_values = self.explainer.shap_values(image.numpy(), max_evals=max_evals)
            
            # SHAP returns values for each class, get the predicted class
            if isinstance(shap_values, list):
                shap_for_pred = shap_values[pred_info['predicted_class']][0]
            else:
                shap_for_pred = shap_values[0]
            
            # Create visualizations
            fig, axes = plt.subplots(1, 4, figsize=(20, 5))
            
            # Original image
            display_img = tf.io.read_file(image_path)
            display_img = tf.image.decode_image(display_img, channels=3)
            display_img = tf.image.resize(display_img, [160, 160])
            display_img = tf.cast(display_img, tf.float32) / 255.0
            
            axes[0].imshow(display_img)
            axes[0].set_title("Original Image")
            axes[0].axis('off')
            
            # SHAP values for each channel
            for i, channel in enumerate(['Red', 'Green', 'Blue']):
                shap_channel = shap_for_pred[:, :, i]
                im = axes[i+1].imshow(shap_channel, cmap='RdBu_r', vmin=-np.max(np.abs(shap_channel)), vmax=np.max(np.abs(shap_channel)))
                axes[i+1].set_title(f'SHAP - {channel} Channel\n{pred_info["predicted_label"]}')
                axes[i+1].axis('off')
                plt.colorbar(im, ax=axes[i+1], fraction=0.046, pad=0.04)
            
            plt.tight_layout()
            plt.show()
            
            # Overall importance heatmap
            shap_sum = np.sum(np.abs(shap_for_pred), axis=-1)
            plt.figure(figsize=(8, 6))
            plt.imshow(shap_sum, cmap='Reds')
            plt.title(f"SHAP Overall Importance\n{pred_info['predicted_label']}")
            plt.colorbar()
            plt.axis('off')
            plt.show()
            
            return shap_values, pred_info, image
            
        except Exception as e:
            print(f"SHAP calculation failed: {e}")
            import traceback
            traceback.print_exc()
            return None, pred_info, image

print("TensorFlowSHAPExplainer class defined")

In [None]:
def create_background_images(data_processor, num_images=10):
    """Create background images for SHAP from validation data"""
    
    background_images = []
    
    # Get random samples from validation data
    sample_indices = np.random.choice(len(data_processor.df), min(num_images, len(data_processor.df)), replace=False)
    
    print(f"Creating {len(sample_indices)} background images for SHAP...")
    
    for idx in sample_indices:
        row = data_processor.df.iloc[idx]
        img_path = os.path.join(data_processor.img_dir, row['file'])
        
        try:
            img = data_processor.load_and_preprocess_image(img_path, augment=False)
            background_images.append(img.numpy())
        except Exception as e:
            print(f"Failed to load background image {img_path}: {e}")
            continue
    
    if not background_images:
        print("No background images could be loaded, using zero baseline")
        return np.zeros((1, 160, 160, 3))
    
    background_array = np.array(background_images)
    print(f"Created background dataset with shape: {background_array.shape}")
    
    return background_array

# Create background images
background_imgs = create_background_images(val_processor, num_images=5)

In [None]:
class GradCAMExplainer:
    """Custom GradCAM implementation for multi-task models"""
    
    def __init__(self, model_explainer):
        self.model_explainer = model_explainer
        self.model = model_explainer.model
        self.task = model_explainer.task
        
    def find_target_layer(self):
        """Find appropriate layer for GradCAM"""
        # Look for convolutional layers in the backbone
        target_layer = None
        for layer in reversed(self.model.backbone.layers):
            if hasattr(layer, 'filters'):  # Conv layer
                target_layer = layer
                break
        
        if target_layer is None:
            print("Warning: No suitable convolutional layer found for GradCAM")
            return None
            
        return target_layer
    
    def explain_and_visualize(self, image_path):
        """Generate and visualize GradCAM heatmap"""
        
        target_layer = self.find_target_layer()
        if target_layer is None:
            print("Cannot generate GradCAM: No suitable layer found")
            return None, None, None
            
        print(f"Using layer '{target_layer.name}' for GradCAM")
        
        # Load and preprocess image
        image = self.model_explainer.preprocess_image(image_path)
        image = tf.expand_dims(image, 0)
        
        pred_info = self.model_explainer.get_single_prediction(image_path)
        print(f"GradCAM for: {pred_info['predicted_label']} (confidence: {pred_info['confidence']:.3f})")
        
        # Create gradient model
        grad_model = tf.keras.models.Model(
            [self.model.backbone.input], 
            [target_layer.output, self.model.output]
        )
        
        with tf.GradientTape() as tape:
            conv_outputs, predictions = grad_model(image)
            
            # Get task-specific prediction
            if isinstance(predictions, dict):
                task_pred = predictions[self.task]
            else:
                task_pred = predictions
                
            # Get the score for predicted class
            if self.task == 'gender_output':
                loss = task_pred[0, 0]  # Binary output
            else:
                loss = task_pred[0, pred_info['predicted_class']]
        
        # Calculate gradients
        grads = tape.gradient(loss, conv_outputs)
        pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
        
        # Weight the feature maps
        conv_outputs = conv_outputs[0]
        heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
        heatmap = tf.squeeze(heatmap)
        heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
        
        # Resize heatmap to image size
        heatmap_resized = tf.image.resize(
            tf.expand_dims(heatmap, -1), 
            [160, 160]
        )
        heatmap_resized = tf.squeeze(heatmap_resized)
        
        # Visualize
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        # Original image (denormalized for display)
        display_img = tf.io.read_file(image_path)
        display_img = tf.image.decode_image(display_img, channels=3)
        display_img = tf.image.resize(display_img, [160, 160])
        display_img = tf.cast(display_img, tf.float32) / 255.0
        
        axes[0].imshow(display_img)
        axes[0].set_title("Original Image")
        axes[0].axis('off')
        
        # Heatmap
        axes[1].imshow(heatmap_resized, cmap='jet')
        axes[1].set_title(f"GradCAM Heatmap\n{pred_info['predicted_label']}")
        axes[1].axis('off')
        
        # Overlay
        axes[2].imshow(display_img)
        axes[2].imshow(heatmap_resized, cmap='jet', alpha=0.4)
        axes[2].set_title("Overlay")
        axes[2].axis('off')
        
        plt.tight_layout()
        plt.show()
        
        return heatmap_resized, pred_info, image

print("GradCAMExplainer class defined")

In [None]:
def explain_prediction_comprehensive(model, data_processor, image_path, task='age_output', background_images=None):
    """Run comprehensive explanation for a prediction including SHAP"""
    
    print(f"Explaining {task} prediction for {image_path}")
    print("="*60)
    
    # Create model explainer
    model_explainer = TensorFlowModelExplainer(model, data_processor, task)
    
    try:
        # LIME Explanation
        print("\n1. LIME Explanation:")
        print("-" * 30)
        lime_explainer = TensorFlowLIMEExplainer(model_explainer)
        lime_explanation, pred_info, img_np = lime_explainer.explain_and_visualize(
            image_path, num_samples=500, num_features=8
        )
        
    except Exception as e:
        print(f"LIME explanation failed: {e}")
        lime_explanation = None
    
    try:
        # GradCAM Explanation
        print("\n2. GradCAM Explanation:")
        print("-" * 30)
        gradcam_explainer = GradCAMExplainer(model_explainer)
        gradcam_heatmap, _, _ = gradcam_explainer.explain_and_visualize(image_path)
        
    except Exception as e:
        print(f"GradCAM explanation failed: {e}")
        gradcam_heatmap = None
    
    try:
        # SHAP Explanation
        print("\n3. SHAP Explanation:")
        print("-" * 30)
        shap_explainer = TensorFlowSHAPExplainer(model_explainer, background_images)
        shap_values, _, _ = shap_explainer.explain_and_visualize(image_path, max_evals=50)
        
    except Exception as e:
        print(f"SHAP explanation failed: {e}")
        shap_values = None
    
    return {
        'lime_explanation': lime_explanation,
        'gradcam_heatmap': gradcam_heatmap,
        'shap_values': shap_values,
        'prediction_info': pred_info if 'pred_info' in locals() else None
    }

print("comprehensive explanation function defined")

In [None]:
def test_interpretability(model, data_processor, image_path, tasks=['age_output', 'gender_output', 'race_output']):
    """Test interpretability for all tasks"""
    
    for task in tasks:
        print(f"\n{'='*80}")
        print(f"ANALYZING TASK: {task.upper()}")
        print(f"{'='*80}")
        
        try:
            results = explain_prediction_comprehensive(
                model, data_processor, image_path, task=task
            )
            print(f"Task {task} explanation completed successfully")
            
        except Exception as e:
            print(f"Task {task} explanation failed: {e}")
            import traceback
            traceback.print_exc()

print("Test function defined")

In [None]:
# Set the image path to analyze
IMAGE_PATH = os.path.join(VAL_IMG_DIR, "1.jpg")

# Check if image exists
if os.path.exists(IMAGE_PATH):
    print(f"Image found: {IMAGE_PATH}")
else:
    print(f"Image not found: {IMAGE_PATH}")
    # Try alternative
    IMAGE_PATH = os.path.join(DATASET_PATH, "val", "1.jpg")
    if os.path.exists(IMAGE_PATH):
        print(f"Found at alternative path: {IMAGE_PATH}")
    else:
        print("Please check your image path")

# Test with a single task first (race classification) - now includes SHAP
results = explain_prediction_comprehensive(
    model, val_processor, IMAGE_PATH, task='race_output', background_images=background_imgs
)

In [None]:
# Test all tasks for the same image
test_interpretability(
    model, val_processor, IMAGE_PATH, 
    tasks=['age_output', 'gender_output', 'race_output']
)

In [None]:
def test_interpretability_with_shap(model, data_processor, image_path, background_images, tasks=['age_output', 'gender_output', 'race_output']):
    """Test interpretability for all tasks including SHAP"""
    
    for task in tasks:
        print(f"\n{'='*80}")
        print(f"ANALYZING TASK: {task.upper()}")
        print(f"{'='*80}")
        
        try:
            results = explain_prediction_comprehensive(
                model, data_processor, image_path, task=task, background_images=background_images
            )
            print(f"Task {task} explanation completed successfully")
            
        except Exception as e:
            print(f"Task {task} explanation failed: {e}")
            import traceback
            traceback.print_exc()

# Test all tasks for the same image (now with SHAP)
test_interpretability_with_shap(
    model, val_processor, IMAGE_PATH, background_imgs,
    tasks=['age_output', 'gender_output', 'race_output']
)

In [None]:
# Test with multiple images
val_images = [f for f in os.listdir(VAL_IMG_DIR) if f.endswith(('.jpg', '.png', '.jpeg'))][:3]

for img_file in val_images:
    img_path = os.path.join(VAL_IMG_DIR, img_file)
    print(f"\n{'='*100}")
    print(f"ANALYZING IMAGE: {img_file}")
    print(f"{'='*100}")
    
    # Test with age prediction for each image
    try:
        results = explain_prediction_comprehensive(
            model, val_processor, img_path, task='age_output'
        )
    except Exception as e:
        print(f"Failed to analyze {img_file}: {e}")