In [None]:
import pandas as pd
import torch
import numpy as np
from matplotlib import pyplot as plt
from shapely.geometry import LineString
from shapely.ops import polygonize, unary_union
from shapely.geometry import Point

from sklearn.metrics import precision_score, recall_score, f1_score

import os
import sys
from pathlib import Path
sys.path.append(str(Path(os.getcwd()).parent))
import json

from price_net.schema import PriceScene
from price_net.utils import plot_price_scene, plot_bboxes

Load dataset directory

In [2]:
while True:
    dataset_dir = Path(input("Input the dataset directory: "))
    if dataset_dir.exists() and dataset_dir.is_dir():
        break
    print("Invalid dataset directory. Please try again.")

Load directory containing hough line npy files

In [3]:
while True:
    hl_dir = Path(input("Input the directory where the hough line numpy files are located: "))
    if hl_dir.exists() and hl_dir.is_dir():
        break
    print("Invalid directory. Please try again.")

Load raw csv and scenes from json

In [None]:
with open(dataset_dir / "raw_price_scenes.json", "r") as f:
   scenes = [PriceScene(**scene) for scene in json.load(f)]

In [None]:
def get_ground_truth_edges_from_scene(scene: PriceScene):
    """
    Extract ground truth product-price associations from a PriceScene.
    Returns a set of (product_id, price_id) tuples representing true associations.
    """
    gt_edges = set()
    
    # Get associations from price_groups
    for price_group in scene.price_groups:
        for price_id in price_group.price_bbox_ids:
            for product_id in price_group.product_bbox_ids:
                gt_edges.add((product_id, price_id))
    
    return gt_edges

# Visualization

