# Training EfficientNet with custom dataset
In this notebook we will train EfficientNet with keras. The training will be done in two steps: Transfer Learning and then Fine-Tuning. \
This notebook is a fork of the following Google Collab notebook: https://colab.research.google.com/drive/1vzEDAX-3ol7gcZ7qmKuwn8zUld524sUZ \
which was used in this tutorial: https://blog.roboflow.com/how-to-train-efficientnet/

# Transfer Learning

## Import Dependencies

In [None]:
import keras
keras.__version__
from tensorflow.keras import models
from tensorflow.keras import layers
from tensorflow.keras import optimizers
import os
import glob
import shutil
import sys
import numpy as np
from skimage.io import imread
import matplotlib.pyplot as plt
from IPython.display import Image
from sklearn.utils import class_weight
%matplotlib inline

In [None]:
%cd /content
!mkdir efficientnet_keras_transfer_learning
%cd efficientnet_keras_transfer_learning/

## Configurating Parameters

In [None]:
balance_classes_by_deleting = False
config = {
    "name": "efnetb4_tL",
    "early_stopping": True,
    "batch_size": 32,
    "dropout_rate": 0.5,
    "dense_layer": 108,
    "dropout_rate_2": 0.3,
    "learning_rate": 1e-4,
    "trainable_layers": 0,
    "epochs": 70,
    "model": "EfficientNetB4"
}

## Import Custom Dataset From Roboflow

In [None]:
!curl -L "https://app.roboflow.com/ds/xE9NhUz80L?key=tQaR9FYMbc" > roboflow.zip; unzip -q roboflow.zip; rm roboflow.zip
! rm README.*

In [None]:
train_dir = '/content/efficientnet_keras_transfer_learning/train/'
valid_dir = '/content/efficientnet_keras_transfer_learning/valid/'
test_dir = '/content/efficientnet_keras_transfer_learning/test/'

import os
import random

def balance_classes(split_path):
    ok_dir = os.path.join(split_path, 'ok')
    not_ok_dir = os.path.join(split_path, 'not_ok')

    ok_images = [os.path.join(ok_dir, f) for f in os.listdir(ok_dir) if os.path.isfile(os.path.join(ok_dir, f))]
    not_ok_images = [os.path.join(not_ok_dir, f) for f in os.listdir(not_ok_dir) if os.path.isfile(os.path.join(not_ok_dir, f))]

    target_count = len(ok_images)

    if len(not_ok_images) > target_count:
        to_delete = random.sample(not_ok_images, len(not_ok_images) - target_count)
        for img in to_delete:
            os.remove(img)
        print(f"Deleted {len(to_delete)} 'not_ok' images from {split_path}")
    else:
        print(f"'not_ok' already balanced in {split_path} ({len(not_ok_images)} â‰¤ {target_count})")


if balance_classes_by_deleting:
    balance_classes(train_dir)
    balance_classes(valid_dir)
    #balance_classes(test_dir)

In [None]:
import os
from tabulate import tabulate

def count_images_in_folders(base_dir):
    data = []
    splits = ['train', 'valid']
    classes = ['ok', 'not_ok']

    for split in splits:
        split_path = os.path.join(base_dir, split)
        row = [split]
        for class_name in classes:
            class_dir = os.path.join(split_path, class_name)
            count = len([f for f in os.listdir(class_dir) if os.path.isfile(os.path.join(class_dir, f))])
            row.append(count)
        data.append(row)

    headers = ['Split', 'ok', 'not_ok']
    print(tabulate(data, headers=headers, tablefmt="github"))

base_dir = '/content/efficientnet_keras_transfer_learning/'
count_images_in_folders(base_dir)

## Create Model

In [None]:
import os, os.path
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping
import copy

size = 224
if config["model"] == 'EfficientNetB4':
    from tensorflow.keras.applications import EfficientNetB4 as Net
    size = 380
else:
    from tensorflow.keras.applications import EfficientNetB0 as Net

# define input height and width
width = size
height = size
input_shape = (height, width, 3)

# load pretrained model
conv_base = Net(weights='imagenet', include_top=False, input_shape=input_shape)

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications.efficientnet import preprocess_input

