In [2]:
import glob
import json
from pathlib import Path
from PIL import Image
import re

import torch
from tqdm import tqdm
from torchmetrics.detection.mean_ap import MeanAveragePrecision
import networkx as nx
from sahi import AutoDetectionModel
from ultralytics.utils import metrics

from utils import detection_metrics, sliced_inference, plot_bboxes

In [3]:
def get_device():
    """
    Get device
    """
    return 'cuda:0' if torch.cuda.is_available() else 'cpu'

In [4]:
def convert_yolo_labels(labels):
    """
    Convert text labels to integer labels for mAP calculation
    """
    
    with open('relationformer_classes.json', 'r') as f:
        json_mapping = json.load(f)

        # 0-indexed in model, 1-indexed in annotations
        labels = [label + 1 for label in labels]

        return [json_mapping[str(label)]['general_class_id'] for label in labels]

In [5]:
def convert_relationformer_labels(labels):
    """
    Convert text labels to integer labels for mAP calculation
    """

    conversion = {
        "general": 1,
        "tank": 2,
        "valve": 3,
        "instrumentation": 4,
        "pump": 5,
        "inlet/outlet": 6,
        "arrow": 7
    }

    return [conversion[label] for label in labels]

In [6]:
def extract_node_labels(graph, x1=0, y1=0, generic_object_detector=False):
    """
    Extract coordinates for nodes from GT graph
    """
    keys = ['xmin', 'ymin', 'xmax', 'ymax']

    # Regex pattern for excluding connection and backgrounds from nodes
    pattern = r"^(crossing|background|connector)"

    node_labels_raw = [
        node[1]['label'] 
        for node in graph.nodes(data=True) if not re.match(pattern, node[0])
    ]

    node_xyxy = torch.tensor([
            [int(node[1][key]) for key in keys]
            for node in graph.nodes(data=True) if not re.match(pattern, node[0])
    ])

    if not generic_object_detector:
        node_labels = torch.tensor(convert_relationformer_labels(node_labels_raw))
    else:
        node_labels = torch.tensor([0 for i in range(len(node_labels_raw))])

    if x1 !=0 or y1 !=0:
        node_xyxy[:, 0::2] -= x1
        node_xyxy[:, 1::2] -= y1
    
    return node_labels, node_xyxy

In [7]:
def crop_imgs(imgs, x1, y1, x2, y2):
    """
    Crop title block from PIDs
    """
    for img in imgs:
        im = Image.open(img)
        im = im.crop((x1, y1, x2, y2))
        
        # Insert '_cropped' before the extension
        dot_index = img.rfind('.')
        new_file_path = img[:dot_index] + "_cropped" + img[dot_index:]
        im.save(new_file_path)

In [8]:
def extract_number(filename):
    match = re.search(r'\d+', filename)
    return int(match.group()) if match else float('inf')  # Use inf to push non-numbered files to the end

In [9]:
def get_origin_offset(dataset):
    """
    Return tuple with X1_crop, Y1_crop 
    """
    match dataset:
            
        case 'Dataset PID':
            return (400, 300)
            
        case _:
            return (0, 0)
        

In [10]:
def process_detections(results, generic_object_detector=False):
    
    pred_bboxes = torch.tensor([x.bbox.to_xyxy() for x in results.object_prediction_list])  
    scores = torch.tensor([x.score.value for x in results.object_prediction_list])
    
    predicted_class_raw = [x.category.id for x in results.object_prediction_list]

    if not generic_object_detector:
        predicted_class = torch.tensor(convert_yolo_labels(predicted_class_raw))
    else:
        predicted_class = torch.tensor(predicted_class_raw)

    return pred_bboxes, scores, predicted_class

### Crop Full Sized P&IDs to remove title block

In [None]:
X1_CROP = 400
Y1_CROP = 300
X2_CROP = 5668 
Y2_CROP = 4311

In [None]:
imgs = root / 'Data' / 'PID2Graph' / 'Complete' / 'Dataset PID' / '*.png'
imgs = glob.glob(str(imgs))

crop_imgs(imgs, X1_CROP, Y1_CROP, X2_CROP, Y2_CROP)

### Load Test Images & Ground Truth Graphs

In [11]:
root = Path().resolve().parents[1]

