# TensorFlow/Keras rewrite (Full parity) with Albumentations

This notebook is a line-by-line TensorFlow/Keras port of the original PyTorch notebook, preserving dataset splitting, augmentation (Albumentations), multi-stage training with progressive unfreezing, CSV logging, plotting, evaluation, single-image inference, and TFLite export. All imports are consolidated at the top.

In [1]:
# Core & utilities
import os, sys, platform, random, shutil, itertools, math
from pathlib import Path

# Data & numerics
import numpy as np
import pandas as pd

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Progress bar
from tqdm.auto import tqdm

# TensorFlow / Keras
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

# Sklearn metrics
from sklearn.metrics import confusion_matrix, classification_report

# Image handling
from PIL import Image
import cv2

# Albumentations
import albumentations as A
import platform
import psutil
print("All imports done")

All imports done


  check_for_updates()


## Environment check

In [2]:
print("\n--- SYSTEM INFO ---")
print("Python version:", platform.python_version())
print("TensorFlow version:", tf.__version__)
print("Platform:", platform.platform())

# Check available devices
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print("Available GPU(s):", gpus)
else:
    print("No GPU detected (check TensorFlow-metal plugin).")

# Memory info (system-wide)
svmem = psutil.virtual_memory()
print("\n--- MEMORY REPORT ---")
print(f"Total RAM:     {svmem.total / (1024**3):.2f} GB")
print(f"Available RAM: {svmem.available / (1024**3):.2f} GB")

# VRAM usage (Apple GPUs don’t expose per-GPU memory stats like CUDA, but TF shows allocation)
if gpus:
    try:
        logical_gpus = tf.config.list_logical_devices('GPU')
        print("\nLogical GPU(s):", logical_gpus)

        # Create a test tensor to force GPU allocation
        with tf.device('/GPU:0'):
            test_tensor = tf.random.uniform([1000, 1000])
        print("✅ Tensor allocated on GPU")

    except Exception as e:
        print("⚠️ Could not allocate tensor on GPU:", e)

print("-------------------------\n")


--- SYSTEM INFO ---
Python version: 3.12.2
TensorFlow version: 2.17.0
Platform: macOS-15.3.1-arm64-arm-64bit
No GPU detected (check TensorFlow-metal plugin).

--- MEMORY REPORT ---
Total RAM:     18.00 GB
Available RAM: 6.03 GB
-------------------------



## Configuration

In [5]:
# Paths (adjust to your dataset)
source_dir = Path('/Users/jameskierdoliguez/Desktop/test_dataset/dataset')    # original un-split data (class subfolders)
output_dir = Path('/Users/jameskierdoliguez/Desktop/test_dataset/aug_dataset')             # where train/val/test will be placed (after splitting)
OUTPUTS = Path('/Users/jameskierdoliguez/Documents/VSCODE/thesis/test_outputs')
OUTPUTS.mkdir(parents=True, exist_ok=True)

output_dir_train = output_dir / 'train'
output_dir_val = output_dir / 'val'
output_dir_test = output_dir / 'test'

# Hyperparameters
IMG_SIZE = (224, 224)
BATCH_SIZE = 32
INIT_EPOCHS = 20
SEED = 42

# History CSV path
HISTORY_CSV = OUTPUTS / 'training_history_test_v1.csv'
print("done 2")

done 2


## Dataset splitting utility (prepare_image_datasets)

In [6]:
def prepare_image_datasets(source_dir, output_dir, split_ratios=(0.7, 0.2, 0.1), seed=SEED):
    if not Path(source_dir).exists():
        raise ValueError(f"Source dir {source_dir} not found")
    if not math.isclose(sum(split_ratios), 1.0, abs_tol=1e-3):
        raise ValueError("Split ratios must sum to 1.")

    classes = [p.name for p in Path(source_dir).iterdir() if p.is_dir()]
    random.seed(seed)

    for split in ["train", "val", "test"]:
        for cls in classes:
            (Path(output_dir)/split/cls).mkdir(parents=True, exist_ok=True)

    for cls in classes:
        files = list(itertools.chain.from_iterable(
            Path(source_dir/cls).glob(f"*.{ext}") for ext in ["jpg", "jpeg", "png", "bmp"]
        ))
        random.shuffle(files)

        n_total = len(files)
        n_train = int(split_ratios[0] * n_total)
        n_test  = int(split_ratios[1] * n_total)
        n_val   = n_total - n_train - n_test

        splits = {
            "train": files[:n_train],
            "test": files[n_train:n_train+n_test],
            "val": files[n_train+n_test:]
        }

        for split, items in splits.items():
            for f in tqdm(items, desc=f"Copying {cls} → {split}"):
                shutil.copy2(f, Path(output_dir)/split/cls)

    print("Dataset split completed.")
