In [None]:
from typing import Tuple
import tensorflow as tf

# from tensorflow. import Dataset
import numpy as np
import pathlib


class ImageProcessor:
    IMG_WIDTH, IMG_HEIGHT = 224, 224
    BATCH_SIZE = 32
    CLASS_NAMES = None
    AUTOTUNE = tf.data.experimental.AUTOTUNE

    @classmethod
    def get_label(cls, file_path: pathlib.Path):
        assert ImageProcessor.CLASS_NAMES is not None
        parts = tf.strings.split(file_path, "/")
        return tf.cast(parts[-2] == ImageProcessor.CLASS_NAMES, tf.float32)

    @classmethod
    def decode_img(cls, img):
        img = tf.image.decode_jpeg(img, channels=3)
        img = tf.image.convert_image_dtype(img, tf.float32)
        return tf.image.resize(
            img, [ImageProcessor.IMG_WIDTH, ImageProcessor.IMG_HEIGHT]
        )

    @classmethod
    def process_path(cls, file_path: pathlib.Path):
        label = ImageProcessor.get_label(file_path)
        img = tf.io.read_file(file_path)
        img = ImageProcessor.decode_img(img)
        return img, label

    @classmethod
    def prepare_for_training(cls, ds: tf.data.Dataset, shuffle_buffer_size=1000):

        ds = ds.shuffle(buffer_size=shuffle_buffer_size)
        ds = ds.batch(ImageProcessor.BATCH_SIZE)
        ds = ds.prefetch(buffer_size=ImageProcessor.AUTOTUNE)
        return ds


class ImageDataSet:
    def __init__(
        self, path: str, train_size: float, test_size: float, val_size: float
    ) -> None:
        self.path = path
        self.data_dir = pathlib.Path(path)
        self.dataset_size = len(list(self.data_dir.glob("*/*.jpg")))
        self.list_ds = tf.data.Dataset.list_files(str(self.data_dir / "*/*"))
        self.train = None
        self.test = None
        self.val = None
        self.train_sz = int(train_size * self.dataset_size)
        self.test_sz = int(test_size * self.dataset_size)
        self.val_sz = int(val_size * self.dataset_size)
        self.CLASS_NAMES = np.array([item.name for item in self.data_dir.glob("*")])

    def get_train_val_test(self, batch_size, width, height) -> Tuple[tf.data.Dataset]:
        ImageProcessor.CLASS_NAMES = self.CLASS_NAMES
        ImageProcessor.IMG_WIDTH, ImageProcessor.IMG_HEIGHT = width, height
        ImageProcessor.BATCH_SIZE = batch_size
        if not self.train:
            self.train = self.list_ds.take(self.train_sz).map(
                ImageProcessor.process_path
            )
            self.train = ImageProcessor.prepare_for_training(self.train)

        remaining = self.list_ds.skip(self.train_sz)
        if not self.val:
            self.val = remaining.take(self.val_sz).map(ImageProcessor.process_path)
            self.val = ImageProcessor.prepare_for_training(self.val)

        if not self.test:
            self.test = remaining.skip(self.val_sz).map(ImageProcessor.process_path)
            self.test = ImageProcessor.prepare_for_training(self.test)
        return self.train, self.val, self.test

In [None]:
import keras
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras import callbacks
import wandb
import os
import datetime
from typing import Dict, Any


def model_v1(num_classes: int, width: int, height: int, trainable: bool = False):
    # Load Base Model
    base_model = keras.applications.ResNet50V2(
        include_top=False,  # Exclude ImageNet classifier at the top
        weights="imagenet",
        input_shape=(width, height, 3),
    )
    # Freeze all parameters of the base model
    base_model.trainable = trainable
    inputs = keras.Input(shape=(width, height, 3))
    # Apply specific pre-processing function for ResNet v2
    x = keras.applications.resnet_v2.preprocess_input(inputs)
    # Keep base model batch normalization layers in inference mode (instead of training mode)
    x = base_model(x, training=False)
    # Rebuild top layers
    x = layers.GlobalAveragePooling2D()(x)  # Average pooling operation
    x = layers.BatchNormalization()(x)  # Introduce batch norm
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    # Flattening to final layer - Dense classifier with 37 units (multi-class classification)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    return keras.Model(inputs, outputs)


