<a href="https://colab.research.google.com/github/aubreymoore/crb-damage-detector-colab/blob/main/detect_and_annotate_dev.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# detect_and_annotate.ipynb

NOTE: The following documentation is already slightly out of date.
Please visit https://github.com/aubreymoore/crb-damage-detector-colab before running this notebook for the first time.

This Colab Jupyter notebook runs a custom YOLOv8 object detector which scans images to find three object classes: live coconut palms, dead coconut palms and v-shaped cuts symptomatic of damage caused by coconut rhinoceros beetle, *Oryctes rhinoceros*.

IMPORTANT: Shortly after the MAIN PROGRAM section begins executing, a BROWSE button will appear below the active cell to allow you to upload single file of input data from your loacal machine to Colab.

**Note that Colab will just sit there and not do anything until you have entered a path to a test file of URLs or a ZIP file of images on your local machine.** [Click here to scroll down to the "Browser" button.](#scrollTo=5zSjfTXvIv2q&line=1&uniqifier=1)

You may choose between 2 options:
* A TEXT file (\*.txt) containing URLs for images to be scanned. One URL per line. (This is the most efficient option.)
* A ZIP file (\*.zip) containing images to be scanned.


Test data are available in a companion GitHub repository at https://github.com/aubreymoore/crb-damage-detector-colab. To use the test data, download it to your local computer it as a [ZIP file](https://github.com/aubreymoore/crb-damage-detector-colab/archive/refs/heads/main.zip) and unzip it. If you have **git** installed, you can clone the repo as an alternative. The TEXT file or ZIP file to be uploaded to Colab will be found in the repository's **data** folder.

To scan images, select **Runtime | Run all** on the main menu.
Results will be in a temporary OUTPUT folder which you can access using the **File browser** in the left Colab panel.

When image scanning is complete, the OUTPUT folder will be compressed into a single ZIP file and automatically downloaded to your computer.

### TODO

- [ finished 2024-10-19] Reduce size of images in the companion GH repo to max dimension of 960px
- [ ] Copy current trained model to companion GH repo
- [ ] Copy this Jupyter notebook to companion GH repo
- [ ] Add confidence values to bounding box labels.
- [ ] Add database to OUTPUT folder
- [ ] Extract GPS coordinates from image files
- [ ] Figure out how to use URLs to access images stored on OneDrive (Sharepoint)

# Load Python packages which are not preinstalled by Colab

In [None]:
%pip install ultralytics -q
%pip install supervision -q
%pip install icecream -q
%pip install ipython-autotime -q
%pip install exif -q
%pip install requests -q

# Import modules

In [None]:
import cv2
import supervision as sv
from ultralytics import YOLO
import glob
import os
import shutil
from icecream import ic
from google.colab import files
import zipfile
import exif
import numpy as np
import requests

# Load cell timer

In [None]:
%load_ext autotime

# Define functions

In [None]:
def extract_img_exif(data):
  """
  Extracts an image and EXIF metadata from a given data buffer.

  See functions url2img and process_zipped_images to see how data buffers
  con be sourced from an URL referencing an image or a ZIP file containing
  images.

  Returns img as a numpy.ndarray and exif_data as a dict.
  """
  # Extract metadata stored in EXIF
  exif_data = {}
  try:
    exif_data = exif.Image(data).get_all()
  except Exception as e:
    ic(e)
    pass

  # Extract image
  bytes_as_np_array = np.frombuffer(data, dtype=np.uint8)
  img = cv2.imdecode(bytes_as_np_array, cv2.IMREAD_UNCHANGED)
  # img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

  # Rotate image if necessary
  try:
    if str(exif_data['orientation']) == 'Orientation.RIGHT_TOP':
      img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
  except Exception as e:
    ic(e)
    pass

  return img, exif_data

# # Usage:
# url = 'https://github.com/aubreymoore/crb-damage-detector-colab/blob/main/data/images/IMG_0695.JPG?raw=true'
# img, exif_data = url2img(url)
# img

In [None]:
def url2img(url):
  """
  Loads contents of a file referenced by an URL into memory and extracts an
  image and EXIF metadata

  Returns img as a numpy.ndarray and exif_data as a dict.
  """
  # Download the data file referenced by the URL and save contents as "data"
  try:
    response = requests.get(url)
    data = response.content
  except Exception as e:
    ic(e)
    return None, None

  img, exif_data = extract_img_exif(data)
  return img, exif_data

# Usage:
# # url = 'https://github.com/aubreymoore/crb-damage-detector-colab/blob/main/data/images/IMG_0532.JPG?raw=true'
# url = 'https://github.com/aubreymoore/crb-damage-detector-colab/blob/main/data/images/IMG_0695.JPG?raw=true'
# img, exif_data = url2img(url)
# ic(exif_data['orientation'])
# img

In [None]:
def process_zipped_images(zip_file_path):
  """
  Args:
    zip_file_path:
  """
  z = zipfile.ZipFile(zip_file_path)
  ic(z.namelist())
  for file_name in z.namelist():
    if file_name.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):  # Check for common image extensions
      with z.open(file_name, 'r') as file:   # Use z.open to directly open the file within the zip archive
        ic(' ')
        ic(file_name)
        try:
          data = file.read() # Read the content of the file as bytes
          img, exif_data = extract_img_exif(data)

          # PROCESS AN IMAGE

          # detect objects
          detections, annotated_image = detect_objects(img, model, box_annotator, label_annotator, csv_sink)

          # Save annotated image
          fn = os.path.basename(file_name)
          output_path = f"OUTPUT/{fn.replace('.', '_annotated.')}"
          cv2.imwrite(output_path, annotated_image)

          # Append metadata to csv_sink
          gps_data = get_gps_from_exif(exif_data)
          ic(gps_data)
          csv_sink.append(
            detections,
            custom_data={
              'timestamp': gps_data['timestamp'].replace(':', '-', 2),
              'latitude': round(gps_data['latitude'], 6),
              'longitude': round(gps_data['longitude'], 6),
              'image_h': img.shape[0],
              'image_w': img.shape[1],
              'source': fn}
          )

        except Exception as e:
          ic(f"Error processing {file_name}: {e}")

