In [1]:
import os
import numpy as np
from matplotlib import pyplot as plt
import pandas as pd

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger
from tensorflow.keras.metrics import Precision, Recall, BinaryAccuracy
from tensorflow.keras.models import load_model

print(tf.__version__, np.__version__)

# Expected output 2.9.0, 1.26.4

2.9.0 1.26.4


# Detect GPU and limit memory usage

In [2]:
USE_GPU = True

In [3]:
gpus = tf.config.experimental.list_physical_devices('GPU')

if gpus != []:
    print(gpus)
    for gpu in gpus: 
        tf.config.experimental.set_memory_growth(gpu, True)
else:
    print("No GPU on this machine")

# Expected output [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
if USE_GPU == False:
    os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

if tf.test.gpu_device_name():
    print('GPU found and successfully configured')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
GPU found and successfully configured


# Load training and testing data

In [4]:
def load_training_data(train_data_dir, train_data_ratio=0.8, batch_size=8):

    print(f"Loading training data from {train_data_dir}...")

    train_data = tf.keras.utils.image_dataset_from_directory(
        train_data_dir,
        image_size=(224, 224),
        batch_size=batch_size
    )

    train_size = int(len(train_data)*train_data_ratio)
    val_size = int(len(train_data)*(1-train_data_ratio))
    train = train_data.take(train_size)
    val = train_data.skip(train_size).take(val_size)

    return train, val

def load_test_data(test_data_dir, batch_size=8):

    print(f"Loading testing data from {test_data_dir}...")

    test_data = tf.keras.utils.image_dataset_from_directory(
        test_data_dir,
        image_size=(224, 224),
        batch_size=batch_size
    )

    return test_data

# Build model

In [5]:
base_model_name = ['DesnseNet201']
INPUT_SHAPE = (224,224,3)

In [6]:
#all_model = ['DenseNet201', 'EfficientNetB7', 'EfficientNetV2L','InceptionResNetV2', 'InceptionV3', 'MobileNetV3Large', 'NASNetLarge', 'ResNet152', 'ResNet152V2', 'VGG19', 'Xception']

def get_base_model(model_name, input_shape):
    model_dict = {
        'DenseNet201': tf.keras.applications.DenseNet201,
        'EfficientNetB7': tf.keras.applications.EfficientNetB7,
        'EfficientNetV2L': tf.keras.applications.EfficientNetV2L,
        'InceptionResNetV2': tf.keras.applications.InceptionResNetV2,
        'InceptionV3': tf.keras.applications.InceptionV3,
        'MobileNetV3Large': tf.keras.applications.MobileNetV3Large,
        'NASNetLarge': tf.keras.applications.NASNetLarge,
        'ResNet152': tf.keras.applications.ResNet152,
        'ResNet152V2': tf.keras.applications.ResNet152V2,
        'VGG19': tf.keras.applications.VGG19,
        'Xception': tf.keras.applications.Xception,
    }
    
    if model_name not in model_dict:
        raise ValueError(f"Unsupported model: {model_name}")
    
    return model_dict[model_name](input_shape=input_shape, include_top=False, weights='imagenet')

In [7]:
def build_model(base_model, input_shape):

    base_model.trainable = False

    inputs = tf.keras.Input(shape=input_shape)
    x = base_model(inputs, training=False)
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = Dense(512, activation='relu')(x)
    outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    model = Model(inputs=inputs, outputs=outputs)

    print("Built model")
    pd.set_option('max_colwidth', None)
    layers = [(layer, layer.name, layer.trainable) for layer in model.layers]
    pd.DataFrame(layers, columns=['Layer Type', 'Layer Name', 'Layer Trainable'])

    return model

# Usage example

# input_shape = (224, 224, 3)
# base_model = get_base_model(model_name, input_shape)
# model = build_model(base_model, input_shape)

In [8]:
def get_callbacks(model_name, results_dir, models_dir, lr_suffix="001"):
    log_file = os.path.join(results_dir, f"{model_name}_bo20_lr{lr_suffix}.csv")
    return [
        CSVLogger(log_file),
        ModelCheckpoint(
            filepath=os.path.join(models_dir, f"{model_name}_bo20_lr{lr_suffix}.h5"),
            save_weights_only=False,
            save_best_only=True,
            save_freq='epoch',
            verbose=1
        )
    ]

