## Human Action Recognition — EfficientNetB3

Train an ImageNet-pretrained EfficientNetB3 to classify human actions from images.

**What’s inside**
- Fast `tf.data` pipeline (decode/resize/augment) + optional MixUp
- Mixed precision for faster training
- Two-stage training: warm up the head → fine-tune the backbone


In [None]:
import os
import random

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

from google.protobuf.message_factory import MessageFactory

if not hasattr(MessageFactory, "GetPrototype") and hasattr(MessageFactory, "GetMessageClass"):

    def _GetPrototype(self, descriptor):
        return self.GetMessageClass(descriptor)

    MessageFactory.GetPrototype = _GetPrototype

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import mixed_precision

SEED = 123
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

mixed_precision.set_global_policy("mixed_float16")



## Data

Load the labels CSV, verify that each referenced image exists, then create a stratified 80/20 train/validation split.



In [None]:
DATA_DIR = "../input/human-action-recognition-har-dataset/Human Action Recognition"
TRAIN_IMG_DIR = os.path.join(DATA_DIR, "train")
TRAIN_CSV_PATH = os.path.join(DATA_DIR, "Training_set.csv")



In [None]:
def load_dataframe(csv_path: str, img_dir: str) -> pd.DataFrame:
    """Load CSV (filename,label) and add an image filepath column."""

    df = pd.read_csv(csv_path)
    df.columns = [c.strip().lower() for c in df.columns]

    required = {"filename", "label"}
    if not required.issubset(df.columns):
        raise ValueError(f"Expected columns {required}, got: {list(df.columns)}")

    df = df[["filename", "label"]].copy()
    df["filepath"] = df["filename"].map(lambda x: os.path.join(img_dir, x))

    missing = (~df["filepath"].map(os.path.exists)).sum()
    if missing:
        raise FileNotFoundError(f"{missing} image files listed in CSV were not found under: {img_dir}")

    return df[["filepath", "label"]]


def train_val_split(df: pd.DataFrame, seed: int = SEED, train_size: float = 0.8):
    """Stratified train/val split."""

    train_df, val_df = train_test_split(
        df,
        train_size=train_size,
        shuffle=True,
        random_state=seed,
        stratify=df["label"],
    )
    return train_df.reset_index(drop=True), val_df.reset_index(drop=True)



In [None]:
df = load_dataframe(TRAIN_CSV_PATH, TRAIN_IMG_DIR)
train_df, val_df = train_val_split(df)

CLASS_NAMES = sorted(df["label"].unique().tolist())
NUM_CLASSES = len(CLASS_NAMES)

print(f"rows={len(df)} train={len(train_df)} val={len(val_df)} classes={NUM_CLASSES}")
assert set(train_df["filepath"]).isdisjoint(set(val_df["filepath"]))



### Quick look (9 samples)

Quick sanity check: sample 9 images from the dataframe and plot them in a 3×3 grid.



In [None]:
def show_samples_grid(frame: pd.DataFrame, n: int = 9, seed: int | None = None):
    """Plot a 3x3 grid of random samples from a dataframe with filepath/label columns."""

    n = int(n)
    n = 9 if n <= 0 else n
    n = min(n, 9)

    if seed is None:
        import secrets

        seed = secrets.randbits(32)

    sample = frame.sample(n=n, random_state=seed).reset_index(drop=True)

    cols = 3
    rows = (n + cols - 1) // cols
    fig, axes = plt.subplots(rows, cols, figsize=(10, 10))
    axes = np.array(axes).reshape(-1)

    for ax in axes[n:]:
        ax.axis("off")

    for i in range(n):
        path = sample.loc[i, "filepath"]
        label = sample.loc[i, "label"]

        img = tf.io.read_file(path)
        img = tf.image.decode_jpeg(img, channels=3)

        axes[i].imshow(img.numpy())
        axes[i].set_title(str(label))
        axes[i].axis("off")

    plt.tight_layout()
    plt.show()

show_samples_grid(train_df)



## Input pipeline

Decode + resize images, apply lightweight augmentation, then build efficient `tf.data` datasets for training/validation.



In [None]:
WORKING_DIR = "./"
IMAGE_SIZE = (300, 300)

BATCH_SIZE = 64

AUTOTUNE = tf.data.AUTOTUNE
PREFETCH = tf.data.AUTOTUNE

STEPS_PER_EXECUTION = 64

LABEL_LOOKUP = keras.layers.StringLookup(
    vocabulary=CLASS_NAMES,
    mask_token=None,
    num_oov_indices=0,
    dtype=tf.int32,
)

AUGMENTATION = keras.Sequential(
    [
        keras.layers.RandomFlip("horizontal"),
        keras.layers.RandomRotation(0.08),
        keras.layers.RandomZoom(0.15),
        keras.layers.RandomTranslation(0.1, 0.1),
        keras.layers.RandomContrast(0.1),
    ],
    name="augmentation",
)



