### <center>Chestnut Bur Detection and Segmentation using MaskRCNN in PyTorch</center>

In [1]:
%matplotlib inline

import pandas as pd
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, Subset, DataLoader
import torchvision
from torchvision import transforms as _transforms, tv_tensors
import torchvision.transforms.v2 as T
from cv2 import fillPoly
import json
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
from pathlib import Path
from matplotlib.patches import Polygon
import shutil

##### <center> Load the image and annotation data </center>

In [None]:
# load annotations from json file here: "C:\Users\exx\Downloads\Route 9 Orchard 4.v1-test_dataset.coco-segmentation\train\_annotations.coco.json"
annos = json.load(open("C:/Users/exx/Downloads/Route 9 Orchard 4.v1-test_dataset.coco-segmentation/train/_annotations.coco.json"))

In [None]:
for key in annos.keys():
    print(annos[key])

In [4]:
# convert the annos dict to a df, where each row is an image and the columns are the file name, the category name,
# the polygon coords, bbox, area, and iscrowd
annos_df = pd.DataFrame(annos["annotations"])
df = pd.DataFrame()
df["tree_id"] = annos_df["image_id"].apply(lambda x: annos["images"][x]["file_name"].split("_")[0])
df["file_name"] = annos_df["image_id"].apply(lambda x: annos["images"][x]["file_name"])
df["file_name"] = df["file_name"].apply(lambda x: x.split("_")[0] + ".png")
categories = [cat["name"] for cat in annos["categories"]]
df["category_name"] = annos_df["category_id"].apply(lambda x: categories[x])
df["bbox"] = annos_df["bbox"].apply(lambda x: torch.tensor(x))
df["area"] = annos_df["area"].apply(lambda x: torch.tensor(x))
df["segmentation"] = annos_df["segmentation"].apply(lambda x: torch.tensor(x))
df["iscrowd"] = annos_df["iscrowd"]

In [5]:
image_dir = "C:\\Users\\exx\\EasyIDP\\Route9_Orchard4\\Outputs\\Roboflow\\images"

image_names = df["file_name"].unique()

filtered_image_dir = "C:\\Users\\exx\\Deep Learning\\Chestnut_Bur_Instance_Segmentation\\filtered_images"
Path(filtered_image_dir).mkdir(exist_ok=True)
for image_name in image_names:
    shutil.copy(image_dir + "\\" + image_name, filtered_image_dir)

##### <center> Plot sample image and annotation data </center>

In [None]:
def mask_fill(mask, polys, color):
    for poly in polys:
        fillPoly(mask, [poly], color)
    return mask

def plot_images_with_masks(image_dir: Path, tree_id_list, df: pd.DataFrame):
    fig, axs = plt.subplots(1, len(tree_id_list), figsize=(20, 10))
    for i, tree_id in enumerate(tree_id_list):
        image_name = df[df["tree_id"] == tree_id]["file_name"].values[0]
        image = Image.open(Path(image_dir) / image_name)

        canopy_poly = df[(df["tree_id"] == tree_id) & (df["category_name"] == "canopy")]["segmentation"].values
        bur_poly = df[(df["tree_id"] == tree_id) & (df["category_name"] == "chestnut bur")]["segmentation"].values

        canopy_poly = [np.array(poly[0]).reshape(-1, 2).astype(np.int32) for poly in canopy_poly]
        bur_poly = [np.array(poly[0]).reshape(-1, 2).astype(np.int32) for poly in bur_poly]

        bur_masks = []
        for poly in bur_poly:
            bur_mask = np.zeros((image.height, image.width), dtype=np.uint8)
            bur_mask = mask_fill(bur_mask, [poly], 2)
            bur_masks.append(bur_mask)

        # Stack all masks together
        if bur_masks:
            background_mask = np.zeros((image.height, image.width, 1), dtype=np.uint8)
            canopy_mask = np.zeros((image.height, image.width, 1), dtype=np.uint8)
            canopy_mask = mask_fill(canopy_mask, canopy_poly, 1)
            bur_masks_stacked = np.stack(bur_masks, axis=-1)
            mask_image = np.concatenate([background_mask, canopy_mask, bur_masks_stacked], axis=-1)
        else:
            background_mask = np.zeros((image.height, image.width), dtype=np.uint8)
            canopy_mask = np.zeros((image.height, image.width), dtype=np.uint8)
            canopy_mask = mask_fill(canopy_mask, canopy_poly, 1)
            mask_image = np.stack((background_mask, canopy_mask), axis=-1)

        axs[i].imshow(image)
        axs[i].imshow(mask_image.sum(axis=-1), alpha=0.5)
        axs[i].axis("off")
        axs[i].set_title(f'Tree ID: {tree_id}')

    plt.show()

