### Importing Libraries and Defining Paths

In [None]:
from warnings import filterwarnings
filterwarnings(action='ignore', category=DeprecationWarning, message='`np.bool` is a deprecated alias')
filterwarnings('ignore')

In [None]:
import os
import sys
import random
import math
import re
import time
import numpy as np
import cv2
import matplotlib
import fiona 
from shapely.geometry import shape, box
import rasterio
from PIL import Image, ImageDraw
import tensorflow as tf
import imgaug.augmenters as iaa
import matplotlib.pyplot as plt
%matplotlib inline 
from patchify import patchify, unpatchify
from shapely.geometry import Polygon, MultiPolygon
import geopandas as gpd
import copy

In [None]:
# Root directory of the project
ROOT_DIR = os.path.abspath("../")

# Import Mask RCNN
sys.path.append(ROOT_DIR)  # To find local version of the library
sys.path.append(os.path.join(ROOT_DIR, "mrcnn"))

In [None]:
from mrcnn.config import Config
from mrcnn import utils
import mrcnn.model as modellib
from mrcnn import visualize
from mrcnn.model import log

In [None]:
# Directory to save logs and trained model
MODEL_DIR = os.path.join(ROOT_DIR, "logs")

DATASET_DIR = os.path.join(ROOT_DIR, "Dataset")

# Local path to trained weights file
COCO_MODEL_PATH = "../models/mask_rcnn_coco.h5"
# Download COCO trained weights from Releases if needed
if not os.path.exists(COCO_MODEL_PATH):
    utils.download_trained_weights(COCO_MODEL_PATH)

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')

In [None]:
# Check if GPU is available
tf.config.list_physical_devices('GPU')

### Dataset Preparation and Loading

In [None]:
class BuildingDataset(utils.Dataset):
     
    def load_dataset(self, dataset_dir, start=1, end=400):
        """Generate the requested number of synthetic images.
        count: number of images to generate.
        height, width: the size of the generated images.
        """
        # Add classes
        self.add_class("BuildingDataset", 1, "building")

        # define data locations for images and annotations
        images_dir = os.path.join(dataset_dir, "cropped_png_files/")
        annotations_dir = os.path.join(dataset_dir, "annotations/")


        for place_dir in os.listdir(images_dir)[start:end]:
            place_path = os.path.join(images_dir, place_dir)
            if os.path.isdir(place_path):
                annotation_file = f"{place_dir}_Builtup_Area.shp"
                annotation_path = os.path.join(annotations_dir, annotation_file)

                for image_dir in os.listdir(place_path):
                    if image_dir.lower().endswith('.png'):
                        image_dir_path = os.path.join(place_path, image_dir)
                        tif_dir_path = image_dir_path.replace("cropped_png_files", "cropped_tif_files")[:-4] + ".tif"
                        self.add_image('BuildingDataset', image_id=image_dir, path=image_dir_path, annotation=annotation_path, tif_path = tif_dir_path)

    def get_tif_bounding_box(self, tif_dataset):
        bounds = tif_dataset.bounds
        min_x, min_y, max_x, max_y = bounds.left, bounds.bottom, bounds.right, bounds.top
        return (min_x, min_y, max_x, max_y)

    def get_polygons(self, shp_path):
        # Open the shapefile and read the polygons:
        polygons = []
        with fiona.open(shp_path, "r") as shapefile:
            for feature in shapefile:
                geometry = shape(feature["geometry"])
                if(geometry.geom_type=="MultiPolygon"):
                    continue
                polygons.append((geometry, feature["properties"]))
        return polygons

    def get_filtered_polygons(self, polygons, tif_dataset):
        # Get the bounding box of the TIF file
        tif_bbox = self.get_tif_bounding_box(tif_dataset)
        
        # Filter the polygons based on the intersection with the TIF file's bounding box:
        filtered_polygons = []
        for polygon, properties in polygons:
            polygon_bbox = polygon.bounds
            if box(*tif_bbox).intersects(box(*polygon_bbox)):
                filtered_polygons.append(polygon)

        return filtered_polygons
    
    def fill_between(self, polygon, height, width):
        """
        Returns: a bool array
        """
        img = Image.new('1', (width, height), False)
        ImageDraw.Draw(img).polygon(polygon, outline=True, fill=True)
        mask = np.array(img)

        return mask
    
    def load_mask(self, image_id):
        tif_file_path = self.image_info[image_id]['tif_path']
        shp_file_path = self.image_info[image_id]['annotation']
        
        tif_dataset = rasterio.open(tif_file_path)
        height = tif_dataset.height
        width = tif_dataset.width
        transform = tif_dataset.transform

        polygons = self.get_polygons(shp_file_path)
        filtered_polygons = self.get_filtered_polygons(polygons, tif_dataset)
        
        masks = np.zeros((height, width, len(filtered_polygons)), dtype=np.uint8)

        for idx, polygon in enumerate(filtered_polygons):
            coordinates = list()
            for point in polygon.exterior.coords:
                x, y = point
                pixel_x, pixel_y = ~transform * (x, y)
                pixel_x = width - 1 if pixel_x > width else pixel_x
                pixel_y = height - 1 if pixel_y > height else pixel_y
                coordinates.append((pixel_x, pixel_y))
            
            mask = self.fill_between(coordinates, height, width)
            masks[:, :, idx] = mask
        
        class_ids = np.asarray([1]*masks.shape[2])

        return masks.astype(np.bool), class_ids.astype(np.int32)

