# Imports

In [None]:
# Move to the root
import os
cwd = os.getcwd()
if os.path.basename(cwd) != "cv-in-farming":
    os.chdir("../")
print(os.getcwd())

## Standard Libraries

In [None]:
import random
import os
from time import time

import cv2
from matplotlib.animation import FFMpegWriter
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import torch
from torch.utils.data import DataLoader
import torchvision.transforms.functional as F

%matplotlib inline
%load_ext autoreload
%autoreload 2

## Custom Modules

In [None]:
from src.dataloader import FurrowDataset
from src.image_processing import convert_grayscale, apply_gaussian_blur, apply_otsu_threshold, apply_canny, apply_template_matching
from src.model import RidgeDetector
from src.solver import load_checkpoint, revert_input_transforms
from utils.helpers import create_template, coord_to_mask, overlay_coord, overlay_mask, show_image

# Load Frames

In [None]:
normalize = True
input_format = 'darr'

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"[Info]: HED will run on: {device}\n")

# Input: Folder to load for benchmarking different methods
folder = "dataset/val/20210309_124809" # Or, "dataset/val/20210309_140259"

dataset_args = {
    "data_path": folder,
    "crop_down": False,
    "normalize": normalize,
    "input_format": input_format,
    "load_edge": False,
    "edge_width": 3,
    "load_time": False,
    "max_frames": 5 #1000
}

dataset = FurrowDataset(dataset_args)
print(dataset)

In [None]:
# Create folder structure

try: os.mkdir("benchmark")
except FileExistsError: pass

try: os.mkdir("benchmark/canny")
except FileExistsError: pass

try: os.mkdir("benchmark/hed")
except FileExistsError: pass

try: os.mkdir("benchmark/hed_original")
except FileExistsError: pass

try: os.mkdir("benchmark/otsu")
except FileExistsError: pass

try: os.mkdir("benchmark/otsu_canny")
except FileExistsError: pass

try: os.mkdir("benchmark/template_matching")
except FileExistsError: pass

# Configure & Run

In [None]:
# Input: Pick a frame
frame_idx = 2 # 500

## Canny Edge Detector

* Input: RGB
* Grayscale + Gaussian Blur + Canny Edge Detector
* Output: 1-Channel Binary Mask

In [None]:
canny_config = {
    "visualize": True,
    "grayscale": False,
    "ksize": (15,)*2, 
    "sigmaX": 30,
    "dynamic_thresh": False,
    "threshold1": 5000, 
    "threshold2": 11000, 
    "apertureSize": 7,
}

def canny_edge_detector(image, config):
    if config["grayscale"]:
        image = convert_grayscale(image, config["visualize"])
    
    image = apply_gaussian_blur(image, config["visualize"], 
                                ksize=config["ksize"], 
                                sigmaX=config["sigmaX"])
    
    if config["dynamic_thresh"]:
        _, threshold2 = apply_otsu_threshold(image, visualize=False)
        config["threshold2"] = threshold2
        config["threshold1"] = threshold2 // 2
    
    image = apply_canny(image, config["visualize"], 
                        threshold1=config["threshold1"],
                        threshold2=config["threshold2"],
                        apertureSize=config["apertureSize"])
    return image

### Single Frame

In [None]:
path = "benchmark/canny/single_frame"

try: os.mkdir(path)
except FileExistsError: pass

item = dataset.get_frame_files(frame_idx, 
                               load_darr=True,
                               load_rgb=True,
                               load_drgb=False,
                               load_edge=True,
                               load_time=False)

frame_id = item['frame_id']
print(f"Fetching frame: {frame_idx} <-> {frame_id}")
rgb_img = np.array(item['rgb_img'])

result = canny_edge_detector(rgb_img, canny_config) / 255

overlaid = overlay_mask(rgb_img, result)
show_image(overlaid, cmap='gray', ticks=False)

cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid*255)
np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), result)

### All Frames

In [None]:
path = f"benchmark/canny/{dataset.folder_id}"

try: os.mkdir(path)
except FileExistsError: pass

