# Project setup

## Install and import required libraries

In [None]:
# update pip -> pip install --upgrade pip
# python 3.10 -> conda create --name viziotf python=3.10
# tensorflow 2.10.0 -> pip install tensorflow==2.10
# CUDA 11.2 -> https://developer.nvidia.com/cuda-11.2.0-download-archive?target_os=Windows&target_arch=x86_64&target_version=10&target_type=exelocal
# CuDNN 8.1 -> https://developer.nvidia.com/rdp/cudnn-archive
# Protobuf -> pip install protobuf==3.20.1

# check package
!pip list

In [None]:
import os
import glob
import pandas as pd
import xml.etree.ElementTree as ET
import random
import shutil
from PIL import Image
import albumentations as A
import wget
import numpy as np
import cv2
import tensorflow as tf
import keras

## Setup environment

In [None]:
CUSTOM_MODEL_NAME = 'vizio12' 
PRETRAINED_MODEL_NAME = 'ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8'
PRETRAINED_MODEL_URL = 'http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz'
TF_RECORD_SCRIPT_NAME = 'generate_tfrecord.py'
LABEL_MAP_NAME = 'label_map.pbtxt'

In [None]:
paths = {
    'WORKSPACE_PATH': os.path.join('FYP', 'workspace'),
    'SCRIPTS_PATH': os.path.join('FYP','scripts'),
    'APIMODEL_PATH': os.path.join('FYP','models'),
    'PROTOC_PATH':os.path.join('FYP','protoc'),
    'RESOURCES_PATH': os.path.join('FYP', 'workspace','resources'),
    'IMAGES_PATH': os.path.join('FYP', 'workspace','images'),
    'MODEL_PATH': os.path.join('FYP', 'workspace','models'),
    'PRETRAINED_MODEL_PATH': os.path.join('FYP', 'workspace','pre-trained-models'),
    'CHECKPOINT_PATH': os.path.join('FYP', 'workspace','models',CUSTOM_MODEL_NAME), 
    'OUTPUT_PATH': os.path.join('FYP', 'workspace','models',CUSTOM_MODEL_NAME, 'export'), 
    'TFLITE_PATH':os.path.join('FYP', 'workspace','models',CUSTOM_MODEL_NAME, 'tfliteexport'), 
    'RESULT_PATH': os.path.join('FYP', 'workspace','models',CUSTOM_MODEL_NAME, 'result'), 
     }

In [None]:
files = {
    'PIPELINE_CONFIG':os.path.join('FYP', 'workspace','models', CUSTOM_MODEL_NAME, 'pipeline.config'),
    'TF_RECORD_SCRIPT': os.path.join(paths['SCRIPTS_PATH'], TF_RECORD_SCRIPT_NAME), 
    'LABELMAP': os.path.join(paths['RESOURCES_PATH'], LABEL_MAP_NAME)
}

In [None]:
for path in paths.values():
    if not os.path.exists(path):
        !mkdir {path}
        print('New folder and files created.')

# Data preprocessing

## Random sampling

In [None]:
def random_sampling(IMAGE_PATH, OUTPUT_PATH, SAMPLE_SIZE):
    
    # Get a list of all image files in the folder
    image_files = [file for file in os.listdir(IMAGE_PATH) if file.lower().endswith('.jpg')]

    # Randomly shuffle the image files
    random.shuffle(image_files)

    # Get the desired number of images for sampling
    image_files_sampled = image_files[:SAMPLE_SIZE]

    # Copy the sampled images to the destination directory
    for image_file in image_files_sampled:
        shutil.copy(os.path.join(IMAGE_PATH, image_file), os.path.join(OUTPUT_PATH, image_file))

In [None]:
IMAGE_PATH = 'D:/# fyp_dataset/2 organized/entrance_front'
OUTPUT_PATH = 'D:/# fyp_dataset/final-sampling/entrance_front'
SAMPLE_SIZE = 1560
random_sampling(IMAGE_PATH, OUTPUT_PATH, SAMPLE_SIZE)

IMAGE_PATH = 'D:/# fyp_dataset/2 organized/entrance_left'
OUTPUT_PATH = 'D:/# fyp_dataset/final-sampling/entrance_left'
SAMPLE_SIZE = 1560
random_sampling(IMAGE_PATH, OUTPUT_PATH, SAMPLE_SIZE)

IMAGE_PATH = 'D:/# fyp_dataset/2 organized/entrance_right'
OUTPUT_PATH = 'D:/# fyp_dataset/final-sampling/entrance_right'
SAMPLE_SIZE = 1560
random_sampling(IMAGE_PATH, OUTPUT_PATH, SAMPLE_SIZE)


IMAGE_PATH = 'D:/# fyp_dataset/2 organized/escalator_front'
OUTPUT_PATH = 'D:/# fyp_dataset/final-sampling/escalator_front'
SAMPLE_SIZE = 1560
random_sampling(IMAGE_PATH, OUTPUT_PATH, SAMPLE_SIZE)

IMAGE_PATH = 'D:/# fyp_dataset/2 organized/escalator_left'
OUTPUT_PATH = 'D:/# fyp_dataset/final-sampling/escalator_left'
SAMPLE_SIZE = 1560
random_sampling(IMAGE_PATH, OUTPUT_PATH, SAMPLE_SIZE)

IMAGE_PATH = 'D:/# fyp_dataset/2 organized/escalator_right'
OUTPUT_PATH = 'D:/# fyp_dataset/final-sampling/escalator_right'
SAMPLE_SIZE = 1560
random_sampling(IMAGE_PATH, OUTPUT_PATH, SAMPLE_SIZE)