In [None]:
# Training dataset
dataset_train = BuildingDataset()
dataset_train.load_dataset(DATASET_DIR, 0, 58)
dataset_train.prepare()

In [None]:
# Validation dataset
dataset_val = BuildingDataset()
dataset_val.load_dataset(DATASET_DIR, 58, 65)
dataset_val.prepare()

In [None]:
# Validation dataset
dataset_test = BuildingDataset()
dataset_test.load_dataset(DATASET_DIR, 65, 72)
dataset_test.prepare()

In [None]:
# Load and display random samples
image_ids = np.random.choice(dataset_train.image_ids,4)
for image_id in image_ids:
    image = dataset_train.load_image(image_id)
    mask, class_ids = dataset_train.load_mask(image_id)
    visualize.display_top_masks(image, mask, class_ids, dataset_train.class_names)

### Configurations

In [None]:
class TrainingConfig(Config):
    NAME = "BuildingDetection"
    GPU_COUNT = 1
    IMAGES_PER_GPU = 4
    STEPS_PER_EPOCH = 2770
    VALIDATION_STEPS = 234
    BACKBONE = "resnet101"
    NUM_CLASSES = 2 #bulding and background
    IMAGE_MAX_DIM = 1024
    IMAGE_MIN_DIM = 1024

    TRAIN_ROIS_PER_IMAGE = 50
    
    MAX_GT_INSTANCES = 50
    LOSS_WEIGHTS = {
        "rpn_class_loss": 2.,
        "rpn_bbox_loss": 1.,
        "mrcnn_class_loss": 1.,
        "mrcnn_bbox_loss": 1.,
        "mrcnn_mask_loss": 10.
    }

In [None]:
train_config = TrainingConfig()
# train_config.display()

In [None]:
model_train = modellib.MaskRCNN(mode="training", config=train_config, model_dir=MODEL_DIR)

### Training

In [None]:
# Which weights to start with?
init_with = "last"  # imagenet, coco, or last

if init_with == "imagenet":
    model_train.load_weights(model_train.get_imagenet_weights(), by_name=True)
elif init_with == "coco":
    # Load weights trained on MS COCO, but skip layers that
    # are different due to the different number of classes
    # See README for instructions to download the COCO weights
    model_train.load_weights(COCO_MODEL_PATH, by_name=True,
                       exclude=["mrcnn_class_logits", "mrcnn_bbox_fc", 
                                "mrcnn_bbox", "mrcnn_mask"])
elif init_with == "last":
    # Load the last model you trained and continue training
    model_train.load_weights(model_train.find_last(), by_name=True)
else:
    model_weight = "../logs/mask_rcnn_buildingdetection_0006.h5"
    model_train.load_weights(model_weight, by_name = True)

In [None]:
# mean_average_precision_callback = modellib.MeanAveragePrecisionCallback(model_train, model_inference, dataset_val, calculate_map_at_every_X_epoch=1, verbose=1)

In [None]:
image_augmenter = iaa.Sequential([
    iaa.Fliplr(0.5),             # Horizontal flips with 50% probability
    iaa.Flipud(0.5),             # Vertical flips with 50% probability
    iaa.Sometimes(0.5, iaa.Affine(rotate=(90, 90))), 
    iaa.Sometimes(0.5, iaa.Affine(rotate=(180, 180))),
    iaa.Sometimes(0.5, iaa.Affine(rotate=(2700, 270))),
    iaa.Sometimes(0.5, iaa.GaussianBlur(sigma=(0, 0.5))),  # Random Gaussian blur
])

In [None]:
# Fine tuning all layers
model_train.train(dataset_train, dataset_val, 
            learning_rate=train_config.LEARNING_RATE,
            epochs=200, 
            layers="all", augmentation = image_augmenter)

In [None]:
# Save weights
# Typically not needed because callbacks save after every epoch
# Uncomment to save manually
# model_path = os.path.join(MODEL_DIR, "mask_rcnn_buildingdetection_00008.h5")
# model_train.keras_model.save_weights(model_path)