# 05_model_vit_or_efficientnet.ipynb

**EfficientNet or Vision Transformer (ViT)** for Driver Drowsiness Detection.

- Uses your preprocessed arrays in `data/processed/*.npy`
- Choose model via `MODEL_FAMILY = "efficientnet"` or `"vit"`
- Handles resizing & preprocessing internally
- Trains, evaluates, saves model + report


In [None]:
import os, json, time, numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight

import tensorflow as tf
from tensorflow.keras import layers, models

# Paths
PROCESSED_DIR = "data/processed"
MODEL_DIR     = "models"
RESULTS_DIR   = "results"
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)

# Classes
CLASS_NAMES = ["Closed_Eyes","Open_Eyes","Yawn","No_Yawn"]
num_classes = len(CLASS_NAMES)

# Choose family: 'efficientnet' or 'vit'
MODEL_FAMILY = "efficientnet"  # change to 'vit' if you want Vision Transformer

# Target image size
TARGET_SIZE = (224, 224)

tf.random.set_seed(42); np.random.seed(42)
print("TensorFlow:", tf.__version__, "| Model family:", MODEL_FAMILY, "| Target:", TARGET_SIZE)


In [None]:
X_train = np.load(f"{PROCESSED_DIR}/X_train.npy")
y_train = np.load(f"{PROCESSED_DIR}/y_train.npy")
X_val   = np.load(f"{PROCESSED_DIR}/X_val.npy")
y_val   = np.load(f"{PROCESSED_DIR}/y_val.npy")
X_test  = np.load(f"{PROCESSED_DIR}/X_test.npy")
y_test  = np.load(f"{PROCESSED_DIR}/y_test.npy")

print("Shapes:")
print("  X_train:", X_train.shape, " y_train:", y_train.shape)
print("  X_val  :", X_val.shape,   " y_val  :", y_val.shape)
print("  X_test :", X_test.shape,  " y_test :", y_test.shape)


In [None]:
classes = np.arange(num_classes)
class_weights = compute_class_weight("balanced", classes=classes, y=y_train)
class_weights = {int(i): float(w) for i,w in enumerate(class_weights)}
class_weights


In [None]:
def build_efficientnet_classifier(num_classes, target_size=(224,224)):
    resize = layers.Resizing(target_size[0], target_size[1])
    preprocess = tf.keras.applications.efficientnet.preprocess_input
    base = tf.keras.applications.EfficientNetB0(include_top=False, weights="imagenet", input_shape=(target_size[0], target_size[1], 3))
    base.trainable = False  # Phase 1: feature extract

    inputs = layers.Input(shape=X_train.shape[1:])  # e.g., (64,64,3)
    x = resize(inputs)
    x = layers.Lambda(preprocess)(x)
    x = base(x, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.25)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    model = models.Model(inputs, outputs, name="efficientnet_b0_cls")
    return model, base


In [None]:
def mlp(x, hidden_units, dropout=0.0):
    for units in hidden_units:
        x = layers.Dense(units, activation='gelu')(x)
        x = layers.Dropout(dropout)(x)
    return x

def build_vit_classifier(num_classes, image_size=(224,224), patch_size=16, projection_dim=192, transformer_layers=6, num_heads=3, mlp_head_units=[256,128]):
    # Inputs come at original size; we resize to image_size
    inputs = layers.Input(shape=X_train.shape[1:])  # (H0,W0,3)
    x = layers.Resizing(image_size[0], image_size[1])(inputs)

    # Patchify via Conv2D with stride = patch_size
    patches = layers.Conv2D(filters=projection_dim, kernel_size=patch_size, strides=patch_size, padding="valid")(x)  # (H/ps, W/ps, dim)
    h, w = image_size[0] // patch_size, image_size[1] // patch_size
    num_patches = h * w
    x = layers.Reshape((num_patches, projection_dim))(patches)  # (B, N, D)

    # Class token
    cls_token = tf.Variable(tf.zeros((1, 1, projection_dim)), trainable=True, name="cls_token")
    cls_tokens = tf.repeat(cls_token, repeats=tf.shape(x)[0], axis=0)
    x = tf.concat([cls_tokens, x], axis=1)  # (B, N+1, D)

    # Positional embeddings
    pos_emb = tf.Variable(tf.random.normal([1, num_patches + 1, projection_dim], stddev=0.02), trainable=True, name="pos_emb")
    x = x + pos_emb

    # Transformer encoder blocks
    for _ in range(transformer_layers):
        # LayerNorm + MHA
        x1 = layers.LayerNormalization(epsilon=1e-6)(x)
        attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=projection_dim, dropout=0.0)(x1, x1)
        x2 = layers.Add()([attn, x])
        # LayerNorm + MLP
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        x3 = mlp(x3, hidden_units=[projection_dim*4, projection_dim], dropout=0.1)
        x = layers.Add()([x3, x2])

    # Take CLS token output
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    cls_out = x[:, 0]  # (B, D)

    # Head
    x = mlp(cls_out, mlp_head_units, dropout=0.2)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    model = models.Model(inputs, outputs, name="vit_classifier")
    return model