IMAGE_PATH = 'D:/# fyp_dataset/2 organized/stair_front'
OUTPUT_PATH = 'D:/# fyp_dataset/final-sampling/stair_front'
SAMPLE_SIZE = 1560
random_sampling(IMAGE_PATH, OUTPUT_PATH, SAMPLE_SIZE)

IMAGE_PATH = 'D:/# fyp_dataset/2 organized/stair_left'
OUTPUT_PATH = 'D:/# fyp_dataset/final-sampling/stair_left'
SAMPLE_SIZE = 1560
random_sampling(IMAGE_PATH, OUTPUT_PATH, SAMPLE_SIZE)

IMAGE_PATH = 'D:/# fyp_dataset/2 organized/stair_right'
OUTPUT_PATH = 'D:/# fyp_dataset/final-sampling/stair_right'
SAMPLE_SIZE = 1560
random_sampling(IMAGE_PATH, OUTPUT_PATH, SAMPLE_SIZE)

## Image resizing

In [None]:
def resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME):
    
    counter = 1
    
    # Get images from the path
    image_files = os.listdir(IMAGE_PATH)
    
    for image in image_files:
        
        image_path = os.path.join(IMAGE_PATH, image)
    
        # Generate the final path with the new file name
        new_file_name = f'{OUTPUT_NAME}_{counter}.jpg'
        final_path = os.path.join(OUTPUT_PATH, new_file_name)
        
        # get the original image
        image = cv2.imread(image_path)

        # Resize while maintaining aspect ratio using INTER_CUBIC interpolation
        aspect_ratio = image.shape[1] / image.shape[0]
        new_width = int(TARGET_SIZE[1] * aspect_ratio)
        resized_image = cv2.resize(image, (new_width, TARGET_SIZE[1]), interpolation=cv2.INTER_CUBIC)

        # Create a canvas of the target size and place the resized image in the center
        canvas = np.zeros((TARGET_SIZE[1], TARGET_SIZE[0], 3), dtype=np.uint8)
        x_offset = (TARGET_SIZE[0] - new_width) // 2
        canvas[:, x_offset:x_offset+new_width, :] = resized_image
    
        # Save the modified image to the specified output path
        cv2.imwrite(final_path, canvas.astype(np.uint8))
        
        counter += 1

In [None]:
# IMAGE_PATH = 'D:/# fyp_dataset/final-sampling/entrance_front'
# TARGET_SIZE = (320, 320)
# OUTPUT_PATH = 'D:/# fyp_dataset/final-resized/entrance_front'
# OUTPUT_NAME = 'entrance_front'
# resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)

# IMAGE_PATH = 'D:/# fyp_dataset/final-sampling/entrance_left'
# TARGET_SIZE = (320, 320)
# OUTPUT_PATH = 'D:/# fyp_dataset/final-resized/entrance_left'
# OUTPUT_NAME = 'entrance_left'
# resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)

# IMAGE_PATH = 'D:/# fyp_dataset/final-sampling/entrance_right'
# TARGET_SIZE = (320, 320)
# OUTPUT_PATH = 'D:/# fyp_dataset/final-resized/entrance_right'
# OUTPUT_NAME = 'entrance_right'
# resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)


# IMAGE_PATH = 'D:/# fyp_dataset/final-sampling/escalator_front'
# TARGET_SIZE = (320, 320)
# OUTPUT_PATH = 'D:/# fyp_dataset/final-resized/escalator_front'
# OUTPUT_NAME = 'escalator_front'
# resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)

# IMAGE_PATH = 'D:/# fyp_dataset/final-sampling/escalator_left'
# TARGET_SIZE = (320, 320)
# OUTPUT_PATH = 'D:/# fyp_dataset/final-resized/escalator_left'
# OUTPUT_NAME = 'escalator_left'
# resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)

# IMAGE_PATH = 'D:/# fyp_dataset/final-sampling/escalator_right'
# TARGET_SIZE = (320, 320)
# OUTPUT_PATH = 'D:/# fyp_dataset/final-resized/escalator_right'
# OUTPUT_NAME = 'escalator_right'
# resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)


# IMAGE_PATH = 'D:/# fyp_dataset/final-sampling/stair_front'
# TARGET_SIZE = (320, 320)
# OUTPUT_PATH = 'D:/# fyp_dataset/final-resized/stair_front'
# OUTPUT_NAME = 'stair_front'
# resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)

# IMAGE_PATH = 'D:/# fyp_dataset/final-sampling/stair_left'
# TARGET_SIZE = (320, 320)
# OUTPUT_PATH = 'D:/# fyp_dataset/final-resized/stair_left'
# OUTPUT_NAME = 'stair_left'
# resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)

# IMAGE_PATH = 'D:/# fyp_dataset/final-sampling/stair_right'
# TARGET_SIZE = (320, 320)
# OUTPUT_PATH = 'D:/# fyp_dataset/final-resized/stair_right'
# OUTPUT_NAME = 'stair_right'
# resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)

IMAGE_PATH = 'D:\\# fyp_dataset\\references\\original\\mrt muzium negara\\entrance'
TARGET_SIZE = (320, 320)
OUTPUT_PATH = 'D:\\# fyp_dataset\\references\\original\\mrt muzium negara\\entrance'
OUTPUT_NAME = 'reference'
resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)

IMAGE_PATH = 'D:\\# fyp_dataset\\references\\original\\mrt muzium negara\\escalator'
TARGET_SIZE = (320, 320)
OUTPUT_PATH = 'D:\\# fyp_dataset\\references\\original\\mrt muzium negara\\escalator'
OUTPUT_NAME = 'reference'
resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)