# 3 Datasets: Train, Test Real, Test Synthetic 
datasets = ['Dataset PID', 'PID2Graph OPEN100', 'PID2Graph Synthetic']

# Store paths for each dataset PID image and labels 
dataset_paths = {
    dataset: {'imgs': [], 'labels': []} for dataset in datasets
}

for dataset in datasets:
    
    file_pattern = '*_cropped.png' if dataset == 'Dataset PID' else ('*.png' if dataset == 'PID2Graph OPEN100' else '*.jpg')
    imgs = root / 'Data' / 'PID2Graph' / 'Complete' / dataset / file_pattern
    
    imgs = glob.glob(str(imgs))
    imgs.sort(key=lambda x: extract_number(Path(x).stem))

    graphs = root / 'Data' / 'PID2Graph' / 'Complete' / f'{dataset}' / '*.graphml'
    graphs = glob.glob(str(graphs))
    graphs.sort(key=lambda x: extract_number(Path(x).stem))

    dataset_paths[dataset]['imgs'] = imgs
    dataset_paths[dataset]['labels'] = graphs

### Load Pre-Trained Model

In [14]:
TILE_SIZE = 640
TILE_OVERLAP_RATIO = 0.2
CONFIDENCE_THRESHOLD = 0.25
IOU_THRESHOLD = 0.5

model_folder = 'train4'
model_path = f'runs/detect/{model_folder}/weights/best.pt'

# Load Model using above params
model = AutoDetectionModel.from_pretrained(
    model_type='yolov8',
    model_path=model_path,
    confidence_threshold=CONFIDENCE_THRESHOLD,
    device=get_device
)

In [16]:
detection_results = {
    key: {'map': []} for key in dataset_paths.keys()
}

In [None]:
taskdevice = get_device()
map_metric = MeanAveragePrecision(iou_type='bbox', class_metrics=True, iou_thresholds=[0.5], max_detection_thresholds=[100, 150, 200])

for dataset in tqdm(dataset_paths.keys(), desc='Dataset'):

    dataset_img_paths = dataset_paths[dataset]['imgs']
    dataset_graphs = dataset_paths[dataset]['labels']

    X1_CROP, Y1_CROP = get_origin_offset(dataset)
    
    for path, label in zip(dataset_img_paths, dataset_graphs):

        results = sliced_inference(model, img_path=path, slice_height=TILE_SIZE, slice_width=TILE_SIZE, h_ratio=TILE_OVERLAP_RATIO, w_ratio=TILE_OVERLAP_RATIO)
        pred_bboxes, scores, predicted_class = process_detections(results)
        
        graph = nx.read_graphml(label) 
        labels, labels_xyxy = extract_node_labels(graph, X1_CROP, Y1_CROP)
        
        
        preds = [{"boxes": pred_bboxes, "scores": scores, "labels": predicted_class}]
        targets = [{"boxes": labels_xyxy, "labels": labels}]
        map_metric.update(preds, targets)
        
    detection_results[dataset] = map_metric.compute()['map'].item()
    map_metric.reset()


In [None]:
detection_results

## Accounting for Classes

## Class Agnostic

In [None]:
model_folder = 'train8'
model_path = f'runs/detect/{model_folder}/weights/best.pt'

# Load Model using above params
model = AutoDetectionModel.from_pretrained(
    model_type='yolov8',
    model_path=model_path,
    confidence_threshold=CONFIDENCE_THRESHOLD,
    device=get_device
)

In [None]:
taskdevice = get_device()

for dataset in dataset_paths.keys():

    dataset_img_paths = dataset_paths[dataset]['imgs']
    dataset_graphs = dataset_paths[dataset]['labels']

    X1_CROP, Y1_CROP = get_origin_offset(dataset)

    map_metric = MeanAveragePrecision(iou_type='bbox', class_metrics=True, iou_thresholds=[0.5], max_detection_thresholds=[100, 150, 200])
    
    for path, label in zip(dataset_img_paths, dataset_graphs):

        results = sliced_inference(model, img_path=path, slice_height=TILE_SIZE, slice_width=TILE_SIZE, h_ratio=TILE_OVERLAP_RATIO, w_ratio=TILE_OVERLAP_RATIO)
        pred_bboxes, scores, predicted_class = process_detections(results, generic_object_detector=True)
        
        graph = nx.read_graphml(label) 
        labels, labels_xyxy = extract_node_labels(graph, X1_CROP, Y1_CROP, generic_object_detector=True)
        
        preds = [{"boxes": pred_bboxes, "scores": scores, "labels": predicted_class}]
        targets = [{"boxes": labels_xyxy, "labels": labels}]
        map_metric.update(preds, targets)

    detection_results[dataset] = map_metric.compute()['map'].item()
    map_metric.reset()

