In [None]:
import numpy as np
import pandas as pd
import time
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon
from skimage.draw import polygon
from PIL import Image, ImageDraw
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data.dataset import random_split

import sklearn.metrics as metrics
from sklearn.metrics import roc_auc_score
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")

from DSCLChecker.dscl import *
from DSCLChecker.monitor import *

In [None]:
def visualize_regions(image, region_info):
    H, W = image.shape[:2]
    label_map = np.zeros((H, W), dtype=int)
    for region_id, info in region_info.items():
        print(f"Region ID: {region_id}, Area: {len(info['pixels'])}")
        if len(info['pixels']) == 1:
            print(info['pixels'])
        for (r, c) in info['pixels']:
            label_map[r, c] = region_id

    plt.figure(figsize=(6, 6))
    plt.imshow(image, cmap='gray', interpolation='nearest')
    plt.imshow(label_map, cmap='tab10', alpha=0.5, interpolation='nearest')
    plt.axis('off')
    plt.title("Region Overlay")
    plt.show()

In [None]:
inference_images_path ="CheXmask-Database/HybridGNet/inference_outputs_0"

val_gt_file = pd.read_csv('~/CheXpert-v1.0-small/chexmask-segmentation-masks/OriginalResolution/CheXpert_cleaned_valid.csv')
val_gt_file.head(5)

In [None]:
def landmarks_to_mask(landmarks, height, width):
    landmarks = np.array(landmarks)
    r = landmarks[:, 1]
    c = landmarks[:, 0]

    rr, cc = polygon(r, c, shape=(height, width))
    mask = np.zeros((height, width), dtype=np.uint8)
    mask[rr, cc] = 1
    return mask

def parse_and_rescale_landmarks(row, original_size, new_size):
    landmarks_str = row["Landmarks"].replace('[ ', '[').replace('\n ', ',').replace('  ', ',').replace(' ', ',')
    landmarks = np.array(eval(landmarks_str)).reshape(-1, 2)
    
    right_lung = landmarks[:44]
    left_lung = landmarks[44:94]
    heart = landmarks[94:]
    orig_h, orig_w = original_size
    new_h, new_w = new_size
    scale_y = new_h / orig_h
    scale_x = new_w / orig_w

    def rescale(region):
        return np.round(region * [scale_x, scale_y]).astype(int)

    return {
        "right_lung": rescale(right_lung),
        "left_lung": rescale(left_lung),
        "heart": rescale(heart)
    }
    
def merge_masks_to_multiclass(gt_mask_rl, gt_mask_ll, gt_mask_h):
    height, width = gt_mask_rl.shape
    multi_mask = np.zeros((height, width), dtype=np.uint8)

    multi_mask[gt_mask_rl == 1] = 1  # Right lung
    multi_mask[gt_mask_ll == 1] = 2  # Left lung
    multi_mask[gt_mask_h == 1] = 3   # Heart

    return multi_mask

In [None]:
mask_root = "CheXmask-Database/HybridGNet/inference_outputs_0/masks"
conf_root = "CheXmask-Database/HybridGNet/inference_outputs_0/confidence"
image_root = "CheXmask-Database/HybridGNet/inference_outputs_0/images"

row = val_gt_file.iloc[3]
rel_path = row['Path'].replace('.jpg', '.png')
file_stem = os.path.splitext(os.path.basename(rel_path))[0]

mask_path = os.path.join(mask_root, os.path.dirname(rel_path), f"{file_stem}_seg.npy")
conf_path = os.path.join(conf_root, os.path.dirname(rel_path), f"{file_stem}_conf.npy")
image_path = os.path.join(image_root, rel_path)

seg_mask = np.load(mask_path)
conf_mask = np.load(conf_path)
image = Image.open(image_path).convert("L")

original_size = (row["Height"], row["Width"])
new_size = (320, 320)
rescaled = parse_and_rescale_landmarks(row, original_size, new_size)
rightLungLandmarks = rescaled["right_lung"]
leftLungLandmarks = rescaled["left_lung"]
heartLandmarks = rescaled["heart"]

gt_mask_ll = landmarks_to_mask(leftLungLandmarks, height=image.height, width=image.width)
gt_mask_rl = landmarks_to_mask(rightLungLandmarks, height=image.height, width=image.width)
gt_mask_h = landmarks_to_mask(heartLandmarks, height=image.height, width=image.width)
multi_class_mask = merge_masks_to_multiclass(gt_mask_rl, gt_mask_ll, gt_mask_h)