IMAGE_PATH = 'D:\\# fyp_dataset\\references\\original\\mrt muzium negara\\stair'
TARGET_SIZE = (320, 320)
OUTPUT_PATH = 'D:\\# fyp_dataset\\references\\original\\mrt muzium negara\\stair'
OUTPUT_NAME = 'reference'
resize_image(IMAGE_PATH, TARGET_SIZE, OUTPUT_PATH, OUTPUT_NAME)

## Data splitting

In [None]:
def split_dataset(base_path, image_src, annotation_src, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):

    # Source folders
    image_folder = os.path.join(base_path, image_src)
    annotation_folder = os.path.join(base_path, annotation_src)
    
    # Destination folders
    folders = {
        'train': {
            'images': os.path.join(base_path, 'train', 'images'),
            'annotations': os.path.join(base_path, 'train', 'annotations')
        },
        'validate': {
            'images': os.path.join(base_path, 'validate', 'images'),
            'annotations': os.path.join(base_path, 'validate', 'annotations')
        },
        'test': {
            'images': os.path.join(base_path, 'test', 'images'),
            'annotations': os.path.join(base_path, 'test', 'annotations')
        }
    }
    
    image_extensions = ['.jpg']
    annotation_extensions = ['.xml']
    
    imgs_list = [filename for filename in os.listdir(image_folder) if os.path.splitext(filename)[-1] in image_extensions]
    annotations_list = [os.path.splitext(img)[0] + ext for img in imgs_list for ext in annotation_extensions]

    random.seed(42)
    combined = list(zip(imgs_list, annotations_list))
    random.shuffle(combined)
    imgs_list, annotations_list = zip(*combined)

    sizes = {
        'train': int(len(imgs_list) * train_ratio),
        'validate': int(len(imgs_list) * val_ratio),
        'test': int(len(imgs_list) * test_ratio)
    }

    # Create destination sub-folders if they don't exist
    for split, paths in folders.items():
        for kind, path in paths.items():
            if not os.path.exists(path):
                os.makedirs(path)

    # Copy image and annotation files to destination sub-folders
    for i, (img, annotation) in enumerate(zip(imgs_list, annotations_list)):
        if i < sizes['train']:
            split = 'train'
        elif i < sizes['train'] + sizes['validate']:
            split = 'validate'
        else:
            split = 'test'
        
        shutil.copy(os.path.join(image_folder, img), os.path.join(folders[split]['images'], img))
        shutil.copy(os.path.join(annotation_folder, annotation), os.path.join(folders[split]['annotations'], annotation))

In [None]:
base_path = 'D:/# fyp_dataset/balanced'
split_dataset(base_path, 'entrance_front', 'entrance_front_annotations')
split_dataset(base_path, 'entrance_left', 'entrance_left_annotations')
split_dataset(base_path, 'entrance_right', 'entrance_right_annotations')
split_dataset(base_path, 'escalator_front', 'escalator_front_annotations')
split_dataset(base_path, 'escalator_left', 'escalator_left_annotations')
split_dataset(base_path, 'escalator_right', 'escalator_right_annotations')
split_dataset(base_path, 'stair_front', 'stair_front_annotations')
split_dataset(base_path, 'stair_left', 'stair_left_annotations')
split_dataset(base_path, 'stair_right', 'stair_right_annotations')

## Data augmentation

In [None]:
def load_image(image_path):
    return cv2.imread(image_path)

In [None]:
def save_image(output_path, image):
    cv2.imwrite(output_path, image)

In [None]:
def load_annotation(annotation_path):
    tree = ET.parse(annotation_path)
    root = tree.getroot()
    
    bboxes = []
    labels = []

    for obj in root.findall('object'):
        label = obj.find('name').text
        bbox = obj.find('bndbox')
        x_min = int(bbox.find('xmin').text)
        y_min = int(bbox.find('ymin').text)
        x_max = int(bbox.find('xmax').text)
        y_max = int(bbox.find('ymax').text)
        
        bboxes.append((x_min, y_min, x_max, y_max))
        labels.append(label)

    return bboxes, labels

In [None]:
def save_annotation(output_path, bboxes, labels, template_xml, new_image_name):
    tree = ET.parse(template_xml)
    root = tree.getroot()
    
    # Update filename in the XML
    filename_xml = root.find('filename')
    if filename_xml is not None:
        filename_xml.text = new_image_name
    
    # Remove existing objects in the template
    for obj in root.findall('object'):
        root.remove(obj)

    for bbox, label in zip(bboxes, labels):
        # Create a new object element and populate it
        obj = ET.SubElement(root, 'object')
        ET.SubElement(obj, 'name').text = label
        ET.SubElement(obj, 'pose').text = 'Unspecified'
        ET.SubElement(obj, 'truncated').text = '0'
        ET.SubElement(obj, 'difficult').text = '0'
        
        bbox_xml = ET.SubElement(obj, 'bndbox')
        ET.SubElement(bbox_xml, 'xmin').text = str(int(bbox[0]))
        ET.SubElement(bbox_xml, 'ymin').text = str(int(bbox[1]))
        ET.SubElement(bbox_xml, 'xmax').text = str(int(bbox[2]))
        ET.SubElement(bbox_xml, 'ymax').text = str(int(bbox[3]))

    tree.write(output_path)


