In [None]:
from google.colab import drive
drive.mount("/content/drive")

#Requirements.txt

Install the packages with versions below:
- keras==2.2.5
- tensorflow-gpu==1.15.0
- h5py==2.10.0

Import the packages below:
- os
- sys
- json
- datetime
- numpy as np
- skimage.draw
- cv2
- random
- matplotlib.pyplot as plt

Change your directory into the Mask RCNN folder and import the MRCNN packages:
- from mrcnn import utils 
- from mrcnn import visualize
- from mrcnn.visualize import display_instances
- from mrcnn.visualize import display_images
- from mrcnn.model import log
- import mrcnn.model as modellib
- from mrcnn.config import Config

Things to modify accordingly:
* Point the Mask RCNN folder to the right folder in your Google Drive
* Point the COCO weights file to the COCO weights file path under the Mask RCNN .h5 file
* Set the logs folder to log the saved model

In [None]:
!pip install keras==2.2.5
!pip install tensorflow-gpu==1.15.0
!pip install h5py==2.10.0

In [None]:
MASK_RCNN_DIR = '/content/drive/My Drive/Colab/Mask-RCNN/Mask_RCNN'

In [None]:
import os
import sys
import json
import datetime
import numpy as np
import skimage.draw
import cv2
import random
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.python.client import device_lib
from google.colab.patches import cv2_imshow

os.chdir(MASK_RCNN_DIR)
sys.path.append(MASK_RCNN_DIR)

from mrcnn import utils
from mrcnn import visualize
from mrcnn.visualize import display_instances
from mrcnn.visualize import display_images
from mrcnn.model import log
import mrcnn.model as modellib

# Root directory of the project
ROOT_DIR = os.path.abspath(".")

# Import Mask RCNN
sys.path.append(ROOT_DIR)  # To find local version of the library
from mrcnn.config import Config
from mrcnn import model as modellib, utils

# Path to trained weights file
COCO_WEIGHTS_PATH = os.path.join(ROOT_DIR, "mask_rcnn_coco.h5")

# Directory to save logs and model checkpoints, if not provided
# through the command line argument --logs
DEFAULT_LOGS_DIR = "/content/drive/My Drive/logs"

# Custom Configuration

If you have a custom dataset to train on, change:
- NAME: Name of the custom config
- BACKBONE: Use 'resnet50' for faster training time
- DETECTION_MAX_INSTANCES: Reduce max instances if your num_classes are not too many
- IMAGE_MIN_DIM/IMAGE_MAX_DIM: Change this according to your image size, lower resolutions speed up training time
- IMAGES_PER_GPU: Reduce if you use a less powerful GPU
- BATCH_SIZE: Default to 1 to speed up training time, adjust accordingly
- STEPS_PER_EPOCH: Set it to TOTAL_TRAINING_SAMPLES / BATCH_SIZE (general rul)
- DETECTION_MIN_CONFIDENCE: Skip detections with < 90% confidence
- LEARNING_RATE: Change the learning rate according to your needs

In [None]:
############################################################
#  Configurations
############################################################

class CustomConfig(Config):
    """Configuration for training on the custom dataset.
    Derives from the base Config class and overrides some values.
    """
    # Give the configuration a recognizable name
    NAME = "drinks"

    BACKBONE = "resnet50"

    DETECTION_MAX_INSTANCES = 20

    IMAGE_MIN_DIM = 640
    IMAGE_MAX_DIM = 640

    # Adjust down if you use a smaller GPU.
    IMAGES_PER_GPU = 1

    # Number of classes (including background)
    NUM_CLASSES = 1 + 2  # Background + Mountain_Dew + Pepsi

    # Number of training steps per epoch
    STEPS_PER_EPOCH = 50

    # Skip detections with < 90% confidence
    DETECTION_MIN_CONFIDENCE = 0.9


# Custom Dataset

If you have a custom dataset:
1. Add the class under the CustomConfig and give it a class_id
2. Best to use the COCO annotation format as it has the relevant JSON items
3. Remember to add a new array 'num_ids' for storing the ids of new classes(specify which class corresponds to which id), then add it to the class itself
4. Under the function load_mask(), add the num_ids and change it into a Numpy array for use later, before adding it to the return statement

In [None]:
#TRAIN_CLASSES = ['mdew', "pepsi"]

In [None]:
############################################################
#  Dataset
############################################################