In [None]:
if MODEL_FAMILY == "efficientnet":
    model, base = build_efficientnet_classifier(num_classes, TARGET_SIZE)
else:
    model = build_vit_classifier(num_classes, image_size=TARGET_SIZE)
model.summary()


In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard

run_id = time.strftime("%Y%m%d-%H%M%S") + f"_{MODEL_FAMILY}"
ckpt_path = f"{MODEL_DIR}/{MODEL_FAMILY}_best_{run_id}.keras"
log_dir   = f"{RESULTS_DIR}/logs_{run_id}"

model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

callbacks = [
    EarlyStopping(monitor="val_accuracy", patience=8, restore_best_weights=True, verbose=1),
    ModelCheckpoint(ckpt_path, monitor="val_accuracy", save_best_only=True, verbose=1),
    ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-6, verbose=1),
    TensorBoard(log_dir=log_dir)
]

print("Checkpoint:", ckpt_path)
print("Logs     :", log_dir)


In [None]:
history1 = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=15 if MODEL_FAMILY=="efficientnet" else 25,
    batch_size=64,
    callbacks=callbacks,
    class_weight=compute_class_weight("balanced", classes=np.arange(num_classes), y=y_train),
    verbose=1
)


In [None]:
history2 = None
if MODEL_FAMILY == "efficientnet":
    # Unfreeze last blocks for fine-tuning
    for layer in base.layers[-30:]:
        layer.trainable = True
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-5),
                  loss="sparse_categorical_crossentropy",
                  metrics=["accuracy"])
    history2 = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=25,
        batch_size=64,
        callbacks=callbacks,
        class_weight=compute_class_weight("balanced", classes=np.arange(num_classes), y=y_train),
        verbose=1
    )


In [None]:
def plot_hist(h, label):
    if h is None: return
    plt.figure(); plt.plot(h.history["accuracy"], label=f"{label} train"); plt.plot(h.history["val_accuracy"], label=f"{label} val"); plt.title("Accuracy"); plt.xlabel("epoch"); plt.legend(); plt.show()
    plt.figure(); plt.plot(h.history["loss"], label=f"{label} train"); plt.plot(h.history["val_loss"], label=f"{label} val"); plt.title("Loss"); plt.xlabel("epoch"); plt.legend(); plt.show()

plot_hist(history1, "phase1")
plot_hist(history2, "finetune")


In [None]:
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print(f"Test accuracy: {test_acc:.4f}")

y_prob = model.predict(X_test, verbose=0)
y_pred = np.argmax(y_prob, axis=1)

print(classification_report(y_test, y_pred, target_names=CLASS_NAMES))

cm = confusion_matrix(y_test, y_pred)

# Matplotlib-only confusion matrix
fig, ax = plt.subplots(figsize=(5,4))
im = ax.imshow(cm, cmap="Blues")
ax.set_xticks(range(num_classes)); ax.set_xticklabels(CLASS_NAMES, rotation=45, ha="right")
ax.set_yticks(range(num_classes)); ax.set_yticklabels(CLASS_NAMES)
for i in range(num_classes):
    for j in range(num_classes):
        ax.text(j, i, cm[i, j], ha="center", va="center", color="black")
ax.set_xlabel("Predicted"); ax.set_ylabel("True"); ax.set_title("Confusion Matrix")
plt.colorbar(im, ax=ax); plt.tight_layout(); plt.show()

report_path = f"{RESULTS_DIR}/{MODEL_FAMILY}_report_{run_id}.txt"
with open(report_path, "w") as f:
    f.write(f"Test accuracy: {test_acc:.4f}\n\n")
    f.write(classification_report(y_test, y_pred, target_names=CLASS_NAMES))
print("Saved report to:", report_path)


In [None]:
final_path = f"{MODEL_DIR}/{MODEL_FAMILY}_final_{run_id}.keras"
model.save(final_path)
with open(f"{MODEL_DIR}/labels.json","w") as fp:
    json.dump({i:c for i,c in enumerate(CLASS_NAMES)}, fp)

print("Saved model to:", final_path)
print("Saved labels to: models/labels.json")


In [None]:
import cv2
def predict_image_generic(path, model, target_size=(224,224)):
    img = cv2.imread(path)
    if img is None: raise FileNotFoundError(path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, target_size)
    x = (img/255.0).astype("float32")
    x = np.expand_dims(x, axis=0)
    probs = model.predict(x, verbose=0)[0]
    idx = int(np.argmax(probs))
    return {"class": CLASS_NAMES[idx], "confidence": float(probs[idx]), "probs": probs.tolist()}

# Example:
# res = predict_image_generic("data/sample/your_image.jpg", model, target_size=TARGET_SIZE)
# res
