# Import Libs

In [4]:
import random
import pandas as pd
import torch
import yolov5.utils
from yolov5.utils.dataloaders import LoadImages
from yolov5.utils.general import non_max_suppression, xyxy2xywh
import torch
from pathlib import Path
import cv2
import matplotlib.pyplot as plt
import os

%matplotlib inline

import torch

if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

In [2]:
import os
current_path = os.getcwd()
current_path

'/Users/khang/Documents/IFN703'

In [3]:
import numpy as np
import random
import cv2
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model

# Helper Functions

## Parameters

In [5]:
# Hyperparameters (Change for making different models for parameters tuning)
history_size = 10
action_option = 9
max_steps = 10
experience_sample_size = 20
max_experience_size = 1000
gamma = 0.9
epsilon_change_steps = 10
loss_arr = []
IOU_STOP = 0.5

## Extract Features 

In [6]:
# Function to extract features from image and history
def extract_feature(image, history, vgg16):
    history_feature = np.zeros(action_option * history_size)
    for i in range(history_size):
        if history[i] != -1:
            history_feature[i * action_option + history[i]] = 1

    feature_extractor = Model(inputs=vgg16.input, outputs=vgg16.layers[20].output)

    image_reshape = [(cv2.resize(image, (224, 224))).reshape(1, 224, 224, 3)]
    image_feature = feature_extractor(image_reshape)[0]
    image_feature = np.ndarray.flatten(image_feature.numpy())
    feature = np.concatenate((image_feature, history_feature))

    return np.array([feature])

## Computing Values for Models

In [9]:
# Q-Value computation
def compute_q(feature, deep_q_model):
    output = deep_q_model.predict(feature, verbose=0)
    return np.ndarray.flatten(output)

# Compute IoU (Intersection over Union) between the predicted mask and ground truth
def compute_iou(mask, ground_truth):
    dx = min(mask[2], ground_truth[2]) - max(mask[0], ground_truth[0])
    dy = min(mask[3], ground_truth[3]) - max(mask[1], ground_truth[1])

    if dx >= 0 and dy >= 0:
        inter_area = dx * dy
    else:
        inter_area = 0

    mask_area = (mask[2] - mask[0]) * (mask[3] - mask[1])
    ground_truth_area = (ground_truth[2] - ground_truth[0]) * (ground_truth[3] - ground_truth[1])

    return inter_area / (mask_area + ground_truth_area - inter_area)

# Compute reward based on the IoU change
def compute_reward(action, ground_truth, current_mask):
    new_mask = compute_mask(action, current_mask)
    iou_new = compute_iou(new_mask, ground_truth)
    iou_current = compute_iou(current_mask, ground_truth)

    return 1 if iou_current < iou_new else -1

# Compute reward at the end of an episode
def compute_end_reward(current_mask, ground_truth):
    iou = compute_iou(current_mask, ground_truth)
    print("CURRENT IOU =", iou)
    return 3 if iou > IOU_STOP else -3

# Update target values for experience replay
def compute_target(reward, new_feature, model):
    return reward + gamma * np.amax(compute_q(new_feature, model))

## Actions

In [8]:

# Compute new mask based on the action
def compute_mask(action, current_mask):
    image_rate = 0.1
    delta_width = image_rate * (current_mask[2] - current_mask[0])
    delta_height = image_rate * (current_mask[3] - current_mask[1])

    dx1, dy1, dx2, dy2 = 0, 0, 0, 0

    # Apply the action to modify the bounding box (mask)
    if action == 0:
        dx1, dx2 = delta_width, delta_width
    elif action == 1:
        dx1, dx2 = -delta_width, -delta_width
    elif action == 2:
        dy1, dy2 = delta_height, delta_height
    elif action == 3:
        dy1, dy2 = -delta_height, -delta_height
    elif action == 4:
        dx1, dx2, dy1, dy2 = -delta_width, delta_width, -delta_height, delta_height
    elif action == 5:
        dx1, dx2, dy1, dy2 = delta_width, -delta_width, delta_height, -delta_height
    elif action == 6:
        dy1, dy2 = delta_height, -delta_height
    elif action == 7:
        dx1, dx2 = delta_width, -delta_width

    new_mask_tmp = np.array([current_mask[0] + dx1, current_mask[1] + dy1,
                             current_mask[2] + dx2, current_mask[3] + dy2])
    new_mask = np.array([
        min(new_mask_tmp[0], new_mask_tmp[2]),
        min(new_mask_tmp[1], new_mask_tmp[3]),
        max(new_mask_tmp[0], new_mask_tmp[2]),
        max(new_mask_tmp[1], new_mask_tmp[3])
    ])

    return new_mask

# Action selection using epsilon-greedy
def select_action(feature, ground_truth_box, step, q_value, epsilon, current_mask):
    if step == max_steps:
        action = 8 #select trigger if agent surpassed maximum number of steps

    else:
        if random.random() > epsilon:
            action = np.argmax(q_value)
        else:
            end_reward = compute_end_reward(current_mask, ground_truth_box)
            if end_reward > 0 and step != 0:
                action = 8
            else:
                rewards = []
                for i in range(action_option - 1):
                    reward = compute_reward(i, ground_truth_box, current_mask)
                    rewards.append(reward)
                rewards = np.asarray(rewards)
                positive_reward_index = np.where(rewards >= 0)[0]

                if len(positive_reward_index) == 0:
                    positive_reward_index = np.asarray(range(9))

                action = np.random.choice(positive_reward_index)

    return action


# Execute action and update mask, history, and reward
def execute_action(action, history, ground_truth_box, current_mask):
    if action == 8:
        new_mask = current_mask
        reward = compute_end_reward(current_mask, ground_truth_box)
        end = True
    else:
        new_mask = compute_mask(action, current_mask)
        reward = compute_reward(action, ground_truth_box, current_mask)
        history = history[1:]
        history.append(action)
        end = False

    return new_mask, reward, end, history

## Crop image

In [10]:
# Crop the image based on the mask
def crop_image(image, new_mask):
    height, width, channel = np.shape(image)
    new_mask = np.asarray(new_mask).astype("int")
    new_mask[0] = max(new_mask[0], 0)
    new_mask[1] = max(new_mask[1], 0)
    new_mask[2] = min(new_mask[2], width)
    new_mask[3] = min(new_mask[3], height)
    cropped_image = image[new_mask[1]:new_mask[3], new_mask[0]:new_mask[2]]
    new_height, new_width, new_channel = np.shape(cropped_image)

    if new_height == 0 or new_width == 0:
        cropped_image = np.zeros((224, 224, 3))
    else:
        cv2.resize(cropped_image, (224, 224))

    return cropped_image

## Experience Replay

In [11]:
# Experience replay to train the model
def experience_replay(deep_q_model, experience):
    sample = random.choices(experience, k=experience_sample_size)

    targets = np.zeros((experience_sample_size, action_option))

    for i in range(experience_sample_size):
        feature, action, new_feature, reward, end = sample[i]
        target = reward

        if not end:
            target = compute_target(reward, new_feature, deep_q_model)

        targets[i, :] = compute_q(feature, deep_q_model)
        targets[i][action] = target

    x = np.concatenate([each[0] for each in sample])

    global loss_arr
    loss = deep_q_model.train_on_batch(x, targets)
    loss_arr.append(loss)
    if len(loss_arr) == 100:
        print("loss %s" % str(sum(loss_arr) / len(loss_arr)))
        loss_arr = []

## Read Images and Annotations

In [12]:
def read_image_index(basepath, test=True):
    """
    Reading the names of images from the directory structure.
    """
    index_list = []
    image_folder_path = os.path.join(basepath, "images")
    for image_file in os.listdir(image_folder_path):
        if image_file.endswith(".jpg"):
            index_list.append(image_file.split(".")[0])
    return index_list