for i in range(len(dataset)):
    item = dataset.get_frame_files(i,
                                  load_darr=False,
                                  load_rgb=True,
                                  load_drgb=False,
                                  load_edge=True,
                                  load_time=False)

    rgb_img = np.array(item['rgb_img'])
    frame_id = item['frame_id']

    result = canny_edge_detector(rgb_img, canny_config) / 255

    overlaid = overlay_mask(rgb_img, result)

    cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid*255)
    np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), result)

## Otsu Thresholding

* Input: RGB
* Apply Grayscale + Gaussian Blur + Otsu Thresholding
* Output: 1-Channel Binary Mask

In [None]:
otsu_config = {
    "visualize": True,
    "ksize": (3,3),
    "sigmaX": 10,
}
def otsu_thresholding(image, config):
    image = convert_grayscale(image, config["visualize"])
    image = apply_gaussian_blur(image, config["visualize"], 
                                ksize=config["ksize"], 
                                sigmaX=config["sigmaX"])
    image, _ = apply_otsu_threshold(image, config["visualize"])
    return image

### Single Frame

In [None]:
path = "benchmark/otsu/single_frame"

try: os.mkdir(path)
except FileExistsError: pass

item = dataset.get_frame_files(frame_idx, 
                               load_darr=True,
                               load_rgb=True,
                               load_drgb=False,
                               load_edge=True,
                               load_time=False)

frame_id = item['frame_id']
print(f"Fetching frame: {frame_idx} <-> {frame_id}")
rgb_img = np.array(item['rgb_img'])

result = otsu_thresholding(rgb_img, otsu_config) / 255

overlaid = overlay_mask(rgb_img, result)
show_image(overlaid, cmap='gray', ticks=False)

cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid*255)
np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), result)

### All Frames

In [None]:
path = f"benchmark/otsu/{dataset.folder_id}"

try: os.mkdir(path)
except FileExistsError: pass

for i in range(len(dataset)):
    item = dataset.get_frame_files(i,
                                  load_darr=False,
                                  load_rgb=True,
                                  load_drgb=False,
                                  load_edge=True,
                                  load_time=False)

    rgb_img = np.array(item['rgb_img'])
    frame_id = item['frame_id']

    result = otsu_thresholding(rgb_img, otsu_config) / 255

    overlaid = overlay_mask(rgb_img, result)

    cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid*255)
    np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), result)

## Otsu + Canny

* Input: RGB
* Apply Grayscale + Gaussian Blur + Otsu Thresholding + Canny Edge Detector
* Output: 1-Channel Binary Mask

In [None]:
otsu_canny_config = {
    "visualize": True,
    "ksize": (11,)*2,
    "sigmaX": 22,
    "apertureSize": 5,
}
def otsu_canny_edge_detector(image, config):
    image = convert_grayscale(image, config["visualize"])
    image = apply_gaussian_blur(image, config["visualize"], 
                                ksize=config["ksize"], 
                                sigmaX=config["sigmaX"])
    image, threshold2 = apply_otsu_threshold(image, config["visualize"])
    image = apply_canny(image, config["visualize"], 
                    threshold1=threshold2/2,
                    threshold2=threshold2,
                    apertureSize=config["apertureSize"])
    return image

### Single Frame

In [None]:
path = "benchmark/otsu_canny/single_frame"

try: os.mkdir(path)
except FileExistsError: pass

item = dataset.get_frame_files(frame_idx, 
                              load_darr=True,
                              load_rgb=True,
                              load_drgb=False,
                              load_edge=True,
                              load_time=False)

frame_id = item['frame_id']
print(f"Fetching frame: {frame_idx} <-> {frame_id}")
rgb_img = np.array(item['rgb_img'])

result = otsu_canny_edge_detector(rgb_img, otsu_canny_config) / 255

overlaid = overlay_mask(rgb_img, result)
show_image(overlaid, cmap='gray', ticks=False)

cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid*255)
np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), result)

### All Frames

In [None]:
path = f"benchmark/otsu_canny/{dataset.folder_id}"

try: os.mkdir(path)
except FileExistsError: pass

for i in range(len(dataset)):
    item = dataset.get_frame_files(i,
                                  load_darr=False,
                                  load_rgb=True,
                                  load_drgb=False,
                                  load_edge=True,
                                  load_time=False)

    rgb_img = np.array(item['rgb_img'])
    frame_id = item['frame_id']

    result = otsu_canny_edge_detector(rgb_img, otsu_canny_config) / 255

    overlaid = overlay_mask(rgb_img, result)

    cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid*255)
    np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), result)

