In [None]:
import xml.etree.ElementTree as ET

import tensorflow as tf
from tensorflow import keras

# Disable XLA for GPUs
tf.config.optimizer.set_jit(False)
import keras_cv
from keras_cv import bounding_box
from keras_cv import visualization

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  

In [None]:
import cv2
import numpy as np

# Load the JPEG image
image_path = 'captured_images/index_index_66760152-113a-4dd1-82fb-e91352e59f19.jpg' # Update this to your image's path
image = cv2.imread(image_path)

# Check if image is loaded properly 1-255
if image is not None:
    # Find the minimum and maximum values
    min_val = np.min(image)
    max_val = np.max(image)

    print(f"Minimum pixel value: {min_val}")
    print(f"Maximum pixel value: {max_val}")
else:
    print("Image not found. Please check the file path.")


In [None]:
SPLIT_RATIO = 0.2
BATCH_SIZE = 8
LEARNING_RATE = 0.008
EPOCH = 300
GLOBAL_CLIPNORM = 10.0

class_ids = [
    "thumbs_up",
    "thumbs_down",
    "fist",
    "index",
]
class_mapping = dict(zip(range(len(class_ids)), class_ids))

# Path to images and annotations
path_images = "captured_images/"
path_annot = "captured_images/"

# Get all XML file paths in path_annot and sort them
xml_files = sorted(
    [
        os.path.join(path_annot, file_name)
        for file_name in os.listdir(path_annot)
        if file_name.endswith(".xml")
    ]
)

# Get all JPEG image file paths in path_images and sort them
jpg_files = sorted(
    [
        os.path.join(path_images, file_name)
        for file_name in os.listdir(path_images)
        if file_name.endswith(".jpg")
    ]
)