def read_image(basepath):
    """
    Loading images using their name from the images folder.
    """
    image_list = []
    # Directories
    img_dir = Path(basepath + '/images')

    # List all image files in the directory
    supported_formats = ['.jpg', '.jpeg', '.png', '.bmp']
    image_paths = [p for p in img_dir.glob('*') if p.suffix.lower() in supported_formats]

    print(f"Found {len(image_paths)} images in {img_dir}")

    for img_path in image_paths:
        img = cv2.imread(str(img_path))
        if img is not None:
            image_list.append(img)
        else:
            print(f"Warning: {img_path} not found or invalid.")
    return image_list

def convert_yolo_label_to_coords(box, img_width, img_height):
    """
    Convert YOLO format (relative coordinates) to pixel coordinates.
    YOLO format is (x_center, y_center, width, height), all normalized.
    """
    x_center, y_center, width, height = box
    x_center *= img_width
    y_center *= img_height
    width *= img_width
    height *= img_height
    xmin = int(x_center - width / 2)
    ymin = int(y_center - height / 2)
    xmax = int(x_center + width / 2)
    ymax = int(y_center + height / 2)
    return [xmin, ymin, xmax, ymax]

def load_annotation(basepath, filtered_class):
    """
    Loading bounding boxes from TXT annotations in labels folder.
    Only extracts images with the "car" label.
    """
    bounding_box_list = []
    annotation_folder_path = os.path.join(basepath, "labels")

    # Directories
    img_dir = Path(basepath + '/images')
    lbl_dir = Path(basepath + '/labels')

    # List all image files in the directory
    supported_formats = ['.jpg', '.jpeg', '.png', '.bmp']
    image_paths = [p for p in img_dir.glob('*') if p.suffix.lower() in supported_formats]

    for img_path in image_paths:
        label_path = lbl_dir / (img_path.stem + '.txt')

        #try:
        # Read image to get dimensions
        img = cv2.imread(str(img_path))
        img_height, img_width = img.shape[:2]

        # Read ground truth labels from TXT
        with open(label_path, 'r') as f:
            ground_truth = [list(map(float, line.strip().split())) for line in f.readlines()]

        car_boxes = []
        # Filter for "car" label (label index 5 in the provided names list)
        for box in ground_truth:
            if len(box) == 5 and int(box[0]) == filtered_class: 
                car_boxes.append(convert_yolo_label_to_coords(box[1:], img_width, img_height))
        #if car_boxes:
        bounding_box_list.append(car_boxes)

        #except Exception as e:
        #    print(f"Error reading annotation for {img_path}: {e}")
        #    bounding_box_list.append([])  # Empty box list if failed to read annotation

    return bounding_box_list

def load_data_fil(dataset_path, test=False, filtered_class=5):
    """
    Loading dataset images and their corresponding bounding boxes.
    """
    image_index = read_image_index(dataset_path, test)
    image_list = read_image(dataset_path)
    bounding_box_list = load_annotation(dataset_path, filtered_class)

    # Optionally save to .npy files if needed
    if test:
        np.save("val_images.npy", image_list)
        np.save("val_boxes.npy", bounding_box_list)

    print(bounding_box_list[:5])
    print("DONE LOADING")

    return image_list, bounding_box_list

# Load Data

In [20]:
# Load data
image_list, bounding_box_list = load_data_fil("valid", filtered_class=5)  # Assuming this is the correct function name

# Ensure image_list and bounding_box_list have the same length
assert len(image_list) == len(bounding_box_list), "Image list and bounding box list must have the same length."

# Initialize lists for selected images and bounding boxes
random_image_list = []
random_bounding_box_list = []

