# Machine Learning Development [CH2-PS579]


*   Muhammad Aditya Hasta Pratama (M299BSY0188) - ML - Universitas Pendidikan Indonesia / Active
*   Shereva Miranda (...) - ML - Institut Teknologi Bandung / Active
*   Reza Nugraha (...) - ML - Institut Teknologi Bandung / Inactive

</br>

Reference Tutorial :     

*   [Tensorflow 2 Custom Object Detection Model by Lazy Tech](https://www.youtube.com/watch?v=8ktcGQ-XreQ&t=553s&ab_channel=LazyTech).
*   [Train a Deep Learning Model for Custom Object Detection Using TensorFlow by TechZizou](https://www.youtube.com/watch?v=amURyS6CAaY&t=69s&ab_channel=techzizou)


# PREPARATION

Library and installation that are needed for running the architecture.

In [None]:
# Install tensorFlow

!pip install tensorflow=="2.13.0"

In [None]:
# Download models for object detection

import os
import pathlib

if "models" in pathlib.Path.cwd().parts:
  while "models" in pathlib.Path.cwd().parts:
    os.chdir('..')
elif not pathlib.Path('models').exists():
  !git clone --depth 1 https://github.com/tensorflow/models

In [None]:
# Install the object detection API

%%bash
cd models/research/
protoc object_detection/protos/*.proto --python_out=.
cp object_detection/packages/tf2/setup.py .
python -m pip install .

In [None]:
# Testing model builder

!python /content/models/research/object_detection/builders/model_builder_tf2_test.py

# PREPARE DATASET

Preparing receipt dataset, tfrecords, and labelmap from kaggle.

In [None]:
# Install kaggle API

!pip install -q kaggle
!pip install -q kaggle-cli

In [None]:
# Setting environtment for kaggle API
# Change the username or the key to match yours

import os

username = "mdhstama23"
key = "b6eab32f437fce47e2d9ef5a4ac57117"

os.environ['KAGGLE_USERNAME'] = username
os.environ['KAGGLE_KEY'] = key

In [None]:
# Download the dataset

!kaggle datasets download -d mdhstama23/receipt-invoice-ml-ch2ps579 --unzip
!ls

In [None]:
# Create label_map.pbtxt files

# Input
number_class = 1
class_labels = ['total']

# Code create
label_map_content = ''
for idx, label in enumerate(class_labels, start=1):
    label_map_content += f"item {{\n  id: {idx}\n  name: '{label}'\n}}\n"

labelmap_path = '/content/label_map.pbtxt'  # Change the path as needed
with open(labelmap_path, 'w') as labelmap_file:
    labelmap_file.write(label_map_content)

# Display
!cat '/content/labelmap.pbtxt'

In [None]:
# Create the xml-to-csv.py files

convert_xml_csv_content = """
import os
import glob
import pandas as pd
import xml.etree.ElementTree as ET

def xml_to_csv(path):
    xml_list = []
    for xml_file in glob.glob(path + '/*.xml'):
        tree = ET.parse(xml_file)
        root = tree.getroot()
        for member in root.findall('object'):
            value = (root.find('filename').text,
                     int(root.find('size')[0].text),
                     int(root.find('size')[1].text),
                     member[0].text,
                     int(member[4][0].text),
                     int(member[4][1].text),
                     int(member[4][2].text),
                     int(member[4][3].text)
                     )
            xml_list.append(value)
    column_name = ['filename', 'width', 'height', 'class', 'xmin', 'ymin', 'xmax', 'ymax']
    xml_df = pd.DataFrame(xml_list, columns=column_name)
    return xml_df

def main():
    for folder in ['train', 'test']:
        image_path = os.path.join(os.getcwd(), (folder))
        xml_df = xml_to_csv(image_path)
        xml_df.to_csv(('csv/'+folder+'_labels.csv'), index=None)
    print('Successfully converted xml to csv.')

main()
"""

# Saved the file code

# Specify the file path
file_path = r"./content/models/research/object_detection/convert_xml_csv.py"

# Save the code to the file
with open(file_path, "w") as file:
    file.write(convert_xml_csv_content)

print(f"Code saved to {file_path}")

In [None]:
# Run the xml-to-csv code

!python convert_xml_csv.py

In [None]:
# Create the tfrecords.py files

tfrecords_content = """
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import os
import io
import pandas as pd

from tensorflow.python.framework.versions import VERSION
if VERSION >= "2.0.0a0":
    import tensorflow.compat.v1 as tf
else:
    import tensorflow as tf

from PIL import Image
from object_detection.utils import dataset_util
from collections import namedtuple, OrderedDict

flags = tf.app.flags
flags.DEFINE_string('csv_input', '', 'Path to the CSV input')
flags.DEFINE_string('output_path', '', 'Path to output TFRecord')
flags.DEFINE_string('image_dir', '', 'Path to images')
FLAGS = flags.FLAGS

def class_text_to_int(row_label):
    if row_label == 'total':
        return 1
    else:
        return None

def split(df, group):
    data = namedtuple('data', ['filename', 'object'])
    gb = df.groupby(group)
    return [data(filename, gb.get_group(x)) for filename, x in zip(gb.groups.keys(), gb.groups)]

def create_tf_example(group, path):
    with tf.gfile.GFile(os.path.join(path, '{}'.format(group.filename)), 'rb') as fid:
        encoded_jpg = fid.read()
    encoded_jpg_io = io.BytesIO(encoded_jpg)
    image = Image.open(encoded_jpg_io)
    width, height = image.size

    filename = group.filename.encode('utf8')
    image_format = b'jpg'
    xmins = []
    xmaxs = []
    ymins = []
    ymaxs = []
    classes_text = []
    classes = []

    for index, row in group.object.iterrows():
        xmins.append(row['xmin'] / width)
        xmaxs.append(row['xmax'] / width)
        ymins.append(row['ymin'] / height)
        ymaxs.append(row['ymax'] / height)
        classes_text.append(row['class'].encode('utf8'))
        classes.append(class_text_to_int(row['class']))

    tf_example = tf.train.Example(features=tf.train.Features(feature={
        'image/height': dataset_util.int64_feature(height),
        'image/width': dataset_util.int64_feature(width),
        'image/filename': dataset_util.bytes_feature(filename),
        'image/source_id': dataset_util.bytes_feature(filename),
        'image/encoded': dataset_util.bytes_feature(encoded_jpg),
        'image/format': dataset_util.bytes_feature(image_format),
        'image/object/bbox/xmin': dataset_util.float_list_feature(xmins),
        'image/object/bbox/xmax': dataset_util.float_list_feature(xmaxs),
        'image/object/bbox/ymin': dataset_util.float_list_feature(ymins),
        'image/object/bbox/ymax': dataset_util.float_list_feature(ymaxs),
        'image/object/class/text': dataset_util.bytes_list_feature(classes_text),
        'image/object/class/label': dataset_util.int64_list_feature(classes),
    }))
    return tf_example


def main(_):
    writer = tf.python_io.TFRecordWriter(FLAGS.output_path)
    path = os.path.join(FLAGS.image_dir)
    examples = pd.read_csv(FLAGS.csv_input)
    grouped = split(examples, 'filename')
    for group in grouped:
        tf_example = create_tf_example(group, path)
        writer.write(tf_example.SerializeToString())

    writer.close()
    output_path = os.path.join(os.getcwd(), FLAGS.output_path)
    print('Successfully created the TFRecords: {}'.format(output_path))


if __name__ == '__main__':
    tf.app.run()

# commands:
# python generate_tfrecord.py --csv_input=dataset/test_labels.csv --image_dir=dataset/test --output_path=test.record
# python generate_tfrecord.py --csv_input=dataset/train_labels.csv --image_dir=dataset/train --output_path=train.record
"""

# Saved the file code

# Specify the file path
file_path = r"../content/models/research/object_detection/generate_tfrecord.py"

# Save the code to the file
with open(file_path, "w") as file:
    file.write(tfrecords_content)

print(f"Code saved to {file_path}")

In [None]:
# Run the tfrecords cod

!python generate_tfrecord.py --csv_input=train_labels.csv --image_dir=train --output_path=train.record
!python generate_tfrecord.py --csv_input=test_labels.csv --image_dir=test --output_path=test.record

# ARCHITECTURE OR MODEL CONFIGURATION

Configuration training model with model that avaiable in [Tensorflow 2 Detection Model Zoo](https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/tf2_detection_zoo.md).

In [None]:
# Initialization

train_record_path = 'train.record'
test_record_path = 'test.record'
labelmap_path = 'label_map.pbtxt'

num_classes = 1
fine_tune_checkpoint_type = 'detection'
batch_size = 32
num_steps = 1000
num_eval_steps = 1000

In [None]:
# Download the architecture
# !wget <link model>
# !tar -xf <name file model.gz>

# model 1
!wget http://download.tensorflow.org/models/object_detection/tf2/20200711/efficientdet_d0_coco17_tpu-32.tar.gz
!tar -xf efficientdet_d0_coco17_tpu-32.tar.gz

# model 2
# !wget http://download.tensorflow.org/models/object_detection/tf2/20200711/faster_rcnn_resnet50_v1_800x1333_coco17_gpu-8.tar.gz
# !tar -xf faster_rcnn_resnet50_v1_800x1333_coco17_gpu-8.tar.gz

In [None]:
# Checkpoint initialization

# model 1
fine_tune_checkpoint = 'efficientdet_d0_coco17_tpu-32/checkpoint/ckpt-0'

# model 2
# fine_tune_checkpoint = 'faster_rcnn_resnet50_v1_800x1333_coco17_gpu-8/checkpoint/ckpt-0'

In [None]:
# Download the config file
# !wget <link config>
# base_config_path = <name file config>

# model 1
!wget https://raw.githubusercontent.com/tensorflow/models/master/research/object_detection/configs/tf2/ssd_efficientdet_d0_512x512_coco17_tpu-8.config
base_config_path = 'ssd_efficientdet_d0_512x512_coco17_tpu-8.config'

# model 2
# !wget https://raw.githubusercontent.com/tensorflow/models/master/research/object_detection/configs/tf2/faster_rcnn_resnet50_v1_800x1333_coco17_gpu-8.config
# base_config_path = 'faster_rcnn_resnet50_v1_800x1333_coco17_gpu-8.config'

In [None]:
# Edit the config file

import re

with open(base_config_path) as f:
    config = f.read()

with open('model_config.config', 'w') as f:

  # Set number of classes.
  config = re.sub('num_classes: [0-9]+',
                  'num_classes: {}'.format(num_classes), config)

  # Set fine_tune_checkpoint path
  config = re.sub('fine_tune_checkpoint: ".*?"',
                  'fine_tune_checkpoint: "{}"'.format(fine_tune_checkpoint), config)

  # Set fine-tune checkpoint type to detection
  config = re.sub('fine_tune_checkpoint_type: "classification"',
             'fine_tune_checkpoint_type: "{}"'.format('detection'), config)

  # Set batch size
  config = re.sub('batch_size: [0-9]+',
                  'batch_size: {}'.format(batch_size), config)

  # Set training steps
  config = re.sub('num_steps: [0-9]+',
                  'num_steps: {}'.format(num_steps), config)

  # Set labelmap path
  config = re.sub('label_map_path: ".*?"',
             'label_map_path: "{}"'.format(labelmap_path), config)

  # Set train tf-record file path
  config = re.sub('(input_path: ".*?)(PATH_TO_BE_CONFIGURED/train)(.*?")',
                  'input_path: "{}"'.format(train_record_path), config)

  # Set test tf-record file path
  config = re.sub('(input_path: ".*?)(PATH_TO_BE_CONFIGURED/val)(.*?")',
                  'input_path: "{}"'.format(test_record_path), config)

  f.write(config)

In [None]:
# Check the config file

%cat model_config.config

# TRAINING THE MODEL

Traiing the model based the architecture configuration before

In [None]:
# Initialization

model_dir = 'training/'
pipeline_config_path = 'model_config.config'

In [None]:
# Run the model

!python /content/models/research/object_detection/model_main_tf2.py \
    --pipeline_config_path={pipeline_config_path} \
    --model_dir={model_dir} \
    --alsologtostderr \
    --num_train_steps={num_steps} \
    --sample_1_of_n_eval_examples=1 \
    --num_eval_steps={num_eval_steps}

# EXPORT THE MODEL

Export the model of training so the model can be used for the next step, which is OCR.

In [None]:
# Export inference graph

output_directory = 'inference_graph'

!python /content/models/research/object_detection/exporter_main_v2.py \
    --trained_checkpoint_dir {model_dir} \
    --output_directory {output_directory} \
    --pipeline_config_path {pipeline_config_path}

In [None]:
# Download the model

from google.colab import files
!zip -r new_model.zip /content/{output_directory}/saved_model
files.download(f'new_model.zip')

# TESTING IMAGES

This step tests whether the model successfully performs object detection

In [4]:
# Import necessary libraries

testing_content = """
import numpy as np
import argparse
import os
from tensorflow import tf
from PIL import Image
from io import BytesIO
import glob
import matplotlib.pyplot as plt

# Import utility functions for object detection
from object_detection.utils import ops as utils_ops
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util

# patch tf1 into `utils.ops`
utils_ops.tf = tf.compat.v1

# Patch the location of gfile
tf.gfile = tf.io.gfile

# Function to load the pre-trained model
def load_model(model_path):
    model = tf.saved_model.load(model_path)
    return model

# Function to load an image into a numpy array
def load_image_into_numpy_array(path):
    img_data = tf.io.gfile.GFile(path, 'rb').read()
    image = Image.open(BytesIO(img_data))
    (im_width, im_height) = image.size
    return np.array(image.getdata()).reshape(
        (im_height, im_width, 3)).astype(np.uint8)

# Function to run inference for a single image using the loaded model
def run_inference_for_single_image(model, image):
    # The input needs to be a tensor, convert it using `tf.convert_to_tensor`.
    input_tensor = tf.convert_to_tensor(image)
    # The model expects a batch of images, so add an axis with `tf.newaxis`.
    input_tensor = input_tensor[tf.newaxis, ...]

    # Run inference
    output_dict = model(input_tensor)

    # All outputs are batches tensors.
    # Convert to numpy arrays, and take index [0] to remove the batch dimension.
    # We're only interested in the first num_detections.
    num_detections = int(output_dict.pop('num_detections'))
    output_dict = {key: value[0, :num_detections].numpy()
                   for key, value in output_dict.items()}
    output_dict['num_detections'] = num_detections

    # detection_classes should be ints.
    output_dict['detection_classes'] = output_dict['detection_classes'].astype(np.int64)

    # Handle models with masks:
    if 'detection_masks' in output_dict:
        # Reframe the bbox mask to the image size.
        detection_masks_reframed = utils_ops.reframe_box_masks_to_image_masks(
            output_dict['detection_masks'], output_dict['detection_boxes'],
            image.shape[0], image.shape[1])
        detection_masks_reframed = tf.cast(detection_masks_reframed > 0.5, tf.uint8)
        output_dict['detection_masks_reframed'] = detection_masks_reframed.numpy()

    return output_dict

# Function to run inference on a set of images
def run_inference(model, category_index, image_path, output_path):
    if os.path.isdir(image_path):
        image_paths = []
        for file_extension in ('*.png', '*jpg'):
            image_paths.extend(glob.glob(os.path.join(image_path, file_extension)))

        i = 0
        for i_path in image_paths:
            image_np = load_image_into_numpy_array(i_path)
            # Actual detection.
            output_dict = run_inference_for_single_image(model, image_np)
            # Visualization of the results of a detection.
            vis_util.visualize_boxes_and_labels_on_image_array(
                image_np,
                output_dict['detection_boxes'],
                output_dict['detection_classes'],
                output_dict['detection_scores'],
                category_index,
                instance_masks=output_dict.get('detection_masks_reframed', None),
                use_normalized_coordinates=True,
                line_thickness=8)
            plt.imshow(image_np)

            # save the output to the specified output folder
            plt.savefig(os.path.join(output_path, "detection_output{}.png".format(i)))
            i = i + 1

if __name__ == '__main__':
    # Parse command line arguments
    parser = argparse.ArgumentParser(description='Detect objects')
    parser.add_argument('-m', '--model', type=str, required=True, help='Model Path')
    parser.add_argument('-l', '--labelmap', type=str, required=True, help='Path to Labelmap')
    parser.add_argument('-i', '--image_path', type=str, required=True, help='Path to image (or folder)')
    parser.add_argument('-o', '--output_path', type=str, required=True, default='./outputs/', help='Path to output folder (default: outputs/)')
    args = parser.parse_args()

    # Ensure that the output_path ends with a '/'
    if not args.output_path.endswith('/'):
        args.output_path += '/'

    # Create the output folder if it doesn't exist
    os.makedirs(args.output_path, exist_ok=True)

    # Load the pre-trained detection model
    detection_model = load_model(args.model)
    # Create a category index from the label map
    category_index = label_map_util.create_category_index_from_labelmap(args.labelmap, use_display_name=True)
    # Run inference on the images and save the results to the specified output folder
    run_inference(detection_model, category_index, args.image_path, args.output_path)
"""

# Saved the file code

# Specify the file path
file_path = r"./content/models/research/object_detection/testing_images.py"

# Save the code to the file
with open(file_path, "w") as file:
    file.write(testing_content)

print(f"Code saved to {file_path}")

In [None]:
# Command to run the testing model

! python .\content\models\research\object_detection\detect_from_image.py -m .\content\{output_directory}\saved_model -l .\label_map.pbtxt -i .\test_images -o .\outputs