In [None]:
from warnings import filterwarnings
filterwarnings(action='ignore', category=DeprecationWarning, message='`np.bool` is a deprecated alias')
filterwarnings('ignore')

In [None]:
import os
import sys
import random
import math
import re
import time
import numpy as np
import cv2
import matplotlib
import fiona 
from shapely.geometry import shape, box
import rasterio
from PIL import Image, ImageDraw
import tensorflow as tf
import matplotlib.pyplot as plt
%matplotlib inline 

In [None]:
# Root directory of the project
ROOT_DIR = os.path.abspath("../")

# Import Mask RCNN
sys.path.append(ROOT_DIR)  # To find local version of the library
sys.path.append(os.path.join(ROOT_DIR, "mrcnn"))
sys.path.append("../../Mask-RCNN-TF2.7.0-keras2.7.0/samples/coco/")

In [None]:
from mrcnn.config import Config
from mrcnn import utils
import mrcnn.model as modellib
from mrcnn import visualize
from mrcnn.model import log

In [None]:
# Directory to save logs and trained model
MODEL_DIR = os.path.join(ROOT_DIR, "logs")

DATASET_DIR = os.path.join(ROOT_DIR, "Dataset")

# Local path to trained weights file
COCO_MODEL_PATH = "../models/mask_rcnn_coco.h5"
# Download COCO trained weights from Releases if needed
if not os.path.exists(COCO_MODEL_PATH):
    utils.download_trained_weights(COCO_MODEL_PATH)

In [None]:
# Check if GPU is available
tf.config.list_physical_devices('GPU')

### Dataset Preparation

In [None]:
class BuildingDataset(utils.Dataset):
     
    def load_dataset(self, dataset_dir, start=1, end=400):
        """Generate the requested number of synthetic images.
        count: number of images to generate.
        height, width: the size of the generated images.
        """
        # Add classes
        self.add_class("BuildingDataset", 1, "building")

        # define data locations for images and annotations
        images_dir = os.path.join(dataset_dir, "cropped_png_files/")
        annotations_dir = os.path.join(dataset_dir, "annotations/")


        for place_dir in os.listdir(images_dir)[start:end]:
            place_path = os.path.join(images_dir, place_dir)
            if os.path.isdir(place_path):
                annotation_file = f"{place_dir}_Builtup_Area.shp"
                annotation_path = os.path.join(annotations_dir, annotation_file)

                for image_dir in os.listdir(place_path):
                    image_dir_path = os.path.join(place_path, image_dir)
                    tif_dir_path = image_dir_path.replace("cropped_png_files", "cropped_tif_files")[:-4] + ".tif"
                    self.add_image('BuildingDataset', image_id=image_dir, path=image_dir_path, annotation=annotation_path, tif_path = tif_dir_path)

    def get_tif_bounding_box(self, tif_dataset):
        bounds = tif_dataset.bounds
        min_x, min_y, max_x, max_y = bounds.left, bounds.bottom, bounds.right, bounds.top
        return (min_x, min_y, max_x, max_y)

    def get_polygons(self, shp_path):
        # Open the shapefile and read the polygons:
        polygons = []
        with fiona.open(shp_path, "r") as shapefile:
            for feature in shapefile:
                geometry = shape(feature["geometry"])
                if(geometry.geom_type=="MultiPolygon"):
                    continue
                polygons.append((geometry, feature["properties"]))
        return polygons

    def get_filtered_polygons(self, polygons, tif_dataset):
        # Get the bounding box of the TIF file
        tif_bbox = self.get_tif_bounding_box(tif_dataset)
        
        # Filter the polygons based on the intersection with the TIF file's bounding box:
        filtered_polygons = []
        for polygon, properties in polygons:
            polygon_bbox = polygon.bounds
            if box(*tif_bbox).intersects(box(*polygon_bbox)):
                filtered_polygons.append(polygon)

        return filtered_polygons
    
    def fill_between(self, polygon, height, width):
        """
        Returns: a bool array
        """
        img = Image.new('1', (width, height), False)
        ImageDraw.Draw(img).polygon(polygon, outline=True, fill=True)
        mask = np.array(img)

        return mask
    
    def load_mask(self, image_id):
        tif_file_path = self.image_info[image_id]['tif_path']
        shp_file_path = self.image_info[image_id]['annotation']
        
        tif_dataset = rasterio.open(tif_file_path)
        height = tif_dataset.height
        width = tif_dataset.width
        transform = tif_dataset.transform

        polygons = self.get_polygons(shp_file_path)
        filtered_polygons = self.get_filtered_polygons(polygons, tif_dataset)
        
        masks = np.zeros((height, width, len(filtered_polygons)), dtype=np.uint8)

        for idx, polygon in enumerate(filtered_polygons):
            coordinates = list()
            for point in polygon.exterior.coords:
                x, y = point
                pixel_x, pixel_y = ~transform * (x, y)
                pixel_x = width - 1 if pixel_x > width else pixel_x
                pixel_y = height - 1 if pixel_y > height else pixel_y
                coordinates.append((pixel_x, pixel_y))
            
            mask = self.fill_between(coordinates, height, width)
            masks[:, :, idx] = mask
        
        class_ids = np.asarray([1]*masks.shape[2])

        return masks.astype(np.bool), class_ids.astype(np.int32)

