In [1]:
import numpy as np
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from img2vec import rgb2flatPCA, pretrain_pca
import math
import os
import pandas as pd
import joblib

# Define the batch size
batch_size = 256

### read the data

In [2]:
train_data = pd.read_csv(os.path.join('..', '..', 'data', 'train.csv'))
val_data = pd.read_csv(os.path.join('..', '..', 'data', 'val.csv'))
test_data = pd.read_csv(os.path.join('..', '..', 'data', 'test.csv'))

### add the path of the images

In [3]:
def construct_img_path(row):
    return os.path.join("..", "..", "data", "faces", row['user_id'],
                        "coarse_tilt_aligned_face." + str(row['face_id']) + "." + row['original_image'])


train_data['img_path'] = train_data.apply(construct_img_path, axis=1)
val_data['img_path'] = val_data.apply(construct_img_path, axis=1)
test_data['img_path'] = test_data.apply(construct_img_path, axis=1)
train_data.head(5)

Unnamed: 0,user_id,face_id,original_image,age,gender,img_path
0,9855553@N08,1581,11658657103_4485e3f5ac_o.jpg,"(60, 100)",m,..\..\data\faces\9855553@N08\coarse_tilt_align...
1,114841417@N06,502,12059583524_606ca96139_o.jpg,"(15, 20)",m,..\..\data\faces\114841417@N06\coarse_tilt_ali...
2,66870968@N06,1227,11326189206_e08bdf6dfd_o.jpg,"(25, 32)",m,..\..\data\faces\66870968@N06\coarse_tilt_alig...
3,8187011@N06,988,11133041085_e2ee5e12cb_o.jpg,"(0, 2)",u,..\..\data\faces\8187011@N06\coarse_tilt_align...
4,114841417@N06,485,12059753735_7141b5443c_o.jpg,"(15, 20)",f,..\..\data\faces\114841417@N06\coarse_tilt_ali...


#### add column for check if the image exists
it will help us to detect if there is any missing image, or if there is any bug in the path construction

In [4]:
train_data['img_exists'] = train_data['img_path'].apply(os.path.exists)
val_data['img_exists'] = val_data['img_path'].apply(os.path.exists)
test_data['img_exists'] = test_data['img_path'].apply(os.path.exists)

train_data.head(5)

Unnamed: 0,user_id,face_id,original_image,age,gender,img_path,img_exists
0,9855553@N08,1581,11658657103_4485e3f5ac_o.jpg,"(60, 100)",m,..\..\data\faces\9855553@N08\coarse_tilt_align...,True
1,114841417@N06,502,12059583524_606ca96139_o.jpg,"(15, 20)",m,..\..\data\faces\114841417@N06\coarse_tilt_ali...,True
2,66870968@N06,1227,11326189206_e08bdf6dfd_o.jpg,"(25, 32)",m,..\..\data\faces\66870968@N06\coarse_tilt_alig...,True
3,8187011@N06,988,11133041085_e2ee5e12cb_o.jpg,"(0, 2)",u,..\..\data\faces\8187011@N06\coarse_tilt_align...,True
4,114841417@N06,485,12059753735_7141b5443c_o.jpg,"(15, 20)",f,..\..\data\faces\114841417@N06\coarse_tilt_ali...,True


In [5]:
# Encode age labels
age_encoder = LabelEncoder()
train_data['age_label'] = age_encoder.fit_transform(train_data['age'])
val_data['age_label'] = age_encoder.transform(val_data['age'])
test_data['age_label'] = age_encoder.transform(test_data['age'])
num_classes = len(age_encoder.classes_)
print("Age classes:", age_encoder.classes_)
train_data.head(5)

Age classes: ['(0, 2)' '(15, 20)' '(25, 32)' '(38, 43)' '(4, 6)' '(48, 53)' '(60, 100)'
 '(8, 23)']


Unnamed: 0,user_id,face_id,original_image,age,gender,img_path,img_exists,age_label
0,9855553@N08,1581,11658657103_4485e3f5ac_o.jpg,"(60, 100)",m,..\..\data\faces\9855553@N08\coarse_tilt_align...,True,6
1,114841417@N06,502,12059583524_606ca96139_o.jpg,"(15, 20)",m,..\..\data\faces\114841417@N06\coarse_tilt_ali...,True,1
2,66870968@N06,1227,11326189206_e08bdf6dfd_o.jpg,"(25, 32)",m,..\..\data\faces\66870968@N06\coarse_tilt_alig...,True,2
3,8187011@N06,988,11133041085_e2ee5e12cb_o.jpg,"(0, 2)",u,..\..\data\faces\8187011@N06\coarse_tilt_align...,True,0
4,114841417@N06,485,12059753735_7141b5443c_o.jpg,"(15, 20)",f,..\..\data\faces\114841417@N06\coarse_tilt_ali...,True,1


