In [0]:
#@title Install Required Packages.

!apt update && apt install protobuf-compiler -y
!pip install lxml cython numpy pillow matplotlib
!pip install pycocotools tensorflow==1.15.0 kaggle

In [0]:
#@title Required Imports

%matplotlib inline
%tensorflow_version 1.x
%load_ext tensorboard
import os, sys, shutil, fnmatch, random, json
import xml.etree.ElementTree as ET
import numpy as np
import tensorflow as tf

In [0]:
#@title Some Useful Functions

def download_file(url, save_as, continue_existing=True):
    if continue_existing:
        !wget -c "{url}" -O "{save_as}"
    else:
        !wget "{url}" -O "{save_as}"

In [0]:
#@title Mount Google Drive

GDRIVE_WORKSPACE = 'Potholes-Detection'  #@param {type: 'string'}

MOUNT_PATH = '/gdrive'
WORKSPACE_PATH = os.path.join(MOUNT_PATH, 'My Drive', GDRIVE_WORKSPACE)

if not os.path.exists(MOUNT_PATH):
  from google.colab import drive
  drive.mount(MOUNT_PATH)
  !mkdir -p "{WORKSPACE_PATH}"
  !ln -s "{WORKSPACE_PATH}" "root"
  os.chdir("root")

In [0]:
#@title Create Directory Tree

new_folders = " ".join(['annotations', 'archives', 'evaluations', 'exported-models', 'pre-trained-models', 'trainings'])
!mkdir -p {new_folders}


### Important!! Before Executing Next

You need a **Kaggle API Key** to download the dataset here.

To use the Kaggle API, sign up for a Kaggle account at https://www.kaggle.com. Then go to the **My Account** of your user profile (https://www.kaggle.com/YOUR_USERNAME/account) and select **Create API Token**. This will trigger the download of **kaggle.json**, a file containing your API credentials.

Now replace according to your **kaggle.json** file :
- **YOUR_USERNAME** with your username
- **YOUR_API_KEY** with your key

In [0]:
#@title Set Environment Variables and Paths

KAGGLE_USERNAME = 'YOUR_USERNAME' #@param {type: 'string'}
KAGGLE_API_KEY = 'YOUR_API_KEY' #@param {type: 'string'}

os.environ['KAGGLE_USERNAME'] = KAGGLE_USERNAME
os.environ['KAGGLE_KEY'] = KAGGLE_API_KEY

new_paths = [os.path.join(os.getcwd(), f'models/research/{d}') for d in ['', 'object_detection', 'slim']]

for p in new_paths:
    if p not in sys.path:
        sys.path.append(p)
os.environ['PYTHONPATH'] = ':'.join(new_paths)
# os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

In [0]:
#@title Download Tensorflow Models Repository

if not os.path.exists('models'):
    download_file('https://github.com/tensorflow/models/archive/master.zip', 'archives/models.zip', continue_existing=False)
    !unzip -qnd archives "archives/models.zip" && mv archives/models-* models