plot_images_with_masks(image_dir, ["100", "11"], df)

##### <center> Pre-process and transform image and annotation data </center>

##### Adapted from: https://pytorch.org/tutorials/intermediate/torchvision_tutorial.html#an-instance-segmentation-model-for-pennfudan-dataset

In [7]:
# Custom dataset loader (PyTorch) for loading images and annotation data
class ChestnutBurSegmentation(Dataset):
    """Custom Dataset for Chestnut Bur Segmentation in UAV Images"""

    def __init__(self, image_dir, df, transform=None):
        self.image_dir = image_dir
        self.df = df
        self.transform = transform
        self.unique_tree_ids = self.df["tree_id"].unique()

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        tree_id = self.unique_tree_ids[idx]

        row = self.df[self.df["tree_id"] == tree_id].iloc[0]

        image_file = Path(self.image_dir) / row["file_name"]

        image = tv_tensors.Image(Image.open(image_file))

        height, width = image.shape[-2:]

        canopy_poly = self.df[(self.df["tree_id"] == tree_id) & (self.df["category_name"] == "canopy")]["segmentation"].values
        bur_poly = self.df[(self.df["tree_id"] == tree_id) & (self.df["category_name"] == "chestnut bur")]["segmentation"].values

        canopy_poly = [np.array(poly[0]).reshape(-1, 2).astype(np.int32) for poly in canopy_poly]
        bur_poly = [np.array(poly[0]).reshape(-1, 2).astype(np.int32) for poly in bur_poly]

        canopy_bbox = self.df[(self.df["tree_id"] == tree_id) & (self.df["category_name"] == "canopy")]["bbox"].values
        canopy_bbox = [torch.tensor([canopy_bbox[0][0], canopy_bbox[0][1], canopy_bbox[0][2], canopy_bbox[0][3]], dtype = torch.float32)]
        canopy_bbox = torch.stack([bbox for bbox in canopy_bbox], dim=0) # (n_objects, 4)

        canopy_mask = np.zeros((height, width), dtype=np.uint8)
        canopy_mask = mask_fill(canopy_mask, canopy_poly, 1)

        bur_masks = []
        # One mask per bur. Store each bur mask in a list to stack later. 
        for poly in bur_poly:
            bur_mask = np.zeros((height, width), dtype=np.uint8)
            bur_mask = mask_fill(bur_mask, [poly], 2)
            bur_masks.append(bur_mask)

        if bur_masks:
            background_mask = np.zeros((height, width, 1), dtype=np.uint8)
            bur_masks_stacked = np.stack(bur_masks, axis=-1)
            mask_image = np.concatenate([background_mask, bur_masks_stacked], axis=-1).transpose(2, 0, 1)
            labels = self.df[(self.df["tree_id"] == tree_id) & (self.df["category_name"] == "chestnut bur")]["category_name"].values
            labels = [categories.index(label) for label in labels]
            bboxes = self.df[(self.df["tree_id"] == tree_id) & (self.df["category_name"] == "chestnut bur")]["bbox"].values
            bboxes = [torch.tensor([bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]], dtype = torch.float32) for bbox in bboxes] # convert to xyxy format
            bboxes = torch.stack([bbox for bbox in bboxes], dim=0) # (n_objects, 4)
            area = self.df[(self.df["tree_id"] == tree_id) & (self.df["category_name"] == "chestnut bur")]["area"].values
            iscrowd = self.df[(self.df["tree_id"] == tree_id) & (self.df["category_name"] == "chestnut bur")]["iscrowd"].values
        else:
            background_mask = np.zeros((height, width), dtype=np.uint8)
            mask_image = np.stack((background_mask, canopy_mask), axis=-1).transpose(2, 0, 1)
            labels = self.df[(self.df["tree_id"] == tree_id) & (self.df["category_name"] == "canopy")]["category_name"].values
            labels = [categories.index(label) for label in labels]
            bboxes = canopy_bbox
            area = self.df[(self.df["tree_id"] == tree_id) & (self.df["category_name"] == "canopy")]["area"].values
            iscrowd = self.df[(self.df["tree_id"] == tree_id) & (self.df["category_name"] == "canopy")]["iscrowd"].values

        # fill image background (outside of tree canopy)
        image = image * np.array(canopy_mask).astype(np.uint8)

        target = {}
        target["boxes"] = tv_tensors.BoundingBoxes(bboxes, format=tv_tensors.BoundingBoxFormat.XYXY, canvas_size=(height, width)) 
        target["labels"] = torch.as_tensor(labels, dtype = torch.int64) # (n_objects)
        target["image_id"] = int(tree_id)
        target["area"] = torch.stack([a.clone().detach().float() for a in area], dim=0) # (n_objects)
        target["iscrowd"] = torch.stack([torch.tensor(ic, dtype = torch.int64) for ic in iscrowd], dim=0) # (n_objects)
            
        if self.transform is not None:
            # normalize image using imagenet stats
            image = T.Compose([T.ToDtype(torch.float32, scale=True),
                               T.ColorJitter(), 
                               T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])(image)
            image_and_mask = torch.cat((image, torch.tensor(mask_image, dtype=torch.float32)), dim=0)
            image_and_mask, target = self.transform(image_and_mask, target)
            image = image_and_mask[:image.shape[0], :, :] # slice out the transformed image
            mask_image = image_and_mask[image.shape[0]:, :].clone().detach().to(torch.uint8) # slice out the transformed mask
            target["masks"] = mask_image
        else:
            image = image.to(torch.float32)
            target["masks"] = torch.tensor(mask_image, dtype=torch.uint8)

        return image, target
        
    def collate_fn(batch):
        return tuple(zip(*batch))

    def __len__(self):
        return len(self.unique_tree_ids)