In [None]:
def decode_and_resize(path: tf.Tensor, label: tf.Tensor):
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3, dct_method="INTEGER_FAST")

    image = tf.image.resize_with_pad(
        image,
        IMAGE_SIZE[0],
        IMAGE_SIZE[1],
        method="bilinear",
        antialias=False,
    )

    image = tf.cast(image, tf.float16)
    return image, LABEL_LOOKUP(label)


def to_one_hot(images, labels_int):
    return images, tf.one_hot(labels_int, depth=NUM_CLASSES)



### MixUp

Optional regularization: MixUp blends pairs of images/labels on each batch after one-hot encoding (probability `MIXUP_PROB`).



In [None]:
MIXUP_PROB = 0.20
MIXUP_ALPHA = 0.2


def _sample_beta(batch_size: tf.Tensor, a: float):
    a = tf.cast(a, tf.float32)
    g1 = tf.random.gamma([batch_size], a)
    g2 = tf.random.gamma([batch_size], a)
    return g1 / (g1 + g2)


def mixup_batch(images, labels):
    b = tf.shape(images)[0]
    lam = _sample_beta(b, MIXUP_ALPHA)

    lam_x = tf.cast(tf.reshape(lam, [b, 1, 1, 1]), images.dtype)
    lam_y = tf.cast(tf.reshape(lam, [b, 1]), labels.dtype)

    idx = tf.random.shuffle(tf.range(b))
    images2 = tf.gather(images, idx)
    labels2 = tf.gather(labels, idx)

    mixed_images = images * lam_x + images2 * (1.0 - lam_x)
    mixed_labels = labels * lam_y + labels2 * (1.0 - lam_y)
    return mixed_images, mixed_labels


def maybe_mixup(images, labels):
    return tf.cond(
        tf.random.uniform([]) < MIXUP_PROB,
        lambda: mixup_batch(images, labels),
        lambda: (images, labels),
    )



In [None]:
SHUFFLE_BUFFER = 8192

# Use disk cache to avoid large RAM usage.
VAL_CACHE_PATH = os.path.join(WORKING_DIR, "val_cache")


def make_dataset(frame: pd.DataFrame, training: bool, apply_mixup: bool = True):
    ds = tf.data.Dataset.from_tensor_slices((frame["filepath"].values, frame["label"].values))

    if training:
        ds = ds.shuffle(min(len(frame), SHUFFLE_BUFFER), seed=SEED, reshuffle_each_iteration=True)

    ds = ds.map(decode_and_resize, num_parallel_calls=AUTOTUNE)

    if not training:
        ds = ds.cache(VAL_CACHE_PATH)

    # Let TF pick best performance kernels (slightly non-deterministic)
    options = tf.data.Options()
    options.experimental_deterministic = False
    ds = ds.with_options(options)

    ds = ds.batch(BATCH_SIZE, drop_remainder=training)
    ds = ds.map(to_one_hot, num_parallel_calls=AUTOTUNE)

    if training and apply_mixup:
        ds = ds.map(maybe_mixup, num_parallel_calls=AUTOTUNE)

    return ds.prefetch(PREFETCH)


train_ds = make_dataset(train_df, training=True, apply_mixup=True)
val_ds = make_dataset(val_df, training=False)

print(
    f"train_batches={tf.data.experimental.cardinality(train_ds).numpy()} "
    f"val_batches={tf.data.experimental.cardinality(val_ds).numpy()}"
)



## Model

EfficientNetB3 (ImageNet pretrained) with a lightweight classification head for your action labels.



In [None]:
def build_model(image_size, num_classes: int, head_dropout: float = 0.3):
    inputs = keras.Input(shape=(*image_size, 3))

    backbone = keras.applications.efficientnet.EfficientNetB3(
        include_top=False,
        weights="imagenet",
        input_shape=(*image_size, 3),
    )
    backbone.trainable = False

    x = AUGMENTATION(inputs)
    x = backbone(x, training=False)
    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.Dropout(head_dropout)(x)

    # Keep softmax output in float32 for numerical stability under mixed precision
    outputs = keras.layers.Dense(num_classes, activation="softmax", dtype="float32")(x)

    model = keras.Model(inputs, outputs, name="EfficientNetB3_classifier")
    return model, backbone


def compile_model(model: keras.Model, lr, weight_decay: float = 1e-4):
    AdamW = getattr(keras.optimizers, "AdamW", None) or keras.optimizers.experimental.AdamW
    model.compile(
        optimizer=AdamW(learning_rate=lr, weight_decay=weight_decay),
        loss=keras.losses.CategoricalCrossentropy(),
        metrics=["accuracy"],
        steps_per_execution=STEPS_PER_EXECUTION,
    )