plt.figure(figsize=(20, 5))

plt.subplot(1, 4, 1)
plt.imshow(image, cmap='gray')
plt.imshow(gt_mask_ll, cmap='Reds', alpha=0.2)
plt.imshow(gt_mask_rl, cmap='Blues', alpha=0.2)
plt.imshow(gt_mask_h, cmap='Greens', alpha=0.2)

plt.title("Original Image")
plt.axis("off")

plt.subplot(1, 4, 2)
plt.imshow(multi_class_mask, cmap='Paired', interpolation='nearest')
plt.title("Segmentation GT")
plt.axis("off")

plt.subplot(1, 4, 3)
plt.imshow(seg_mask, cmap='Paired', interpolation='nearest')
plt.title("Segmentation Mask")
plt.axis("off")

plt.subplot(1, 4, 4)
plt.imshow(conf_mask)
plt.title("Probability Mask")
plt.axis("off")

plt.tight_layout()
plt.show()

In [None]:
region_map = np.zeros((320, 320), dtype=np.uint8)
for idx, region in enumerate(region_info.values()):
    region_id = region['region_id']
    for (r, c) in region['pixels']:
        region_map[r, c] = region_id

In [None]:
def plot_each_label(seg_mask):
    labels = np.unique(seg_mask)
    labels = labels[labels != 0]
    n = len(labels)
    plt.figure(figsize=(4 * n, 4))
    for i, label in enumerate(labels):
        print(f"Label {label}: {np.sum(seg_mask == label)} pixels")
        plt.subplot(1, n, i + 1)
        mask = (seg_mask == label)
        plt.imshow(mask, cmap='gray')
        plt.title(f"Label {label}")
        plt.axis('off')
    plt.tight_layout()
    plt.show()

In [None]:
processed_images = []
for idx, row in val_gt_file.iterrows():
    rel_path = row['Path'].replace('.jpg', '.png')
    file_stem = os.path.splitext(os.path.basename(rel_path))[0]

    mask_path = os.path.join(mask_root, os.path.dirname(rel_path), f"{file_stem}_seg.npy")
    conf_path = os.path.join(conf_root, os.path.dirname(rel_path), f"{file_stem}_conf.npy")
    image_path = os.path.join(image_root, rel_path)

    seg_mask = np.load(mask_path).astype(np.uint8)        
    conf_mask = np.load(conf_path).astype(np.float32)     
    image = np.array(Image.open(image_path).convert("L")).astype(np.float32)  
    image /= 255.0

    region_info = extract_component_bounds(seg_mask)
    region_map = np.zeros((320, 320), dtype=np.uint8)
    for region in region_info.values():
        for (r, c) in region['pixels']:
            region_map[r, c] = region['region_id']

    combined = np.zeros((320, 320, 4), dtype=np.float32)
    combined[:, :, 0] = region_map.astype(np.float32) 
    combined[:, :, 1] = seg_mask.astype(np.float32)
    combined[:, :, 2] = conf_mask
    combined[:, :, 3] = image

    processed_images.append(combined)


In [None]:
label_map = {
    1: "Right lung",
    2: "Left lung",
    3: "Heart"
}

all_quant = []
all_qual = []
violated_indices = []

for idx, combined in enumerate(processed_images):

    label_to_region = {}
    
    region_info = extract_component_bounds(combined[:,:,1])

    for region_id, info in region_info.items():
        label_value = info['value']
        if label_value in label_map:
            label = label_map[label_value]
            label_to_region[label] = region_id
            
    r_tl_rl = region_info[label_to_region['Right lung']]['top_left']
    r_br_rl = region_info[label_to_region['Right lung']]['bottom_right']
    r_tr_rl = region_info[label_to_region['Right lung']]['top_right']
    r_bl_rl = region_info[label_to_region['Right lung']]['bottom_left']

    r_tl_ll = region_info[label_to_region['Left lung']]['top_left']
    r_br_ll = region_info[label_to_region['Left lung']]['bottom_right']
    r_tr_ll = region_info[label_to_region['Left lung']]['top_right']
    r_bl_ll = region_info[label_to_region['Left lung']]['bottom_left']
    logic_strong_dist = "solidtriangle E [20] ({}, {}, {}, {}) (forall ({}, {}, {}, {}) p.class == 2)".format(r_tl_rl, r_tr_rl, r_bl_rl, r_br_rl, r_tl_ll, r_tr_ll, r_bl_ll, r_br_ll)
    parsed = parse(logic_strong_dist)
    parsed_result_val = quantitativescore(parsed, combined)
    parsed = parse(logic_strong_dist)
    parsed_result_bool = qualitativescore(parsed, combined)
    if not parsed_result_bool:
        violated_indices.append(idx)
    
    all_quant.append(parsed_result_val)
    all_qual.append(parsed_result_bool)

