In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

from tensorflow.keras.callbacks import Callback, TensorBoard, EarlyStopping
from tensorflow.keras import regularizers
from keras import layers
import keras_tuner as kt

from io import BytesIO


In [None]:
!git clone https://github.com/GitKentC/dataset.git

## Loading Data

In [None]:
""" DATASET VARIABLES """
DATASET_NAME = 'garbage_classification'
DATASET_DIR = f'../dataset/dupe_cleaned/{DATASET_NAME}'
TRAIN_DIR = f'../dataset/split/{DATASET_NAME}/train'
VALIDATION_DIR = f'../dataset/split/{DATASET_NAME}/validation'
TEST_DIR = f'../dataset/split/{DATASET_NAME}/test'

In [None]:
# """ DATASET VARIABLES """
# # DATASET_NAME = 'garbage_classification'
# DATASET_DIR = '/content/dataset/dupe_cleaned/garbage_classification'
# TRAIN_DIR = '/content/dataset/split/garbage_classification/train'
# VALIDATION_DIR = '/content/dataset/split/garbage_classification/validation'
# TEST_DIR = '/content/dataset/split/garbage_classification/test'

In [36]:
""" Change this to match pre-trained models input; for consistency """
IMAGE_SIZE:tuple[int,int] = (256, 256) 

BATCH_SIZE:int = 64                     # try testing this
LABEL_MODE:str = 'categorical'
COLOR_MODE:str = 'rgb'                 # accepts 'grayscale', 'rgb', and 'rgba' only
VALIDATION_SPLIT:float = 0.2           # try testing this
SEED:int               = 42

def load_datasets_cleaned():
    """ Function to load from cleaned dataset; customizable """
    train_dataset = tf.keras.utils.image_dataset_from_directory(
        DATASET_DIR,
        subset     = 'training',
        image_size = IMAGE_SIZE,
        batch_size = BATCH_SIZE,
        label_mode = LABEL_MODE,
        color_mode = COLOR_MODE,
        validation_split = VALIDATION_SPLIT,
        seed             = SEED,
        pad_to_aspect_ratio=True,
        )

    validation_dataset = tf.keras.utils.image_dataset_from_directory(
        DATASET_DIR,
        subset     = 'training',
        image_size = IMAGE_SIZE,
        batch_size = BATCH_SIZE,
        label_mode = LABEL_MODE,
        color_mode = COLOR_MODE,
        validation_split = VALIDATION_SPLIT,
        seed             = SEED,
        pad_to_aspect_ratio=True,
        )

    return train_dataset, validation_dataset

def load_datasets_split():
    """ Function to load from cleaned, splitted dataset  """
    train_dataset = tf.keras.utils.image_dataset_from_directory(
        TRAIN_DIR,
        image_size = IMAGE_SIZE,
        batch_size = BATCH_SIZE,
        label_mode = LABEL_MODE,
        color_mode = COLOR_MODE,
        pad_to_aspect_ratio=True,
    )

    validation_dataset = tf.keras.utils.image_dataset_from_directory(
        VALIDATION_DIR,
        image_size = IMAGE_SIZE,
        batch_size = BATCH_SIZE,
        label_mode = LABEL_MODE,
        color_mode = COLOR_MODE,
        pad_to_aspect_ratio=True,
    )

    test_dataset = tf.keras.utils.image_dataset_from_directory(
        TEST_DIR,
        image_size = IMAGE_SIZE,
        batch_size = BATCH_SIZE,
        label_mode = LABEL_MODE,
        color_mode = COLOR_MODE,
        pad_to_aspect_ratio=True,
    )

    return train_dataset, validation_dataset, test_dataset

In [37]:
""" Create datasets """
#train_ds, val_ds = load_datasets_cleaned()
train_ds, val_ds, test_ds = load_datasets_split()

Found 10815 files belonging to 12 classes.
Found 3087 files belonging to 12 classes.
Found 1556 files belonging to 12 classes.


In [42]:
print(f"In train ds there is: {len(train_ds)} batches of {BATCH_SIZE} consisting of {len(train_ds)*BATCH_SIZE-1} images!")

In train ds there is: 169 batches of 64 consisting of 10815 images!


In [None]:
""" Preprocess the data  """
def preprocess(image, label):
    """ Read the docs in https://github.com/keras-team/keras/tree/master/keras/src/applications for more info """
    # CenterCrop after resizing the smallest
    image = tf.keras.layers.CenterCrop(224,224)(image)
    image = tf.keras.applications.resnet.preprocess_input(image)
    return image, label

train_ds_prep = train_ds.map(preprocess)
val_ds_prep = val_ds.map(preprocess)
test_ds_prep = test_ds.map(preprocess)

In [None]:
train_ds_prep.element_spec[0]

In [None]:
for images, labels in train_ds.take(1):
    original_image = images[0].numpy().astype("uint8")  
    label = labels[0].numpy()

    preprocessed_image = preprocess(original_image, label)  

    # Display both images side by side
    plt.figure(figsize=(10, 5))

    # Original image
    plt.subplot(1, 2, 1)  # 1 row, 2 columns, 1st subplot
    plt.imshow(original_image)
    plt.title(f"Original Image\nLabel: {label}")
    plt.axis('off')

    # Preprocessed image
    plt.subplot(1, 2, 2)  # 1 row, 2 columns, 2nd subplot
    plt.imshow(preprocessed_image[0])
    plt.title("Preprocessed Image")
    plt.axis('off')

    plt.show()
    break