In [9]:
def plot_training_history(history, model_name, results_dir, lr_suffix="001"):
    plt.figure()
    plt.plot(history.history['loss'], color='teal', label='loss')
    plt.plot(history.history['val_loss'], color='orange', label='val_loss')
    plt.title(f'{model_name} Loss (LR: 0.{lr_suffix})')
    plt.legend(loc="upper left")
    plt.savefig(os.path.join(results_dir, f"{model_name}_loss_lr{lr_suffix}.png"))
    plt.close()

# Training

In [10]:
def test_model(model_name, test_data, models_dir):
    model = tf.keras.models.load_model(os.path.join(models_dir, f"{model_name}_bo20_lr0001.h5"))
    pre = Precision()
    re = Recall()
    acc = BinaryAccuracy()

    for batch in test_data.as_numpy_iterator():
        X, y = batch
        yhat = model.predict(X)
        pre.update_state(y, yhat)
        re.update_state(y, yhat)
        acc.update_state(y, yhat)

    f1_score = 2 * (pre.result() * re.result()) / (pre.result() + re.result())
    print(f"{model_name} Test Results:")
    print(f"Precision: {pre.result():.4f}")
    print(f"Recall: {re.result():.4f}")
    print(f"Accuracy: {acc.result():.4f}")
    print(f"F1 Score: {f1_score:.4f}")

In [11]:
def train_models(model_names, train_data_dir, test_data_dir, results_base_dir, models_base_dir, epochs=20, batch_size=32):
    
    train, val = load_training_data(train_data_dir, train_data_ratio=0.8, batch_size=batch_size)

    for model_name in model_names:
        print(f"Training {model_name}...")

        # Set up model-specific directories
        results_dir = os.path.join(results_base_dir, model_name)
        models_dir = os.path.join(models_base_dir, model_name)
        os.makedirs(results_dir, exist_ok=True)
        os.makedirs(models_dir, exist_ok=True)

        # Build model
        input_shape = (224, 224, 3)
        base_model = get_base_model(model_name, input_shape)
        model = build_model(base_model, input_shape)

        # Compile model
        model.compile(
            loss=keras.losses.BinaryCrossentropy(from_logits=False),
            optimizer=keras.optimizers.Adam(learning_rate=1e-2),
            metrics=[keras.metrics.BinaryAccuracy()]
        )

        # Print model summary
        model.summary()
        
        # Set up callbacks
        callbacks = get_callbacks(model_name, results_dir, models_dir)

        # Train model
        history = model.fit(
            train,
            validation_data=val,
            epochs=epochs,
            verbose=1,
            callbacks=callbacks
        )

        # Plot training history
        plot_training_history(history, model_name, results_dir)

        # Reduce learning rate and continue training
        model = load_model(os.path.join(models_dir, f"{model_name}_bo{epochs}_lr001.h5"))
        model.compile(
            loss=keras.losses.BinaryCrossentropy(from_logits=False),
            optimizer=keras.optimizers.Adam(learning_rate=1e-3),
            metrics=[keras.metrics.BinaryAccuracy()]
        )

        callbacks = get_callbacks(model_name, results_dir, models_dir, lr_suffix="0001")
        history = model.fit(
            train,
            validation_data=val,
            epochs=epochs,
            verbose=1,
            callbacks=callbacks
        )

        plot_training_history(history, model_name, results_dir, lr_suffix="0001")

        # Test model

        test_data = load_test_data(test_data_dir, batch_size=1)
        test_model(model_name, test_data, models_dir)

# Main

In [12]:
model_to_train = ['EfficientNetB7', 'EfficientNetV2L','InceptionResNetV2', 'InceptionV3', 'MobileNetV3Large', 'ResNet152', 'ResNet152V2', 'VGG19', 'Xception']
# trained = ['DenseNet201', ]
# error : NASNetLarge
train_data_dir = r"D:\Kananat\TF_TMJOA_jpg_x_10px"
test_data_dir = r"D:\Kananat\TF_TMJOA_jpg_x_10px_test"

results_base_dir = r"D:\Kananat\_result\logs"
models_base_dir = r"D:\Kananat\_result\models"

train_models(model_to_train, train_data_dir, test_data_dir, results_base_dir, models_base_dir, epochs=20, batch_size=32)

Loading training data from D:\Kananat\TF_TMJOA_jpg_x_10px...
Found 37492 files belonging to 2 classes.
Training EfficientNetB7...
Built model
Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 efficientnetb7 (Functional)  (None, 7, 7, 2560)       64097687  
                                                                 
 global_average_pooling2d (G  (None, 2560)             0         
 lobalAveragePooling2D)                                          
                                                                 
 dense (Dense)               (None, 1024)              2622464   
                                                                 
 dense_1 (Dense)             (None, 512)               524800    
                                                   