In [8]:
# define transforms for the dataset
def get_transform(train):
    transforms = []
    if train:
        transforms.append(T.RandomZoomOut(fill = {tv_tensors.Image: (255, 20, 147), tv_tensors.Mask: (0,0,0)},
                            p = 0.5,
                            side_range = (1.0, 1.4)))
        transforms.append(T.Resize((448, 448), interpolation=T.InterpolationMode.NEAREST, antialias = True)) # no maintain aspect ratio
        transforms.append(T.RandomRotation(degrees = 45, expand = False))
        transforms.append(T.RandomHorizontalFlip(0.5))
        transforms.append(T.RandomVerticalFlip(0.5))
    else:
        transforms.append(T.Resize((448, 448), interpolation=T.InterpolationMode.NEAREST, antialias = True)) # no maintain aspect ratio
    transforms.append(T.ClampBoundingBoxes())
    transforms.append(T.SanitizeBoundingBoxes())
    return T.Compose(transforms)

In [9]:
sample_ds = ChestnutBurSegmentation(filtered_image_dir, df, get_transform(train = True))
sample_dl = DataLoader(sample_ds, batch_size = 24, shuffle = True, collate_fn = ChestnutBurSegmentation.collate_fn)

In [10]:
images, targets = next(iter(sample_dl))
images = [img for img in images]
targets = [{k: v for k, v in target.items()} for target in targets]

##### <center> Plot sample transformed images, targets, and masks </center>

In [None]:
%matplotlib inline
tree_ids = [target["image_id"] for target in targets]

image = images[tree_ids.index(11)].permute(1, 2, 0) if 11 in tree_ids else images[0].permute(1, 2, 0)
mask = targets[tree_ids.index(11)]["masks"].permute(1, 2, 0) if 11 in tree_ids else targets[0]["masks"].permute(1, 2, 0)

plt.imshow(image)
plt.title("transformed image")
plt.show()

plt.imshow(mask.sum(axis=2), cmap="gray")
plt.title("chestnut bur masks")
plt.show()

# plot mask on image
plt.imshow(image)
plt.imshow(mask.sum(axis=2), cmap='gray', alpha=0.7)
plt.title("training sample")
plt.show()

##### <center> Construct MaskRCNN Model </center>