class CustomDataset(utils.Dataset):

    def load_custom(self, dataset_dir, subset):
        """Load the beagle dataset.
        dataset_dir: Root directory of the dataset.
        subset: Subset to load: train or val
        """
        # Add classes. We have only one class to add.
        #for i, class_ in enumerate(classes):
        #  self.add_class(str(config_name), i+1, class_)
        self.add_class("drinks", 1, "mdew")
        self.add_class("drinks", 2, "pepsi")

        # Train or validation dataset?
        assert subset in ["train", "val"]
        dataset_dir = os.path.join(dataset_dir, subset)

        # Load annotations
        # VGG Image Annotator saves each image in the form:
        # { 'filename': '28503151_5b5b7ec140_b.jpg',
        #   'regions': {
        #       '0': {
        #           'region_attributes': {},
        #           'shape_attributes': {
        #               'all_points_x': [...],
        #               'all_points_y': [...],
        #               'name': 'polygon'}},
        #       ... more regions ...
        #   },
        #   'size': 100202
        # }
        # We mostly care about the x and y coordinates of each region
        annotations1 = json.load(open(os.path.join(dataset_dir, "coco_annot.json")))
        # print(annotations1)
        annotations = list(annotations1.values())  # don't need the dict keys

        # The VIA tool saves images in the JSON even if they don't have any
        # annotations. Skip unannotated images.
        annotations = [a for a in annotations if a['regions']]

        # Add images
        for a in annotations:
            # Get the x, y coordinaets of points of the polygons that make up
            # the outline of each object instance. There are stores in the
            # shape_attributes (see json format above)
            polygons = [r['shape_attributes'] for r in a['regions']]

            # load_mask() needs the image size to convert polygons to masks.
            # Unfortunately, VIA doesn't include it in JSON, so we must read
            # the image. This is only managable since the dataset is tiny.
            image_path = os.path.join(dataset_dir, a['filename'])
            image = skimage.io.imread(image_path)
            height, width = image.shape[:2]
            objects = [s['region_attributes']['class'] for s in a['regions']]
            print("Objects/Classes: ", objects)

            name_dict = {"mdew": 1, "pepsi":2}
            num_ids = [name_dict[a] for a in objects]
            print(num_ids)

            self.add_image(
                "drinks",  ## for a single class just add the name here
                image_id=a['filename'],  # use file name as a unique image id
                path=image_path,
                width=width, height=height,
                polygons=polygons,
                num_ids=num_ids)

    def load_mask(self, image_id):
        """Generate instance masks for an image.
       Returns:
        masks: A bool array of shape [height, width, instance count] with
            one mask per instance.
        class_ids: a 1D array of class IDs of the instance masks.
        """
        # If not a beagle dataset image, delegate to parent class.
        image_info = self.image_info[image_id]
        if image_info["source"] != "drinks":
            return super(self.__class__, self).load_mask(image_id)

        # Convert polygons to a bitmap mask of shape
        # [height, width, instance_count]
        info = self.image_info[image_id]
        num_ids = info['num_ids']
        mask = np.zeros([info["height"], info["width"], len(info["polygons"])],
                        dtype=np.uint8)
        for i, p in enumerate(info["polygons"]):
            # Get indexes of pixels inside the polygon and set them to 1
            rr, cc = skimage.draw.polygon(p['all_points_y'], p['all_points_x'])
            mask[rr, cc, i] = 1

        # Return mask, and array of class IDs of each instance. Since we have
        # one class ID only, we return an array of 1s
        num_ids = np.array(num_ids, dtype=np.int32)
        return mask, num_ids

    def image_reference(self, image_id):
        """Return the path of the image."""
        info = self.image_info[image_id]
        if info["source"] == "drinks":
            return info["path"]
        else:
            super(self.__class__, self).image_reference(image_id)

In [None]:
def get_ax(rows=1, cols=1, size=8):
  _, ax = plt.subplots(rows, cols, figsize=(size*cols, size*rows))
  return ax

#Train

In [None]:
def train(model, config, dataset_dir, epochs):
    """Train the model."""
    # Training dataset.
    dataset_train = CustomDataset()
    dataset_train.load_custom(dataset_dir=dataset_dir, subset="train")
    dataset_train.prepare()

    # Validation dataset
    dataset_val = CustomDataset()
    dataset_val.load_custom(dataset_dir=dataset_dir, subset="val")
    dataset_val.prepare()

    # *** This training schedule is an example. Update to your needs ***
    # Since we're using a very small dataset, and starting from
    # COCO trained weights, we don't need to train too long. Also,
    # no need to train all layers, just the heads should do it.
    print("Training network heads")
    model.train(dataset_train, dataset_val,
                learning_rate=config.LEARNING_RATE,
                epochs=epochs,
                layers='heads')