In [None]:
detection_results

In [None]:
model_folder = 'train8'
model_path = f'runs/detect/{model_folder}/weights/best.pt'

CONFIDENCE_THRESHOLD = 0.5

# Load Model using above params
model = AutoDetectionModel.from_pretrained(
    model_type='yolov8',
    model_path=model_path,
    confidence_threshold=CONFIDENCE_THRESHOLD,
    device=get_device
)

In [None]:
taskdevice = get_device()

for dataset in dataset_paths.keys():

    dataset_img_paths = dataset_paths[dataset]['imgs']
    dataset_graphs = dataset_paths[dataset]['labels']

    X1_CROP, Y1_CROP = get_origin_offset(dataset)

    map_metric = MeanAveragePrecision(iou_type='bbox', class_metrics=True, iou_thresholds=[0.5], max_detection_thresholds=[100, 150, 200])
    
    for path, label in zip(dataset_img_paths, dataset_graphs):

        results = sliced_inference(model, img_path=path, slice_height=TILE_SIZE, slice_width=TILE_SIZE, h_ratio=TILE_OVERLAP_RATIO, w_ratio=TILE_OVERLAP_RATIO)
        pred_bboxes, scores, predicted_class = process_detections(results, generic_object_detector=True)
        
        graph = nx.read_graphml(label) 
        labels, labels_xyxy = extract_node_labels(graph, X1_CROP, Y1_CROP, generic_object_detector=True)
        
        preds = [{"boxes": pred_bboxes, "scores": scores, "labels": predicted_class}]
        targets = [{"boxes": labels_xyxy, "labels": labels}]
        map_metric.update(preds, targets)

    detection_results[dataset] = map_metric.compute()['map'].item()
    map_metric.reset()

In [None]:
detection_results

In [12]:
model_folder = 'train8'
model_path = f'runs/detect/{model_folder}/weights/best.pt'

CONFIDENCE_THRESHOLD = 0.75

# Load Model using above params
model = AutoDetectionModel.from_pretrained(
    model_type='yolov8',
    model_path=model_path,
    confidence_threshold=CONFIDENCE_THRESHOLD,
    device=get_device
)

In [19]:
taskdevice = get_device()

for dataset in dataset_paths.keys():

    dataset_img_paths = dataset_paths[dataset]['imgs']
    dataset_graphs = dataset_paths[dataset]['labels']

    X1_CROP, Y1_CROP = get_origin_offset(dataset)

    map_metric = MeanAveragePrecision(iou_type='bbox', class_metrics=True, iou_thresholds=[0.5], max_detection_thresholds=[100, 150, 200])

    if dataset == 'PID2Graph OPEN100':
        TILE_SIZE = 320
    else:
        TILE_SIZE = 640
    
    for path, label in zip(dataset_img_paths, dataset_graphs):

        results = sliced_inference(model, img_path=path, slice_height=TILE_SIZE, slice_width=TILE_SIZE, h_ratio=TILE_OVERLAP_RATIO, w_ratio=TILE_OVERLAP_RATIO)
        pred_bboxes, scores, predicted_class = process_detections(results, generic_object_detector=True)
        
        graph = nx.read_graphml(label) 
        labels, labels_xyxy = extract_node_labels(graph, X1_CROP, Y1_CROP, generic_object_detector=True)
        
        preds = [{"boxes": pred_bboxes, "scores": scores, "labels": predicted_class}]
        targets = [{"boxes": labels_xyxy, "labels": labels}]
        map_metric.update(preds, targets)

    detection_results[dataset] = map_metric.compute()['map'].item()
    map_metric.reset()

Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 slices.
Performing prediction on 88 

In [20]:
detection_results

{'Dataset PID': 0.029440058395266533,
 'PID2Graph OPEN100': 0.0,
 'PID2Graph Synthetic': 0.009900989942252636}