In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import os
import keras
from sklearn.model_selection import train_test_split
from keras.models import Model
from keras import layers
from keras.layers import Layer, BatchNormalization, Activation, InputLayer
from keras.layers import Dense, Conv2D, MaxPooling2D, Flatten, Dropout
from keras.layers import RandomContrast, RandomFlip, RandomRotation, RandomZoom
from keras.layers import Concatenate, GlobalAveragePooling2D, GlobalMaxPooling2D
from keras.layers import Resizing, Rescaling
from keras.losses import CategoricalCrossentropy
from keras.optimizers import Adam, AdamW, SGD
from keras.metrics import CategoricalAccuracy, TopKCategoricalAccuracy
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.regularizers import L2, L1
from sklearn.metrics import confusion_matrix, classification_report

Dataset Loading

In [None]:
train_dir = "dataset/Emotions Dataset/Emotions Dataset/train/"
test_dir = "dataset/Emotions Dataset/Emotions Dataset/test/"
# check exist
if os.path.exists(train_dir):
    print("Training directory found.")
else:
    print("Training directory not found.")
    raise FileNotFoundError("Training directory not found.")
if os.path.exists(test_dir):
    print("Testing directory found.")
else:
    print("Testing directory not found.")
    raise FileNotFoundError("Testing directory not found.")

In [None]:
# # check images size
# df_image_sizes = []
# for class_name in os.listdir(train_dir):
#     class_path = os.path.join(train_dir, class_name)
#     if os.path.isdir(class_path):
#         image_count = len(os.listdir(class_path))
#         for i in range(image_count):
#             sample_image_path = os.path.join(class_path, os.listdir(class_path)[i])
#             sample_image = keras.preprocessing.image.load_img(sample_image_path)
#             df_image_sizes.append(
#                 {
#                     "class": class_name,
#                     "width": sample_image.size[0],
#                     "height": sample_image.size[1],
#                 }
#             )
# df_image_sizes = pd.DataFrame(df_image_sizes)
# df_image_sizes.value_counts()

In [None]:
CLASS_NAMES = ["angry", "happy", "sad"]
CONFIGURATION = {
    "IMAGE_SIZE": 224,
    "BATCH_SIZE": 64,
    "N_FILTERS": 16,
    "KERNEL_SIZE": 3,
    "N_STRIDES": 4,
    "REGULATION_RATE": 1e-3,
    "DROPOUT_RATE": 0.4,
    "POOL_SIZE": 2,
    "N_CLASSES": len(CLASS_NAMES),
    "N_DENSE_1": 2048,
    "N_DENSE_2": 128,
    "EPOCHS": 100,
    "INITIAL_LEARNING_RATE": 1e-3,
    "DECAY_STEPS": 500,
    "DECAY_RATE": 0.95,
    "STAIRCASE": True,
    "VALIDATION_SPLIT": 0.01,
    "PATIENCE": 10,
    "SEED": 99,
    "DO_COMPILE": False,
    "USING_MODEL_PATH": "models/best_lenet_model_acc87.keras",
    "DO_DRAW_FEATURE_MAP": False,
    "DO_DRAW_GRAD_CAM": True,
}

In [None]:
train_dataset = keras.preprocessing.image_dataset_from_directory(
    train_dir,
    labels="inferred",
    label_mode="categorical",
    class_names=CLASS_NAMES,
    color_mode="rgb",
    batch_size=CONFIGURATION["BATCH_SIZE"],
    image_size=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"]),
    shuffle=True,
    seed=CONFIGURATION["SEED"],
    validation_split=CONFIGURATION["VALIDATION_SPLIT"],
    subset="training",
)

In [None]:
val_dataset = keras.preprocessing.image_dataset_from_directory(
    test_dir,
    labels="inferred",
    label_mode="categorical",
    class_names=CLASS_NAMES,
    color_mode="rgb",
    batch_size=CONFIGURATION["BATCH_SIZE"],
    image_size=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"]),
    shuffle=True,
    seed=CONFIGURATION["SEED"],
)