In [None]:
def random_sampling_and_augmentation(IMAGE_PATH, ANNOTATION_PATH, OUTPUT_IMAGE_PATH, OUTPUT_ANNOTATION_PATH, SAMPLE_RATIO):
    
    image_height = 320
    image_width = 320
    NUM_AUGMENTATIONS_PER_IMAGE = 3

    augmentation_pipeline = A.Compose([
        
        # Hue and Saturation
        A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=0.5),

        # Contrast and Brightness
        A.RandomBrightnessContrast(p=0.5),

        # Random zooming with limited reduction
        A.RandomSizedCrop(min_max_height=(int(0.8*image_height), image_height), height=image_height, width=image_width, p=0.5),

        # Affine and Geometric transformations
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.2, rotate_limit=15, p=0.5),

        # Mild Perspective change
        A.Perspective(scale=(0.02, 0.05), p=0.5),

        # Blurs
        A.MotionBlur(p=0.2),
        A.GaussianBlur(p=0.2),

        # Noise
        A.ISONoise(p=0.5)
    ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

    
    # Get a list of all image files in the folder
    image_files = [file for file in os.listdir(IMAGE_PATH) if file.lower().endswith('.jpg')]

    # Randomly shuffle the image files
    random.shuffle(image_files)

    # Determine the sample size based on the desired ratio
    sample_size = int(len(image_files) * SAMPLE_RATIO)

    # Get the desired number of images for sampling
    image_files_sampled = image_files[:sample_size]

    # For each sampled image, apply augmentation and save both image and annotation
    for image_file in image_files_sampled:
        
        # Load the image
        image = load_image(os.path.join(IMAGE_PATH, image_file))
        
        # Load the corresponding annotation
        annotation_name = image_file.replace(".jpg", ".xml")
        bboxes, labels = load_annotation(os.path.join(ANNOTATION_PATH, annotation_name))
        
        for i in range(NUM_AUGMENTATIONS_PER_IMAGE):
            # Apply the augmentation
            transformed = augmentation_pipeline(image=image, bboxes=bboxes, labels=labels)
            transformed_image = transformed["image"]
            transformed_bboxes = transformed["bboxes"]
            
            # Save the augmented image with a new name
            new_image_name = f"aug_{i}_" + image_file
            save_image(os.path.join(OUTPUT_IMAGE_PATH, new_image_name), transformed_image)
            
            # Save the updated annotation with a corresponding name
            new_annotation_name = new_image_name.replace(".jpg", ".xml")
            template_xml = os.path.join(ANNOTATION_PATH, annotation_name)
            save_annotation(os.path.join(OUTPUT_ANNOTATION_PATH, new_annotation_name), transformed_bboxes, 
                            labels, template_xml, new_image_name)


In [None]:
IMAGE_PATH = os.path.join(paths['IMAGES_PATH'], 'train', 'images')
ANNOTATION_PATH = os.path.join(paths['IMAGES_PATH'], 'train', 'annotations')
OUTPUT_IMAGE_PATH = os.path.join(paths['IMAGES_PATH'], 'augmented', 'images')
OUTPUT_ANNOTATION_PATH = os.path.join(paths['IMAGES_PATH'], 'augmented', 'annotations')
SAMPLE_RATIO = 0.7

random_sampling_and_augmentation(IMAGE_PATH, ANNOTATION_PATH, OUTPUT_IMAGE_PATH, OUTPUT_ANNOTATION_PATH, SAMPLE_RATIO)
print('Data augmentation completed.')

## Image dimension checking

In [None]:
def check_image_dimensions(IMAGE_PATH, TARGET_WIDTH, TARGET_HEIGHT):
    
    for image_file in os.listdir(IMAGE_PATH):
        if image_file.lower().endswith(('.png', '.jpg', '.jpeg')):
            with Image.open(os.path.join(IMAGE_PATH, image_file)) as img:
                width, height = img.size
                if width != TARGET_WIDTH or height != TARGET_HEIGHT:
                    print(f"Image {image_file} has dimensions {width}x{height} instead of {TARGET_WIDTH}x{TARGET_HEIGHT}")
                    return False
    return True

In [None]:
IMAGE_PATH = os.path.join(paths['IMAGES_PATH'], 'train', 'images')
TARGET_WIDTH = 320
TARGET_HEIGHT = 320

if check_image_dimensions(IMAGE_PATH, TARGET_WIDTH, TARGET_HEIGHT):
    print("All images have the desired dimensions!")
else:
    print("Some images do not match the desired dimensions.")

## Images with corresponding annotations checking

In [None]:
import os

def check_annotations(IMAGE_PATH, ANNOTATION_PATH):
   
    missing_annotations = []

    # Iterate over each image in the image folder
    for image_file in os.listdir(IMAGE_PATH):
        if image_file.lower().endswith(('.jpg')):
            # Construct the expected annotation filename
            expected_annotation = os.path.splitext(image_file)[0] + '.xml'
            
            # Check if the annotation exists
            if not os.path.exists(os.path.join(ANNOTATION_PATH, expected_annotation)):
                missing_annotations.append(image_file)

    # If there are missing annotations, print them and return False
    if missing_annotations:
        print("The following images are missing annotations:")
        for image in missing_annotations:
            print(image)
        return False

    return True

In [None]:
IMAGE_PATH = os.path.join(paths['IMAGES_PATH'], 'train', 'images')
ANNOTATION_PATH = os.path.join(paths['IMAGES_PATH'], 'train', 'annotations')
if check_annotations(IMAGE_PATH, ANNOTATION_PATH):
    print("All images have corresponding annotations!")
else:
    print("Some images are missing annotations.")

## Files renaming

In [None]:
# Path to the directory containing the images
directory_path = 'D:/# fyp_dataset/final-labeled/stair_left'

for filename in os.listdir(directory_path):
    if filename.startswith("stair_left"):
        # Extract the number and the extension
        parts = filename.split("_")
        number_part = parts[2].split('.')[0]
        extension = parts[2].split('.')[1]

        # Create the new filename
        new_filename = f"stair_right_{number_part}.{extension}"

        # Rename the file
        os.rename(os.path.join(directory_path, filename), os.path.join(directory_path, new_filename))
        print(f"Renamed {filename} to {new_filename}")