!cd models/research && protoc object_detection/protos/*.proto --python_out=.

In [0]:
#@title Split Dataset into Train and Test.

DATASET_PATH = '/dataset' #@param {type:'string'}
TEST_RATIO = 0.2 #@param {type:"slider", min:0.01, max:0.99, step:0.01}
FORCE_RESPLIT = False #@param {type:"boolean"}

IMAGES_PATH = os.path.join(DATASET_PATH, 'annotated-images')

if not os.path.exists(DATASET_PATH):
    !kaggle datasets download -p archives chitholian/annotated-potholes-dataset
    !unzip -qd "{DATASET_PATH}" "archives/annotated-potholes-dataset.zip"

if FORCE_RESPLIT or not os.path.exists(os.path.join(DATASET_PATH, 'splits.json')):
    all_xmls = fnmatch.filter(os.listdir(IMAGES_PATH), '*.xml')
    total_size = len(all_xmls)
    max_test_ratio = (total_size - 1)/total_size  # Training set must have at least 1 member.
    assert 0 <= TEST_RATIO <= max_test_ratio, f'TEST_RATIO must be in range [0, {max_test_ratio}]'
    test_set = set()
    train_set = set()
    while len(test_set) < round(total_size * TEST_RATIO):
        test_set.add(all_xmls[random.randint(0, total_size - 1)])
    for xml in all_xmls:
        if xml not in test_set:
            train_set.add(xml)
    with open(os.path.join(DATASET_PATH, 'splits.json'), 'w') as f:
        json.dump({'train': list(train_set), 'test': list(test_set)}, f)
        print(f'Splits written to "{f.name}"')
else:
    with open(os.path.join(DATASET_PATH, 'splits.json'), 'r') as f:
        splits = json.load(f)
        train_set = splits['train']
        test_set = splits['test']

# print('-----Training Set', train_set)
# print('-----Test Set', test_set)

In [0]:
#@title Create LabelMap and TFRECORD Files

LABELS = ['pothole']  #@param {type:"raw"}
FORCE_RECREATE = False  #@param {type:"boolean"}

with open('annotations/labels.pbtxt', 'w') as f:
    for i in range(len(LABELS)):
        f.write(f'''\
item {{
    id: {i + 1}
    name: "{LABELS[i]}"
}}
''')

def class_text_to_int(class_name):
    try:
        return LABELS.index(class_name) + 1
    except:
        return None

def create_tfrecord(xml_files, output_file):
    writer = tf.python_io.TFRecordWriter(output_file)

    for xml_file in xml_files:
        tree = ET.parse(os.path.join(IMAGES_PATH, xml_file))
        root = tree.getroot()

        width = int(root.find('size/width').text)
        height = int(root.find('size/height').text)
        filename = root.find('filename').text.encode('utf8')
        image_format = root.find('filename').text.split('.')[-1].encode('utf8')

        with tf.gfile.GFile(os.path.join(IMAGES_PATH, root.find('filename').text), 'rb') as fid:
            encoded_img = fid.read()

        class_names = []
        class_ids = []
        xmins = []
        ymins = []
        xmaxs= []
        ymaxs = []
        truncated = []
        difficulties = []

        for m in root.iter('object'):
            class_names.append(m.find('name').text.encode('utf8'))
            class_ids.append(int(class_text_to_int(m.find('name').text)))
            xmins.append(int(m.find('bndbox/xmin').text) / width)
            ymins.append(int(m.find('bndbox/ymin').text) / height)
            xmaxs.append(int(m.find('bndbox/xmax').text) / width)
            ymaxs.append(int(m.find('bndbox/ymax').text) / height)
            truncated.append(int(m.find('truncated').text))
            difficulties.append(int(m.find('difficult').text))

        tf_example = tf.train.Example(features=tf.train.Features(feature={
            'image/height': tf.train.Feature(int64_list=tf.train.Int64List(value=[height])),
            'image/width': tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
            'image/filename': tf.train.Feature(bytes_list=tf.train.BytesList(value=[filename])),
            'image/source_id': tf.train.Feature(bytes_list=tf.train.BytesList(value=[filename])),
            'image/encoded': tf.train.Feature(bytes_list=tf.train.BytesList(value=[encoded_img])),
            'image/format': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_format])),
            'image/object/bbox/xmin': tf.train.Feature(float_list=tf.train.FloatList(value=xmins)),
            'image/object/bbox/xmax': tf.train.Feature(float_list=tf.train.FloatList(value=xmaxs)),
            'image/object/bbox/ymin': tf.train.Feature(float_list=tf.train.FloatList(value=ymins)),
            'image/object/bbox/ymax': tf.train.Feature(float_list=tf.train.FloatList(value=ymaxs)),
            'image/object/class/text': tf.train.Feature(bytes_list=tf.train.BytesList(value=class_names)),
            'image/object/class/label': tf.train.Feature(int64_list=tf.train.Int64List(value=class_ids)),
            'image/object/truncated': tf.train.Feature(int64_list=tf.train.Int64List(value=truncated)),
            'image/object/difficult': tf.train.Feature(int64_list=tf.train.Int64List(value=difficulties))
        }))

        writer.write(tf_example.SerializeToString())

    writer.close()
    print('Created tfrecord file:', output_file)

if FORCE_RECREATE or not os.path.exists('annotations/training.record'):
    create_tfrecord(train_set, 'annotations/training.record')
if FORCE_RECREATE or not os.path.exists('annotations/evaluation.record'):
    create_tfrecord(test_set, 'annotations/evaluation.record')

In [0]:
#@title Prepare a Pre-Trained Model

MODEL_NAME = 'ssd_mobilenet_v2_quantized_300x300_coco_2019_01_03'  #@param {type:"string"}
TUNE_CONFIG = True  #@param {type:"boolean"}

INITIAL_LR = 0.005  #@param {type:"number"}
DECAY_STEPS = 6000  #@param {type:"integer"}
DECAY_FACTOR = 0.50  #@param {type:"number"}
BATCH_SIZE =     24  #@param {type:"integer"}
QUANTIZATION_DELAY = 30000  #@param {type:"integer"}

REMOVE_BATCH_NORM_TRAINABLE = True  #@param {type:"boolean"}

if not os.path.exists(f'pre-trained-models/{MODEL_NAME}'):
    download_file(f'http://download.tensorflow.org/models/object_detection/{MODEL_NAME}.tar.gz', f'archives/{MODEL_NAME}.tar.gz')
    !tar -xz -C "pre-trained-models" -f "archives/{MODEL_NAME}.tar.gz"

if not os.path.exists(f'trainings/{MODEL_NAME}/pipeline-mod.config'):
    !mkdir -p "trainings/{MODEL_NAME}"
    !cp "pre-trained-models/{MODEL_NAME}/pipeline.config" "trainings/{MODEL_NAME}/pipeline-mod.config"

## Tune Configurations
if TUNE_CONFIG:
    !sed -i "trainings/{MODEL_NAME}/pipeline-mod.config" \
        -e 's/fine_tune_checkpoint:.*/fine_tune_checkpoint: "pre-trained-models\/{MODEL_NAME}\/model.ckpt"/g' \
        -e 's/label_map_path:.*/label_map_path: "annotations\/labels.pbtxt"/g' \
        -e 's/input_path:.*train.*/input_path: "annotations\/training.record"/g' \
        -e 's/input_path:.*val.*/input_path: "annotations\/evaluation.record"/g' \
        -e 's/num_classes:.*/num_classes: {len(LABELS)}/g' \
        -e 's/num_examples:.*/num_examples: {len(test_set)}/g' \
        -e 's/fixed_shape_resizer/keep_aspect_ratio_resizer/g' \
        -e 's/width:/pad_to_max_dimension: true\n        max_dimension:/g' \
        -e 's/height:/min_dimension:/g' \
        -e 's/shuffle:.*/shuffle: true/g' \
        -e 's/include_metrics_per_category:.*/include_metrics_per_category: false/g' \
        -e 's/delay:.*/delay: {QUANTIZATION_DELAY}/g'

    if INITIAL_LR > 0:
        !sed 's/initial_learning_rate:.*/initial_learning_rate: {INITIAL_LR}/g' -i "trainings/{MODEL_NAME}/pipeline-mod.config"
    if DECAY_STEPS >= 0:
        !sed 's/decay_steps:.*/decay_steps: {DECAY_STEPS}/g' -i "trainings/{MODEL_NAME}/pipeline-mod.config"
    if DECAY_FACTOR > 0:
        !sed 's/decay_factor:.*/decay_factor: {DECAY_FACTOR}/g' -i "trainings/{MODEL_NAME}/pipeline-mod.config"
    if REMOVE_BATCH_NORM_TRAINABLE:
        !sed 's/batch_norm_trainable:.*//g' -i "trainings/{MODEL_NAME}/pipeline-mod.config"
    if BATCH_SIZE > 0:
        !sed 's/batch_size:.*/batch_size: {BATCH_SIZE}/g' -i "trainings/{MODEL_NAME}/pipeline-mod.config"