train_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)
valid_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)
train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(height, width),
    batch_size=config["batch_size"],
    class_mode='binary')

validation_generator = valid_datagen.flow_from_directory(
        valid_dir,
        target_size=(height, width),
        batch_size=config["batch_size"],
        class_mode='binary')
train_generator.class_indices

In [None]:
# Creating model
NUM_TRAIN = sum([len(files) for r, d, files in os.walk(train_dir)])
NUM_TEST = sum([len(files) for r, d, files in os.walk(valid_dir)])

num_classes = len(os.listdir(train_dir))
print('building network for ' + str(num_classes) + ' classes')

# Define Layers
model = models.Sequential()
model.add(conv_base)
model.add(layers.GlobalAveragePooling2D(name="gap"))
if config['dropout_rate'] > 0:
    model.add(layers.Dropout(config['dropout_rate'], name="dropout_out"))
if config['dense_layer'] > 0:
    model.add(layers.Dense(config['dense_layer'], activation='relu', kernel_regularizer=l2(0.01)))
    if config['dropout_rate_2'] > 0:
        model.add(layers.Dropout(config['dropout_rate_2'], name="dropout_out_2"))
model.add(layers.Dense(1, activation='sigmoid', name="fc_out"))

# Freezing layers
print('This is the number of trainable layers '
      'before freezing the conv base:', len(model.trainable_weights))

total_layers = len(conv_base.layers)
for i in range(total_layers):
    conv_base.layers[i].trainable = (i >= (total_layers - config['trainable_layers']))

print('This is the number of trainable layers '
      'after freezing the conv base:', len(model.trainable_weights))
model.summary()

# Setup EarlyStopping
callbacks = None

if config['early_stopping']:
    early_stop = EarlyStopping(
        monitor='val_loss',  # or 'val_accuracy'
        patience=3,          # Stop if no improvement for 3 epochs
        restore_best_weights=True
    )
    callbacks = [early_stop]

# Compile Model
model.compile(loss='binary_crossentropy',
              optimizer=optimizers.Adam(learning_rate=config['learning_rate']),
              metrics=['acc'])

# Class weighting
class_weights = None
if not balance_classes_by_deleting:
    y_train = train_generator.classes
    
    class_weights = class_weight.compute_class_weight(
        class_weight='balanced',
        classes=np.unique(y_train),
        y=y_train
    )
    class_weights = dict(enumerate(class_weights))
    print("Class Weights:", class_weights)

## Training

In [None]:
# Training
history = model.fit(
      train_generator,
      steps_per_epoch= NUM_TRAIN // config['batch_size'],
      epochs=config['epochs'],
      validation_data=validation_generator,
      validation_steps= NUM_TEST // config['batch_size'],
      verbose=1,
      class_weight=class_weights,  
      callbacks=callbacks
)

In [None]:
# Save Model weights
model.save(f"/kaggle/working/{config['name']}.h5")

## Training Results

In [None]:
name = config['name']
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs_x = range(len(acc))

plt.plot(epochs_x, acc, 'bo', label='Training acc')
plt.plot(epochs_x, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()

plt.figure()

plt.plot(epochs_x, loss, 'bo', label='Training loss')
plt.plot(epochs_x, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

## Testing model after Transfer Learning

In [None]:
# Setup
from tensorflow.keras.models import load_model
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay, classification_report, accuracy_score, precision_score, recall_score, f1_score                             
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import load_model
from tensorflow.keras.applications.efficientnet import preprocess_input

test_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)

model_name = config['name']
model = load_model(f"/kaggle/working/{model_name}.h5")
size = 224 if config['model'] == "EfficientNetB0" else 380
batch_size = 32
width = size
height = size
input_shape = (height, width, 3)

test_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=(height, width),
        batch_size=batch_size,
        class_mode='binary',
        shuffle=False
)

In [None]:
# Run Inference
preds = model.predict(test_generator)
y_true = test_generator.classes
class_names = list(test_generator.class_indices.keys())
y_prob = preds.ravel()

In [None]:
# ROC Curve + AUC
fpr, tpr, thresholds_roc = roc_curve(y_true, y_prob)
roc_auc = auc(fpr, tpr)

optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds_roc[optimal_idx]
print(f"Optimal threshold according to ROC: {optimal_threshold:.2f}")

# Plot ROC
plt.figure(figsize=(6, 5))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc='lower right')
plt.grid()
plt.show()

# Precision-Recall Curve + AUC
precision, recall, thresholds_pr = precision_recall_curve(y_true, y_prob)
pr_auc = auc(recall, precision)
print(f"PR AUC: {pr_auc:.2f}")

# Plot PR
plt.figure(figsize=(6, 5))
plt.plot(recall, precision, color='blue', lw=2, label=f'PR curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.grid()
plt.show()

y_pred = (y_prob > 0.5).astype(int).reshape(-1)

print("Class Mapping:", test_generator.class_indices)

# Confusion Matrix
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                              display_labels=list(test_generator.class_indices.keys()))
disp.plot(cmap='Blues')
plt.title("Confusion Matrix")
plt.show()

print("Accuracy:", accuracy_score(y_true, y_pred))
print("Precision:", precision_score(y_true, y_pred))
print("Recall:", recall_score(y_true, y_pred))
print("F1-score:", f1_score(y_true, y_pred))
print("\nComplete Report:\n", classification_report(y_true, y_pred, target_names=class_names))

In [None]:
import math
from tensorflow.keras.preprocessing import image

model_name = "cS24B32L1e4D108TL"
model = load_model(f"/kaggle/working/{model_name}.h5")
size = 224 if "0B32" in model_name else 380
batch_size = 32
width = size
height = size
input_shape = (height, width, 3)

test_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=(height, width),
        batch_size=batch_size,
        class_mode='binary',
        shuffle=False
)

# Obtener predicciones
preds = model.predict(test_generator)
y_true = test_generator.classes
class_names = list(test_generator.class_indices.keys())
y_prob = preds.ravel()

fpr, tpr, thresholds = roc_curve(y_true, y_prob)
roc_auc = auc(fpr, tpr)

optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]

y_pred = (y_prob > 0.5).astype(int).reshape(-1)

# Get all filenames and paths
filenames = test_generator.filenames
img_paths = [os.path.join(test_dir, fname) for fname in filenames]

# Find all misclassified images
misclassified = [(path, true, pred) for path, true, pred in zip(img_paths, y_true, y_pred) 
                 if class_names[true] != class_names[pred]]

num_misclassified = len(misclassified)

if num_misclassified == 0:
    print("No misclassified images found!")
else:
    cols = 4 
    rows = math.ceil(num_misclassified / cols)

    plt.figure(figsize=(4 * cols, 4 * rows))
    for i, (img_path, true_label, pred_label) in enumerate(misclassified):
        img = image.load_img(img_path, target_size=(224, 224))
        plt.subplot(rows, cols, i + 1)
        plt.imshow(img)
        plt.axis('off')
        plt.title(f"True: {class_names[true_label]}\nPred: {class_names[pred_label]}")
    plt.tight_layout()
    plt.show()

# Fine-Tuning



In [None]:
balance_classes_by_deleting = False
early_stopping = True
wait_for_epochs = 12 # Min epochs before early stopping starts
batch_size = 32
learning_rate = 5e-6
trainable_layers = {"block7b", "top"}
epochs = 50
size = 380
height = size
width = size

In [None]:
from tensorflow.keras.models import load_model
model = load_model("/kaggle/working/efnetb4_tL.h5")

In [None]:
model.summary()

In [None]:
# We display all EfficientNet Layers to choose the ones to unfreeze
conv_base = model.layers[0]
conv_base.summary()

In [None]:
for layer in conv_base.layers:
    block_name = layer.name.split('_')[0]
    layer.trainable = block_name in trainable_layers
print('This is the number of trainable layers '
      'after unfreezing the conv base:', len(model.trainable_weights))

In [None]:
# Setup EarlyStopping with a minimum of epochs
from tensorflow.keras.callbacks import Callback, EarlyStopping