### Filter out any rows where the image doesn't exist

In [6]:
train_data_filtered = train_data[train_data['img_exists'] == True]
val_data_filtered = val_data[val_data['img_exists'] == True]
test_data_filtered = test_data[test_data['img_exists'] == True]

### Extract image paths and labels

In [7]:
train_image_paths = train_data_filtered['img_path'].tolist()
train_labels = train_data_filtered['age_label'].values

val_image_paths = val_data_filtered['img_path'].tolist()
val_labels = val_data_filtered['age_label'].values

test_image_paths = test_data_filtered['img_path'].tolist()
test_labels = test_data_filtered['age_label'].values

### Pre-train PCA on a large subset of training data to ensure we can use 256 components

In [8]:
pca = pretrain_pca(train_image_paths, n_components=256)

Pre-training PCA with 11856 images...
Processing image 0/1000...
Processing image 100/1000...
Processing image 200/1000...
Processing image 300/1000...
Processing image 400/1000...
Processing image 500/1000...
Processing image 600/1000...
Processing image 700/1000...
Processing image 800/1000...
Processing image 900/1000...
PCA pre-training complete. Saved to pca_cache_rgb_256_128x128.joblib


### Define a generator function to process images in batches

In [9]:
def image_batch_generator(image_paths, labels, batch_size):
    num_samples = len(image_paths)
    num_batches = math.ceil(num_samples / batch_size)

    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, num_samples)

        batch_paths = image_paths[start_idx:end_idx]
        batch_features = rgb2flatPCA(batch_paths) / 255.0
        batch_labels = labels[start_idx:end_idx]

        yield batch_features, batch_labels

### Process a single batch to determine input shape

In [10]:
print("Processing a sample batch to determine input dimensions...")
sample_batch_size = min(256, len(train_image_paths))
sample_paths = train_image_paths[:sample_batch_size]
sample_batch = rgb2flatPCA(sample_paths)
input_shape = sample_batch.shape[1]
print(f"Input shape: {input_shape}")

Processing a sample batch to determine input dimensions...
Input shape: 256


### Define the model

In [11]:
model = Sequential([
    Dense(num_classes, activation='softmax', input_shape=(input_shape,))
])

model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


### Train the model using batches

In [12]:
print("Training the model...")
epochs = 100
steps_per_epoch = math.ceil(len(train_image_paths) / batch_size)
validation_steps = math.ceil(len(val_image_paths) / batch_size)

# Custom training loop
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

for epoch in range(epochs):
    print(f"Epoch {epoch + 1}/{epochs}")

    # Training
    train_loss = 0
    train_acc = 0
    batch_count = 0

    for batch_features, batch_labels in image_batch_generator(train_image_paths, train_labels, batch_size):
        # Train on batch
        batch_history = model.train_on_batch(
            batch_features,
            batch_labels,
        )

        batch_loss, batch_acc = batch_history
        train_loss += batch_loss
        train_acc += batch_acc
        batch_count += 1

        print(f"\rBatch {batch_count}/{steps_per_epoch} - loss: {batch_loss:.4f} - accuracy: {batch_acc:.4f}", end="")

    avg_train_loss = train_loss / batch_count
    avg_train_acc = train_acc / batch_count
    train_losses.append(avg_train_loss)
    train_accuracies.append(avg_train_acc)

    # Validation
    val_loss = 0
    val_acc = 0
    batch_count = 0

    for batch_features, batch_labels in image_batch_generator(val_image_paths, val_labels, batch_size):
        # Evaluate on batch
        batch_val_loss, batch_val_acc = model.test_on_batch(
            batch_features,
            batch_labels,
        )

        val_loss += batch_val_loss
        val_acc += batch_val_acc
        batch_count += 1

    avg_val_loss = val_loss / batch_count
    avg_val_acc = val_acc / batch_count
    val_losses.append(avg_val_loss)
    val_accuracies.append(avg_val_acc)

    print(
        f"\nEpoch {epoch + 1}: loss={avg_train_loss:.4f}, accuracy={avg_train_acc:.4f}, val_loss={avg_val_loss:.4f}, val_accuracy={avg_val_acc:.4f}")

    # Optional early stopping
    if epoch > 10 and val_losses[-1] > val_losses[-2] and val_losses[-2] > val_losses[-3]:
        print("Validation loss increased for 2 consecutive epochs. Early stopping.")
        break