true_rate = sum(all_qual) / len(all_qual) if all_qual else 0
avg_quant = sum(all_quant) / len(all_quant) if all_quant else 0

print(f"Qualitative True Rate: {true_rate:.2f} ({sum(all_qual)} out of {len(all_qual)})")
print(f"Average Quantitative Score: {avg_quant:.4f}")


In [None]:
label_map = {1: "Right lung", 2: "Left lung", 3: "Heart"}

all_quant = []
all_qual = []
violated_indices = []
monitor_times = []

for idx, combined in enumerate(processed_images):
    region_info = extract_component_bounds(combined[:, :, 1])

    # --- Per-image directional distance spec ---
    label_to_region = {}
    for region_id, info in region_info.items():
        label_value = info['value']
        if label_value in label_map:
            label = label_map[label_value]
            label_to_region[label] = region_id

    try:
        r_tl_rl = region_info[label_to_region['Right lung']]['top_left']
        r_br_rl = region_info[label_to_region['Right lung']]['bottom_right']
        r_tr_rl = region_info[label_to_region['Right lung']]['top_right']
        r_bl_rl = region_info[label_to_region['Right lung']]['bottom_left']

        r_tl_ll = region_info[label_to_region['Left lung']]['top_left']
        r_br_ll = region_info[label_to_region['Left lung']]['bottom_right']
        r_tr_ll = region_info[label_to_region['Left lung']]['top_right']
        r_bl_ll = region_info[label_to_region['Left lung']]['bottom_left']

        logic_strong_dist = (
            "solidtriangle E [20] ({}, {}, {}, {}) (forall ({}, {}, {}, {}) p.class == 2)"
            .format(r_tl_rl, r_tr_rl, r_bl_rl, r_br_rl,
                    r_tl_ll, r_tr_ll, r_bl_ll, r_br_ll)
        )
        
        logic_test_1 = "forall ({}, {}, {}, {}) p.class == 2".format(r_tl_rl, r_tr_rl, r_bl_rl, r_br_rl)
        test_pixel_logic = "( ON (p(1,2).prob <=  0.7000 & p(1,2).prob >=  0.5)  )"

        parsed = parse(test_pixel_logic)
        start_time = time.time()
        parsed_result_val = quantitativescore(parsed, combined)
        elapsed_time = time.time() - start_time

        monitor_times.append(elapsed_time)

        if not parsed_result_bool:
            violated_indices.append(idx)

    except KeyError:
        print(f"[Warning] Missing lung labels in image {idx}, skipping directional check.")

# --- Summary ---
true_rate = sum(all_qual) / len(all_qual) if all_qual else 0
avg_quant = sum(all_quant) / len(all_quant) if all_quant else 0
avg_time = np.mean(monitor_times)
std_time = np.std(monitor_times)

print(f"Directional Constraint True Rate: {true_rate:.2f} ({sum(all_qual)} out of {len(all_qual)})")
print(f"Average Quantitative Score (directional): {avg_quant:.4f}")
print(f"Avg monitoring time per image: {avg_time:.6f} sec")
print(f"Std of monitoring time: {std_time:.6f} sec")

In [None]:
label_map = {
    1: "Right lung",
    2: "Left lung",
    3: "Heart"
}

all_quant = []
all_qual = []
violated_indices = []
first_satisfying_steps = []