print("done")

done


## Data loading (tf.data)

In [5]:
def load_datasets_from_dirs(train_dir=TRAIN_DIR, val_dir=VAL_DIR, test_dir=TEST_DIR, img_size=IMG_SIZE, batch_size=BATCH_SIZE):
    train_ds = keras.preprocessing.image_dataset_from_directory(
        train_dir, image_size=img_size, batch_size=batch_size,
        shuffle=True, seed=SEED
    )
    val_ds = keras.preprocessing.image_dataset_from_directory(
        val_dir, image_size=img_size, batch_size=batch_size,
        shuffle=False
    )
    test_ds = keras.preprocessing.image_dataset_from_directory(
        test_dir, image_size=img_size, batch_size=batch_size,
        shuffle=False
    )
    return train_ds, val_ds, test_ds
print("done 2")

done 2


## Albumentations pipelines

In [6]:
# Training pipeline - more aggressive
train_albu = A.Compose([
    A.Resize(height=IMG_SIZE[0], width=IMG_SIZE[1]),
    A.RandomResizedCrop(size=(IMG_SIZE[0], IMG_SIZE[1]), scale=(0.8, 1.0), p=0.8),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.2),
    A.Affine(translate_percent={"x": (-0.07, 0.07), "y": (-0.07, 0.07)},scale=(0.9, 1.1),rotate=(-25, 25),p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.HueSaturationValue(p=0.3),
    A.OneOf([
        A.GaussNoise(var_limit=(10.0, 50.0)),
        A.MotionBlur(blur_limit=3)], p=0.4),
])

# Validation/test pipeline - deterministic resize
val_albu = A.Compose([
    A.Resize(height=IMG_SIZE[0], width=IMG_SIZE[1]),
])
print("done q")

done q


## Apply Albumentations inside tf.data (bridge functions)

In [7]:
AUTOTUNE = tf.data.AUTOTUNE

def _apply_albu_to_batch(image, label, pipeline):
    def _aug_fn(img):
        aug = pipeline(image=img.numpy())['image']
        return aug.astype(np.uint8)

    aug_image = tf.numpy_function(_aug_fn, [image], Tout=tf.uint8)
    aug_image.set_shape([IMG_SIZE[0], IMG_SIZE[1], 3])
    aug_image = tf.keras.applications.mobilenet_v2.preprocess_input(
        tf.cast(aug_image, tf.float32)
    )
    return aug_image, label

def apply_albumentations_to_dataset(ds, pipeline, shuffle=False):
    ds = ds.map(lambda x, y: _apply_albu_to_batch(x, y, pipeline),
                num_parallel_calls=AUTOTUNE)
    if shuffle:
        ds = ds.shuffle(1000)
    return ds.prefetch(AUTOTUNE)


## Visualize samples (after augmentation)

In [8]:
def show_batch(ds, class_names, n=9):
    imgs, labels = next(iter(ds))
    plt.figure(figsize=(10, 10))
    for i in range(n):
        ax = plt.subplot(3, 3, i+1)
        img = (imgs[i].numpy() + 1) / 2  # de-normalize
        plt.imshow(np.clip(img, 0, 1))
        plt.title(class_names[labels[i]])
        plt.axis("off")
    plt.show()
print("done")

done


## Model builders (MobileNetV2 & EfficientNetV2M) and unfreeze helper