In [0]:
#@title Launch TensorBoard

%tensorboard --logdir "trainings/{MODEL_NAME}"

In [0]:
#@title Start Training

!python models/research/object_detection/model_main.py \
    --model_dir "trainings/{MODEL_NAME}" \
    --pipeline_config_path "trainings/{MODEL_NAME}/pipeline-mod.config"


In [0]:
#@title Export Inference Graph

EXPORT_PATH = f"exported-models/{MODEL_NAME}"

###########!!! Important !!!###########
## Note: Change the checkpoint value according to your needs.
##       Find them in "trainings/{MODEL_NAME}/model.ckpt-{CHECKPOINT}"
CHECKPOINT = 104384  #@param {type:"integer"}

!python models/research/object_detection/export_tflite_ssd_graph.py \
    --pipeline_config_path "trainings/{MODEL_NAME}/pipeline-mod.config" \
    --trained_checkpoint_prefix "trainings/{MODEL_NAME}/model.ckpt-{CHECKPOINT}" \
    --output_directory "{EXPORT_PATH}" \
    --config_override " \
    model {{ \
      ssd {{ \
          image_resizer {{ \
              fixed_shape_resizer {{ \
                  width: 300 \
                  height: 300 \
              }} \
          }} \
      }} \
    }}"


In [0]:
#@title Create .tflite From .pb