def model_v2(num_classes: int, width: int, height: int, trainable: bool = False):
    # Load Base Model
    base_model = keras.applications.ResNet50V2(
        include_top=False,  # Exclude ImageNet classifier at the top
        weights="imagenet",
        input_shape=(width, height, 3),
    )
    # Freeze all parameters of the base model
    base_model.trainable = trainable
    inputs = keras.Input(shape=(width, height, 3))
    # Apply specific pre-processing function for ResNet v2
    x = keras.applications.resnet_v2.preprocess_input(inputs)
    # Keep base model batch normalization layers in inference mode (instead of training mode)
    x = base_model(x, training=False)
    # Rebuild top layers
    x = layers.Flatten()(x)
    x = layers.Dense(units=128, activation="relu")(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    x = layers.Dense(units=64, activation="relu")(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    x = layers.Dense(units=32, activation="relu")(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    outputs = layers.Dense(num_classes, activation="softmax")(x)
    return keras.Model(inputs, outputs)


def model_v3(num_classes: int, width: int, height: int, trainable: bool = False):
    # Load Base Model
    base_model = keras.applications.VGG19(
        include_top=False,  # Exclude ImageNet classifier at the top
        weights="imagenet",
        input_shape=(width, height, 3),
    )
    # Freeze all parameters of the base model
    base_model.trainable = trainable
    inputs = keras.Input(shape=(width, height, 3))
    # Apply specific pre-processing function for ResNet v2
    x = keras.applications.vgg19.preprocess_input(inputs)
    # Keep base model batch normalization layers in inference mode (instead of training mode)
    x = base_model(x, training=False)
    # Rebuild top layers
    x = layers.Flatten()(x)
    x = layers.Dense(units=128, activation="relu")(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    x = layers.Dense(units=128, activation="relu")(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    x = layers.Dense(units=64, activation="relu")(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    x = layers.Dense(units=64, activation="relu")(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    x = layers.Dense(units=32, activation="relu")(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    x = layers.Dense(units=32, activation="relu")(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    outputs = layers.Dense(num_classes, activation="softmax")(x)
    return keras.Model(inputs, outputs)


def model_V4(num_classes: int, width: int, height: int, trainable: bool = False):
    # Load Base Model
    base_model = keras.applications.VGG19(
        include_top=False,  # Exclude ImageNet classifier at the top
        weights="imagenet",
        input_shape=(width, height, 3),
    )
    # Freeze all parameters of the base model
    base_model.trainable = trainable
    inputs = keras.Input(shape=(width, height, 3))
    # Apply specific pre-processing function for ResNet v2
    x = keras.applications.vgg19.preprocess_input(inputs)
    # Keep base model batch normalization layers in inference mode (instead of training mode)
    x = base_model(x, training=False)

    x = layers.GlobalAveragePooling2D()(x)  # Average pooling operation
    x = layers.BatchNormalization()(x)  # Introduce batch norm
    x = layers.Dropout(0.2)(x)  # Regularize with dropout
    x = layers.Dense(32)(x)
    x = layers.BatchNormalization()(x)  # Introduce batch norm
    x = layers.Activation("relu")(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout

    outputs = layers.Dense(num_classes, activation="softmax")(x)
    return keras.Model(inputs, outputs)


def run(
    model: tf.keras.Model,
    epochs: int,
    train_ds: tf.data.Dataset,
    val_ds: tf.data.Dataset,
    config_logs: Dict[str, Any],
    checkpoint_filename: str,
    save_dir: str,
    compile_config: Dict[str, Any],
    wandb_config: Dict,
    hub_token: str = None,
    hub_model_id: str = None,
):

    run = wandb.init(
        sync_tensorboard=True, reinit=True, **config_logs, config=wandb_config
    )
    model.compile(**compile_config)
    earlystopping = callbacks.EarlyStopping(
        monitor="val_loss", mode="min", patience=5, restore_best_weights=True
    )

    # Persist the model as checkpoint
    checkpoint_dir = os.path.join(save_dir, checkpoint_filename)
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)

    #     checkpoint = callbacks.ModelCheckpoint(filepath=os.path.join(checkpoint_dir, checkpoint_filename,'model.keras'),
    #                                            monitor="val_loss",
    #                                            verbose=1,
    #                                            save_best_only=True,
    #                                            save_weights_only=False)
    #     #Push to Hub Callback
    #     model_push_to_hub = PushToHubCallback(output_dir=checkpoint_dir,
    #                                             save_strategy= "epoch",
    #                                             hub_model_id = hub_model_id,
    #                                             hub_token= hub_token,
    #                                             checkpoint = True
    #                                             )
    # Tensorboard Tracking run Callback
    log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=log_dir, histogram_freq=1
    )

    history = model.fit(
        train_ds,
        epochs=epochs,
        validation_data=val_ds,
        verbose=1,
        callbacks=[earlystopping, tensorboard_callback],
    )
    run.finish()
    return history

In [None]:
import matplotlib.pyplot as plt
import uuid


def plot_history(history):
    plt.plot(history.history["loss"])
    plt.plot(history.history["val_loss"])
    plt.title("model loss")
    plt.ylabel("loss")
    plt.xlabel("epoch")
    plt.legend(["train", "val"], loc="upper left")
    plt.savefig(f"history-{str(uuid.uuid1())}.png", bbox_inches="tight")
    plt.show()

In [None]:
dataset_args = {
    "path": "/kaggle/input/amazon-products-images-v2",
    "train_size": 0.8,
    "val_size": 0.1,
    "test_size": 0.1,
}


dataset = ImageDataSet(**dataset_args)

In [None]:
train_split_args = {"batch_size": 16, "width": 224, "height": 224}
train, val, test = dataset.get_train_val_test(**train_split_args)

In [None]:
train, val, test

In [None]:
import uuid

epochs = 50
learning_rate = 0.001
model = model_V4(
    len(ImageProcessor.CLASS_NAMES),
    train_split_args["width"],
    train_split_args["height"],
)
optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
loss = keras.losses.CategoricalCrossentropy()
metrics = [
    tf.keras.metrics.CategoricalCrossentropy(name="categorical_crossentropy"),
    tf.keras.metrics.CategoricalAccuracy(name="accuracy"),
    tf.keras.metrics.Precision(name="precision1", top_k=1),
    tf.keras.metrics.Precision(name="precision3", top_k=3),
    tf.keras.metrics.Recall(name="recall1", top_k=1),
    tf.keras.metrics.Recall(name="recall3", top_k=3),
    tf.keras.metrics.F1Score(average="macro", name="f1_score"),
]

project = "Slash"
id_ = f"V2-{str(uuid.uuid4())}"
save_dir, checkpoint_filename = "./model", "model-" + str(datetime.datetime.now())
config_logs = {"project": project, "id": id_}

compile_config = {"optimizer": optimizer, "loss": loss, "metrics": metrics}
wandb_config = {
    "learning_rate": learning_rate,
    "epochs": epochs,
    "batch_size": ImageProcessor.BATCH_SIZE,
    "architecture": "ModelV4",
    "dataset": "V2",
}
from kaggle_secrets import UserSecretsClient

user_secrets = UserSecretsClient()
hub_token = user_secrets.get_secret("HF")
# hub_model_id = f"{config['architecture']}-{config['dataset']}-{config['epochs']}"
history1 = run(
    model,
    epochs,
    train,
    val,
    config_logs,
    checkpoint_filename,
    save_dir,
    compile_config,
    wandb_config,
)

In [None]:
plot_history(history1)

In [None]:
loaded_model = tf.keras.models.load_model(
    "/kaggle/working/model/2024-03-15 09:43:28.386710/2024-03-15 09:43:28.386710model.keras"
)

In [None]:
loaded_model.summary()

In [None]:
loaded_model

In [None]:
def save_model(path: str, model: tf.keras.models.Model) -> None:
    # serialize model to YAML
    model_yaml = model.to_json()
    if not os.path.exists(os.path.join(path, "model.json")):
        os.makedirs(path)
    with open(os.path.join(path, "model.json"), "w") as json_file:
        json_file.write(model_yaml)
    # serialize weights to HDF5
    model.save_weights(os.path.join(path, "model.weights.h5"))
    print("Saved model to disk")


save_model("/kaggle/working/saved/models/v1", loaded_model)