In [5]:
def visualize_hough_predictions(scene_idx, scenes, dataset_dir, hl_dir, show_regions=True):
    """
    Visualize Hough line baseline predictions with side-by-side image and graph
    """
    scene = scenes[scene_idx]
    
    # Load image
    image_path = dataset_dir / "images" / f"{scene.scene_id}.jpg"
    if not image_path.exists():
        # Try alternative extensions
        for ext in [".png", ".jpeg"]:
            alt_path = dataset_dir / "images" / f"{scene.scene_id}{ext}"
            if alt_path.exists():
                image_path = alt_path
                break
    
    image = plt.imread(image_path)
    height, width = image.shape[:2]
    
    # Get Hough lines for region visualization
    hl_path = hl_dir / f"{scene.scene_id}.npy"
    shelf_array = np.load(hl_path) if hl_path.exists() else np.array([])
    
    # Create figure with side-by-side plots
    fig, axs = plt.subplots(1, 3, figsize=(18, 6))
    graph_ax, image_ax, regions_ax = axs
    
    # Plot 1: Graph representation with ground truth and predictions
    plot_price_scene(scene, ax=graph_ax)
    graph_ax.set_title(f"Scene {scene.scene_id}: Ground Truth vs Predictions", fontsize=12)
    graph_ax.set_xlabel("Normalized X coordinate")
    graph_ax.set_ylabel("Normalized Y coordinate")
    
    # Get ground truth and predicted edges
    gt_edges = get_ground_truth_edges_from_scene(scene)
    
    # Get predicted edges from Hough line method
    pred_edges = []
    
    # Extract product and price info
    prod_ids = list(scene.product_bboxes.keys())
    prod_bboxes = torch.stack([scene.product_bboxes[id_].to_tensor() for id_ in prod_ids])
    price_ids = list(scene.price_bboxes.keys())
    price_bboxes = torch.stack([scene.price_bboxes[id_].to_tensor() for id_ in price_ids])
    
    # Convert to pixel coordinates for image processing
    prod_centroids_px = torch.stack([prod_bboxes[:, 0] * width, prod_bboxes[:, 1] * height], axis=1)
    price_centroids_px = torch.stack([price_bboxes[:, 0] * width, price_bboxes[:, 1] * height], axis=1)
    
    if len(shelf_array) > 0:
        # Create regions from Hough lines
        # shelf_array format is [y1, x1, y2, x2], so we create LineString with [(x1, y1), (x2, y2)]
        line_strings = [LineString([(shelf_array[i, 1], shelf_array[i, 0]), 
                                   (shelf_array[i, 3], shelf_array[i, 2])]) 
                       for i in range(len(shelf_array))]
        line_strings.extend([
            LineString([(0, 0), (width-1, 0)]),
            LineString([(0, 0), (0, height-1)]),
            LineString([(width-1, 0), (width-1, height-1)]),
            LineString([(0, height-1), (width-1, height-1)])
        ])

        union_result = unary_union(line_strings)
        if hasattr(union_result, 'geoms'):
            regions = list(polygonize(union_result.geoms))
        else:
            regions = list(polygonize([union_result]))
                
        # Find predicted edges using regions
        for region in regions:
            region_products = []
            region_prices = []
            
            for idx in range(len(prod_ids)):
                point = Point(prod_centroids_px[idx])
                if region.contains(point):
                    region_products.append(idx)
            
            for idx in range(len(price_ids)):
                point = Point(price_centroids_px[idx])
                if region.contains(point):
                    region_prices.append(idx)
            
            for prod_idx in region_products:
                if len(region_prices) == 0:
                    continue
                    
                prod_coords = prod_centroids_px[prod_idx]
                price_coords = price_centroids_px[region_prices]
                dists = torch.norm(price_coords - prod_coords, dim=1)
                nearest_idx = torch.argmin(dists).item()
                nearest_price_idx = region_prices[nearest_idx]
                pred_edges.append((prod_ids[prod_idx], price_ids[nearest_price_idx]))
    
    # Draw prediction lines on graph
    for prod_id, price_id in pred_edges:
        prod_bbox = scene.product_bboxes[prod_id]
        price_bbox = scene.price_bboxes[price_id]
        
        # Check if this is a true positive, false positive
        is_correct = (prod_id, price_id) in gt_edges
        color = 'green' if is_correct else 'red'
        alpha = 0.8 if is_correct else 0.5
        linestyle = '-' if is_correct else '--'
        
        graph_ax.plot([prod_bbox.cx, price_bbox.cx], 
                     [prod_bbox.cy, price_bbox.cy], 
                     color=color, alpha=alpha, linestyle=linestyle, linewidth=2)
    
    # Add legend
    from matplotlib.lines import Line2D
    legend_elements = [
        Line2D([0], [0], color='grey', lw=2, label='Ground Truth'),
        Line2D([0], [0], color='green', lw=2, label='True Positive'),
        Line2D([0], [0], color='red', lw=2, linestyle='--', label='False Positive'),
        Line2D([0], [0], color='black', marker='x', linestyle='None', label='Price Tags'),
        Line2D([0], [0], color='blue', marker='o', linestyle='None', label='Products')
    ]
    graph_ax.legend(handles=legend_elements, loc='upper right')
    
    # Plot 2: Original image with bounding boxes and predictions
    image_ax.imshow(image)
    image_ax.set_title(f"Image with Predictions", fontsize=12)
    image_ax.axis('off')
    
    # Plot bounding boxes on image
    plot_bboxes(scene.product_bboxes.values(), ax=image_ax, color='blue', width=width, height=height)
    plot_bboxes(scene.price_bboxes.values(), ax=image_ax, color='red', linestyle='--', width=width, height=height)
    
    # Draw prediction lines on image
    for prod_id, price_id in pred_edges:
        prod_bbox = scene.product_bboxes[prod_id]
        price_bbox = scene.price_bboxes[price_id]
        
        is_correct = (prod_id, price_id) in gt_edges
        color = 'lime' if is_correct else 'red'
        alpha = 0.8
        
        image_ax.plot([prod_bbox.cx * width, price_bbox.cx * width], 
                     [prod_bbox.cy * height, price_bbox.cy * height], 
                     color=color, alpha=alpha, linewidth=2)
    
    # Plot 3: Hough line regions
    regions_ax.imshow(image, alpha=0.7)
    regions_ax.set_title(f"Hough Line Regions", fontsize=12)
    regions_ax.axis('off')
    
    if show_regions and len(shelf_array) > 0:
        # Draw Hough lines
        for i in range(len(shelf_array)):
            y1, x1, y2, x2 = shelf_array[i]
            regions_ax.plot([x1, x2], [y1, y2], 'yellow', linewidth=4, alpha=0.8)
    
    # Add product and price centroids
    for prod_id in prod_ids:
        bbox = scene.product_bboxes[prod_id]
        regions_ax.scatter(bbox.cx * width, bbox.cy * height, c='blue', s=50, alpha=0.8)
    
    for price_id in price_ids:
        bbox = scene.price_bboxes[price_id]
        regions_ax.scatter(bbox.cx * width, bbox.cy * height, c='red', s=50, marker='x', alpha=0.8)
    
    plt.tight_layout()
    
    # Print metrics for this scene
    tp = len([edge for edge in pred_edges if edge in gt_edges])
    fp = len(pred_edges) - tp
    fn = len(gt_edges) - tp
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    
    print(f"Scene {scene.scene_id} Metrics:")
    print(f"  Regions: {len(regions)}")
    print(f"  Ground Truth Edges: {len(gt_edges)}")
    print(f"  Predicted Edges: {len(pred_edges)}")
    print(f"  True Positives: {tp}")
    print(f"  False Positives: {fp}")
    print(f"  False Negatives: {fn}")
    print(f"  Precision: {precision:.3f}")
    print(f"  Recall: {recall:.3f}")
    print(f"  F1: {f1:.3f}")
    
    plt.show()
    
    return

In [7]:
from ipywidgets import widgets, interact

def interactive_scene_viewer(scene_idx=0, show_regions=True):
    return visualize_hough_predictions(scene_idx, scenes, dataset_dir, hl_dir, show_regions)

# Create interactive widget
scene_slider = widgets.IntSlider(
    value=0, 
    min=0, 
    max=len(scenes)-1, 
    step=1, 
    description='Scene Index:'
)