## Template Matching

* Input: Depth Array
* Apply Template Matching + RANSAC + Curve Fitting
* Output: Coordinates for Edge in the Mask

In [None]:
parameters = {
    "template": {
        "size": 30,
        "position": 1,
    },
    "matching": {
        "start_depth": 0.92,  # Given in depth-scale
        "contour_width": 25, # Given in y-scale
        "y_step": 5,         # Given in y-scale
        "n_contours": 1000,
        "ransac_thresh": 30, #15
        "score_thresh": None,
        "roi": [None,None,250,None], # min_y:max_y, min_x:max_x
        "fit_type": "curve",
        "verbose": 0
    }
}

### Single Frame

In [None]:
path = "benchmark/template_matching/single_frame"

try: os.mkdir(path)
except FileExistsError: pass

item = dataset.get_frame_files(frame_idx, 
                              load_darr=True,
                              load_rgb=True,
                              load_drgb=False,
                              load_edge=True,
                              load_time=False)

frame_id = item['frame_id']
print(f"Fetching frame: {frame_idx} <-> {frame_id}")
rgb_img = np.array(item['rgb_img'])
depth_arr = np.array(item['depth_arr'])

# Create a template to find corners
template = create_template(**parameters["template"])

# Fit a curve (2nd degree polynomial) to inlier detections
edge_pixels, inliers, outliers = apply_template_matching(depth_arr, template, **parameters["matching"])

overlaid = overlay_coord(rgb_img, edge_pixels, thickness=2)
show_image(overlaid, cmap='gray', ticks=False)

cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid)
np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), edge_pixels)

### All Frames

In [None]:
path = f"benchmark/template_matching/{dataset.folder_id}"

try: os.mkdir(path)
except FileExistsError: pass

for i in range(len(dataset)):
    item = dataset.get_frame_files(i, 
                                   load_darr=True,
                                   load_rgb=True,
                                   load_drgb=False,
                                   load_edge=False,
                                   load_time=False)

    frame_id = item['frame_id']
    rgb_img = np.array(item['rgb_img'])
    depth_arr = item['depth_arr']

    # Create a template to find corners
    template = create_template(**parameters["template"])

    # Fit a curve (2nd degree polynomial) to inlier detections
    edge_pixels, inliers, outliers = apply_template_matching(depth_arr, template, **parameters["matching"])

    overlaid = overlay_coord(rgb_img, edge_pixels, thickness=2)
    cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid)
    np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), edge_pixels)

## HED (Original)

* Input: Depth Array or RGB
* Forward pass through network
* Output: 6-Channel Edge Score Mask

In [None]:
# Dataset has to be re-initialized because original HED takes RGB input.
_dataset_args = {**dataset_args}
_dataset_args['input_format'] = 'rgb'

_dataset = FurrowDataset(_dataset_args)