In [9]:
def build_mobilenetv2(num_classes, img_size=IMG_SIZE):
    inputs = keras.Input(shape=(*img_size, 3))
    base = keras.applications.MobileNetV2(input_shape=(*img_size, 3),
                                          include_top=False,
                                          weights="imagenet")
    base.trainable = False
    x = base(inputs, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.3)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    return keras.Model(inputs, outputs), base

def unfreeze_last_n_layers(base_model, n):
    if n > 0:
        base_model.trainable = True
        for layer in base_model.layers[:-n]:
            layer.trainable = False
        print(f"Unfroze last {n} layers of {len(base_model.layers)}")
print("done")

done


In [None]:
def build_efficientnetv2m(num_classes, img_size=IMG_SIZE):
    inputs = keras.Input(shape=(*img_size, 3))
    base = keras.applications.EfficientNetV2M(input_shape=(*img_size, 3),
                                              include_top=False,
                                              weights="imagenet")
    base.trainable = False
    x = base(inputs, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.4)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    return keras.Model(inputs, outputs), base

def unfreeze_last_n_layers(base_model, n):
    if n > 0:
        base_model.trainable = True
        for layer in base_model.layers[:-n]:
            layer.trainable = False
        print(f"Unfroze last {n} layers of {len(base_model.layers)}")


## Compile & Callbacks utilities

In [10]:
def compile_model(model, lr=1e-3):
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=lr),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

def make_callbacks(name_prefix="model"):
    return [
        EarlyStopping(monitor="val_accuracy", patience=8, restore_best_weights=True),
        ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=4, min_lr=1e-7, verbose=1),
        ModelCheckpoint(OUTPUTS/f"{name_prefix}_best.keras", monitor="val_accuracy",
                        save_best_only=True, verbose=1)
    ]
print("done 1")

done 1


## History CSV logging

In [11]:
def append_history(history, csv_path=HISTORY_CSV):
    df = pd.DataFrame(history.history)
    df['epoch'] = np.arange(len(df)) + 1

    if csv_path.exists():
        old = pd.read_csv(csv_path)
        start = old['epoch'].max() + 1
        df['epoch'] = np.arange(start, start+len(df))
        df = pd.concat([old, df], ignore_index=True)

    df.to_csv(csv_path, index=False)
    print(f"Appended history to {csv_path}")
print("done")

done


## Incremental training (multi-session unfreezing)

In [11]:
def incremental_training(model, base_model, train_ds, val_ds, sessions, prefix="stage"):
    histories = []
    for i, cfg in enumerate(sessions, 1):
        print(f"--- Session {i}: {cfg} ---")
        if "unfreeze_layer" in cfg:
            unfreeze_last_n_layers(base_model, cfg["unfreeze_layer"])
        compile_model(model, lr=cfg["lr"])
        h = model.fit(train_ds,
                      validation_data=val_ds,
                      epochs=cfg["epochs"],
                      callbacks=make_callbacks(f"{prefix}_s{i}"))
        append_history(h)
        histories.append(h.history)
    return histories
print("done")

done


## Plot utilities (session & historical)

In [12]:
def plot_sessions(*histories):
    loss, val_loss, acc = [], [], []
    for h in histories:
        loss.extend(h["loss"])
        val_loss.extend(h["val_loss"])
        acc.extend(h["accuracy"])
    plt.figure(figsize=(12, 5))
    plt.subplot(1,2,1)
    plt.plot(loss, label="loss"); plt.plot(val_loss, label="val_loss")
    plt.legend(); plt.title("Loss")
    plt.subplot(1,2,2)
    plt.plot(acc, label="accuracy")
    plt.legend(); plt.title("Accuracy")
    plt.show()

def plot_history_csv(csv_path=HISTORY_CSV):
    if csv_path.exists():
        df = pd.read_csv(csv_path)
        sns.lineplot(df, x="epoch", y="loss", label="loss")
        sns.lineplot(df, x="epoch", y="val_loss", label="val_loss")
        plt.show()
        if "accuracy" in df:
            sns.lineplot(df, x="epoch", y="accuracy", label="acc")
            sns.lineplot(df, x="epoch", y="val_accuracy", label="val_acc")
            plt.show()