In [None]:
# Training dataset
dataset = BuildingDataset()
dataset.load_dataset(DATASET_DIR, 0, 58)
dataset.prepare()

In [None]:
# Validation dataset
dataset_val = BuildingDataset()
dataset_val.load_dataset(DATASET_DIR, 58, 65)
dataset_val.prepare()

In [None]:
# Validation dataset
dataset_test = BuildingDataset()
dataset_test.load_dataset(DATASET_DIR, 67, 72)
dataset_test.prepare()

In [None]:
print("Image Count: {}".format(len(dataset.image_ids)))
print("Class Count: {}".format(dataset.num_classes))
for i, info in enumerate(dataset.class_info):
    print("{:3}. {:50}".format(i, info['name']))

In [None]:
# Load and display random samples
image_ids = np.random.choice(dataset.image_ids, 4)
print(image_ids)
for image_id in image_ids:
    image = dataset.load_image(image_id)
    mask, class_ids = dataset.load_mask(image_id)
    visualize.display_top_masks(image, mask, class_ids, dataset.class_names)

In [None]:
import coco
class TrainingConfig(coco.CocoConfig):
    NAME = "BuildingDetection"
    BACKBONE = "resnet101"
    NUM_CLASSES = 2 #bulding and background
    IMAGE_MAX_DIM = 1024
    IMAGE_MIN_DIM = 1024
    # MINI_MASK_SHAPE = (128, 128)
    # USE_MINI_MASK = True
    BACKBONE_STRIDES = [8, 16, 32, 64]
    RPN_ANCHOR_SCALES = (64, 128, 256, 512)

config = TrainingConfig()

In [None]:
# Load random image and mask.
image_id = np.random.choice(dataset.image_ids, 1)[0]
image_id = 13166
image = dataset.load_image(image_id)
mask, class_ids = dataset.load_mask(image_id)
original_shape = image.shape
# Resize
image, window, scale, padding, _ = utils.resize_image(
    image, 
    min_dim=config.IMAGE_MIN_DIM, 
    max_dim=config.IMAGE_MAX_DIM,
    mode=config.IMAGE_RESIZE_MODE)
mask = utils.resize_mask(mask, scale, padding)
# Compute Bounding box
bbox = utils.extract_bboxes(mask)

# Display image and additional stats
print("image_id: ", image_id, dataset.image_reference(image_id))
print("Original shape: ", original_shape)
log("image", image)
log("mask", mask)
log("class_ids", class_ids)
log("bbox", bbox)
# Display image and instances
visualize.display_instances(image, bbox, mask, class_ids, dataset.class_names)