model, backbone = build_model(IMAGE_SIZE, NUM_CLASSES)
model.summary()



## Training

Two-stage training:

- Warmup: train only the classification head (backbone frozen)
- Fine-tune: unfreeze the last N backbone layers and continue training



In [None]:
WARMUP_EPOCHS = 2
FINETUNE_EPOCHS = 13
TOTAL_EPOCHS = WARMUP_EPOCHS + FINETUNE_EPOCHS

FINE_TUNE_LAST_N = 160

LR_HEAD = 5e-4
LR_FINE = 5e-5

WEIGHT_DECAY_HEAD = 1e-4
WEIGHT_DECAY_FINE = 5e-5


def _validate_training_config():
    assert isinstance(WARMUP_EPOCHS, int) and WARMUP_EPOCHS >= 1
    assert isinstance(FINETUNE_EPOCHS, int) and FINETUNE_EPOCHS >= 1
    assert TOTAL_EPOCHS == WARMUP_EPOCHS + FINETUNE_EPOCHS
    assert isinstance(FINE_TUNE_LAST_N, int) and FINE_TUNE_LAST_N > 0
    assert 0.0 < LR_HEAD and 0.0 < LR_FINE
    assert 0.0 <= WEIGHT_DECAY_HEAD and 0.0 <= WEIGHT_DECAY_FINE


_validate_training_config()

CHECKPOINT_PATH = os.path.join(WORKING_DIR, "best_effnetb3.weights.h5")

checkpoint_cb = keras.callbacks.ModelCheckpoint(
    CHECKPOINT_PATH,
    monitor="val_accuracy",
    mode="max",
    save_best_only=True,
    save_weights_only=True,
)

callbacks_warmup = [checkpoint_cb]

steps_per_epoch = int(tf.data.experimental.cardinality(train_ds).numpy())
SCHED_FINE = keras.optimizers.schedules.CosineDecay(
    initial_learning_rate=LR_FINE,
    decay_steps=steps_per_epoch * FINETUNE_EPOCHS,
    alpha=0.1,
)

callbacks_finetune = [
    checkpoint_cb,
    keras.callbacks.EarlyStopping(
        monitor="val_loss",
        mode="min",
        patience=6,
        restore_best_weights=True,
    ),
    keras.callbacks.TerminateOnNaN(),
]



In [None]:
# --- Stage 1: warmup (train classification head) ---
backbone.trainable = False
compile_model(model, lr=LR_HEAD, weight_decay=WEIGHT_DECAY_HEAD)

history_warmup = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=WARMUP_EPOCHS,
    callbacks=callbacks_warmup,
)



In [None]:
# --- Stage 2: unfreeze backbone and fine-tune ---

def set_finetune_trainable(last_n: int):
    """Unfreeze last `last_n` layers; keep BatchNorm frozen for stability."""

    backbone.trainable = True

    for layer in backbone.layers[:-last_n]:
        layer.trainable = False
    for layer in backbone.layers[-last_n:]:
        layer.trainable = True

    for layer in backbone.layers:
        if isinstance(layer, keras.layers.BatchNormalization):
            layer.trainable = False


set_finetune_trainable(FINE_TUNE_LAST_N)
compile_model(model, lr=SCHED_FINE, weight_decay=WEIGHT_DECAY_FINE)

history_finetune = model.fit(
    train_ds,
    validation_data=val_ds,
    initial_epoch=WARMUP_EPOCHS,
    epochs=TOTAL_EPOCHS,
    callbacks=callbacks_finetune,
)



In [None]:
# Finalize: load best checkpoint and merge history

model.load_weights(CHECKPOINT_PATH)

history = {}
for h in (history_warmup, history_finetune):
    for k, v in h.history.items():
        history.setdefault(k, []).extend(v)

best_val_acc = float(np.max(history.get("val_accuracy", [float("nan")])))
print("best val_accuracy:", best_val_acc)



## Results

Plot learning curves and export the best model weights.



In [None]:
def tr_plot(history_dict):
    tacc = history_dict.get("accuracy", [])
    tloss = history_dict.get("loss", [])
    vacc = history_dict.get("val_accuracy", [])
    vloss = history_dict.get("val_loss", [])

    epochs = range(1, len(tloss) + 1)

    plt.style.use("fivethirtyeight")
    fig, axes = plt.subplots(1, 2, figsize=(20, 8))

    axes[0].plot(epochs, tloss, label="train")
    axes[0].plot(epochs, vloss, label="val")
    axes[0].set_title("Loss")
    axes[0].set_xlabel("Epoch")
    axes[0].legend()

    axes[1].plot(epochs, tacc, label="train")
    axes[1].plot(epochs, vacc, label="val")
    axes[1].set_title("Accuracy")
    axes[1].set_xlabel("Epoch")
    axes[1].legend()

    plt.tight_layout()
    plt.show()


tr_plot(history)

