# Load cleaned data and preprocessing data here

In [8]:
import pandas as pd

# # Load the CSV file into a DataFrame
# df = pd.read_csv("cleaned_human_face_emotions.csv")

# Load the Parquet file (instead)
df = pd.read_parquet("cleaned_human_face_emotions.parquet")

# Drop a column (for example, the "qa" column)
df = df.drop(columns=["qa"])

# Print the first few rows of the updated DataFrame
print(df.head())

                                               image  emotion
0  {'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...      sad
1  {'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...    anger
2  {'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...  neutral
3  {'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...     fear
4  {'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...  content


In [9]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9400 entries, 0 to 9399
Data columns (total 2 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   image    9400 non-null   object
 1   emotion  9400 non-null   object
dtypes: object(2)
memory usage: 147.0+ KB


Now we just have images in the first column with the emotion in the second column.

In [10]:
from sklearn.model_selection import train_test_split

# Separate feature (X) and label (y)
X = df['image']
y = df['emotion']

# Perform a stratified split to keep class distribution consistent
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,      # 80% training, 20% testing
    random_state=42,    # for reproducibility
    stratify=y          # important for classification
)

# Validation split from X_train if needed:
X_train, X_val, y_train, y_val = train_test_split(
    X_train,
    y_train,
    test_size=0.25,     # 25% of the training set (which is 20% of the total) -> 15% overall
    random_state=42,
    stratify=y_train
)

print("Training set size:", len(X_train))
print("Test set size:", len(X_test))
print("Validation set size:", len(X_val))

Training set size: 5640
Test set size: 1880
Validation set size: 1880


In [11]:
import io
import numpy as np
from PIL import Image

# Image bytes -> numpy arrays
def decode_images(image_series, target_size=(224, 224)):
    """
    Takes a pandas Series of dictionaries, each containing {'bytes': ...}.
    Decodes them into a list of NumPy arrays (RGB).
    Resizes images to target_size.
    Normalizes pixel values to [0, 1].

    Returns:
      - A NumPy array of shape (num_samples, target_size[0], target_size[1], 3)
    """
    decoded_list = []
    for item in image_series:
        # item should be a dict like {'bytes': b'...'}
        try:
            img_bytes = item['bytes']
            with Image.open(io.BytesIO(img_bytes)) as img:
                # Convert to RGB if needed
                img = img.convert('RGB')
                # Resize
                img = img.resize(target_size)
                # Convert to array
                arr = np.array(img, dtype=np.float32) / 255.0
            decoded_list.append(arr)
        except Exception as e:
            # If there's a bad image, you might want to handle or skip it
            print("Error decoding image:", e)
            # Optionally skip or handle it somehow. For now, let's skip:
            # Continue with the loop
            continue

    return np.stack(decoded_list, axis=0)

print("\nDecoding and resizing images...")

# Decode train set
X_train_array = decode_images(X_train, target_size=(224, 224))
print("X_train_array shape:", X_train_array.shape)

# Decode val set
X_val_array = decode_images(X_val, target_size=(224, 224))
print("X_val_array shape:", X_val_array.shape)

# Decode test set
X_test_array = decode_images(X_test, target_size=(224, 224))
print("X_test_array shape:",  X_test_array.shape)


Decoding and resizing images...
X_train_array shape: (5640, 224, 224, 3)
X_val_array shape: (1880, 224, 224, 3)
X_test_array shape: (1880, 224, 224, 3)


In [12]:
# Encode labels
from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_val_encoded   = label_encoder.transform(y_val)
y_test_encoded  = label_encoder.transform(y_test)

print("\nLabel classes found:", label_encoder.classes_)
print("Sample of encoded labels:", y_train_encoded[:10])


Label classes found: ['anger' 'content' 'disgust' 'fear' 'happy' 'neutral' 'sad' 'surprise']
Sample of encoded labels: [6 3 5 6 0 6 7 3 4 0]


# Use transfer learning with pre-trained CNN model
convert the existing pipeline to use a pre-trained network such as ResNet50 in a transfer‐learning setup for multi-class classification using cross-entropy loss. In a transfer-learning approach, it typically replace the top (classification) layers of the pre-trained network with our own custom head and use a loss such as categorical cross-entropy (or sparse categorical cross-entropy if your labels remain as integers). We then train the added head first, and optionally fine-tune the deeper layers later.

Improve accuracy with data augmentation.

In [13]:
pip install --upgrade tensorflow

Note: you may need to restart the kernel to use updated packages.


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
# import warnings
# warnings.filterwarnings("ignore", message="Name tf.RaggedTensorSpec has already been registered")

# Assume X_train_array, X_val_array, X_test_array,
# y_train_encoded, y_val_encoded, y_test_encoded, and label_encoder are already defined.
num_classes = 8

# Define a simple data augmentation pipeline:
data_augmentation = tf.keras.Sequential([
    # Random rotation: factor=0.25 corresponds roughly to ±90° rotation.
    layers.RandomRotation(0.25),
    # Random horizontal flip.
    layers.RandomFlip("horizontal")
])

# Pick a pre-trained model; here we use ResNet50.
base_model = tf.keras.applications.ResNet50(
    input_shape=(224, 224, 3),
    include_top=False,  # Remove the default classification head.
    weights='imagenet'
)
# Freeze the base model to only train the new head initially.
base_model.trainable = False

# Build a new model on top of the base model.
model = models.Sequential([
    tf.keras.Input(shape=(224, 224, 3))
    # Data augmentation layers (only active during training).
    data_augmentation,
    # Pre-processing can be added here if needed.
    base_model,
    # Global average pooling to reduce spatial dimensions.
    layers.GlobalAveragePooling2D(),
    # Optional dropout for regularization.
    layers.Dropout(0.2),
    # Final Dense layer for multi-class classification.
    layers.Dense(num_classes, activation='softmax')
])

# Compile the model.
model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer='adam',
    metrics=['accuracy']
)

# Show the model summary.
model.summary()

# Train the new head.
history = model.fit(
    X_train_array, y_train_encoded,
    validation_data=(X_val_array, y_val_encoded),
    epochs=100,
    batch_size=177
)

# Evaluate on the test set.
test_loss, test_acc = model.evaluate(X_test_array, y_test_encoded)
print(f"\nTest Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")

# Save the trained model (HDF5 format).
model.save("ResNet50_basemodel.h5")
print("Model saved as ResNet50_basemodel.h5")

# Optional: Fine-tuning
# Unfreeze the base model to allow fine-tuning.
base_model.trainable = True
# Freeze all layers except the last two of the base model.
for layer in base_model.layers[:-2]:
    layer.trainable = False

# Re-compile the model with a lower learning rate.
model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=tf.keras.optimizers.Adam(1e-5),
    metrics=['accuracy']
)

# Continue training (fine-tuning).
history_fine = model.fit(
    X_train_array, y_train_encoded,
    validation_data=(X_val_array, y_val_encoded),
    epochs=100,
    batch_size=177
)

final_loss, final_acc = model.evaluate(X_test_array, y_test_encoded)
print(f"\nFinal Test Loss after fine-tuning: {final_loss:.4f}")
print(f"Final Test Accuracy after fine-tuning: {final_acc:.4f}")

# Optionally, save the fine-tuned model too.
model.save("ResNet50_basemodel_finetuned.h5")
print("Fine-tuned model saved as ResNet50_basemodel_finetuned.h5")

# Plot loss versus epochs (for initial training).
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title("Loss vs. Epochs (Initial Training)")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)
plt.show()

# If you want to also plot the fine-tuning phase's loss,
# you can do something similar:
plt.figure(figsize=(10, 6))
plt.plot(history_fine.history['loss'], label='Fine-tune Training Loss')
plt.plot(history_fine.history['val_loss'], label='Fine-tune Validation Loss')
plt.title("Loss vs. Epochs (Fine-tuning)")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)
plt.show()

Epoch 1/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m187s[0m 6s/step - accuracy: 0.1222 - loss: 2.2264 - val_accuracy: 0.1351 - val_loss: 2.0836
Epoch 2/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m157s[0m 5s/step - accuracy: 0.1312 - loss: 2.1395 - val_accuracy: 0.1617 - val_loss: 2.0723
Epoch 3/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m160s[0m 5s/step - accuracy: 0.1341 - loss: 2.1168 - val_accuracy: 0.1388 - val_loss: 2.0670
Epoch 4/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m159s[0m 5s/step - accuracy: 0.1328 - loss: 2.1042 - val_accuracy: 0.1436 - val_loss: 2.0628
Epoch 5/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m158s[0m 5s/step - accuracy: 0.1461 - loss: 2.0873 - val_accuracy: 0.1410 - val_loss: 2.0638
Epoch 6/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m154s[0m 5s/step - accuracy: 0.1559 - loss: 2.0765 - val_accuracy: 0.1527 - val_loss: 2.0693
Epoch 7/100
[1m32/32[0m [