In [None]:
model_args = {
    "pretrained": False,
    "freeze": False,
    "input_format": input_format
}
weight_map = {
 'stage1.0.weight':'moduleVggOne.0.weight',
 'stage1.0.bias': 'moduleVggOne.0.bias',
 'stage1.2.weight': 'moduleVggOne.2.weight',
 'stage1.2.bias': 'moduleVggOne.2.bias',
 'sideout1.0.weight': 'moduleScoreOne.weight',
 'sideout1.0.bias': 'moduleScoreOne.bias',
 'stage2.5.weight':  'moduleVggTwo.1.weight',
 'stage2.5.bias':  'moduleVggTwo.1.bias',
 'stage2.7.weight':  'moduleVggTwo.3.weight',
 'stage2.7.bias':  'moduleVggTwo.3.bias',
 'sideout2.0.weight':  'moduleScoreTwo.weight',
 'sideout2.0.bias':  'moduleScoreTwo.bias',
 'stage3.10.weight':  'moduleVggThr.1.weight',
 'stage3.10.bias':  'moduleVggThr.1.bias',
 'stage3.12.weight':  'moduleVggThr.3.weight',
 'stage3.12.bias':  'moduleVggThr.3.bias',
 'stage3.14.weight':  'moduleVggThr.5.weight',
 'stage3.14.bias':  'moduleVggThr.5.bias',
 'sideout3.0.weight':  'moduleScoreThr.weight',
 'sideout3.0.bias':  'moduleScoreThr.bias',
 'stage4.17.weight':  'moduleVggFou.1.weight',
 'stage4.17.bias':  'moduleVggFou.1.bias',
 'stage4.19.weight':  'moduleVggFou.3.weight',
 'stage4.19.bias':  'moduleVggFou.3.bias',
 'stage4.21.weight':  'moduleVggFou.5.weight',
 'stage4.21.bias':  'moduleVggFou.5.bias',
 'sideout4.0.weight':  'moduleScoreFou.weight',
 'sideout4.0.bias':  'moduleScoreFou.bias',
 'stage5.24.weight':  'moduleVggFiv.1.weight',
 'stage5.24.bias':  'moduleVggFiv.1.bias',
 'stage5.26.weight':  'moduleVggFiv.3.weight',
 'stage5.26.bias':  'moduleVggFiv.3.bias',
 'stage5.28.weight':  'moduleVggFiv.5.weight',
 'stage5.28.bias':  'moduleVggFiv.5.bias',
 'sideout5.0.weight':  'moduleScoreFiv.weight',
 'sideout5.0.bias':  'moduleScoreFiv.bias',
 'fuse.weight': 'moduleCombine.0.weight',
 'fuse.bias': 'moduleCombine.0.bias',
}

ckpt_path = "checkpoint/network-bsds500.pytorch" # Downloadable from: http://content.sniklaus.com/github/pytorch-hed/network-bsds500.pytorch
checkpoint = torch.load(ckpt_path)

model = RidgeDetector(model_args)
model.to(device)
optim_choice = None
optim_args = {}

state = {}
for k1 in model.state_dict().keys():
    k2 = weight_map[k1]
    state[k1] = checkpoint[k2]
    
model.load_state_dict(state)

### Single Frame

In [None]:
path = "benchmark/hed_original/single_frame"

try: os.mkdir(path)
except FileExistsError: pass

item = _dataset.get_frame_files(frame_idx, 
                               load_darr=False,
                               load_rgb=True,
                               load_drgb=False,
                               load_edge=False,
                               load_time=False)
rgb_img = np.array(item['rgb_img'])

item = _dataset.__getitem__(frame_idx)
norm_img = item['input'].unsqueeze(0).to(device)

model.eval()
results = None
with torch.no_grad():
    results = model(norm_img)
    results = torch.sigmoid(results)

for result in results[0]:
    mask = result.cpu().numpy()
    overlaid = overlay_mask(rgb_img, mask)
    show_image(overlaid, ticks=False)

mask = results[0,4].cpu().numpy()
overlaid = overlay_mask(rgb_img, mask)
cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid*255)
np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), mask)

### All Frames

In [None]:
path = f"benchmark/hed_original/{_dataset.folder_id}"

try: os.mkdir(path)
except FileExistsError: pass

def detect_per_sample(model, sample, device):
    model.eval()

    with torch.no_grad():
        X = sample['input'].to(device)
        logits = model(X)
        preds = torch.sigmoid(logits)
        mask = preds[:,4:5,:,:][0,0]
        return mask.cpu().numpy()

loader = DataLoader(_dataset, batch_size=1, shuffle=False, num_workers=0)
print(f"Using device: {device}")

t1 = time()

model.to(device)
td = 0
for i, sample in enumerate(loader):
    item = _dataset.get_frame_files(i,
                                   load_darr=False,
                                   load_rgb=True,
                                   load_drgb=False,
                                   load_edge=False,
                                   load_time=False)
    rgb_img = np.array(item['rgb_img'])
    frame_id = item['frame_id']
    td1 = time()
    mask = detect_per_sample(model, sample, device)
    td2 = time()
    td += (td2 - td1)
    
    # Store result
    overlaid = overlay_mask(rgb_img, mask)
    cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid*255)
    np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), mask)

t2 = time()
print("Total duration:", t2-t1)
print("Avg duration:", (t2-t1)/len(_dataset))
print("Total detection duration:", td)
print("Avg detection duration:", td/len(_dataset))