regions_checkbox = widgets.Checkbox(
    value=True,
    description='Show Regions'
)

interact(interactive_scene_viewer, scene_idx=scene_slider, show_regions=regions_checkbox)

interactive(children=(IntSlider(value=0, description='Scene Index:', max=488), Checkbox(value=True, descriptio…

<function __main__.interactive_scene_viewer(scene_idx=0, show_regions=True)>

# Overall evaluation metrics

In [8]:
def get_houghline_features_and_eval(scenes, dataset_dir_, idx=0):
    scene = scenes[idx]

    image_path = dataset_dir / "images" / f"{scene.scene_id}.jpg"
    image = plt.imread(image_path)
    height, width = image.shape[:2]

    hl_path = hl_dir / f"{scene.scene_id}.npy"
    hl_path = dataset_dir_.parent / hl_path
    shelf_array = np.load(hl_path)

    prod_ids: list[str] = []
    prod_bboxes = []
    for id_, bbox in scene.product_bboxes.items():
        prod_ids.append(id_)
        prod_bboxes.append(bbox.to_tensor())
    prod_bboxes = torch.stack(prod_bboxes)

    price_ids: list[str] = []
    price_bboxes = []
    for id_, bbox in scene.price_bboxes.items():
        price_ids.append(id_)
        price_bboxes.append(bbox.to_tensor())
    price_bboxes = torch.stack(price_bboxes)

    cx, cy = prod_bboxes[:, 0] * width, prod_bboxes[:, 1] * height
    prod_centroids = torch.stack([cx, cy], axis=1)
    cx, cy = price_bboxes[:, 0] * width, price_bboxes[:, 1] * height
    price_centroids = torch.stack([cx, cy], axis=1)

    # shelf_array format is [y1, x1, y2, x2], so we create LineString with [(x1, y1), (x2, y2)]
    line_strings = [LineString([(shelf_array[i, 1], shelf_array[i, 0]), (shelf_array[i, 3], shelf_array[i, 2])]) for i in range(len(shelf_array))]
    # Add the image boundaries as lines
    line_strings = line_strings + [LineString([(0, 0), (width-1, 0)]), LineString([(0,0), (0,height-1)]), LineString([(width-1, 0), (width-1, height-1)]), LineString([(0, height-1), (width-1, height-1)])]
    
    union_result = unary_union(line_strings)
    if hasattr(union_result, 'geoms'):
        regions = list(polygonize(union_result.geoms))
    else:
        regions = list(polygonize([union_result]))


    pred_edges = []
    for region in regions:
        # Find products in this region
        region_products = []
        for idx in range(len(prod_ids)):
            point = Point(prod_centroids[idx])
            if region.contains(point):
                region_products.append(idx)
        # Find prices in this region
        region_prices = []
        for idx in range(len(price_ids)):
            point = Point(price_centroids[idx])
            if region.contains(point):
                region_prices.append(idx)

        # Connect all products in this region to the nearest price tag
        for prod_idx in region_products:
            # Get the centroid (x, y) of the product
            prod_coords = prod_centroids[prod_idx, :2]
            # If there are no price tags in the region, skip
            if len(region_prices) == 0:
                continue
            price_coords = price_centroids[region_prices, :2]
            # compute distances from product to all price tags in this region
            dists = torch.norm(price_coords - prod_coords, dim=1)
            # Find the nearest price tag
            nearest_idx = torch.argmin(dists)
            nearest_price_idx = region_prices[nearest_idx]
            pred_edges.append((prod_ids[prod_idx], price_ids[nearest_price_idx]))

    # Get ground truth edges using the helper function
    gt_edges = get_ground_truth_edges_from_scene(scene)
    pred_edges_set = set(pred_edges)

    # For evaluation, create binary labels for all possible price-product pairs in this image
    all_pairs = []
    y_true = []
    y_pred = []
    for price_id in price_ids:
        for prod_id in prod_ids:
            pair = (prod_id, price_id)
            all_pairs.append(pair)
            y_true.append(pair in gt_edges)
            y_pred.append(pair in pred_edges_set)

    return {
        "y_true": np.array(y_true),
        "y_pred": np.array(y_pred),
    }

In [9]:
all_results = []
for i in range(len(scenes)):
    result = get_houghline_features_and_eval(scenes, dataset_dir, idx=i)
    all_results.append(result)

# Aggregate all predictions and ground truths
all_y_true = np.concatenate([res["y_true"] for res in all_results])
all_y_pred = np.concatenate([res["y_pred"] for res in all_results])

# Compute metrics on the aggregated results
precision = precision_score(all_y_true, all_y_pred, zero_division=0)
recall = recall_score(all_y_true, all_y_pred, zero_division=1)
f1 = f1_score(all_y_true, all_y_pred, zero_division=0)

print(f"Precision: {precision:.3f}, Recall: {recall:.3f}, F1: {f1:.3f}")

Precision: 0.755, Recall: 0.383, F1: 0.509