In [None]:
import imgaug.augmenters as iaa
from imgaug.augmentables.bbs import BoundingBox, BoundingBoxesOnImage

image_augmenter = iaa.Sequential([
    iaa.Fliplr(0.5),             # Horizontal flips with 50% probability
    iaa.Flipud(0.5),             # Vertical flips with 50% probability
    iaa.Sometimes(0.5, iaa.Affine(rotate=(0, 90))), 
    iaa.Sometimes(0.5, iaa.Affine(rotate=(90, 180))),
    iaa.Sometimes(0.5, iaa.Affine(rotate=(180, 270))),
    iaa.Sometimes(0.5, iaa.GaussianBlur(sigma=(0, 0.5))),  # Random Gaussian blur
])


In [None]:
image, image_meta, class_ids, bbox, mask = modellib.load_image_gt(dataset, TrainingConfig, 43, augmentation=image_augmenter)
visualize.display_instances(image, bbox, mask, class_ids, dataset.class_names)

In [None]:
image_id = np.random.choice(dataset.image_ids, 1)[0]
# image_id = 13166
image, image_meta, class_ids, bbox, mask = modellib.load_image_gt(
    dataset, config, image_id)

log("image", image)
log("image_meta", image_meta)
log("class_ids", class_ids)
log("bbox", bbox)
log("mask", mask)

visualize.display_images([image]+[mask[:,:,i] for i in range(min(mask.shape[-1], 12))])

In [None]:
# Generate Anchors
backbone_shapes = modellib.compute_backbone_shapes(config, config.IMAGE_SHAPE)
anchors = utils.generate_pyramid_anchors(config.RPN_ANCHOR_SCALES, 
                                          config.RPN_ANCHOR_RATIOS,
                                          backbone_shapes,
                                          config.BACKBONE_STRIDES, 
                                          config.RPN_ANCHOR_STRIDE)