## HED (Fine-tuned)

* Input: Depth Array or RGB or RGB + Depth Array
* Forward pass through network
* Output: 6-Channel Edge Score Mask

In [None]:
ckpt_path = "checkpoint/darr/18_ckpt.pth"
# ckpt_path = "checkpoint/rgb/18_ckpt.pth"
# ckpt_path = "checkpoint/rgb-darr/18_ckpt.pth"

last_epoch, _, _, model, _, _, _, _ = load_checkpoint(ckpt_path)

print(f"Model from epoch-{last_epoch} is loaded")

### Single Frame

In [None]:
path = "benchmark/hed/single_frame"

try: os.mkdir(path)
except FileExistsError: pass

item = dataset.get_frame_files(frame_idx, 
                              load_darr=False,
                              load_rgb=True,
                              load_drgb=False,
                              load_edge=False,
                              load_time=False)
rgb_img = np.array(item['rgb_img'])

item = dataset.__getitem__(frame_idx)
norm_img = item['input'].unsqueeze(0).to(device)

model.eval()
results = None
with torch.no_grad():
    results = model(norm_img)
    results = torch.sigmoid(results)

for result in results[0]:
    mask = result
    overlaid = overlay_mask(rgb_img, mask.cpu())
    show_image(overlaid, ticks=False)
    
mask = results[0,-1].cpu().numpy()
overlaid = overlay_mask(rgb_img, mask)
cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid*255)
np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), mask)

### All Frames

In [None]:
path = f"benchmark/hed/{dataset.folder_id}"

try: os.mkdir(path)
except FileExistsError: pass

def detect_per_sample(model, sample, device):
    model.eval()

    with torch.no_grad():
        X = sample['input'].to(device)
        logits = model(X)
        preds = torch.sigmoid(logits)
        mask = preds[:,5:6,:,:][0,0]
        return mask.cpu().numpy()

loader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=0)
print(f"Using device: {device}")

t1 = time()

model.to(device)
td = 0
for i, sample in enumerate(loader):
    item = dataset.get_frame_files(i,
                                   load_darr=False,
                                   load_rgb=True,
                                   load_drgb=False,
                                   load_edge=False,
                                   load_time=False)
    rgb_img = np.array(item['rgb_img'])
    frame_id = item['frame_id']
    td1 = time()
    mask = detect_per_sample(model, sample, device)
    td2 = time()
    td += (td2 - td1)
    
    # Store result
    overlaid = overlay_mask(rgb_img, mask)
    cv2.imwrite(os.path.join(path, f"{frame_id}_overlay.png"), overlaid*255)
    np.save(os.path.join(path, f"{frame_id}_edge_pts.npy"), mask)

t2 = time()
print("Total duration:", t2-t1)
print("Avg duration:", (t2-t1)/len(dataset))
print("Total detection duration:", td)
print("Avg detection duration:", td/len(dataset))

# Samples from November and March Captures

In [None]:
def read_depth(path):
    depth_arr = np.load(path)
    depth_arr = np.rint(255 * (depth_arr / depth_arr.max()))
    depth_arr = np.clip(depth_arr * 7, a_min=0, a_max=255).astype(np.uint8)
    return depth_arr

fig = plt.figure(figsize=[15,15])
gs = gridspec.GridSpec(1, 4, wspace=0, hspace=0)

path = "notebooks/Benchmark Content/Samples from November and March"

grid1 = np.array([
    [np.array(Image.open(f"{path}/November/3900_rgb.png")), np.array(Image.open(f"{path}/November/26600_rgb.png"))],
    [np.stack([read_depth(f"{path}/November/3900_depth.npy")]*3, axis=-1), np.stack([read_depth(f"{path}/November/26600_depth.npy")]*3, axis=-1)]
])

grid2 = np.array([
    [np.array(Image.open(f"{path}/March/3000_rgb.png")), np.array(Image.open(f"{path}/March/5000_rgb.png"))],
    [np.stack([read_depth(f"{path}/March/3000_depth.npy")]*3, axis=-1), np.stack([read_depth(f"{path}/March/5000_depth.npy")]*3, axis=-1)]
])