In [None]:
# 각 클래스의 샘플 수 확인
for class_name in CLASS_NAMES:
    class_path = os.path.join(train_dir, class_name)
    count = len(os.listdir(class_path))
    print(f"{class_name}: {count} images")

In [None]:
# 클래스 불균형 해결
from sklearn.utils.class_weight import compute_class_weight

class_weights = compute_class_weight(
    "balanced",
    classes=np.array([0, 1, 2]),
    y=np.array([0] * 1525 + [1] * 3019 + [2] * 2255),
)
class_weight_dict = {i: class_weights[i] for i in range(3)}

# 학습 시 적용

In [None]:
# for i in train_dataset.take(1):  # type: ignore
#     images, labels = i
#     print(images.shape)
#     print(labels.numpy())

Dataset Visualization

In [None]:
x, y = next(iter(train_dataset))
print("Image batch shape: ", x.shape)
print("Label batch shape: ", y.shape)

In [None]:
unique = (
    np.unique(y.numpy())
    if len(y.shape) == 1
    else np.unique(np.argmax(y.numpy(), axis=1))
)
print("unique labels:", unique)

In [None]:
def visualize_training_history(history):
    # plot training & validation accuracy values
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history.history["acc"], label="Train Accuracy")
    plt.plot(history.history["val_acc"], label="Validation Accuracy")
    plt.title("Model Accuracy")
    plt.ylabel("Accuracy")
    plt.xlabel("Epoch")
    plt.legend(loc="lower right")
    # plot training & validation loss values
    plt.subplot(1, 2, 2)
    plt.plot(history.history["loss"], label="Train Loss")
    plt.plot(history.history["val_loss"], label="Validation Loss")
    plt.title("Model Loss")
    plt.ylabel("Loss")
    plt.xlabel("Epoch")
    plt.legend(loc="upper right")
    plt.show()

In [None]:
plt.figure(figsize=(10, 10))
for images, labels in train_dataset.take(1):  # type: ignore
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(f"{CLASS_NAMES[tf.argmax(labels[i])]}(L:{labels[i].numpy()})")
        plt.axis("off")

Dataset Preparation

In [None]:
training_dataset = train_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)  # type: ignore
validation_dataset = val_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)  # type: ignore

Modeling

In [None]:
resize_rescale_layers = keras.Sequential(
    [
        Resizing(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"]),
        Rescaling(1.0 / 255),
    ]
)

In [None]:
# 2. Data Augmentation 추가 (Dataset Preparation 직후)
data_augmentation = keras.Sequential(
    [
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.25),
        layers.RandomZoom(0.03),
        layers.RandomContrast(0.1),
    ],
    name="data_augmentation",
)

lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=CONFIGURATION["INITIAL_LEARNING_RATE"],
    decay_steps=CONFIGURATION["DECAY_STEPS"],
    decay_rate=CONFIGURATION["DECAY_RATE"],
    staircase=CONFIGURATION["STAIRCASE"],
)


# optimizer = Adam(learning_rate=lr_schedule)
# optimizer = AdamW(learning_rate=lr_schedule, weight_decay=1e-4)
optimizer = SGD(learning_rate=0.01, momentum=0.9, nesterov=True)

loss = CategoricalCrossentropy(from_logits=False, label_smoothing=0.0)
metrics = [
    CategoricalAccuracy(name="acc"),
    CategoricalCrossentropy(name="cce"),
]

early_stopping = EarlyStopping(
    monitor="val_loss",
    patience=CONFIGURATION["PATIENCE"],
    restore_best_weights=True,
)
model_checkpoint = ModelCheckpoint(
    "best_lenet_model.keras",
    monitor="val_loss",
    save_best_only=True,
)
callbacks = [
    early_stopping,
    model_checkpoint,
]

In [None]:
def load_or_build_model():
    if CONFIGURATION["DO_COMPILE"]:
        print("Model path not found. Building a new model.")
        loaded_model = _setup_lenet_model()
    else:
        if os.path.exists(CONFIGURATION["USING_MODEL_PATH"]):
            print("Loading model from:", CONFIGURATION["USING_MODEL_PATH"])
            loaded_model = keras.models.load_model(CONFIGURATION["USING_MODEL_PATH"])
            return loaded_model
        else:
            raise FileNotFoundError("Model path not found.")

    # Build a new model if not loading
    return loaded_model