# Usage:
# process_zipped_images('images.zip')

In [None]:
def get_gps_from_exif(exif_data):
  """
  Gets timestamp and GPS coordinates for an image.

  Args:
    exif_data: dict containing EXIF data

  Returns:
    timestamp, latitude, longitude
  """
  d = exif_data
  try:
    timestamp = d.get('datetime_original', None)
    latdms = d.get('gps_latitude', None)
    londms = d.get('gps_longitude', None)
    if latdms and londms:
      latitude = latdms[0] + latdms[1]/60 + latdms[2]/3600
      if d['gps_latitude_ref'] == 'S':
        latitude = -latitude
      longitude  = londms[0] + londms[1]/60 + londms[2]/3600
      if d['gps_longitude_ref'] == 'W':
        longitude = -longitude
    return {"timestamp": timestamp, "latitude": latitude, "longitude": longitude}
  except Exception as e:
    ic(e)
    return {"timestamp": None, "latitude": None,"longitude": None}

# # Usage:
# url = 'https://github.com/aubreymoore/crb-damage-detector-colab/blob/main/data/images/IMG_0532.JPG?raw=true'
# img, exif_data = url2img(url)
# gps_data = get_gps_from_exif(exif_data)
# gps_data

In [None]:
def upload_model_weights():
  '''
  Upload model weights from GitHub repo to **weights.pt** only if this file does not already exist.
  '''
  !wget -nc https://github.com/aubreymoore/code-for-CRB-damage-ai/raw/refs/heads/main/models/3class/train5/weights/best.pt -O weights.pt

# upload_model_weights()

In [None]:
def load_model_weights():
  model = YOLO('weights.pt')

In [None]:
def create_input_folder():
  if not os.path.exists('INPUT'):
    os.makedirs('INPUT')

# create_input_folder()

In [None]:
def create_output_folder():
  if not os.path.exists('OUTPUT'):
    os.makedirs('OUTPUT')

# create_output_folder()

In [None]:
def run_garbage_disposal():
  '''
  Delete any data files left over from the last run.
  '''
  shutil.rmtree('INPUT', ignore_errors=True)
  shutil.rmtree('OUTPUT', ignore_errors=True)
  shutil.rmtree('sample_data', ignore_errors=True)

  try:
    os.remove('weights.pt')
  except OSError:
    pass

# run_garbage_disposal()

In [None]:
def extract_JPG_files(zip_file_path, output_dir):
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        # Loop through each file in the zip archive
        for file_name in zip_ref.namelist():
            if file_name.endswith('.JPG'):  # Check for .jpg extension
                print(f'Extracting {file_name}...')
                zip_ref.extract(file_name, output_dir)  # Extract the file

# Usage
# extract_dll_files('path/to/your/archive.zip', 'path/to/extract/directory')