gs1 = gridspec.GridSpec(2, 2)
gs1.update(left=0, right=1, hspace=0, wspace=0)
for i in range(2):
    for j in range(2):
        ax = fig.add_subplot(gs1[i, j])
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_frame_on(False)
        ax.imshow(grid1[i, j])
        ax.set_aspect("auto")
        if i == 0 and j == 0:
            ax.text(425, -25, 'Samples from November', size=33)
        
gs2 = gridspec.GridSpec(2, 2)
gs2.update(left=1.02, right=2, hspace=0, wspace=0)
for i in range(2):
    for j in range(2):
        ax = fig.add_subplot(gs2[i, j])
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_frame_on(False)
        ax.imshow(grid2[i, j])
        ax.set_aspect("auto")
        if i == 0 and j == 0:
            ax.text(425, -25, 'Samples from March', size=33)

# Training Sample for HED

In [None]:
normalize = True
input_format = 'rgb'

dataset_args = {
    "data_path": "dataset/train/20201112_125754", # New capture
    "crop_down": False,
    "normalize": normalize,
    "input_format": input_format,
    "load_edge": False,
    "edge_width": 3,
    "load_time": False,
}
dataset = FurrowDataset(dataset_args)

frame_idx = 0
item = dataset.get_frame_files(frame_idx, 
                               load_darr=True,
                               load_rgb=True,
                               load_drgb=False,
                               load_edge=True,
                               load_time=False)
rgb_img = np.array(item['rgb_img'])
depth_arr = item['depth_arr']
depth_arr = np.rint(255 * (depth_arr / depth_arr.max()))
depth_arr = np.clip(depth_arr * 7, a_min=0, a_max=255).astype(np.uint8)
edge_pixels = item['edge_pixels']

crop = cv2.rectangle(rgb_img.copy(), (120, 80), (520, 480), [255, 0, 0], thickness=2)
rgb_input = apply_gaussian_blur(rgb_img, False, ksize=(7,7), sigmaX=20)
rgb_input[80:480,120:520] = crop[80:480,120:520]
# show_image(rgb_input, ticks=False)

crop = cv2.rectangle(depth_arr.copy(), (120, 80), (520, 480), [255, 0, 0], thickness=2)
depth_input = apply_gaussian_blur(depth_arr, False, ksize=(7,7), sigmaX=20)
depth_input[80:480,120:520] = crop[80:480,120:520]
# show_image(depth_input, ticks=False, cmap="gray")

edge_mask = coord_to_mask((480,640), edge_pixels, thickness=2)
crop = cv2.rectangle(edge_mask.copy(), (120, 80), (520, 480), 255, thickness=2)
edge_img = apply_gaussian_blur(edge_mask, False, ksize=(7,7), sigmaX=20)
edge_img[80:480,120:520] = crop[80:480,120:520]
# show_image(edge_img, ticks=False, cmap="gray")

overlaid = overlay_coord(rgb_img, edge_pixels, thickness=2)
crop = cv2.rectangle(overlaid.copy(), (120, 80), (520, 480), [255, 0, 0], thickness=2)
overlay_rgb = apply_gaussian_blur(overlaid, False, ksize=(7,7), sigmaX=20)
overlay_rgb[80:480,120:520] = crop[80:480,120:520]
# show_image(overlay_rgb, ticks=False)

overlaid = overlay_coord(depth_arr, edge_pixels, thickness=2)
crop = cv2.rectangle(overlaid.copy(), (120, 80), (520, 480), 255, thickness=2)
overlay_darr = apply_gaussian_blur(overlaid, False, ksize=(7,7), sigmaX=20)
overlay_darr[80:480,120:520] = crop[80:480,120:520]
# show_image(overlay_darr, ticks=False, cmap="gray")

fig = plt.figure(figsize=[40,8])
gs = gridspec.GridSpec(1, 4, wspace=0, hspace=0)
imgs = np.array([
    rgb_input, np.stack([depth_input]*3, axis=-1), np.stack([edge_img]*3, axis=-1), overlay_rgb
])

labels = ["Cropped RGB Input", "Cropped Depth Input", "Cropped Edge Mask Target", "Cropped RGB with Mask Overlaid"]