print("Renaming process complete.")

## Convert XML to CSV

In [None]:
def xml_to_csv(base_path, data_type):
    assert data_type in ['train', 'test', 'validate'], "data_type should be either 'train', 'test', or 'validate'"
    
    # Navigating to the annotations folder inside the data_type directory
    path = os.path.join(base_path, data_type, 'annotations')
    
    xml_list = []
    for xml_file in glob.glob(path + '/*.xml'):
        tree = ET.parse(xml_file)
        root = tree.getroot()
        for member in root.findall('object'):
            value = (root.find('filename').text,
                     int(root.find('size')[0].text),
                     int(root.find('size')[1].text),
                     member[0].text,
                     int(member[4][0].text),
                     int(member[4][1].text),
                     int(member[4][2].text),
                     int(member[4][3].text)
                     )
            xml_list.append(value)
            
    column_name = ['filename', 'width', 'height', 'class', 'xmin', 'ymin', 'xmax', 'ymax']
    xml_df = pd.DataFrame(xml_list, columns=column_name)
    csv_path = os.path.join(base_path, f'{data_type}.csv')
    xml_df.to_csv(csv_path, index=None)
    
    print(f'Successfully converted xml in {data_type}/annotations folder to {data_type}.csv in the base path.')

In [None]:
base_path = paths['IMAGES_PATH']
#base_path = "D:\\# fyp_dataset\\balanced"
for dtype in ['train', 'test', 'validate']:
    xml_to_csv(base_path, dtype)

# CSV checking

In [None]:
def display_value_counts(file_path, column_name):
    """Load CSV, and display unique value counts for a specific column."""
    df = pd.read_csv(file_path)
    value_counts = df[column_name].value_counts()
    
    print(f"File: {os.path.basename(file_path)}")
    for category, count in value_counts.items():
        print(f"{count} {category}")
    print("-----\n")

In [None]:
csv_files = [
    os.path.join(paths['RESOURCES_PATH'], 'train.csv'),
    os.path.join(paths['RESOURCES_PATH'], 'validate.csv'),
    os.path.join(paths['RESOURCES_PATH'], 'test.csv')
]

column_name = 'class'
for csv_file in csv_files:
    display_value_counts(csv_file, column_name)

# Model development

## Setup object detection api