Training the model...
Epoch 1/100
Batch 47/47 - loss: 5.3925 - accuracy: 0.1342
Epoch 1: loss=6.0006, accuracy=0.1236, val_loss=5.2387, val_accuracy=0.1387
Epoch 2/100
Batch 47/47 - loss: 4.3977 - accuracy: 0.1636
Epoch 2: loss=4.7556, accuracy=0.1519, val_loss=4.3016, val_accuracy=0.1682
Epoch 3/100
Batch 47/47 - loss: 3.7175 - accuracy: 0.1985
Epoch 3: loss=3.9580, accuracy=0.1847, val_loss=3.6562, val_accuracy=0.2025
Epoch 4/100
Batch 47/47 - loss: 3.2645 - accuracy: 0.2307
Epoch 4: loss=3.4248, accuracy=0.2183, val_loss=3.2240, val_accuracy=0.2343
Epoch 5/100
Batch 47/47 - loss: 2.9540 - accuracy: 0.2602
Epoch 5: loss=3.0651, accuracy=0.2492, val_loss=2.9258, val_accuracy=0.2633
Epoch 6/100
Batch 47/47 - loss: 2.7309 - accuracy: 0.2850
Epoch 6: loss=2.8114, accuracy=0.2761, val_loss=2.7102, val_accuracy=0.2874
Epoch 7/100
Batch 47/47 - loss: 2.5635 - accuracy: 0.3054
Epoch 7: loss=2.6244, accuracy=0.2980, val_loss=2.5478, val_accuracy=0.3073
Epoch 8/100
Batch 47/47 - loss: 2.4336 -

MemoryError: Unable to allocate 12.0 MiB for an array with shape (256, 49152) and data type uint8

### Evaluate the model on test data

In [None]:
print("Evaluating the model on test data...")
test_loss = 0
test_acc = 0
batch_count = 0
all_predictions = []
all_labels = []

for batch_features, batch_labels in image_batch_generator(test_image_paths, test_labels, batch_size):
    # Evaluate on batch
    batch_test_loss, batch_test_acc = model.test_on_batch(
        batch_features,
        batch_labels,
    )

    # Get predictions for this batch
    batch_preds = model.predict_on_batch(batch_features)
    batch_pred_classes = np.argmax(batch_preds, axis=1)

    all_predictions.extend(batch_pred_classes)
    all_labels.extend(batch_labels)

    test_loss += batch_test_loss
    test_acc += batch_test_acc
    batch_count += 1

avg_test_loss = test_loss / batch_count
avg_test_acc = test_acc / batch_count
print(f"Test Loss: {avg_test_loss:.4f}")
print(f"Test Accuracy: {avg_test_acc:.4f}")

### Visualize results

In [None]:
# Plot training history
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')

plt.tight_layout()
plt.savefig('training_history.png')
plt.show()

# Confusion matrix
plt.figure(figsize=(10, 8))
cm = confusion_matrix(all_labels, all_predictions)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=age_encoder.classes_,
            yticklabels=age_encoder.classes_)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.tight_layout()
plt.savefig('confusion_matrix.png')
plt.show()

### Classification report

In [None]:
print("Classification Report:")
print(classification_report(all_labels, all_predictions, target_names=age_encoder.classes_))

### Save the model and LabelEncoder

In [None]:
model.save('basic_softmax_age_classifier.h5')
print("Model saved successfully.")

# Save the LabelEncoder
joblib.dump(age_encoder, 'age_encoder.pkl')
print("Age encoder saved successfully.")

### exapmle of use

In [None]:
import joblib
import numpy as np
import tensorflow as tf
from img2vec import rgb2flatPCA

# Load the trained model
model = tf.keras.models.load_model('basic_softmax_age_classifier.h5')
print("Model loaded successfully.")

# Load the LabelEncoder
age_encoder = joblib.load('age_encoder.pkl')
print("Age encoder loaded successfully.")


# Function to predict age range for a new image
def predict_age(image_path, model, age_encoder):
    # Extract features using rgb2flatPCA
    features = rgb2flatPCA([image_path])
    # Normalize features
    features = features / 255.0
    # Make prediction
    pred_probs = model.predict(features)[0]
    # Get predicted class
    pred_class = np.argmax(pred_probs)
    # Convert to age range
    pred_age_range = age_encoder.classes_[pred_class]
    confidence = pred_probs[pred_class]

    return pred_age_range, confidence


# Example usage:
image_path = "img.jpg"
pred_age, confidence = predict_age(image_path, model, age_encoder)
print(f"Predicted age range: {pred_age} with confidence {confidence:.2f}")