for i in range(4):
    ax = fig.add_subplot(gs[0, i])
    ax.set_xticks([])
    ax.set_yticks([])
    ax.set_frame_on(False)
    ax.imshow(imgs[i])
    ax.set_aspect("auto")
    ax.xaxis.set_label_position('top') 
    ax.set_xlabel(labels[i], fontsize=33)

# Qualitative Results

## Template Matching vs HED on Extreme Cases

In [None]:
tm = "notebooks/Benchmark Content/Compare HED and TM/Template Matching"
hed = "notebooks/Benchmark Content/Compare HED and TM/HED (Depth-Only)"

frame_ids = [5906, 8971, 9404]

grid = [
    [],
    [],
    []
]

for i, frame_id in enumerate(frame_ids):
    grid[i].append(np.array(Image.open(f"{hed}/{frame_id}_depth.png")))
    grid[i].append(np.array(Image.open(f"{tm}/{frame_id}_overlay.png")))
    grid[i].append(np.array(Image.open(f"{hed}/{frame_id}_overlay.png")))

grid = np.array(grid)
nrows, ncols = grid.shape[:2]

fig = plt.figure(figsize=[8,8])

gs = gridspec.GridSpec(nrows, ncols)
gs.update(left=0, right=1, hspace=0, wspace=0)
size = 19
for i in range(nrows):
    for j in range(ncols):
        ax = fig.add_subplot(gs[i, j])
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(grid[i,j])
        ax.set_aspect("auto")
        if i == 0 and j == 0:
            ax.xaxis.set_label_position('top') 
            ax.set_xlabel('Input', fontsize=size)
        elif i == 0 and j == 1:
            ax.xaxis.set_label_position('top') 
            ax.set_xlabel('Template Matching', fontsize=size)
        elif i == 0 and j == 2:
            ax.xaxis.set_label_position('top') 
            ax.set_xlabel('HED', fontsize=size)

plt.show()

## Compare Best of Each Method on a Frame

In [None]:
path = "notebooks/Benchmark Content/Compare Methods"
fig = plt.figure(figsize=[10,10])

labels = [["Otsu+Canny", "Template Matching"], ["HED Original", "HED Finetuned (Ours)"]]

gs = gridspec.GridSpec(2, 2, wspace=0, hspace=0.1)
imgs = np.array([
    [np.array(Image.open(f"{path}/otsu+canny1.png")), np.array(Image.open(f"{path}/template_matching1.png"))],
    [np.array(Image.open(f"{path}/hed_original1.png")), np.array(Image.open(f"{path}/hed-darr1.png"))],
])

for i in range(2):
    for j in range(2):
        ax = fig.add_subplot(gs[i, j])
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_frame_on(False)
        ax.imshow(imgs[i, j])
        ax.set_aspect("auto")
        ax.xaxis.set_label_position('top') 
        ax.set_xlabel(labels[i][j], fontsize=22)

plt.show()

## HED Side Outputs

In [None]:
darr = "notebooks/Benchmark Content/HED-related/Side Outputs/Depth-Only"
rgb = "notebooks/Benchmark Content/HED-related/Side Outputs/RGB-Only"
rgb_darr = "notebooks/Benchmark Content/HED-related/Side Outputs/RGB+Depth"

grid = [[], [], []]
labels = ["Side Output (x1)", "Side Output (x2)", "Side Output (x4)", "Side Output (x8)", "Side Output (x16)", "Side Output Fusion"]

for i in range(1,7):
    grid[0].append(np.array(Image.open(f"{darr}/scale{i}.png")))
#     grid[1].append(np.array(Image.open(f"{rgb}/scale{i}.png")))
#     grid[2].append(np.array(Image.open(f"{rgb_darr}/scale{i}.png")))

fig = plt.figure(figsize=[24, 3], frameon=False)
# fig.suptitle("HED Side Outputs with Different Inputs", fontsize=30, y=0.94)
gs = gridspec.GridSpec(1, 6, wspace=0, hspace=0.05)

for i in range(1):
    for j in range(6):
        ax = fig.add_subplot(gs[i, j])
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_frame_on(False)
        ax.imshow(grid[i][j])
        ax.set_aspect("auto")
        ax.xaxis.set_label_position('top')
        ax.set_xlabel(labels[j], fontsize=22)