!tflite_convert --output_file "{EXPORT_PATH}/model.tflite" \
  --graph_def_file "{EXPORT_PATH}/tflite_graph.pb" \
  --input_arrays "normalized_input_image_tensor" \
  --output_arrays 'TFLite_Detection_PostProcess','TFLite_Detection_PostProcess:1','TFLite_Detection_PostProcess:2','TFLite_Detection_PostProcess:3' \
  --input_shape "1,300,300,3" \
  --allow_custom_ops \
  --inference_type QUANTIZED_UINT8 \
  --std_dev_values 128 \
  --mean_values 128

In [0]:
#@title Run Evaluation (Optional)

!python models/research/object_detection/model_main.py \
    --model_dir "evaluations/{MODEL_NAME}" \
    --eval_only \
    --run_once \
    --checkpoint_dir "trainings/{MODEL_NAME}" \
    --pipeline_config_path "trainings/{MODEL_NAME}/pipeline-mod.config"


In [0]:
#@title Run Test on Images

from PIL import Image, ImageDraw
import random

NUM_TEST_IMAGES =   10 #@param {type: 'integer'}
CHOOSE_RANDOMLY = True  #@param {type: 'boolean'}
MIN_DETECTION_SCORE =  0.5 #@param {type:"slider", min:0.01, max:0.999, step: 0.01}
SHOW_FULL_SIZE = False  #@param {type: 'boolean'}

if not CHOOSE_RANDOMLY:
  random.seed(0)  # Setting a fixed seed makes the random function like a fixed requence.

ALL_IMAGES = fnmatch.filter(os.listdir(IMAGES_PATH), '*.jpg')
TEST_IMAGES = [os.path.join(IMAGES_PATH, random.choice(ALL_IMAGES)) for i in range(NUM_TEST_IMAGES)]

# Load TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(model_path=f"{EXPORT_PATH}/model.tflite")
interpreter.allocate_tensors()

# Get input and output tensors.
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_shape = input_details[0]['shape']

for i in TEST_IMAGES:
  img_orig = Image.open(i)
  img2 = img_orig.resize((300, 300))
  
  if SHOW_FULL_SIZE:
    image = img_orig
  else:
    image = img2
  
  w, h = image.size
  draw = ImageDraw.Draw(image)

  img = np.asarray(img2, dtype=np.uint8)

  input_data = np.reshape(img, input_shape)
  interpreter.set_tensor(input_details[0]['index'], input_data)

  interpreter.invoke()

  detections = interpreter.get_tensor(output_details[3]['index'])
  locations = interpreter.get_tensor(output_details[0]['index'])
  scores = interpreter.get_tensor(output_details[2]['index'])
  
  for d in range(int(detections[0])):
    loc = locations[0][d]
    score = scores[0][d]
    if score >= MIN_DETECTION_SCORE:
      loc = [loc[3] * w, loc[2] * h, loc[1] * w, loc[0] * h]

      draw.rectangle(loc)
      draw.text((loc[2:4]), '{:.2f}%'.format(score * 100))
  display(image)