print("done")

done


## Build datasets and preview

In [13]:
# Optionally split raw dataset first (uncomment if needed)
# prepare_image_datasets(INPUT_RAW_DIR, DATA_DIR, split_ratios=(0.7,0.2,0.1), seed=SEED)

# Load datasets from DATA_DIR directories (expects train/val/test present)
train_ds_raw, val_ds_raw, test_ds_raw = load_datasets_from_dirs()

# Apply Albumentations: aggressive to train, deterministic to val/test
train_ds = apply_albumentations_to_dataset(train_ds_raw, train_albu, shuffle=True)
val_ds = apply_albumentations_to_dataset(val_ds_raw, val_albu, shuffle=False)
test_ds = apply_albumentations_to_dataset(test_ds_raw, val_albu, shuffle=False)

# Class names
class_names = train_ds_raw.class_names
num_classes = len(class_names)
print('Classes:', class_names)

# Visual check
show_batch(train_ds, class_names)
print("done")

NotFoundError: Could not find directory /Users/jameskierdoliguez/Desktop/test_dataset/aug_dataset/train

## Build model (MobileNetV2) and initial training

In [None]:
model, base = build_mobilenetv2(num_classes)
compile_model(model, lr=1e-3)
hist0 = model.fit(train_ds,
                  validation_data=val_ds,
                  epochs=INIT_EPOCHS,
                  callbacks=make_callbacks("base"))
append_history(hist0)


## Incremental sessions example (edit as needed)

In [None]:
sessions = [
    {"epochs": 10, "lr": 5e-4, "unfreeze_layer": 0},
    {"epochs": 15, "lr": 1e-4, "unfreeze_layer": 10},
    {"epochs": 10, "lr": 5e-5, "unfreeze_layer": 30},
]
histories = incremental_training(model, base, train_ds, val_ds, sessions, prefix="mobilenetv2")
plot_sessions(hist0.history, *histories)


## Evaluation on test set

In [None]:
def evaluate_and_save(model, ds, class_names, out_csv=OUTPUTS/"predictions.csv"):
    y_true, y_pred = [], []
    for imgs, labels in ds:
        preds = model.predict(imgs)
        y_true.extend(labels.numpy())
        y_pred.extend(np.argmax(preds, axis=1))
    print(classification_report(y_true, y_pred, target_names=class_names))
    cm = confusion_matrix(y_true, y_pred)
    sns.heatmap(cm, annot=True, fmt="d",
                xticklabels=class_names,
                yticklabels=class_names, cmap="Blues")
    plt.show()
    pd.DataFrame({"true": y_true, "pred": y_pred}).to_csv(out_csv, index=False)
    print(f"Predictions saved to {out_csv}")

evaluate_and_save(model, test_ds, class_names)


## Single-image inference

In [None]:
def predict_single_image(model, path, class_names):
    img = Image.open(path).convert("RGB").resize(IMG_SIZE)
    arr = np.array(img)
    aug = val_albu(image=arr)["image"]
    arr = tf.keras.applications.mobilenet_v2.preprocess_input(aug.astype(np.float32))
    arr = np.expand_dims(arr, 0)
    preds = model.predict(arr)
    idx = np.argmax(preds)
    print("Pred:", class_names[idx], " (conf:", preds[0][idx], ")")
    plt.imshow(img); plt.title(class_names[idx]); plt.axis("off"); plt.show()

example_img = list(TEST_DIR.rglob("*.jpg"))[0]
predict_single_image(model, example_img, class_names)


## Save model & TFLite conversion

In [None]:
save_dir = OUTPUTS / 'saved_model_final'
save_dir.mkdir(parents=True, exist_ok=True)
model.save(save_dir, include_optimizer=False)
print('Saved model to', save_dir)

# TFLite float16 conversion
converter = tf.lite.TFLiteConverter.from_saved_model(str(save_dir))
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
tflite_model = converter.convert()
tflite_path = OUTPUTS / 'model_float16.tflite'
tflite_path.write_bytes(tflite_model)
print('Wrote TFLite model to', tflite_path)