plt.show()

## Effect of Data Augmentation on HED

In [None]:
path1 = "notebooks/Benchmark Content/HED-related/Mirror Augmentation/With Mirror/"
path2 = "notebooks/Benchmark Content/HED-related/Mirror Augmentation/Without Mirror/"
fig = plt.figure(figsize=[15,12], frameon=False)

labels = ["with mirror augmentation", "without mirror augmentation"]

gs = gridspec.GridSpec(2, 2, wspace=0, hspace=0)
imgs = np.array([
    [np.array(Image.open(f"{path1}/front-1.png")), np.array(Image.open(f"{path2}/front-1.png"))],
    [np.array(Image.open(f"{path1}/back-2.png")), np.array(Image.open(f"{path2}/back-2.png"))]
])


for i in range(2):
    for j in range(2):
        ax = fig.add_subplot(gs[i, j])
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_frame_on(False)
        ax.imshow(imgs[i, j])
        ax.set_aspect("auto")
        if i == 0:
            ax.xaxis.set_label_position('top')
            ax.set_xlabel(labels[j], fontsize=16)

plt.show()

## Generate Video

### Template Matching

In [None]:
plt.rcParams['animation.ffmpeg_path'] = '/usr/bin/ffmpeg'

# Input: Path contains RGB frames and detections
frame_path = "demo/template_matching"
# Input: Output video name
video_name = "demo/Template Matching"

files = os.listdir(frame_path)

rgb_im_files = []
edge_px_files = []

# Filter files wrt their extension
for file in files:
    if file.endswith("rgb.png"):
        rgb_im_files.append(file)
    if file.endswith("edge_pts.npy"):
        edge_px_files.append(file)

rgb_im_files = sorted(rgb_im_files, key=lambda f: int(f.split("_")[0]))
edge_px_files = sorted(edge_px_files, key=lambda f: int(f.split("_")[0]))

fig = plt.figure()
cut = range(1164, 2965)

metadata = dict(title='Template Matching Detector')
writer = FFMpegWriter(fps=30, metadata=metadata)

fig = plt.figure()
plt.xticks([])
plt.yticks([])
imgh = plt.imshow(np.zeros((480, 640), dtype=np.uint8))

with writer.saving(fig, f"{video_name}.mp4", 100):
    for i in cut:
        rgb_im_file = rgb_im_files[i]
        edge_px_file = edge_px_files[i]
        
        frame_idx = rgb_im_file.split("_")[0]

        rgb_im_path = os.path.join(frame_path, rgb_im_file)
        rgb_img = cv2.imread(rgb_im_path, cv2.IMREAD_COLOR)
        
        edge_px_path = os.path.join(frame_path, f"{frame_idx}_edge_pts.npy")
        edge_pixels = np.load(edge_px_path)
        overlaid = overlay_coord(rgb_img, edge_pixels, thickness=2)
        
        imgh.set_data(overlaid)
        
        writer.grab_frame()

### HED

In [None]:
plt.rcParams['animation.ffmpeg_path'] = '/usr/bin/ffmpeg'

# Input: Edge overlaid RGB frame path
frame_path = "demo/hed"
# Input: Output video name
video_name = "demo/HED"

files = os.listdir(frame_path)

rgb_im_files = []

# Filter files wrt their extension
for file in files:
    if file.endswith(".png"):
        rgb_im_files.append(file)

rgb_im_files = sorted(rgb_im_files, key=lambda f: int(f.split("_")[0]))

fig = plt.figure()
cut = rgb_im_files

metadata = dict(title='HED Detector')
writer = FFMpegWriter(fps=30, metadata=metadata)

fig = plt.figure()
plt.xticks([])
plt.yticks([])
imgh = plt.imshow(np.zeros((480, 640), dtype=np.uint8))

with writer.saving(fig, f"{video_name}.mp4", 100):
    for rgb_im_file in cut:
        frame_idx = rgb_im_file.split("_")[0]

        rgb_im_path = os.path.join(frame_path, rgb_im_file)
        rgb_img = cv2.imread(rgb_im_path, cv2.IMREAD_COLOR)
        
        imgh.set_data(rgb_img)
        
        writer.grab_frame()