def _setup_lenet_model():
    model = keras.Sequential(
        [
            InputLayer((CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"], 3)),
            resize_rescale_layers,
            data_augmentation,
            Conv2D(
                filters=CONFIGURATION["N_FILTERS"],
                kernel_size=CONFIGURATION["KERNEL_SIZE"],
                padding="same",
                activation="relu",
                kernel_regularizer=L2(CONFIGURATION["REGULATION_RATE"]),
            ),
            BatchNormalization(),
            MaxPooling2D(CONFIGURATION["POOL_SIZE"], CONFIGURATION["N_STRIDES"]),
            Conv2D(
                filters=CONFIGURATION["N_FILTERS"] * 2,
                kernel_size=CONFIGURATION["KERNEL_SIZE"],
                padding="same",
                activation="relu",
                kernel_regularizer=L2(CONFIGURATION["REGULATION_RATE"]),
            ),
            BatchNormalization(),
            MaxPooling2D(CONFIGURATION["POOL_SIZE"], CONFIGURATION["N_STRIDES"]),
            Conv2D(
                filters=CONFIGURATION["N_FILTERS"] * 8,
                kernel_size=CONFIGURATION["KERNEL_SIZE"],
                padding="same",
                activation="relu",
                kernel_regularizer=L2(CONFIGURATION["REGULATION_RATE"]),
            ),
            BatchNormalization(),
            Dropout(CONFIGURATION["DROPOUT_RATE"]),
            Conv2D(
                filters=CONFIGURATION["N_FILTERS"] * 8,
                kernel_size=CONFIGURATION["KERNEL_SIZE"],
                padding="same",
                activation="relu",
                kernel_regularizer=L2(CONFIGURATION["REGULATION_RATE"]),
            ),
            BatchNormalization(),
            Dropout(CONFIGURATION["DROPOUT_RATE"]),
            Conv2D(
                filters=CONFIGURATION["N_FILTERS"] * 8,
                kernel_size=CONFIGURATION["KERNEL_SIZE"],
                padding="same",
                activation="relu",
                kernel_regularizer=L2(CONFIGURATION["REGULATION_RATE"]),
            ),
            BatchNormalization(),
            Flatten(),
            Dense(
                CONFIGURATION["N_DENSE_1"],
                activation="relu",
                kernel_regularizer=L2(CONFIGURATION["REGULATION_RATE"]),
            ),
            Dropout(0.2),
            Dense(
                CONFIGURATION["N_DENSE_2"],
                activation="relu",
            ),
            Dense(CONFIGURATION["N_CLASSES"], activation="softmax"),
        ]
    )
    model.compile(
        optimizer=optimizer,  # type: ignore
        loss=loss,
        metrics=metrics,
    )
    return model


lenet_model = load_or_build_model()
lenet_model.summary()
print(lenet_model.output_shape)

Training

In [None]:
# # low loss test
# y_true = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]])
# y_pred = np.array([[0.9, 0.05, 0.05], [0.1, 0.8, 0.1], [0.2, 0.2, 0.6]])
# loss_value = loss(y_true, y_pred).numpy()
# print(f"Low loss value: {loss_value}")

# # high loss test
# y_true = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]])
# y_pred = np.array([[0.1, 0.8, 0.1], [0.3, 0.01, 0.69], [0.9, 0.1, 0.0]])
# loss_value = loss(y_true, y_pred).numpy()
# print(f"High loss value: {loss_value}")

In [None]:
# 만약 pass mode면 해당 셀에서 lenet_model 불러오기 빌드 수행

In [None]:
# # 1. 배치의 실제 이미지 범위 확인
# # 2. 시각화하여 이미지가 올바른지 확인
# plt.figure(figsize=(12, 3))
# for images, labels in training_dataset.take(1):
#     print("Image dtype:", images.dtype)
#     print("Image min:", images.numpy().min())
#     print("Image max:", images.numpy().max())
#     print("Label shape:", labels.shape)
#     print("Label sample:", labels.numpy()[:3])
#     print("Predicted class from label:", np.argmax(labels.numpy()[:3], axis=1))
#     for i in range(3):
#         plt.subplot(1, 3, i + 1)
#         img = images[i].numpy()
#         if img.max() > 1:  # 0-255 범위
#             img = img / 255.0
#         plt.imshow(img.astype("float32"))
#         plt.title(CLASS_NAMES[np.argmax(labels[i].numpy())])
#     break
# plt.tight_layout()
# plt.show()

In [None]:
# small_ds = training_dataset.take(2).repeat()  # 배치 2개를 반복해서 외우게 함
# history = lenet_model.fit(
#     small_ds, steps_per_epoch=20, epochs=50, verbose=1, class_weight=class_weight_dict
# )

In [None]:
# config 보고 load면 fit 패스
if not CONFIGURATION["DO_COMPILE"]:
    print("Model loaded. Skipping training as per configuration.")
else:
    print("Starting training...")
    history = lenet_model.fit(
        training_dataset,
        epochs=CONFIGURATION["EPOCHS"],
        validation_data=validation_dataset,
        callbacks=callbacks,
        # class_weight=class_weight_dict,
        verbose=1,
    )
    visualize_training_history(history)

Evaluation

In [None]:
lenet_model.evaluate(validation_dataset)

In [None]:
# show false predictions
plt.figure(figsize=(10, 10))
i = 0
for images, labels in validation_dataset:  # type: ignore
    f_maps = lenet_model.predict(images)
    for j in range(len(images)):
        if tf.argmax(f_maps[j]) != tf.argmax(labels[j]):
            ax = plt.subplot(3, 3, i + 1)
            plt.imshow(images[j].numpy().astype("uint8"))
            plt.title(
                f"Predicted: {CLASS_NAMES[tf.argmax(f_maps[j])]}, Actual: {CLASS_NAMES[tf.argmax(labels[j])]}"
            )
            plt.axis("off")
            i += 1
            if i >= 9:
                break
    if i >= 9:
        break
plt.savefig("false_predictions_acc87.png")
plt.show()
# plt save

In [None]:
# plt save
plt.savefig("false_predictions_acc87.png")

Confusion Matrix

In [None]:
predicted = []
labels = []
for images, label in validation_dataset:  # type: ignore
    preds = lenet_model.predict(images, verbose=0)
    predicted.extend(np.argmax(preds, axis=1).tolist())
    labels.extend(np.argmax(label.numpy(), axis=1).tolist())

cm = confusion_matrix(labels, predicted)
plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=CLASS_NAMES,
    yticklabels=CLASS_NAMES,
    cmap="Blues",
)
plt.ylabel("Actual")
plt.xlabel("Predicted")
plt.title("Confusion Matrix")
plt.show()

In [None]:
classification_report_output = classification_report(
    labels, predicted, target_names=CLASS_NAMES
)
print("Classification Report:\n", classification_report_output)

Visualization about model

In [None]:
if CONFIGURATION["DO_DRAW_FEATURE_MAP"]:
    for i in lenet_model.layers:
        print(i.name)
    display(lenet_model.input_shape)
    # dummy 호출
    dummy = tf.zeros((1, 224, 224, 3))
    _ = lenet_model.predict(dummy)

In [None]:
if CONFIGURATION["DO_DRAW_FEATURE_MAP"]:
    # feature model visualization
    feature_layers = [
        layer.output for layer in lenet_model.layers if "conv" in layer.name
    ]
    feature_model = Model(
        inputs=lenet_model.layers[0].input,
        outputs=feature_layers,
    )
    feature_model.summary()

In [None]:
if CONFIGURATION["DO_DRAW_FEATURE_MAP"]:
    test_img = train_dir + r"/happy/3159.jpg"
    img = keras.preprocessing.image.load_img(
        test_img, target_size=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"])
    )
    img_array = keras.preprocessing.image.img_to_array(img)
    img_array = tf.expand_dims(img_array, 0)  # Create batch axis

    f_maps = feature_model.predict(img_array)

    # origin
    plt.figure(figsize=(4, 4))
    plt.imshow(img_array[0].numpy().astype("uint8"))
    plt.title("Original Image")
    plt.axis("off")
    plt.show()

In [None]:
if CONFIGURATION["DO_DRAW_FEATURE_MAP"]:
    for i in range(len(f_maps)):
        plt.figure(figsize=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"]))
        f_size = f_maps[i].shape[-1]
        n_channels = int(np.sqrt(f_size))
        for j in range(n_channels * n_channels):
            plt.subplot(n_channels, n_channels, j + 1)
            plt.imshow(f_maps[i][0, :, :, j], cmap="gray")
            plt.axis("off")
            plt.suptitle(f"Feature Maps from Conv Layer {i+1}")
        plt.savefig(f"feature_map_conv_layer_{i+1}.png")
        plt.show()

DRAWING Grad CAM

In [None]:
for i in lenet_model.layers:
    print(i.name)

In [None]:
if CONFIGURATION["DO_DRAW_GRAD_CAM"]:
    from keras import Input

    last_conv_layer_name = "conv2d_19"
    last_conv_layer = lenet_model.get_layer(last_conv_layer_name)
    print("Last conv layer output shape:", last_conv_layer.output.shape)
    # Last conv layer output shape: (None, 14, 14, 128)

    last_conv_layer_model = Model(
        inputs=lenet_model.inputs, outputs=last_conv_layer.output
    )
    classifer_layer_names = [
        "flatten_3",
        "dense_9",
        "dropout_11",
        "dense_10",
        "dense_11",
    ]
    classifier_input = Input(shape=last_conv_layer.output.shape[1:])
    x = classifier_input
    for layer_name in classifer_layer_names:
        x = lenet_model.get_layer(layer_name)(x)
    classifier_model = Model(inputs=classifier_input, outputs=x)
    classifier_model.summary()

In [None]:
if CONFIGURATION["DO_DRAW_GRAD_CAM"]:
    test_img = train_dir + r"/happy/3159.jpg"
    img = keras.preprocessing.image.load_img(
        test_img, target_size=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"])
    )
    img_array = keras.preprocessing.image.img_to_array(img)
    img_array = tf.expand_dims(img_array, 0)  # Create batch axis

    with tf.GradientTape() as tape:
        last_conv_layer_output = last_conv_layer_model(img_array)
        tape.watch(last_conv_layer_output)
        preds = classifier_model(last_conv_layer_output)
        top_pred_index = tf.argmax(preds[0])
        top_class_channel = preds[:, top_pred_index]
    grads = tape.gradient(top_class_channel, last_conv_layer_output)
    display(grads.shape)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    display(pooled_grads.shape)
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    plt.matshow(heatmap.numpy())
    plt.title("Grad-CAM Heatmap")
    plt.axis("off")
    plt.show()

In [None]:
if CONFIGURATION["DO_DRAW_GRAD_CAM"]:
    import cv2

    resized_heatmap = cv2.resize(
        np.array(heatmap), (CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"])
    )
    img = keras.preprocessing.image.load_img(
        test_img, target_size=(CONFIGURATION["IMAGE_SIZE"], CONFIGURATION["IMAGE_SIZE"])
    )
    img = keras.preprocessing.image.img_to_array(img)
    img = img.astype("uint8")
    heatmap = np.uint8(255 * resized_heatmap)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    superimposed_img = heatmap * 0.4 + img
    superimposed_img = np.clip(superimposed_img, 0, 255).astype("uint8")
    plt.figure(figsize=(8, 8))
    plt.subplot(1, 3, 1)
    plt.imshow(img.astype("uint8"))
    plt.title("Original Image")
    plt.axis("off")
    plt.subplot(1, 3, 2)
    plt.imshow(heatmap)
    plt.title("Grad-CAM Heatmap")
    plt.axis("off")
    plt.subplot(1, 3, 3)
    plt.imshow(superimposed_img)
    plt.title("Superimposed Image")
    plt.axis("off")
    plt.savefig("grad_cam_happy_3159.png")
    plt.show()