In [None]:
def extract_zip_to_memory(zip_content):
    """
    Extracts files from a ZIP archive stored in memory.

    :param zip_content: Bytes of the ZIP file.
    :return: A dictionary of filename and file-like objects.
    """
    extracted_files = {}
    with zipfile.ZipFile(io.BytesIO(zip_content)) as z:
        for file_info in z.infolist():
            with z.open(file_info) as file:
                extracted_files[file_info.filename] = file.read()  # Read file content
    return extracted_files

# Usage example
# Assuming 'zip_data' contains the bytes of your ZIP file
# zip_data = ... (load your ZIP data here)
# files = extract_zip_to_memory(zip_data)
# for name, content in files.items():
#     print(f"Extracted {name} with size {len(content)} bytes")


In [None]:
def upload_zip_or_txt():
  '''
  Upload images in a ZIP (*.zip) or list of URLs (*.txt)
  '''
  input_mode = None
  urls = None
  # image_file_dir = None

  # THE FOLLOWING LINE TRIGGERS APPEARANCE OF THE BROWSE BUTTON
  uploaded = files.upload(target_dir='INPUT')
  filename = list(uploaded.keys())[0]

  if filename.endswith('.txt'):
    input_mode = 'text'
    with open(filename, 'r') as f:
      urls = f.read().splitlines()
  elif filename.endswith('.zip'):
    input_mode = 'zip'
  else:
    raise ValueError('INPUT file must be *.txt or *.zip.')
  return input_mode, urls

# Usage:
# input_mode, urls = upload_and_unpack_zip_or_txt()

In [None]:
def get_input_file_list():
  return glob.glob(f'INPUT/**/*', recursive=True)

# get_input_file_list()

In [None]:
def detect_objects(image, model, box_annotator, label_annotator, csv_sink):
  '''
  detect objects in an image
  returns detections and an annotated image
  '''
  results = model(image)[0]
  detections = sv.Detections.from_ultralytics(results)
  # ic(detections)
  annotated_image = box_annotator.annotate(image, detections=detections)
  labels = [f"{model.model.names[class_id]} {confidence:.2f}" for class_id, confidence in zip(detections.class_id, detections.confidence)]
  annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections, labels=labels)
  return detections, annotated_image

# Usage:
# detections, annotated_image = detect_objects(img, model, box_annotator, label_annotator, csv_sink)

# MAIN PROGRAM

In [None]:
# Clear data files from previous run
run_garbage_disposal()

create_input_folder()
create_output_folder()

# Upload images or list of URLs
# THE FOLLOWING LINE TRIGGERS APPEARANCE OF THE BROWSE BUTTON
input_mode, urls = upload_zip_or_txt()

# Upload weights from trained model and load them
upload_model_weights()
model = YOLO('weights.pt')


## Please click on the Browse buttom when it appears above this cell.

In [None]:
# Scan images

box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()
csv_sink = sv.CSVSink('OUTPUT/detections.csv')
csv_sink.open()

if input_mode == 'text':
  for url in urls:
    ic()
    ic(url)
    try:
      img, exif_data = url2img(url)
    except Exception as e:
      ic()
      ic(e)
      continue

    # Continue with next URL in the loop if img cannot be retrieved
    if img is None:
      continue

    detections, annotated_image = detect_objects(img, model, box_annotator, label_annotator, csv_sink)
    gps_data = get_gps_from_exif(exif_data)
    ic(gps_data)
    csv_sink.append(
        detections,
        custom_data={
            'timestamp': gps_data['timestamp'].replace(':', '-', 2),
            'latitude': round(gps_data['latitude'], 6),
            'longitude': round(gps_data['longitude'], 6),
            'image_h': img.shape[0],
            'image_w': img.shape[1],
            'source': url}
    )
    # Extract filename from URL
    filename = url.split('/')[-1]
    pos = filename.find('?')
    if pos >= 0:
      filename = filename[:pos]

    # Save annotated image
    output_path = f'OUTPUT/{filename}'.replace('.', '_annotated.')
    ic(output_path)
    os.makedirs(os.path.dirname(output_path), exist_ok = True)
    cv2.imwrite(output_path, annotated_image)

if input_mode == 'zip':
  process_zipped_images('INPUT/images.zip')

csv_sink.close()

### Download OUTPUT folder as a ZIP file

In [None]:
!zip -r OUTPUT.zip OUTPUT

In [None]:
from google.colab import files
files.download("OUTPUT.zip")

# FINISHED
If everything worked as intended, you should find a file named **OUTPUT.zip** in your Downloads folder. Unzip this file to see results.

In [None]:
print('FINISHED')