# Select 300 valid random images with non-empty bounding boxes
while len(random_image_list) < 300:
    random_index = random.randint(0, len(image_list) - 1)
    if len(bounding_box_list[random_index]) > 0:  # Check if bounding box list is non-empty
        random_image_list.append(image_list[random_index])
        random_bounding_box_list.append(bounding_box_list[random_index])

print(f"Selected {len(random_image_list)} images and their corresponding bounding boxes.")

# Check for bounding box lists
for box_lst in random_bounding_box_list:
    if len(box_lst) == 0:
        print("Found an empty bounding box list, which should not happen.")

Found 362 images in valid/images
[[[96, 284, 540, 538]], [[33, 225, 509, 605]], [], [[146, 234, 616, 598]], [[101, 3, 555, 486]]]
DONE LOADING
Selected 300 images and their corresponding bounding boxes.


In [21]:
# Load person data
person_image_list, person_bounding_box_list = load_data_fil("valid", filtered_class=8)  # Assuming this is the correct function name

# Ensure image_list and bounding_box_list have the same length
assert len(person_image_list) == len(person_bounding_box_list), "Image list and bounding box list must have the same length."

# Initialize lists for selected images and bounding boxes
selected_person_image_list = []
selected_person_bounding_box_list = []

# Select 100 valid random images with non-empty bounding boxes
for i in range(len(person_bounding_box_list )):
    if len(person_bounding_box_list[i]) > 0 and len(selected_person_bounding_box_list) <= 100:  # Check if bounding box list is non-empty
        selected_person_image_list.append(person_image_list[i])
        selected_person_bounding_box_list.append(person_bounding_box_list[i])

print(f"Selected {len(selected_person_image_list)} images and their corresponding bounding boxes.")

# Check for bounding box lists
for box_lst in selected_person_bounding_box_list:
    if len(box_lst) == 0:
        print("Found an empty bounding box list, which should not happen.")

Found 362 images in valid/images
[[], [], [[133, 248, 499, 504]], [], []]
DONE LOADING
Selected 38 images and their corresponding bounding boxes.


# Train Model

In [16]:
HUBER_DELTA = 1.0
def smoothL1(y_true, y_pred):
    x = K.abs(y_true - y_pred)
    x = tf.where(x < HUBER_DELTA, 0.5 * x ** 2, HUBER_DELTA * (x - 0.5 * HUBER_DELTA))
    return K.sum(x)


def create_q_model():
    model = Sequential()
    model.add(Dense(1024, input_shape=(4096 + action_option*history_size,), activation='relu'))
    model.add(Dense(1024, activation='relu'))
    model.add(Dense(9, activation='linear'))
    model.compile(loss=smoothL1, optimizer='adam')
    return model


def create_vgg16():
    vgg16 = VGG16(weights='imagenet', include_top=True , pooling='max')
    #vgg16.summary()
    return vgg16


# Change Epochs
training_epoch = 10
epsilon = 1
#image_list, bounding_box_list = load_data("",test=False)
deep_q_model = create_q_model()
vgg16 = create_vgg16()

trained_model = train_deep_q(training_epoch, epsilon, random_image_list, random_bounding_box_list, deep_q_model, vgg16)

In [18]:
# Person model
trained_model_person = train_deep_q(training_epoch, selected_person_image_list, selected_person_bounding_box_list, epsilon, , deep_q_model, vgg16)

# Save Models

In [None]:
name_of_the_model = "well_train_model_max_step_" + str(max_steps) + "_gamma_" + str(gamma) \
    + "_epochs_" + str(training_epoch) + "_trigger_threshold_" + str(IOU_STOP) + ".h5"
trained_model.save(name_of_the_model)

In [None]:
name_of_the_model = "person_well_train_model_max_step_" + str(max_steps) + "_gamma_" + str(gamma) \
    + "_epochs_" + str(training_epoch) + "_trigger_threshold_" + str(IOU_STOP) + ".h5"
trained_model_person.save(name_of_the_model)