# Modified Waste identification with instance segmentation in TensorFlow

In [1]:
!pip install -q tf_keras

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from six.moves.urllib.request import urlopen
from six import BytesIO
from PIL import Image
import tensorflow as tf
import numpy as np
import sys
import matplotlib.pyplot as plt
import matplotlib
import logging
import pandas as pd

logging.disable(logging.WARNING)

%matplotlib inline

In [4]:
# Clone the tensorflow models repository.
!git clone --depth 1 https://github.com/tensorflow/models 2>/dev/null

In [5]:
# Download the script to pull instance segmentation model weights from the TF Model Garden repo.
url = (
    "https://raw.githubusercontent.com/"
    "tensorflow/models/master/"
    "official/projects/waste_identification_ml/"
    "model_inference/download_and_unzip_models.py"
)

!wget -q {url}

In [6]:
sys.path.append('models/research/')
from object_detection.utils import visualization_utils as viz_utils

In [7]:
sys.path.append('models/official/projects/waste_identification_ml/model_inference/')
import preprocessing
import postprocessing
import color_and_property_extractor
import labels

## Utilities

In [8]:
def load_image_into_numpy_array(path):
  """Load an image from file into a numpy array.

  Puts image into numpy array to feed into tensorflow graph.
  Note that by convention we put it into a numpy array with shape
  (height, width, channels), where channels=3 for RGB.

  Args:
    path: the file path to the image

  Returns:
    uint8 numpy array with shape (1, h, w, 3)
  """
  image = None
  if(path.startswith('http')):
    response = urlopen(path)
    image_data = response.read()
    image_data = BytesIO(image_data)
    image = Image.open(image_data)
  else:
    image_data = tf.io.gfile.GFile(path, 'rb').read()
    image = Image.open(BytesIO(image_data))

  (im_width, im_height) = image.size
  return np.array(image.getdata()).reshape(
      (1, im_height, im_width, 3)).astype(np.uint8)


def load_model(model_handle):
    """Loads a TensorFlow SavedModel and returns a function that can be used to make predictions.

    Args:
      model_handle: A path to a TensorFlow SavedModel.

    Returns:
      A function that can be used to make predictions.
    """
    print('loading model...')
    print(model_handle)
    model = tf.saved_model.load(model_handle)
    print('model loaded!')
    detection_fn = model.signatures['serving_default']
    return detection_fn


def perform_detection(model, image):
  """Performs Mask RCNN on an image using the specified model.

  Args:
    model: A function that can be used to make predictions.
    image_np: A NumPy array representing the image to be detected.

  Returns:
    A list of detections.
  """
  detection_fn = model(image)
  detection_fn = {key: value.numpy() for key, value in detection_fn.items()}
  return detection_fn

In [9]:
# 'material_model' output is both material and its sub type e.g. Plastics_PET.
# 'material_form_model' outputs the form of an object e.g. can, bottle, etc.
MODEL_WEIGHTS_DICT = {
  'MODELS_WEIGHTS_RESNET_V1' : {
      'material_url': (
          'https://storage.googleapis.com/tf_model_garden/vision/'
          'waste_identification_ml/two_model_strategy/material_resnet_v1.zip'
      ),
      'material_form_url': (
          'https://storage.googleapis.com/tf_model_garden/vision/'
          'waste_identification_ml/two_model_strategy/material_form_resnet_v1.zip'
      ),
  },
  'MODELS_WEIGHTS_RESNET_V2': {
      'material_url': (
          'https://storage.googleapis.com/tf_model_garden/vision/'
          'waste_identification_ml/two_model_strategy/material_resnet_v2.zip'
      ),
      'material_form_url': (
          'https://storage.googleapis.com/tf_model_garden/vision/'
          'waste_identification_ml/two_model_strategy/material_form_resnet_v2.zip'
      ),
  },
  'MODELS_WEIGHTS_MOBILENET_V2': {
      'material_url': (
          'https://storage.googleapis.com/tf_model_garden/vision/'
          'waste_identification_ml/two_model_strategy/material_mobilenet_v2.zip'
      ),
      'material_form_url': (
          'https://storage.googleapis.com/tf_model_garden/vision/'
          'waste_identification_ml/two_model_strategy/material_form_mobilenet_v2.zip'
      ),
  }
}

MODELS_RESNET_V1 = {
'material_model' : 'material/material_resnet_v1/saved_model/',
'material_form_model' : 'material_form/material_form_resnet_v1/saved_model/',
}

MODELS_RESNET_V2 = {
'material_model' : 'material/material_resnet_v2/saved_model/',
'material_form_model' : 'material_form/material_form_resnet_v2/saved_model/',
}