In [12]:
def get_instance_segmentation_model(num_classes):
    # Load a Mask R-CNN instance segmentation model pre-trained on COCO
    model = torchvision.models.detection.maskrcnn_resnet50_fpn_v2(
        weights=torchvision.models.detection.MaskRCNN_ResNet50_FPN_V2_Weights.DEFAULT,
        weights_backbone=torchvision.models.ResNet50_Weights.DEFAULT
    )

    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features

    # Replace the pre-trained head with a new one to reflect the number of classes
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)

    # Get the number of input features for the mask classifier
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256

    # Replace the mask predictor with a new one
    model.roi_heads.mask_predictor = torchvision.models.detection.mask_rcnn.MaskRCNNPredictor(
        in_features_mask,
        hidden_layer,
        num_classes
    )

    return model

In [None]:
print(get_instance_segmentation_model(3))

##### <center> Tune model hyperparameters using bayesian optimization algo and hyperband scheduler </center>

##### adapted from: https://docs.ray.io/en/latest/tune/examples/bohb_example.html

In [1]:
# Bayesian Optimization HyperBand (BOHB) with HyperBand scheduler
# # https://proceedings.mlr.press/v80/falkner18a.html

import tempfile
import time
from pathlib import Path

import ray
from ray import train, tune
from ray.tune.schedulers.hb_bohb import HyperBandForBOHB
from ray.tune.search.bohb import TuneBOHB
import ConfigSpace as CS

In [2]:
def evaluate(step, width, height, activation):
    time.sleep(0.1)
    activation_boost = 10 if activation=="relu" else 1
    return (0.1 + width * step / 100) ** (-1) + height * 0.1 + activation_boost

def objective(config):
    start = 0
    if train.get_checkpoint():
        with train.get_checkpoint().as_directory() as checkpoint_dir:
            start = int((Path(checkpoint_dir) / "data.ckpt").read_text())

    for step in range(start, config["steps"]):
        score = evaluate(step, config["width"], config["height"], config["activation"])
        with tempfile.TemporaryDirectory() as checkpoint_dir:
            (Path(checkpoint_dir) / "data.ckpt").write_text(str(step))
            train.report(
                {"iterations": step, "mean_loss": score},
                checkpoint=train.Checkpoint.from_directory(checkpoint_dir)
            )

In [3]:
search_space = {
    "steps": 100,
    "width": tune.uniform(0, 20),
    "height": tune.uniform(-100, 100),
    "activation": tune.choice(["relu", "tanh"]),
}

In [4]:
algo = TuneBOHB()
algo = tune.search.ConcurrencyLimiter(algo, max_concurrent=4)
scheduler = HyperBandForBOHB(
    time_attr="training_iteration",
    max_t=100,
    reduction_factor=4,
    stop_last_trials=False,
)

num_samples = 100

In [None]:
tuner = tune.Tuner(
    objective,
    tune_config=tune.TuneConfig(
        metric="mean_loss",
        mode="min",
        search_alg=algo,
        scheduler=scheduler,
        num_samples=num_samples,
    ),
    run_config=train.RunConfig(
        name="bohb_exp",
        stop={"training_iteration": 100},
    ),
    param_space=search_space,
)
results = tuner.fit()

##### <center> Train the model using tuned hyperparameters </center>

In [None]:
from detection_pytorch import engine

# train on the GPU or on the CPU, if a GPU is not available
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')

num_classes = 3 # (background = 0, canopy = 1, chestnut bur = 2)

# use our dataset and defined transformations
train_ds = ChestnutBurSegmentation(filtered_image_dir, df, get_transform(train = True))
valid_ds = ChestnutBurSegmentation(filtered_image_dir, df, get_transform(train = False))

# store training indices in random order list
indices = torch.randperm(len(train_ds)).tolist()
splits = [int(len(indices) * 0.8), len(indices) - int(len(indices) * 0.8)]

train_ds = Subset(train_ds, splits[0])
valid_ds = Subset(valid_ds, splits[1])

train_dl = DataLoader(train_ds, batch_size = 8, shuffle = True, collate_fn = ChestnutBurSegmentation.collate_fn)
valid_dl = DataLoader(valid_ds, batch_size = 1, shuffle = False, collate_fn = ChestnutBurSegmentation.collate_fn)

# get the model using our helper function
model = get_instance_segmentation_model(num_classes)

# move model to the right device
model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params,
    lr=0.005,
    momentum=0.9,
    weight_decay=0.0005
)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=3,
    gamma=0.1
)

num_epochs = 2

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    engine.train_one_epoch(model, optimizer, train_dl, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
    engine.evaluate(model, valid_dl, device=device)

print("That's it!")