def color_splash(image, mask):
    """Apply color splash effect.
    image: RGB image [height, width, 3]
    mask: instance segmentation mask [height, width, instance count]
    Returns result image.
    """
    # Make a grayscale copy of the image. The grayscale copy still
    # has 3 RGB channels, though.
    gray = skimage.color.gray2rgb(skimage.color.rgb2gray(image)) * 255
    # We're treating all instances as one, so collapse the mask into one layer
    mask = (np.sum(mask, -1, keepdims=True) >= 1)
    # Copy color pixels from the original color image where mask is set
    if mask.shape[0] > 0:
        splash = np.where(mask, image, gray).astype(np.uint8)
    else:
        splash = gray
    return splash


def detect_and_color_splash(model, image_path=None, video_path=None, save_dir=None):
    assert image_path or video_path

    # Image or video?
    if image_path:
        # Run model detection and generate the color splash effect
        print("Running on {}".format(image_path))
        # Read image
        image = skimage.io.imread(image_path)
        # Detect objects
        r = model.detect([image], verbose=1)[0]
        # Color splash
        splash = color_splash(image, r['masks'])
        # Save output
        file_name = save_dir + "/splash_{:%Y%m%dT%H%M%S}.png".format(datetime.datetime.now())
        skimage.io.imsave(file_name, splash)
        plt.imshow(splash)
    elif video_path:
        import cv2
        # Video capture
        vcapture = cv2.VideoCapture(video_path)
        width = int(vcapture.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(vcapture.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = vcapture.get(cv2.CAP_PROP_FPS)

        # Define codec and create video writer
        file_name = save_dir + "/splash_{:%Y%m%dT%H%M%S}.avi".format(datetime.datetime.now())
        vwriter = cv2.VideoWriter(file_name,
                                  cv2.VideoWriter_fourcc(*'MJPG'),
                                  fps, (width, height))

        count = 0
        success = True
        while success:
            print("frame: ", count)
            # Read next image
            success, image = vcapture.read()
            if success:
                # OpenCV returns images as BGR, convert to RGB
                image = image[..., ::-1]
                # Detect objects
                r = model.detect([image], verbose=0)[0]
                # Color splash
                splash = color_splash(image, r['masks'])
                # RGB -> BGR to save image to video
                splash = splash[..., ::-1]
                # Add image to video writer
                vwriter.write(splash)
                count += 1
        vwriter.release()
    print("Saved to ", file_name)

In [None]:
DEVICE = "/gpu:0"

In [None]:
tf.test.gpu_device_name()
device_lib.list_local_devices()

In [None]:
DATASET_DIR = "/content/drive/My Drive/images"
LOGS_DIR = DEFAULT_LOGS_DIR
WEIGHTS = "coco"

print("Weights: ", WEIGHTS)
print("Dataset: ", DATASET_DIR)

train_config = CustomConfig()
train_config.display()

# Prepare the model
with tf.device(DEVICE):
  model = modellib.MaskRCNN(mode="training", config=train_config, model_dir=LOGS_DIR)

# Prepare the weights
# Select weights file to load
if WEIGHTS.lower() == "coco":
    weights_path = COCO_WEIGHTS_PATH
    # Download weights file
    if not os.path.exists(weights_path):
        utils.download_trained_weights(weights_path)
elif WEIGHTS.lower() == "last":
    # Find last trained weights
    weights_path = model.find_last()[1]
elif WEIGHTS.lower() == "imagenet":
    # Start from ImageNet trained weights
    weights_path = model.get_imagenet_weights()
else:
    weights_path = WEIGHTS

# Load weights
print("Loading weights ", weights_path)
if WEIGHTS.lower() == "coco":
    # Exclude the last layers because they require a matching
    # number of classes
    model.load_weights(weights_path, by_name=True, exclude=["mrcnn_class_logits", "mrcnn_bbox_fc","mrcnn_bbox", "mrcnn_mask"])
else:
    model.load_weights(weights_path, by_name=True)



In [None]:
############################################################
#  Start training
############################################################
train(model, config=train_config, dataset_dir=DATASET_DIR, epochs=50)

# Validation

Change:
- weights_path
- config
- LOGS_DIR

In [None]:
weights_path = "/content/drive/My Drive/logs/drinks20211012T0224/mask_rcnn_drinks_0025.h5"

In [None]:
class InferenceConfig(CustomConfig):
    # Set batch size to 1 since we'll be running inference on
    # one image at a time. Batch size = GPU_COUNT * IMAGES_PER_GPU
    GPU_COUNT = 1
    IMAGES_PER_GPU = 1

In [None]:
config = InferenceConfig()
config.display()

# Load validation dataset
dataset = CustomDataset()
dataset.load_custom(dataset_dir=DATASET_DIR, subset="val")

dataset.prepare()

print("Images: {}\nClasses: {}".format(len(dataset.image_ids), dataset.class_names))

# Create model in inference mode
config = InferenceConfig()
with tf.device(DEVICE):
  model = modellib.MaskRCNN(mode='inference', model_dir=LOGS_DIR, config=config)

print("Loading weights: ", weights_path)
model.load_weights(weights_path, by_name=True)

image_id = random.choice(dataset.image_ids)
image, image_meta, gt_class_id, gt_bbox, gt_mask = modellib.load_image_gt(dataset, config, image_id, use_mini_mask=False)

info = dataset.image_info[image_id]
print("Image ID: {}.{} ({}) {}".format(info['source'], info['id'], image_id, dataset.image_reference(image_id)))

# Run object detection
results = model.detect([image], verbose=1)

ax = get_ax(1)
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'], dataset.class_names, r['scores'], ax=ax, title='Predictions')

log('gt_class_id', gt_class_id)
log('gt_bbox', gt_bbox)
log('gt_mask', gt_mask)

# Inference

Change:
- weights_path
- config
- LOGS_DIR

In [None]:
image_path = "/content/drive/My Drive/images/mdew_pepsi/test/pepsi/1.jpg"

In [None]:
# Run object detection
image = skimage.io.imread(image_path)
results = model.detect([image], verbose=1)

ax = get_ax(1)
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'], dataset.class_names, r['scores'], ax=ax, title='Predictions')

#Detect and Splash

In [None]:
detect_and_color_splash(model, image_path="/content/drive/My Drive/test/pepsi/1.jpg", save_dir="/content/drive/My Drive/")

In [None]:
detect_and_color_splash(model, image_path="/content/drive/My Drive/Computer Vision/test/m_dew/2.jpg", save_dir="/content/drive/My Drive/")

In [None]:
config = InferenceConfig()
with tf.device(DEVICE):
  model = modellib.MaskRCNN(mode="inference", model_dir=DEFAULT_LOGS_DIR, config=config)

model.load_weights(weights_path, by_name=True)

image_path = "/content/drive/My Drive/val/1406.jpg"

# Run object detection
image = skimage.io.imread(image_path)
results = model.detect([image], verbose=1)

ax = get_ax(1)
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'], ['BG', 'mdew', 'pepsi'], r['scores'], ax=ax, title='Predictions')

In [None]:
config = InferenceConfig()
with tf.device(DEVICE):
  model = modellib.MaskRCNN(mode="inference", model_dir=DEFAULT_LOGS_DIR, config=config)

model.load_weights(weights_path, by_name=True)

image_path = "/content/drive/My Drive/test/pepsi/4.jpg"

# Run object detection
image = skimage.io.imread(image_path)
results = model.detect([image], verbose=1)

ax = get_ax(1)
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'], ['BG', 'mdew', 'pepsi'], r['scores'], ax=ax, title='Predictions')

In [None]:
config = InferenceConfig()
with tf.device(DEVICE):
  model = modellib.MaskRCNN(mode="inference", model_dir=DEFAULT_LOGS_DIR, config=config)

model.load_weights(weights_path, by_name=True)

image_path = "/content/drive/My Drive/test/pepsi/10.jpg"

# Run object detection
image = skimage.io.imread(image_path)
results = model.detect([image], verbose=1)

ax = get_ax(1)
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'], ['BG', 'mdew', 'pepsi'], r['scores'], ax=ax, title='Predictions')

In [None]:
image_path = "/content/drive/My Drive/test/pepsi/9.jpg"

# Run object detection
image = skimage.io.imread(image_path)
results = model.detect([image], verbose=1)

ax = get_ax(1)
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'], ['BG', 'mdew', 'pepsi'], r['scores'], ax=ax, title='Predictions')

In [None]:
image_path = "/content/drive/My Drive/test/m_dew/8.jpg"

# Run object detection
image = skimage.io.imread(image_path)
results = model.detect([image], verbose=1)

ax = get_ax(1)
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'], ['BG', 'mdew', 'pepsi'], r['scores'], ax=ax, title='Predictions')

In [None]:
image_path = "/content/drive/My Drive/test/m_dew/4.jpg"

# Run object detection
image = skimage.io.imread(image_path)
results = model.detect([image], verbose=1)

ax = get_ax(1)
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'], ['BG', 'mdew', 'pepsi'], r['scores'], ax=ax, title='Predictions')

In [None]:
image_path = "/content/drive/My Drive/test/m_dew/3.jpg"

# Run object detection
image = skimage.io.imread(image_path)
results = model.detect([image], verbose=1)

ax = get_ax(1)
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'], ['BG', 'mdew', 'pepsi'], r['scores'], ax=ax, title='Predictions')

In [None]:
print(np.__version__)
import matplotlib
print(matplotlib.__version__)
print(skimage.__version__)
import keras
print(keras.__version__)