MODELS_MOBILENET_V2 = {
'material_model' : 'material/material_mobilenet_v2/saved_model/',
'material_form_model' : 'material_form/material_form_mobilenet_v2/saved_model/',
}

LABELS = {
    'material_model': (
        'models/official/projects/waste_identification_ml/pre_processing/'
        'config/data/two_model_strategy_material.csv'
    ),
    'material_form_model': (
        'models/official/projects/waste_identification_ml/pre_processing/'
        'config/data/two_model_strategy_material_form.csv'
    ),
}

# Path to a sample image stored in the repo.
IMAGES_FOR_TEST = {
    'Image1': (
        'models/official/projects/waste_identification_ml/pre_processing/'
        'config/sample_images/image_2.png'
    )
}

## Import pre-trained models.

In [10]:
selected_model = "MODELS_WEIGHTS_RESNET_V2" #@param ["MODELS_WEIGHTS_RESNET_V1", "MODELS_WEIGHTS_RESNET_V2", "MODELS_WEIGHTS_MOBILENET_V2"]

if selected_model == "MODELS_WEIGHTS_RESNET_V1":
  ALL_MODELS = MODELS_RESNET_V1
elif selected_model == "MODELS_WEIGHTS_RESNET_V2":
  ALL_MODELS = MODELS_RESNET_V2
elif selected_model == "MODELS_WEIGHTS_MOBILENET_V2":
  ALL_MODELS = MODELS_MOBILENET_V2

# Extract URLs based on the selected model
url1 = MODEL_WEIGHTS_DICT[selected_model]['material_url']
url2 = MODEL_WEIGHTS_DICT[selected_model]['material_form_url']

# Download and unzip the selected model weights
!python3 download_and_unzip_models.py $url1 $url2

## Load label map data

In [11]:
# The total number of predicted labels (category_indices) for a combined model = 741.
category_indices, category_index = labels.load_labels(LABELS)

## Load pre-trained weights for both models.

In [12]:
# Loading both models.
detection_fns = [load_model(model_path) for model_path in ALL_MODELS.values()]

loading model...
material/material_resnet_v2/saved_model/
model loaded!
loading model...
material_form/material_form_resnet_v2/saved_model/
model loaded!


## **Inference Utilities**

In [13]:
def load_img(image_path, is_display):
  image_np = load_image_into_numpy_array(str(image_path))

  # print('min:', np.min(image_np[0]), 'max:', np.max(image_np[0]))

  print('image is loaded\n')
  if is_display:
    plt.figure(figsize=(24,32))
    plt.imshow(image_np[0])
    plt.show()


  return image_np


def pre_processing(image_np, is_display):
  height = detection_fns[0].structured_input_signature[1]['inputs'].shape[1]
  width = detection_fns[0].structured_input_signature[1]['inputs'].shape[2]
  input_size = (height, width)
  print(f'input size is {height}x{width}')

  image_np_cp = tf.image.resize(image_np[0], input_size, method=tf.image.ResizeMethod.AREA)
  image_np_cp = tf.cast(image_np_cp, tf.uint8)
  image_np = preprocessing.normalize_image(image_np_cp)
  image_np = tf.expand_dims(image_np, axis=0)
  image_np.get_shape()

  print('pre processing done\n')

  if is_display:
    plt.figure(figsize=(24,32))
    plt.imshow(image_np[0])
    plt.show()


  return image_np, image_np_cp, height, width


def display_final_result(final_result, image_np_cp):
  %matplotlib inline
  image_new = image_np_cp.numpy().copy()

  if 'detection_masks_reframed' in final_result:
    final_result['detection_masks_reframed'] = final_result['detection_masks_reframed'].astype(np.uint8)

  viz_utils.visualize_boxes_and_labels_on_image_array(
        image_new,
        final_result['detection_boxes'][0],
        (final_result['detection_classes'] + 0).astype(int),
        final_result['detection_scores'][0],
        category_index=category_index,
        use_normalized_coordinates=True,
        max_boxes_to_draw=100,
        min_score_thresh=0.6,
        agnostic_mode=False,
        instance_masks=final_result.get('detection_masks_reframed', None),
        line_thickness=2)

  plt.figure(figsize=(20,10))
  plt.imshow(image_new)
  plt.show()


def get_detection_info(final_result):
    num_objects = int(final_result['num_detections'][0])
    object_types = final_result['detection_classes_names']
    detection_boxes = final_result['detection_boxes'][0]

    coordinates = []
    for box in detection_boxes:
        ymin, xmin, ymax, xmax = box
        x_center = (xmin + xmax) / 2
        y_center = (ymin + ymax) / 2
        coordinates.append((x_center, y_center))

    return num_objects, object_types, coordinates