class DelayedEarlyStopping(Callback):
    def __init__(self, monitor='val_loss', min_epoch=20, patience=3, restore_best_weights=True):
        super().__init__()
        self.early_stopping = EarlyStopping(
            monitor=monitor,
            patience=patience,
            restore_best_weights=restore_best_weights
        )
        self.min_epoch = min_epoch

    def set_model(self, model):
        super().set_model(model)
        self.early_stopping.set_model(model)

    def on_train_begin(self, logs=None):
        self.early_stopping.on_train_begin(logs)

    def on_epoch_end(self, epoch, logs=None):
        if epoch >= self.min_epoch:
            self.early_stopping.on_epoch_end(epoch, logs)

    def on_train_end(self, logs=None):
        self.early_stopping.on_train_end(logs)


In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications.efficientnet import preprocess_input

train_dir = '/content/efficientnet_keras_transfer_learning/train/'
valid_dir = '/content/efficientnet_keras_transfer_learning/valid/'

train_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)
valid_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)

train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(height, width),
    batch_size=batch_size,
    class_mode='binary')

validation_generator = valid_datagen.flow_from_directory(
        valid_dir,
        target_size=(height, width),
        batch_size=batch_size,
        class_mode='binary')

NUM_TRAIN = sum([len(files) for r, d, files in os.walk(train_dir)])
NUM_TEST = sum([len(files) for r, d, files in os.walk(valid_dir)])

In [None]:
callbacks = None
if early_stopping:
    from tensorflow.keras.callbacks import EarlyStopping
    
    early_stop = DelayedEarlyStopping(
        monitor='val_loss',  # or 'val_accuracy'
        patience=3,          # Stop if no improvement for 3 epochs
        min_epoch=wait_for_epochs,
        restore_best_weights=True
    )
    callbacks = [early_stop]

model.compile(loss='binary_crossentropy',
              optimizer=optimizers.Adam(learning_rate=learning_rate),
              metrics=['acc'])

class_weights = None
if not balance_classes_by_deleting:
    y_train = train_generator.classes
    print(y_train)
    print(np.unique(y_train))
    
    class_weights = class_weight.compute_class_weight(
        class_weight='balanced',
        classes=np.unique(y_train),
        y=y_train
    )
    class_weights = dict(enumerate(class_weights))
    print("Class Weights:", class_weights)

In [None]:
# Fine Tuning
history = model.fit(
      train_generator,
      steps_per_epoch= NUM_TRAIN //batch_size,
      epochs=epochs,
      validation_data=validation_generator,
      validation_steps= NUM_TEST //batch_size,
      verbose=1,
      class_weight=class_weights,  
      callbacks=callbacks
)

In [None]:
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs_x = range(len(acc))

plt.plot(epochs_x, acc, 'bo', label='Training acc')
plt.plot(epochs_x, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.figure()

plt.plot(epochs_x, loss, 'bo', label='Training loss')
plt.plot(epochs_x, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.show()

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report, accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc                           
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import load_model
from tensorflow.keras.applications.efficientnet import preprocess_input

test_dir = '/content/efficientnet_keras_transfer_learning/test/'

test_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)

test_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=(height, width),
        batch_size=batch_size,
        class_mode='binary',
        shuffle=False
)

preds = model.predict(test_generator)

y_true = test_generator.classes
class_names = list(test_generator.class_indices.keys())
y_prob = preds.ravel()

fpr, tpr, thresholds = roc_curve(y_true, y_prob)
roc_auc = auc(fpr, tpr)

optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]
print(f"Optimal Threshold according to ROC: {optimal_threshold:.2f}")

y_pred = (preds > 0.5).astype(int).reshape(-1)

plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc='lower right')
plt.grid()
plt.show()

print("Class Mapping:", test_generator.class_indices)

# Confusion Matrix
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                              display_labels=['damaged', 'healthy'])
disp.plot(cmap='Blues')
plt.title("Confusion Matrix")
plt.show()

print("Accuracy:", accuracy_score(y_true, y_pred))
print("Precision:", precision_score(y_true, y_pred))
print("Recall:", recall_score(y_true, y_pred))
print("F1-score:", f1_score(y_true, y_pred))
print("\nComplete Report:\n", classification_report(y_true, y_pred, target_names=class_names))


In [None]:
os.makedirs("./models", exist_ok=True)
model.save('/kaggle/working/efficientNetFT.keras')