# Obstacle detection

In [None]:
!sudo apt -y install libportaudio2
!pip install -q --use-deprecated=legacy-resolver tflite-model-maker
!pip install -q pycocotools
!pip install -q opencv-python-headless==4.1.2.30
!pip uninstall -y tensorflow && pip install -q tensorflow==2.8.0

In [None]:
import numpy as np
import os

from tflite_model_maker.config import QuantizationConfig
from tflite_model_maker.config import ExportFormat
from tflite_model_maker import model_spec
from tflite_model_maker import object_detector

import tensorflow as tf
assert tf.__version__.startswith('2')

tf.get_logger().setLevel('ERROR')
from absl import logging
logging.set_verbosity(logging.ERROR)



**object detection model archiecture.**


Performance of each EfficientDet-Lite models compared to each others.

| Model architecture | Size(MB)* | Latency(ms)** | Average Precision*** |
|--------------------|-----------|---------------|----------------------|
| EfficientDet-Lite0 | 4.4       | 37            | 25.69%               |
| EfficientDet-Lite1 | 5.8       | 49            | 30.55%               |
| EfficientDet-Lite2 | 7.2       | 69            | 33.97%               |
| EfficientDet-Lite3 | 11.4      | 116           | 37.70%               |
| EfficientDet-Lite4 | 19.9      | 260           | 41.96%               |


In [None]:
spec = model_spec.get('efficientdet_lite4')  #selected lite4 version

**Dataset loading**


In [None]:
train_data, validation_data, test_data = object_detector.DataLoader.from_csv('gs://navigation_detection.csv')  #load the dataset csv file

**Training**

* The EfficientDet-Lite0 model uses `epochs = 50` by default
* Set `train_whole_model=True` to fine-tune the whole model instead of just training the head layer to improve accuracy. The trade-off is that it may take longer to train the model.

In [None]:
model = object_detector.create(train_data, model_spec=spec, batch_size=8, train_whole_model=True, validation_data=validation_data)

**Evaluate the model**

In [None]:
model.evaluate(test_data)

**Export as a TensorFlow Lite model**

Export the trained object detection model to the TensorFlow Lite format.The default post-training quantization technique is full integer quantization.

In [None]:
model.export(export_dir='.')

**Evaluation of the TensorFlow Lite model.**


In [None]:
model.evaluate_tflite('model.tflite', test_data)

In [None]:
#@title Load the trained TFLite model and define some visualization functions

import cv2

from PIL import Image

model_path = 'model.tflite'

# Load the labels into a list
classes = ['???'] * model.model_spec.config.num_classes
label_map = model.model_spec.config.label_map
for label_id, label_name in label_map.as_dict().items():
  classes[label_id-1] = label_name

# Define a list of colors for visualization
COLORS = np.random.randint(0, 255, size=(len(classes), 3), dtype=np.uint8)

def preprocess_image(image_path, input_size):
  """Preprocess the input image to feed to the TFLite model"""
  img = tf.io.read_file(image_path)
  img = tf.io.decode_image(img, channels=3)
  img = tf.image.convert_image_dtype(img, tf.uint8)
  original_image = img
  resized_img = tf.image.resize(img, input_size)
  resized_img = resized_img[tf.newaxis, :]
  resized_img = tf.cast(resized_img, dtype=tf.uint8)
  return resized_img, original_image


def detect_objects(interpreter, image, threshold):
  """Returns a list of detection results, each a dictionary of object info."""

  signature_fn = interpreter.get_signature_runner()

  # Feed the input image to the model
  output = signature_fn(images=image)

  # Get all outputs from the model
  count = int(np.squeeze(output['output_0']))
  scores = np.squeeze(output['output_1'])
  classes = np.squeeze(output['output_2'])
  boxes = np.squeeze(output['output_3'])

  results = []
  for i in range(count):
    if scores[i] >= threshold:
      result = {
        'bounding_box': boxes[i],
        'class_id': classes[i],
        'score': scores[i]
      }
      results.append(result)
  return results