In [None]:
# setup protobuf
os.environ['PATH'] += os.pathsep + os.path.abspath(os.path.join(paths['PROTOC_PATH'], 'bin'))   
!cd FYP/models/research && protoc object_detection/protos/*.proto --python_out=. && copy object_detection\\packages\\tf2\\setup.py setup.py && python setup.py build && python setup.py install
!cd FYP/models/research/slim && pip install -e . 

In [None]:
VERIFICATION_SCRIPT = os.path.join(paths['APIMODEL_PATH'], 'research', 'object_detection', 'builders', 'model_builder_tf2_test.py')
# Verify Installation
!python {VERIFICATION_SCRIPT}

In [None]:
import object_detection

In [None]:
wget.download(PRETRAINED_MODEL_URL)
!move {PRETRAINED_MODEL_NAME+'.tar.gz'} {paths['PRETRAINED_MODEL_PATH']}
!cd {paths['PRETRAINED_MODEL_PATH']} && tar -zxvf {PRETRAINED_MODEL_NAME+'.tar.gz'}

## Create labelmap

In [None]:
labels = [{'name':'entrance_front', 'id':1}, {'name':'entrance_left', 'id':2}, {'name':'entrance_right', 'id':3}, 
          {'name':'escalator_front', 'id':4}, {'name':'escalator_left', 'id':5}, {'name':'escalator_right', 'id':6}, 
          {'name':'stair_front', 'id':7}, {'name':'stair_left', 'id':8}, {'name':'stair_right', 'id':9}
         ]

with open(files['LABELMAP'], 'w') as f:
    for label in labels:
        f.write('item { \n')
        f.write('\tname:\'{}\'\n'.format(label['name']))
        f.write('\tid:{}\n'.format(label['id']))
        f.write('}\n')

## Create TF records

In [None]:
!python {files['TF_RECORD_SCRIPT']} --csv_input={os.path.join(paths['RESOURCES_PATH'], 'train.csv')} --image_dir={os.path.join(paths['IMAGES_PATH'], 'train', 'images')} --labels_path={files['LABELMAP']} --output_path={os.path.join(paths['RESOURCES_PATH'], 'train.record')} 
!python {files['TF_RECORD_SCRIPT']} --csv_input={os.path.join(paths['RESOURCES_PATH'], 'validate.csv')} --image_dir={os.path.join(paths['IMAGES_PATH'], 'validate', 'images')} --labels_path={files['LABELMAP']} --output_path={os.path.join(paths['RESOURCES_PATH'], 'validate.record')} 

## Model training

In [None]:
TRAINING_SCRIPT = os.path.join(paths['APIMODEL_PATH'], 'research', 'object_detection', 'model_main_tf2.py')

In [None]:
command = "python {} --model_dir={} --pipeline_config_path={} --num_train_steps=50000".format(TRAINING_SCRIPT, paths['CHECKPOINT_PATH'],files['PIPELINE_CONFIG'])

In [None]:
print(command)

## Model evaluation

In [None]:
command = "python {} --model_dir={} --pipeline_config_path={} --checkpoint_dir={}".format(TRAINING_SCRIPT, paths['CHECKPOINT_PATH'],files['PIPELINE_CONFIG'], paths['CHECKPOINT_PATH'])

In [None]:
print(command)

### Import model and necessary components

In [None]:
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder
from object_detection.utils import config_util

In [None]:
# Load pipeline config and build a detection model
configs = config_util.get_configs_from_pipeline_file(files['PIPELINE_CONFIG'])
detection_model = model_builder.build(model_config=configs['model'], is_training=False)

# Restore checkpoint
ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
ckpt.restore(os.path.join(paths['CHECKPOINT_PATH'], 'ckpt-51')).expect_partial()

@tf.function
def detect_fn(image):
    image, shapes = detection_model.preprocess(image)
    prediction_dict = detection_model.predict(image, shapes)
    detections = detection_model.postprocess(prediction_dict, shapes)
    return detections

### Setup necessary components

In [None]:
from matplotlib import pyplot as plt
%matplotlib inline
tf.config.run_functions_eagerly(True)

category_index = label_map_util.create_category_index_from_labelmap(files['LABELMAP'])

In [None]:
from object_detection.utils import visualization_utils as viz_utils
import matplotlib.pyplot as plt

def detect_and_visualize_image(image_path, detect_fn, category_index, label_id_offset=1, max_boxes_to_draw=5, min_score_thresh=.7):
    """
    Detects and visualizes objects in the image specified by the image path.
    
    Parameters:
    - image_path: Path to the image.
    - detect_fn: Detection function.
    - category_index: Index of categories.
    - label_id_offset: Offset for label IDs.
    - max_boxes_to_draw: Maximum number of boxes to draw.
    - min_score_thresh: Minimum score threshold for visualizing a detection.
    
    Returns:
    - width_in_pixels: Width of the detected object in pixels.
    - image_np_with_detections: Image with visualized detections.
    """
    
    img = cv2.imread(image_path)
    image_np = np.array(img)

    input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
    detections = detect_fn(input_tensor)

    num_detections = int(detections.pop('num_detections'))
    detections = {key: value[0, :num_detections].numpy()
                  for key, value in detections.items()}
    detections['num_detections'] = num_detections
    detections['detection_classes'] = detections['detection_classes'].astype(np.int64)

    image_np_with_detections = image_np.copy()

    viz_utils.visualize_boxes_and_labels_on_image_array(
                image_np_with_detections,
                detections['detection_boxes'],
                detections['detection_classes']+label_id_offset,
                detections['detection_scores'],
                category_index,
                use_normalized_coordinates=True,
                max_boxes_to_draw=max_boxes_to_draw,
                min_score_thresh=min_score_thresh,
                agnostic_mode=False)

    image_height, image_width, _ = image_np_with_detections.shape
    box = detections['detection_boxes'][0]
    xmin = int(box[1] * image_width)
    xmax = int(box[3] * image_width)
    width_in_pixels = xmax - xmin

    return width_in_pixels, image_np_with_detections

In [None]:
image_paths = [
    os.path.join(paths['IMAGES_PATH'], 'test', 'images', 'entrance_front_446.jpg'),
    os.path.join(paths['IMAGES_PATH'], 'test', 'images', 'escalator_left_746.jpg'),
    os.path.join(paths['IMAGES_PATH'], 'test', 'images', 'stair_right_624.jpg'),
#     # MRT Muzium Negara
#     'D:/# fyp_dataset/references/resized/mrt muzium negara/entrance/entrance_front3.jpg',
#     'D:/# fyp_dataset/references/resized/mrt muzium negara/entrance/entrance_left.jpg',
#     'D:/# fyp_dataset/references/resized/mrt muzium negara/entrance/entrance_right.jpg',

#     'D:/# fyp_dataset/references/resized/mrt muzium negara/escalator/escalator_front.jpg',
#     'D:/# fyp_dataset/references/resized/mrt muzium negara/escalator/escalator_left.jpg',
#     'D:/# fyp_dataset/references/resized/mrt muzium negara/escalator/escalator_right.jpg',
    
#     'D:/# fyp_dataset/references/resized/mrt muzium negara/stair/stair_front.jpg',
#     'D:/# fyp_dataset/references/resized/mrt muzium negara/stair/stair_left.jpg',
#     'D:/# fyp_dataset/references/resized/mrt muzium negara/stair/stair_right.jpg',
    
#     # LRT KL Sentral
#     'D:/# fyp_dataset/references/resized/lrt kl sentral/entrance/entrance_front3.jpg',
#     'D:/# fyp_dataset/references/resized/lrt kl sentral/entrance/entrance_left2.jpg',
#     'D:/# fyp_dataset/references/resized/lrt kl sentral/entrance/entrance_right.jpg',
    
#     'D:/# fyp_dataset/references/resized/lrt kl sentral/escalator/escalator_front.jpg',
#     'D:/# fyp_dataset/references/resized/lrt kl sentral/escalator/escalator_left.jpg',
#     'D:/# fyp_dataset/references/resized/lrt kl sentral/escalator/escalator_right.jpg',
    
#     'D:/# fyp_dataset/references/resized/lrt kl sentral/stair/stair_front.jpg',
#     'D:/# fyp_dataset/references/resized/lrt kl sentral/stair/stair_left.jpg',
#     'D:/# fyp_dataset/references/resized/lrt kl sentral/stair/stair_right.jpg',
    
#     # LRT Putra Heights
#     'D:/# fyp_dataset/references/resized/lrt putra heights/entrance/entrance_front.jpg',
#     'D:/# fyp_dataset/references/resized/lrt putra heights/entrance/entrance_left.jpg',
#     'D:/# fyp_dataset/references/resized/lrt putra heights/entrance/entrance_right.jpg',
    
#     'D:/# fyp_dataset/references/resized/lrt putra heights/escalator/escalator_front.jpg',
#     'D:/# fyp_dataset/references/resized/lrt putra heights/escalator/escalator_left.jpg',
#     'D:/# fyp_dataset/references/resized/lrt putra heights/escalator/escalator_right.jpg',    
    
#     'D:/# fyp_dataset/references/resized/lrt putra heights/stair/stair_front2.jpg',
#     'D:/# fyp_dataset/references/resized/lrt putra heights/stair/stair_left.jpg',
#     'D:/# fyp_dataset/references/resized/lrt putra heights/stair/stair_right.jpg'
]

for image_path in image_paths:
    width, detected_image = detect_and_visualize_image(image_path, detect_fn, category_index)
    plt.imshow(cv2.cvtColor(detected_image, cv2.COLOR_BGR2RGB))
    plt.title(f"Width in pixels: {width}")
    plt.show()

# Save and export

In [None]:
FREEZE_SCRIPT = os.path.join(paths['APIMODEL_PATH'], 'research', 'object_detection', 'exporter_main_v2.py ')

In [None]:
command = "python {} --input_type=image_tensor --pipeline_config_path={} --trained_checkpoint_dir={} --output_directory={}".format(FREEZE_SCRIPT ,files['PIPELINE_CONFIG'], paths['CHECKPOINT_PATH'], paths['OUTPUT_PATH'])

In [None]:
print(command)

# TFlite conversion

In [None]:
TFLITE_SCRIPT = os.path.join(paths['APIMODEL_PATH'], 'research', 'object_detection', 'export_tflite_graph_tf2.py ')

In [None]:
command = "python {} --pipeline_config_path={} --trained_checkpoint_dir={} --output_directory={}".format(TFLITE_SCRIPT ,files['PIPELINE_CONFIG'], paths['CHECKPOINT_PATH'], paths['TFLITE_PATH'])

In [None]:
print(command)

In [None]:
FROZEN_TFLITE_PATH = os.path.join(paths['TFLITE_PATH'], 'saved_model')
TFLITE_MODEL = os.path.join(paths['TFLITE_PATH'], 'saved_model', 'vizio.tflite')

In [None]:
command = "tflite_convert \
--saved_model_dir={} \
--output_file={} \
--input_shapes=1,320,320,3 \
--input_arrays=normalized_input_image_tensor \
--output_arrays='TFLite_Detection_PostProcess','TFLite_Detection_PostProcess:1','TFLite_Detection_PostProcess:2','TFLite_Detection_PostProcess:3' \
--inference_type=FLOAT \
--allow_custom_ops".format(FROZEN_TFLITE_PATH, TFLITE_MODEL, )

In [None]:
print(command)

# TFlite model evaluation

## Random image evaluation

In [None]:
# Load TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(TFLITE_MODEL)
interpreter.allocate_tensors()

# Get input and output tensors.
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Test model on random input data.
input_shape = input_details[0]['shape']
input_data = np.array(np.random.random_sample(input_shape), dtype=np.float32)
interpreter.set_tensor(input_details[0]['index'], input_data)

interpreter.invoke()

# The function `get_tensor()` returns a copy of the tensor data.
# Use `tensor()` in order to get a pointer to the tensor.
output_data = interpreter.get_tensor(output_details[0]['index'])
print(output_data)

## Input image evaluation

In [None]:
# Load the TFLite interpreter
interpreter = tf.lite.Interpreter(model_path=os.path.join(paths['TFLITE_PATH'], 'saved_model', 'vizio.tflite'))
interpreter.allocate_tensors()

# Get input and output details
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Examine which output is which
for detail in output_details:
    print(detail['name'], detail['shape'])
print()    

# Load the test image
im = cv2.imread('D:/# fyp_dataset/references/resized/lrt putra heights/escalator/escalator_left.jpg')

# Convert the image from BGR to RGB
im_rgb = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)

# Resize the image to match the input shape
input_shape = input_details[0]['shape']
im_rgb = cv2.resize(im_rgb, (input_shape[1], input_shape[2]))  # e.g., 320x320

# Normalize the image to [0,1]
input_data = np.expand_dims(im_rgb, axis=0).astype(np.float32) / 255.0

# Set the input tensor for the interpreter
interpreter.set_tensor(input_details[0]['index'], input_data)

# Run inference
interpreter.invoke()

# Get the results
detection_scores = interpreter.get_tensor(output_details[0]['index'])
detection_locations = interpreter.get_tensor(output_details[1]['index'])
num_boxes = interpreter.get_tensor(output_details[2]['index'])
detection_classes = interpreter.get_tensor(output_details[3]['index'])

# Print the results
print("==detection_scores==")
print(detection_scores)
print("\n==detection_locations==")
print(detection_locations)
print("\n==num_boxes==")
print(num_boxes)
print("\n==detection_classes==")
print(detection_classes)

# Focal length calculation

## Actual width

### MRT Muzium Negara
entrance = 54\
escalator = 59\
stair = 86


### LRT KL Sentral
entrance = 64\
escalator = 54\
stair = 60


### LRT Putra Heights
entrance = 64\
escalator = 57\
stair = 71

### Find average of actual width for each item

In [1]:
avg_entrance_width = (54+64+64) / 3
avg_escalator_width = (59+54+57) / 3
avg_stair_width = (86+60+71) / 3

print(f'Average actual width of entrance: {avg_entrance_width}')
print(f'Average actual width of escalator: {avg_escalator_width}')
print(f'Average actual width of stair: {avg_stair_width}')

Average actual width of entrance: 60.666666666666664
Average actual width of escalator: 56.666666666666664
Average actual width of stair: 72.33333333333333


## Width In Pixels (WIP)

In [2]:
# MRT Muzium Negara
MMM_entrance_front = 72
MMM_entrance_left = 52
MMM_entrance_right = 45

MMM_escalator_front = 69
MMM_escalator_left = 68
MMM_escalator_right = 65

MMM_stair_front = 80
MMM_stair_left = 57
MMM_stair_right = 77


# LRT KL Sentral
LKS_entrance_front = 48
LKS_entrance_left = 41
LKS_entrance_right = 37

LKS_escalator_front = 55
LKS_escalator_left = 51
LKS_escalator_right = 54

LKS_stair_front = 57
LKS_stair_left = 60
LKS_stair_right = 55


# LRT Putra Heights
LPH_entrance_front = 46
LPH_entrance_left = 32
LPH_entrance_right = 37


LPH_escalator_front = 53
LPH_escalator_left = 43
LPH_escalator_right = 31

LPH_stair_front = 64
LPH_stair_left = 53
LPH_stair_right = 57

### Find average of Width In Pixels (WIP) for each class

In [3]:
avg_wip_entrance_front = (MMM_entrance_front + LKS_entrance_front + LPH_entrance_front) / 3
avg_wip_entrance_left = (MMM_entrance_left + LKS_entrance_left + LPH_entrance_left) / 3
avg_wip_entrance_right = (MMM_entrance_right + LKS_entrance_right + LPH_entrance_right) / 3
avg_wip_escalator_front = (MMM_escalator_front + LKS_escalator_front + LPH_escalator_front) / 3
avg_wip_escalator_left = (MMM_escalator_left + LKS_escalator_left + LPH_escalator_left) / 3
avg_wip_escalator_right = (MMM_escalator_right + LKS_escalator_right + LPH_escalator_right) / 3
avg_wip_stair_front = (MMM_stair_front + LKS_stair_front + LPH_stair_front) / 3
avg_wip_stair_left = (MMM_stair_left + LKS_stair_left + LPH_stair_left) / 3
avg_wip_stair_right = (MMM_stair_right + LKS_stair_right + LPH_stair_right) / 3

print(f'Average WIP of entrance front: {avg_wip_entrance_front}')
print(f'Average WIP of entrance left: {avg_wip_entrance_left}')
print(f'Average WIP of entrance right: {avg_wip_entrance_right}')
print(f'Average WIP of escalator front: {avg_wip_escalator_front}')
print(f'Average WIP of escalator left: {avg_wip_escalator_left}')
print(f'Average WIP of escalator right: {avg_wip_escalator_right}')
print(f'Average WIP of stair front: {avg_wip_stair_front}')
print(f'Average WIP of stair left: {avg_wip_stair_left}')
print(f'Average WIP of stair right: {avg_wip_stair_right}')

Average WIP of entrance front: 55.333333333333336
Average WIP of entrance left: 41.666666666666664
Average WIP of entrance right: 39.666666666666664
Average WIP of escalator front: 59.0
Average WIP of escalator left: 54.0
Average WIP of escalator right: 50.0
Average WIP of stair front: 67.0
Average WIP of stair left: 56.666666666666664
Average WIP of stair right: 63.0


### Find average of side angles (left and right)

In [4]:
# Left and right should be the same, thus finding average may help the accuracy

avg_wip_entrance_side = (avg_wip_entrance_left + avg_wip_entrance_right) / 2
avg_wip_escalator_side = (avg_wip_escalator_left + avg_wip_escalator_right) / 2
avg_wip_stair_side = (avg_wip_stair_left + avg_wip_stair_right) / 2

print(f'Average WIP of entrance side: {avg_wip_entrance_side}')
print(f'Average WIP of escalator side: {avg_wip_escalator_side}')
print(f'Average WIP of stair side: {avg_wip_stair_side}')

Average WIP of entrance side: 40.666666666666664
Average WIP of escalator side: 52.0
Average WIP of stair side: 59.83333333333333


In [5]:
def focal_length_finder (measured_distance, real_width, width_in_pixels):
    focal_length = (width_in_pixels * measured_distance) / real_width

    return focal_length

In [6]:
ACTUAL_DISTANCE = 80.0 #inches

In [7]:
# entrance
focal_entrance_front = focal_length_finder(ACTUAL_DISTANCE, 
                                           avg_entrance_width, 
                                           avg_wip_entrance_front)

focal_entrance_side = focal_length_finder(ACTUAL_DISTANCE, 
                                           avg_entrance_width, 
                                           avg_wip_entrance_side)

# escalator
focal_escalator_front = focal_length_finder(ACTUAL_DISTANCE, 
                                           avg_escalator_width, 
                                           avg_wip_escalator_front)

focal_escalator_side = focal_length_finder(ACTUAL_DISTANCE, 
                                           avg_escalator_width, 
                                           avg_wip_escalator_side)

#stair
focal_stair_front = focal_length_finder(ACTUAL_DISTANCE, 
                                           avg_stair_width, 
                                           avg_wip_stair_front)

focal_stair_side = focal_length_finder(ACTUAL_DISTANCE, 
                                           avg_stair_width, 
                                           avg_wip_stair_side)


print(f'Focal entrance front: {focal_entrance_front}')
print(f'Focal entrance side: {focal_entrance_side}')
print(f'Focal escalator front: {focal_escalator_front}')
print(f'Focal escalator side: {focal_escalator_side}')
print(f'Focal stair front: {focal_stair_front}')
print(f'Focal stair side: {focal_stair_side}')

Focal entrance front: 72.96703296703298
Focal entrance side: 53.62637362637362
Focal escalator front: 83.29411764705883
Focal escalator side: 73.41176470588236
Focal stair front: 74.10138248847927
Focal stair side: 66.17511520737327