def inference(path_image, is_display):
  image_np, image_np_cp, height, width = pre_processing(load_img(path_image, is_display), is_display)

  print('performing inference')
  results = list(map(lambda model: perform_detection(model, image_np), detection_fns))
  print('inference done\n')

  # from here, from one cell.
  use_generic_color = True


  SCORE_THRESH = 0.8

  no_detections_in_first = results[0]['num_detections'][0]
  no_detections_in_second = results[1]['num_detections'][0]

  if no_detections_in_first !=0 and no_detections_in_second != 0:
    results = [postprocessing.reframing_masks(detection, height, width) for detection in results]

    max_detection = max(no_detections_in_first, no_detections_in_second)

    area_threshold = 0.3 * np.prod(image_np_cp.shape[:2])

    final_result = postprocessing.find_similar_masks(
        results[0],
        results[1],
        max_detection,
        SCORE_THRESH,
        category_indices,
        category_index,
        area_threshold
    )

    transformed_boxes = []
    for bb in final_result['detection_boxes'][0]:
        YMIN = int(bb[0]*height)
        XMIN = int(bb[1]*width)
        YMAX = int(bb[2]*height)
        XMAX = int(bb[3]*width)
        transformed_boxes.append([YMIN, XMIN, YMAX, XMAX])

    filtered_boxes, index_to_delete = (
      postprocessing.filter_bounding_boxes(transformed_boxes))

    final_result['num_detections'][0] -= len(index_to_delete)
    final_result['detection_classes'] = np.delete(
        final_result['detection_classes'], index_to_delete)
    final_result['detection_scores'] = np.delete(
        final_result['detection_scores'], index_to_delete, axis=1)
    final_result['detection_boxes'] = np.delete(
        final_result['detection_boxes'], index_to_delete, axis=1)
    final_result['detection_classes_names'] = np.delete(
        final_result['detection_classes_names'], index_to_delete)
    final_result['detection_masks_reframed'] = np.delete(
        final_result['detection_masks_reframed'], index_to_delete, axis=0)

    if final_result['num_detections'][0]:

      dfs, cropped_masks = (
          color_and_property_extractor.extract_properties_and_object_masks(
              final_result, height, width, image_np_cp))
      features = pd.concat(dfs, ignore_index=True)
      features['image_name'] = 'Image 1' #TODO: figure out what to do here.
      features.rename(columns={
          'centroid-0':'y',
          'centroid-1':'x',
          'bbox-0':'bbox_0',
          'bbox-1':'bbox_1',
          'bbox-2':'bbox_2',
          'bbox-3':'bbox_3'
      }, inplace=True)


  features.iloc[0]

  if is_display:
    display_final_result(final_result, image_np_cp)

  return final_result, features, image_np_cp

### **Colab Codes**

In [14]:
sys.path.append('/content/drive/MyDrive/plugins/')
import gs_manager_colab as gs_manager

In [15]:
gs_manager.initialize.init("DetectedOutput", "Sheet1")

## **Deployment Code**

In [16]:
import os
import time

In [18]:
PATH_bucket = "/content/drive/MyDrive/data_bucket/img_bucket"


def check_folder_changes(folder_path):
  keep_checking = True #@param {type:"boolean"}
  initial_files = set(file for file in os.listdir(folder_path))

  if keep_checking:
    print('checking...')
    time.sleep(1)

    current_files = set(file for file in os.listdir(folder_path))

    added_files = current_files - initial_files
    removed_files = initial_files - current_files

    if added_files or removed_files:
        print("Changes detected:")
        if added_files:
            for added_file in added_files:
                print("Added file:", added_file)
                PATH_target_pic = os.path.join(folder_path, added_file)

                print('\nstarting resolving')
                final_result, features, image_np_cp = inference(PATH_target_pic, False)
                print('resolving done\n')

                print('data uploading')
                sliced, k = gs_manager.main.data_manager(final_result)

                added_file = added_file.replace('.jpg', '')

                for i in range(k):
                  gs_manager.main.gs_edit(gs_manager.bucket.worksheet, f'{added_file}', str(sliced[i]))

                print('data uploading complete.\n')

        if removed_files:
            print("Removed files:", removed_files)


        initial_files = current_files
    else:
      check_folder_changes(folder_path)

check_folder_changes(PATH_bucket)

checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
checking...
Changes detected:
Added file: frame_2cz1x5t3.jpg

starting resolving
image is loaded

input size is 512x1024
pre processing done

performing inference
inference done

resolving done

data uploading
current cell time is 20240808190439
datecell, tokenCell, dataCell is correctly oriented
data uploading complete.