def run_odt_and_draw_results(image_path, interpreter, threshold=0.5):
  """Run object detection on the input image and draw the detection results"""
  # Load the input shape required by the model
  _, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']

  # Load the input image and preprocess it
  preprocessed_image, original_image = preprocess_image(
      image_path,
      (input_height, input_width)
    )

  # Run object detection on the input image
  results = detect_objects(interpreter, preprocessed_image, threshold=threshold)

  # Plot the detection results on the input image
  original_image_np = original_image.numpy().astype(np.uint8)
  for obj in results:
    # Convert the object bounding box from relative coordinates to absolute
    # coordinates based on the original image resolution
    ymin, xmin, ymax, xmax = obj['bounding_box']
    xmin = int(xmin * original_image_np.shape[1])
    xmax = int(xmax * original_image_np.shape[1])
    ymin = int(ymin * original_image_np.shape[0])
    ymax = int(ymax * original_image_np.shape[0])

    # Find the class index of the current object
    class_id = int(obj['class_id'])

    # Draw the bounding box and label on the image
    color = [int(c) for c in COLORS[class_id]]
    cv2.rectangle(original_image_np, (xmin, ymin), (xmax, ymax), color, 2)
    # Make adjustments to make the label visible for all objects
    y = ymin - 15 if ymin - 15 > 15 else ymin + 15
    label = "{}: {:.0f}%".format(classes[class_id], obj['score'] * 100)
    cv2.putText(original_image_np, label, (xmin, y),
        cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

  # Return the final image
  original_uint8 = original_image_np.astype(np.uint8)
  return original_uint8

## Additionals

### Customize the EfficientDet model hyperparameters

The model and training pipline parameters you can adjust are:

* `model_dir`: The location to save the model checkpoint files. If not set, a temporary directory will be used.
* `steps_per_execution`: Number of steps per training execution.
* `moving_average_decay`: Float. The decay to use for maintaining moving averages of the trained parameters.
* `var_freeze_expr`: The regular expression to map the prefix name of variables to be frozen which means remaining the same during training. More specific, use `re.match(var_freeze_expr, variable_name)` in the codebase to map the variables to be frozen.
* `tflite_max_detections`: integer, 25 by default. The max number of output detections in the TFLite model.
* `debug`: Enable debug mode.

### Tune the training hyperparameters

The `create` function is the driver function that the Model Maker library uses to create models. The `create` function comprises of the following steps:

1. Creates the model for the object detection according to `model_spec`.
2. Trains the model.  The default epochs and the default batch size are set by the `epochs` and `batch_size` variables in the `model_spec` object.
You can also tune the training hyperparameters like `epochs` and `batch_size` that affect the model accuracy. For instance,

*   `epochs`: Integer, 50 by default. More epochs could achieve better accuracy, but may lead to overfitting.
*   `batch_size`: Integer, 64 by default. The number of samples to use in one training step.
*   `train_whole_model`: Boolean, False by default. If true, train the whole model. Otherwise, only train the layers that do not match `var_freeze_expr`.

```python
model = object_detector.create(train_data, model_spec=spec, epochs=10, validation_data=validation_data)
```

### Export to different formats



```python
model.export(export_dir='.')
```




```python
config = QuantizationConfig.for_float16()
```


```python
model.export(export_dir='.', tflite_filename='model_fp16.tflite', quantization_config=config)
```

In [None]:
import os
import numpy as np
import tensorflow as tf

IMAGE_SHAPE = (300, 300)
TRAINING_DATA_DIR = ''

datagen_kwargs = dict(rescale=1./255)
test_datagen = tf.keras.preprocessing.image.ImageDataGenerator(**datagen_kwargs)
test_generator = test_datagen.flow_from_directory(TRAINING_DATA_DIR, shuffle=True,target_size=IMAGE_SHAPE,batch_size=1)


# Learn about dataset labels
dataset_labels = sorted(test_generator.class_indices.items(), key=lambda pair:pair[1])
dataset_labels = np.array([key.title() for key, value in dataset_labels])
print(dataset_labels)


true_= None
for i in range(7577):
  # Get images and labels batch from test dataset generator
  test_image_batch, test_label_batch = next(iter(test_generator))
  if i==0:
    true_ = test_label_batch
  else:
    true_ = np.append(true_, test_label_batch, axis=0)
  true_label_ids = np.argmax(test_label_batch, axis=-1)
  #print("test batch shape:", test_image_batch.shape)
  print('Testing on ',(i+1)*1)

  tflite_interpreter = tf.lite.Interpreter(model_path='model.tflite')

  input_details = tflite_interpreter.get_input_details()
  output_details = tflite_interpreter.get_output_details()

  tflite_interpreter.resize_tensor_input(input_details[0]['index'], (1, 300, 300, 3))
  tflite_interpreter.resize_tensor_input(output_details[0]['index'], (1, 15))
  tflite_interpreter.allocate_tensors()

  input_details = tflite_interpreter.get_input_details()
  output_details = tflite_interpreter.get_output_details()

  tflite_interpreter.set_tensor(input_details[0]['index'], test_image_batch)

  tflite_interpreter.invoke()
  if i==0:
    tflite_model_predictions = tflite_interpreter.get_tensor(output_details[0]['index'])
  else:
    temp = tflite_interpreter.get_tensor(output_details[0]['index'])
    tflite_model_predictions = np.append(tflite_model_predictions, temp, axis=0)
  
  del tflite_interpreter

In [None]:
from sklearn.metrics import confusion_matrix
import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np

tflite_predicted_ids1 = np.argmax(tflite_model_predictions, axis=-1)
true_label_ids1 = np.argmax(true_, axis=-1)

y_true = dataset_labels[np.argmax(true_, axis=-1)]
y_pred = dataset_labels[np.argmax(tflite_model_predictions, axis=-1)]
data = confusion_matrix(y_true, y_pred)
df_cm = pd.DataFrame(data, columns=np.unique(y_true), index = np.unique(y_true))
df_cm.index.name = 'Predicted'
df_cm.columns.name = 'Actual'
plt.figure(figsize = (17,15))
sn.set(font_scale=1.4)#for label size
sn.heatmap(df_cm, cmap="Blues", annot=True,annot_kws={"size": 12},fmt='g')# font size

ACCURACY FOR EACH CLASS

In [None]:
accu = {}
tflite_predicted_ids1 = np.argmax(tflite_model_predictions, axis=-1)
true_label_ids1 = np.argmax(true_, axis=-1)
for i in range(len(true_label_ids1)):
  if true_label_ids1[i] == tflite_predicted_ids1[i]:
    accu[str(dataset_labels[true_label_ids1[i]])] = 0

for i in range(len(true_label_ids1)):
  if true_label_ids1[i] == tflite_predicted_ids1[i]:
    accu[str(dataset_labels[true_label_ids1[i]])] += 1

pred = list(tflite_predicted_ids1)
true = list(true_label_ids1)
for i in range(15):
  print(dataset_labels[i],'-', round(accu[dataset_labels[i]]/true.count(i)*100,2),'%')

F1 score

In [None]:
import tensorflow_addons as tfa
metric = tfa.metrics.F1Score(num_classes=15, threshold=0.5)

metric.update_state(true_, tflite_model_predictions)

result = metric.result()
result.numpy()

Precision

In [None]:
for i in range(15):
  m = tf.keras.metrics.Precision(class_id=i)
  m.update_state(true_, tflite_model_predictions)
  print(m.result().numpy())

Recall

In [None]:
for i in range(15):
  m = tf.keras.metrics.Recall(class_id=i)
  m.update_state(true_, tflite_model_predictions)
  print(m.result().numpy())