In [None]:
SHUFFLE_BUFFER_SIZE = 1000
PREFETCH_BUFFER_SIZE = tf.data.AUTOTUNE

train_ds_final = (train_ds_prep
                  .cache()
                  .shuffle(SHUFFLE_BUFFER_SIZE)
                  .prefetch(PREFETCH_BUFFER_SIZE)
                  )

val_ds_final = (val_ds_prep
                .cache()
                .prefetch(PREFETCH_BUFFER_SIZE)
                )

test_ds_final = (test_ds_prep
                .cache()
                .prefetch(PREFETCH_BUFFER_SIZE)
                )

In [None]:
def create_pre_trained_model():
    pre_trained_model = tf.keras.applications.ResNet50(
        include_top=False,
        input_shape=(224,224,3),
        weights="imagenet"
    )

    for layer in pre_trained_model.layers:
        layer.trainable = False

    return pre_trained_model

pre_trained_model = create_pre_trained_model()
pre_trained_model.summary()

In [None]:
last_output = pre_trained_model.output

print(last_output.shape)

In [None]:
def create_augmentation_model():
    FILL_MODE = 'nearest'

    augmentation_model = tf.keras.Sequential([
        tf.keras.Input(shape=(224, 224, 3)),
        tf.keras.layers.RandomFlip("horizontal_and_vertical"),
        tf.keras.layers.RandomRotation(0.2, fill_mode = FILL_MODE),
        tf.keras.layers.RandomTranslation(0.2, 0.2, fill_mode = FILL_MODE),
        tf.keras.layers.RandomZoom(0.2, fill_mode = FILL_MODE),
    ])

    return augmentation_model

augmentation_model = create_augmentation_model()

In [None]:
def create_model(hp):
    inputs = tf.keras.Input(shape=(224, 224, 3))
    x = augmentation_model(inputs)

    x = pre_trained_model(x)
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dense(hp.Int('dense_units', min_value=32, max_value=2048, step=2, sampling='log'), 
                              activation='relu', 
                              kernel_regularizer=regularizers.l2(hp.Float('dense_l2', min_value=1e-5, max_value=1e-2))
                              )(x)
    x = tf.keras.layers.Dropout(hp.Float('dropout_1', min_value=0.2, max_value=0.66))(x)
    
    outputs = tf.keras.layers.Dense(12, activation='softmax')(x)

    model = tf.keras.Model(inputs = inputs, outputs = outputs)

    model.compile(
        optimizer = tf.keras.optimizers.Adam(learning_rate=hp.Float('learning_rate', min_value=1e-5, max_value=3e-4)),
        loss = 'categorical_crossentropy',
        metrics = ['accuracy']
    )

    return model

In [None]:
summary = create_model(kt.HyperParameters())
summary.summary()

In [None]:
tuner = kt.BayesianOptimization(
    hypermodel=create_model,
    objective='val_accuracy',
    max_trials=50,
    num_initial_points=5,
    executions_per_trial=1,
    overwrite=True,
    directory='hypertune',
    project_name='resnet50_hypertune_trial1'
)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

# Define the log directory
log_dir = 'logs/hyperparameter_tuning'

early_stopping = EarlyStopping(
    monitor='val_loss',       # Monitors the validation loss
    min_delta=0.001,          # Minimum change to qualify as an improvement
    patience=5,               # Number of epochs to wait for improvement
    verbose=1,                # Verbosity mode
    mode='min',               # Stop when the monitored quantity has stopped decreasing
)

tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

In [None]:

tuner.search(
    train_ds_final,
    validation_data=val_ds_final,
    epochs=50,
    callbacks=[tensorboard_callback, early_stopping]
)

In [None]:
# history = model.fit(train_ds_final,
#                     validation_data=val_ds_final,
#                     epochs=30,
#                     verbose=1,
#                     callbacks = [early_stopping],)

In [None]:
def plot_loss_acc(history):
    '''Plots the training and validation loss and accuracy from a history object'''
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    epochs = range(len(acc))

    fig, ax = plt.subplots(1,2, figsize=(12, 6))
    ax[0].plot(epochs, acc, 'bo', label='Training accuracy')
    ax[0].plot(epochs, val_acc, 'b', label='Validation accuracy')
    ax[0].set_title('Training and validation accuracy')
    ax[0].set_xlabel('epochs')
    ax[0].set_ylabel('accuracy')
    ax[0].legend()

    ax[1].plot(epochs, loss, 'bo', label='Training Loss')
    ax[1].plot(epochs, val_loss, 'b', label='Validation Loss')
    ax[1].set_title('Training and validation loss')
    ax[1].set_xlabel('epochs')
    ax[1].set_ylabel('loss')
    ax[1].legend()

    plt.show()

plot_loss_acc(history)

In [None]:
loss, accuracy = model.evaluate(test_ds_final, verbose=1)
print("Test Loss:", loss)
print("Test Accuracy:", accuracy)

In [None]:
model_name = "resnet50_finetune_trial1.h5"
tf.keras.models.save_model(model, model_name)

In [None]:
loaded_model = tf.keras.models.load_model('resnet50_finetune_trial1.h5')

loaded_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(loaded_model)

# (Optional) Optimize the model for better performance and smaller size
# converter.optimizations = [tf.lite.Optimize.DEFAULT]


tflite_model = converter.convert()

with open('model.tflite', 'wb') as f:
    f.write(tflite_model)

print("Model converted to TensorFlow Lite format and saved as model.tflite")