def parse_annotation(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()

    image_name = root.find("filename").text
    image_path = os.path.join(path_images, image_name)

    boxes = []
    classes = []
    for obj in root.iter("object"):
        cls = obj.find("name").text
        classes.append(cls)

        bbox = obj.find("bndbox")
        xmin = float(bbox.find("xmin").text)
        ymin = float(bbox.find("ymin").text)
        xmax = float(bbox.find("xmax").text)
        ymax = float(bbox.find("ymax").text)
        boxes.append([xmin, ymin, xmax, ymax])

    class_ids = [
        list(class_mapping.keys())[list(class_mapping.values()).index(cls)]
        for cls in classes
    ]
    return image_path, boxes, class_ids


image_paths = []
bbox = []
classes = []
for xml_file in xml_files:
    image_path, boxes, class_ids = parse_annotation(xml_file)
    #Filter images with faulty labels
    if boxes[0][0]>boxes[0][2] or boxes[0][1]>boxes[0][3]:
        print("Error")
        continue

    image_paths.append(image_path)
    bbox.append(boxes)
    classes.append(class_ids)
print()

In [None]:

bbox = tf.ragged.constant(bbox)
classes = tf.ragged.constant(classes)
image_paths = tf.ragged.constant(image_paths)

data = tf.data.Dataset.from_tensor_slices((image_paths, classes, bbox))

# Determine the number of validation samples
num_val = int(len(xml_files) * SPLIT_RATIO)

# Split the dataset into train and validation sets
data = data.shuffle(len(xml_files))
val_data = data.take(num_val)
train_data = data.skip(num_val)

def load_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    return image


def load_dataset(image_path, classes, bbox):
    # Read Image
    image = load_image(image_path)
    bounding_boxes = {
        "classes": tf.cast(classes, dtype=tf.float32),
        "boxes": bbox,
    }
    return {"images": tf.cast(image, tf.float32), "bounding_boxes": bounding_boxes}


augmenter = keras.Sequential(
    layers=[
        keras_cv.layers.RandomFlip(mode="horizontal", bounding_box_format="xyxy"),
        keras_cv.layers.RandomSaturation(0.9),
        keras_cv.layers.RandomShear(
            x_factor=0.2, y_factor=0.2, bounding_box_format="xyxy"
        ),
        keras_cv.layers.JitteredResize(
            target_size=(320, 320), scale_factor=(0.25, 0.5), bounding_box_format="xyxy"
        ),
    ]
)

train_ds = train_data.map(load_dataset, num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.shuffle(BATCH_SIZE * 4)
train_ds = train_ds.ragged_batch(BATCH_SIZE, drop_remainder=True)
train_ds = train_ds.map(augmenter, num_parallel_calls=tf.data.AUTOTUNE)


resizing = keras_cv.layers.JitteredResize(
    target_size=(320, 320),
    scale_factor=(0.25, 0.5),
    bounding_box_format="xyxy",
)

val_ds = val_data.map(load_dataset, num_parallel_calls=tf.data.AUTOTUNE)
val_ds = val_ds.shuffle(BATCH_SIZE * 4)
val_ds = val_ds.ragged_batch(BATCH_SIZE, drop_remainder=True)
val_ds = val_ds.map(resizing, num_parallel_calls=tf.data.AUTOTUNE)

def visualize_dataset(inputs, value_range, rows, cols, bounding_box_format):
    inputs = next(iter(inputs.take(1)))
    images, bounding_boxes = inputs["images"], inputs["bounding_boxes"]
    #print(bounding_boxes)
    visualization.plot_bounding_box_gallery(
        images,
        value_range=value_range,
        rows=rows,
        cols=cols,
        y_true=bounding_boxes,
        scale=5,
        font_scale=0.7,
        bounding_box_format=bounding_box_format,
        class_mapping=class_mapping,
    )


In [None]:
def dict_to_tuple(inputs):
    return inputs["images"], bounding_box.to_dense(
        inputs["bounding_boxes"], max_boxes=1
    )


train_ds = train_ds.map(dict_to_tuple, num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.prefetch(tf.data.AUTOTUNE)

val_ds = val_ds.map(dict_to_tuple, num_parallel_calls=tf.data.AUTOTUNE)
val_ds = val_ds.prefetch(tf.data.AUTOTUNE)

In [None]:
model = keras_cv.models.RetinaNet(
    num_classes=4,
    bounding_box_format="xyxy",
    backbone=keras_cv.models.MobileNetV3Backbone.from_preset(
    "mobilenet_v3_large_imagenet",
    include_rescaling = True,
    )
)

In [None]:
optimizer = tf.keras.optimizers.Adam(
    learning_rate=LEARNING_RATE,
    global_clipnorm=GLOBAL_CLIPNORM,
)
tf.keras.backend.clear_session()
model.compile(
    optimizer=optimizer, classification_loss="focal", box_loss="SmoothL1"
)

In [None]:
initial_learning_rate = LEARNING_RATE
warmup_steps = 10000
total_steps = 50000
alpha = 0.9  # Final learning rate as a fraction of initial_learning_rate
warmup_target = 0.0266

def lr_schedule(epoch, lr):
    # Convert epoch to steps
    step = epoch * 15 + 1  # Assuming steps_per_epoch is defined elsewhere, e.g., len(train_ds)
    
    if step < warmup_steps:
        # Linear warmup
        warmup_lr = initial_learning_rate * warmup_target * step / warmup_steps
        return warmup_lr
    else:
        # Cosine decay
        decayed_lr = (initial_learning_rate - alpha * initial_learning_rate) * 0.5 * (1 + tf.math.cos((step - warmup_steps) / (total_steps - warmup_steps) * np.pi)) + alpha * initial_learning_rate
        return float(decayed_lr)

# Create a LearningRateScheduler callback
lr_scheduler = tf.keras.callbacks.LearningRateScheduler(lr_schedule, verbose=1)

In [None]:
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCH,
    callbacks=lr_scheduler,
    verbose = 1,
)

In [None]:
import matplotlib.pyplot as plt

# Data for plotting
training_loss = history.history['loss']
validation_loss = history.history['val_loss']

# Calculate minimum loss values and their corresponding epochs
min_training_loss = min(training_loss)
min_training_epoch = training_loss.index(min_training_loss)
min_validation_loss = min(validation_loss)
min_validation_epoch = validation_loss.index(min_validation_loss)

# Plotting
plt.plot(training_loss, label='Training Loss')
plt.plot(validation_loss, label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.yscale('log')  # Set y-axis to logarithmic scale
plt.grid(True)  # Enable grid

# Annotating minimum loss values on the plot
plt.annotate(f'Min Training Loss: {min_training_loss:.4f}',
             (min_training_epoch, min_training_loss),
             textcoords="offset points",
             xytext=(-10,-10),
             ha='center',
             arrowprops=dict(arrowstyle="->", color='r'))

plt.annotate(f'Min Validation Loss: {min_validation_loss:.4f}',
             (min_validation_epoch, min_validation_loss),
             textcoords="offset points",
             xytext=(-10,10),
             ha='center',
             arrowprops=dict(arrowstyle="->", color='g'))

plt.legend()
plt.show()
print(training_loss[-1])
print(validation_loss[-1])

In [None]:
import numpy as np
def filter_top_two_predictions(y_pred):
    # Extract probabilities and get the indices of the top two
    probabilities = y_pred['boxes']
    probabilities1 = y_pred['confidence']
    probabilities2 = y_pred['classes']
    probabilities3 = y_pred['num_detections']
    top_two_indices = [0,1] # Get indices of top 2 predictions
    # Filter each part of y_pred using these indices dict_keys(['boxes', 'confidence', 'classes', 'num_detections'])
    y_pred_filtered = {
        'boxes': np.array(y_pred['boxes'])[:,top_two_indices,:],
        'confidence': np.array(y_pred['confidence'])[:,top_two_indices],
        'classes': np.array(y_pred['classes'])[:,top_two_indices],
        'num_detections': y_pred['num_detections'],
        # Assuming the fourth entry is structured similarly; adjust if necessary
        # 'fourth_entry': np.array(y_pred['fourth_entry_name'])[top_two_indices]
    }
    return y_pred_filtered

def visualize_detections(model, dataset, bounding_box_format):
    images, y_true = next(iter(dataset.take(1)))
    y_pred = model.predict(images)
    y_pred_top_two = filter_top_two_predictions(y_pred)
    #print(y_pred_top_two)
    #print(y_true)
    visualization.plot_bounding_box_gallery(
        images,
        value_range=(0, 255),
        bounding_box_format=bounding_box_format,
        y_true=y_true,
        y_pred=y_pred_top_two,
        scale=4,
        rows=2,
        cols=1,
        show=True,
        font_scale=0.7,
        class_mapping=class_mapping,
    )

images, y_true = next(iter(train_ds.take(1)))
y_pred = model.predict(images)
visualize_detections(model, dataset=train_ds, bounding_box_format="xyxy")

In [None]:
# Evaluate the model on the training dataset
train_evaluation = model.evaluate(train_ds, verbose=1, return_dict=True)
print(f"Training Evaluation: {train_evaluation}")

# Evaluate the model on the validation dataset
val_evaluation = model.evaluate(val_ds, verbose=1, return_dict=True)
print(f"Validation Evaluation: {val_evaluation}")


In [None]:
model.save("modelMobile.keras")