print(config.BACKBONE_STRIDES)
print(backbone_shapes)
# Print summary of anchors
num_levels = len(backbone_shapes)
anchors_per_cell = len(config.RPN_ANCHOR_RATIOS)
print("Count: ", anchors.shape[0])
print("Scales: ", config.RPN_ANCHOR_SCALES)
print("ratios: ", config.RPN_ANCHOR_RATIOS)
print("Anchors per Cell: ", anchors_per_cell)
print("Levels: ", num_levels)
anchors_per_level = []
for l in range(num_levels):
    num_cells = backbone_shapes[l][0] * backbone_shapes[l][1]
    anchors_per_level.append(anchors_per_cell * num_cells // config.RPN_ANCHOR_STRIDE**2)
    print("Anchors in Level {}: {}".format(l, anchors_per_level[l]))

In [None]:
import matplotlib.patches as patches
import matplotlib.lines as lines
from matplotlib.patches import Polygon

In [None]:
## Visualize anchors of one cell at the center of the feature map of a specific level

# Load and draw random image
image_id = np.random.choice(dataset.image_ids, 1)[0]
image, image_meta, _, _, _ = modellib.load_image_gt(dataset, config, image_id)
fig, ax = plt.subplots(1, figsize=(10, 10))
ax.imshow(image)
levels = len(backbone_shapes)
print(backbone_shapes)

for level in range(levels):
    colors = visualize.random_colors(levels)
    # Compute the index of the anchors at the center of the image
    level_start = sum(anchors_per_level[:level]) # sum of anchors of previous levels
    level_anchors = anchors[level_start:level_start+anchors_per_level[level]]
    print("Level {}. Anchors: {:6}  Feature map Shape: {}".format(level, level_anchors.shape[0], 
                                                                  backbone_shapes[level]))
    center_cell = backbone_shapes[level] // 2
    center_cell_index = (center_cell[0] * backbone_shapes[level][1] + center_cell[1])
    level_center = center_cell_index * anchors_per_cell 
    center_anchor = anchors_per_cell * (
        (center_cell[0] * backbone_shapes[level][1] / config.RPN_ANCHOR_STRIDE**2) \
        + center_cell[1] / config.RPN_ANCHOR_STRIDE)
    level_center = int(center_anchor)

    # Draw anchors. Brightness show the order in the array, dark to bright.
    for i, rect in enumerate(level_anchors[level_center:level_center+anchors_per_cell]):
        y1, x1, y2, x2 = rect
        p = patches.Rectangle((x1, y1), x2-x1, y2-y1, linewidth=2, facecolor='none',
                              edgecolor=(i+1)*np.array(colors[level]) / anchors_per_cell)
        ax.add_patch(p)


In [None]:
# Create data generator
random_rois = 20
g = modellib.DataGenerator(
    dataset, config, shuffle=True, random_rois=random_rois, 
    detection_targets=True)

In [None]:
[normalized_images, image_meta, rpn_match, rpn_bbox, gt_class_ids, gt_boxes, gt_masks, rpn_rois, rois], \
    [mrcnn_class_ids, mrcnn_bbox, mrcnn_mask] = g.__getitem__(32)

In [None]:
log("rois", rois)
log("mrcnn_class_ids", mrcnn_class_ids)
log("mrcnn_bbox", mrcnn_bbox)
log("mrcnn_mask", mrcnn_mask)

log("gt_class_ids", gt_class_ids)
log("gt_boxes", gt_boxes)
log("gt_masks", gt_masks)
log("rpn_match", rpn_match, )
log("rpn_bbox", rpn_bbox)
image_id = modellib.parse_image_meta(image_meta)["image_id"][0]
print("image_id: ", image_id, dataset.image_reference(image_id))

# Remove the last dim in mrcnn_class_ids. It's only added
# to satisfy Keras restriction on target shape.
mrcnn_class_ids = mrcnn_class_ids[:,:,0]

In [None]:
b = 0

# Restore original image (reverse normalization)
sample_image = modellib.unmold_image(normalized_images[b], config)

# Compute anchor shifts.
indices = np.where(rpn_match[b] == 1)[0]
refined_anchors = utils.apply_box_deltas(anchors[indices], rpn_bbox[b, :len(indices)] * config.RPN_BBOX_STD_DEV)
log("anchors", anchors)
log("refined_anchors", refined_anchors)

# Get list of positive anchors
positive_anchor_ids = np.where(rpn_match[b] == 1)[0]
print("Positive anchors: {}".format(len(positive_anchor_ids)))
negative_anchor_ids = np.where(rpn_match[b] == -1)[0]
print("Negative anchors: {}".format(len(negative_anchor_ids)))
neutral_anchor_ids = np.where(rpn_match[b] == 0)[0]
print("Neutral anchors: {}".format(len(neutral_anchor_ids)))

# ROI breakdown by class
for c, n in zip(dataset.class_names, np.bincount(mrcnn_class_ids[b].flatten())):
    if n:
        print("{:23}: {}".format(c[:20], n))

# Show positive anchors
visualize.draw_boxes(sample_image, boxes=anchors[positive_anchor_ids], 
                     refined_boxes=refined_anchors)

In [None]:
# Show negative anchors
visualize.draw_boxes(sample_image, boxes=anchors[negative_anchor_ids])

In [None]:
# Show neutral anchors. They don't contribute to training.
visualize.draw_boxes(sample_image, boxes=anchors[np.random.choice(neutral_anchor_ids, 100)])

In [None]:
if random_rois:
    # Class aware bboxes
    bbox_specific = mrcnn_bbox[b, np.arange(mrcnn_bbox.shape[1]), mrcnn_class_ids[b], :]

    # Refined ROIs
    refined_rois = utils.apply_box_deltas(rois[b].astype(np.float32), bbox_specific[:,:4] * config.BBOX_STD_DEV)

    # Class aware masks
    mask_specific = mrcnn_mask[b, np.arange(mrcnn_mask.shape[1]), :, :, mrcnn_class_ids[b]]

    visualize.draw_rois(sample_image, rois[b], refined_rois, mask_specific, mrcnn_class_ids[b], dataset.class_names)
    
    # Any repeated ROIs?
    rows = np.ascontiguousarray(rois[b]).view(np.dtype((np.void, rois.dtype.itemsize * rois.shape[-1])))
    _, idx = np.unique(rows, return_index=True)
    print("Unique ROIs: {} out of {}".format(len(idx), rois.shape[1]))

In [None]:
# Check ratio of positive ROIs in a set of images.
if random_rois:
    limit = 10
    temp_g = modellib.DataGenerator(
        dataset, config, shuffle=True, random_rois=10000, detection_targets=True)
    total = 0
    for i in range(limit):
        _, [ids, _, _] = temp_g.__getitem__(i)
        positive_rois = np.sum(ids[0] > 0)
        total += positive_rois
        print("{:5} {:5.2f}".format(positive_rois, positive_rois/ids.shape[1]))
    print("Average percent: {:.2f}".format(total/(limit*ids.shape[1])))

In [None]:
# Check ratio of positive ROIs in a set of images.
if random_rois:
    limit = 10
    temp_g = modellib.DataGenerator(
        dataset, config, shuffle=True, random_rois=10000, detection_targets=True).__getitem__()
    total = 0
    for i in range(limit):
        _, [ids, _, _] = next(temp_g)
        positive_rois = np.sum(ids[0] > 0)
        total += positive_rois
        print("{:5} {:5.2f}".format(positive_rois, positive_rois/ids.shape[1]))
    print("Average percent: {:.2f}".format(total/(limit*ids.shape[1])))

In [None]:
# Get Next Image
if random_rois:
    [normalized_images, image_meta, rpn_match, rpn_bbox, gt_class_ids, gt_boxes, gt_masks, rpn_rois, rois], \
    [mrcnn_class_ids, mrcnn_bbox, mrcnn_mask] = next(g)
    
    log("rois", rois)
    log("mrcnn_class_ids", mrcnn_class_ids)
    log("mrcnn_bbox", mrcnn_bbox)
    log("mrcnn_mask", mrcnn_mask)
else:
    [normalized_images, image_meta, rpn_match, rpn_bbox, gt_boxes, gt_masks], _ = next(g)
    
log("gt_class_ids", gt_class_ids)
log("gt_boxes", gt_boxes)
log("gt_masks", gt_masks)
log("rpn_match", rpn_match, )
log("rpn_bbox", rpn_bbox)
image_id = modellib.parse_image_meta(image_meta)["image_id"][0]
print("image_id: ", image_id, dataset.image_reference(image_id))

# Remove the last dim in mrcnn_class_ids. It's only added
# to satisfy Keras restriction on target shape.
mrcnn_class_ids = mrcnn_class_ids[:,:,0]

### Training

In [None]:
class TrainingConfig(Config):
    NAME = "BuildingDetection"
    GPU_COUNT = 1
    IMAGES_PER_GPU = 4
    STEPS_PER_EPOCH = 3200
    VALIDATION_STEPS = 380
    BACKBONE = "resnet101"
    NUM_CLASSES = 2 #bulding and background
    USE_MINI_MASK = False
    IMAGE_MAX_DIM = 1024
    IMAGE_MIN_DIM = 1024
    MAX_GT_INSTANCES = 50
    USE_MINI_MASK = True
    LOSS_WEIGHTS = {
        "rpn_class_loss": 1.,
        "rpn_bbox_loss": 1.,
        "mrcnn_class_loss": 1.,
        "mrcnn_bbox_loss": 1.,
        "mrcnn_mask_loss": 10.
    }

In [None]:
train_config = TrainingConfig()
train_config.display()

In [None]:
model_train = modellib.MaskRCNN(mode="training", config=train_config, model_dir=MODEL_DIR)

In [None]:
class InferenceConfig(TrainingConfig):
    GPU_COUNT = 1
    IMAGES_PER_GPU = 1

inference_config = InferenceConfig()

In [None]:
model_inference = modellib.MaskRCNN(mode="inference", config=inference_config, model_dir=MODEL_DIR)

In [None]:
# Which weights to start with?
init_with = "coco"  # imagenet, coco, or last

if init_with == "imagenet":
    model_train.load_weights(model_train.get_imagenet_weights(), by_name=True)
elif init_with == "coco":
    # Load weights trained on MS COCO, but skip layers that
    # are different due to the different number of classes
    # See README for instructions to download the COCO weights
    model_train.load_weights(COCO_MODEL_PATH, by_name=True,
                       exclude=["mrcnn_class_logits", "mrcnn_bbox_fc", 
                                "mrcnn_bbox", "mrcnn_mask"])
elif init_with == "last":
    # Load the last model you trained and continue training
    model_train.load_weights(model_train.find_last(), by_name=True)

In [None]:
mean_average_precision_callback = modellib.MeanAveragePrecisionCallback(model_train, model_inference, dataset_val, calculate_map_at_every_X_epoch=5, verbose=1)

In [None]:
# Fine tune all layers
# Passing layers="all" trains all layers.
model_train.train(dataset_train, dataset_val, 
            learning_rate=train_config.LEARNING_RATE,
            epochs=200, 
            layers="all")

In [None]:
# Save weights
# Typically not needed because callbacks save after every epoch
# Uncomment to save manually
model_path = os.path.join(MODEL_DIR, "mask_rcnn_buildingdetection_00008.h5")
model_train.keras_model.save_weights(model_path)

### Inference Part

In [None]:
# Get path to saved weights
# Either set a specific path or find last trained weights
model_path = os.path.join(MODEL_DIR, "buildingdetection20230727T1322/mask_rcnn_buildingdetection_0005.h5")
# model_path = model_inference.find_last()

# Load trained weights
print("Loading weights from ", model_path)
model_inference.load_weights(model_path, by_name=True)

In [None]:
# Test on a random image
image_id = random.choice(dataset_val.image_ids)
print(image_id)
original_image, image_meta, gt_class_id, gt_bbox, gt_mask =\
    modellib.load_image_gt(dataset_val, inference_config, 
                           image_id)

log("original_image", original_image)
log("image_meta", image_meta)
log("gt_class_id", gt_class_id)
log("gt_bbox", gt_bbox)
log("gt_mask", gt_mask)

visualize.display_instances(original_image, gt_bbox, gt_mask, gt_class_id, 
                            dataset_train.class_names, figsize=(8, 8))

In [None]:
results = model_inference.detect([original_image], verbose=1)

r = results[0]
visualize.display_instances(original_image, r['rois'], r['masks'], r['class_ids'], 
                            dataset_val.class_names, r['scores'])

In [None]:
# Compute VOC-Style mAP @ IoU=0.5
# Running on 10 images. Increase for better accuracy.
image_ids = dataset_test.image_ids

APs = []
for image_id in image_ids:
    # Load image and ground truth data
    image, image_meta, gt_class_id, gt_bbox, gt_mask =\
        modellib.load_image_gt(dataset_test, inference_config,
                               image_id)
    molded_images = np.expand_dims(modellib.mold_image(image, inference_config), 0)
    # Run object detection
    results = model_inference.detect([image], verbose=0)
    r = results[0]
    # Compute AP
    AP, precisions, recalls, overlaps =\
        utils.compute_ap(gt_bbox, gt_class_id, gt_mask,
                         r["rois"], r["class_ids"], r["scores"], r['masks'])
    if(np.isnan(AP)):
        continue
    APs.append(AP)
    
print("mAP: ", np.mean(APs))