for idx, combined in enumerate(processed_images):

    label_to_region = {}
    region_info = extract_component_bounds(combined[:, :, 1])

    for region_id, info in region_info.items():
        label_value = info['value']
        if label_value in label_map:
            label = label_map[label_value]
            label_to_region[label] = region_id

    try:
        r_tl_rl = region_info[label_to_region['Right lung']]['top_left']
        r_br_rl = region_info[label_to_region['Right lung']]['bottom_right']
        r_tr_rl = region_info[label_to_region['Right lung']]['top_right']
        r_bl_rl = region_info[label_to_region['Right lung']]['bottom_left']

        r_tl_ll = region_info[label_to_region['Left lung']]['top_left']
        r_br_ll = region_info[label_to_region['Left lung']]['bottom_right']
        r_tr_ll = region_info[label_to_region['Left lung']]['top_right']
        r_bl_ll = region_info[label_to_region['Left lung']]['bottom_left']
    except KeyError:
        print(f"Missing label in image {idx}, skipping.")
        continue

    found = False
    for step_val in range(5, 101):
        logic_strong_dist = (
            "solidtriangle E [{}] ({}, {}, {}, {}) (forall ({}, {}, {}, {}) p.class == 2)"
            .format(step_val, r_tl_rl, r_tr_rl, r_bl_rl, r_br_rl, r_tl_ll, r_tr_ll, r_bl_ll, r_br_ll)
        )
        parsed = parse(logic_strong_dist)
        if not qualitativescore(parsed, combined):
            first_satisfying_steps.append(step_val)
            all_qual.append(True)
            found = True
            break

    if not found:
        all_qual.append(False)
        first_satisfying_steps.append(np.nan)
        violated_indices.append(idx)

    # Fixed quantitative evaluation at step=100
    logic_fixed = (
        "solidtriangle E [100] ({}, {}, {}, {}) (forall ({}, {}, {}, {}) p.class == 2)"
        .format(r_tl_rl, r_tr_rl, r_bl_rl, r_br_rl, r_tl_ll, r_tr_ll, r_bl_ll, r_br_ll)
    )
    parsed_val = parse(logic_fixed)
    parsed_result_val = quantitativescore(parsed_val, combined)
    all_quant.append(parsed_result_val)

# Summary metrics
true_rate = sum(all_qual) / len(all_qual) if all_qual else 0
avg_quant = sum(all_quant) / len(all_quant) if all_quant else 0
mean_first_step = np.nanmean(first_satisfying_steps)

print(f"Qualitative True Rate: {true_rate:.2f} ({sum(all_qual)} out of {len(all_qual)})")
print(f"Average Quantitative Score: {avg_quant:.4f}")
print(f"Average First Satisfying Step (mean over images): {mean_first_step:.2f}")

In [None]:
label_map = {
    1: "Right lung",
    2: "Left lung",
    3: "Heart"
}

all_quant = []
all_qual = []
violated_indices = []

for idx, combined in enumerate(processed_images):

    label_to_region = {}
    
    region_info = extract_component_bounds(combined[:,:,1])

    for region_id, info in region_info.items():
        label_value = info['value']
        if label_value in label_map:
            label = label_map[label_value]
            label_to_region[label] = region_id
            
    r_tl_rl = region_info[label_to_region['Right lung']]['top_left']
    r_br_rl = region_info[label_to_region['Right lung']]['bottom_right']
    r_tr_rl = region_info[label_to_region['Right lung']]['top_right']
    r_bl_rl = region_info[label_to_region['Right lung']]['bottom_left']

    r_tl_ll = region_info[label_to_region['Heart']]['top_left']
    r_br_ll = region_info[label_to_region['Heart']]['bottom_right']
    r_tr_ll = region_info[label_to_region['Heart']]['top_right']
    r_bl_ll = region_info[label_to_region['Heart']]['bottom_left']
    logic_strong_dist = "hollowtriangle E [1] ({}, {}, {}, {}) (forall ({}, {}, {}, {}) p.class == 3)".format(r_tl_rl, r_tr_rl, r_bl_rl, r_br_rl, r_tl_ll, r_tr_ll, r_bl_ll, r_br_ll)
    parsed = parse(logic_strong_dist)
    parsed_result_val = quantitativescore(parsed, combined)
    parsed = parse(logic_strong_dist)
    parsed_result_bool = qualitativescore(parsed, combined)
    
    if not parsed_result_bool:
        violated_indices.append(idx)
    
    all_quant.append(parsed_result_val)
    all_qual.append(parsed_result_bool)

true_rate = sum(all_qual) / len(all_qual) if all_qual else 0
avg_quant = sum(all_quant) / len(all_quant) if all_quant else 0

print(f"Qualitative True Rate: {true_rate:.2f} ({sum(all_qual)} out of {len(all_qual)})")
print(f"Average Quantitative Score: {avg_quant:.4f}")