In [None]:
import json
import re

import pandas as pd
import requests

from portal_visualization.builder_factory import get_view_config_builder
from portal_visualization.constants import image_units
from portal_visualization.utils import get_image_scale, get_physical_size_units

## Configuration

Set your authentication token, pipeline value, and environment below.

In [None]:
# Authentication token (Globus groups token)
# easiest way to retrieve = log in to environment, open dev tools, and copy the value of `Authentication` header from any request to the search API
TOKEN = ""

if not TOKEN:
    import warnings;
    warnings.warn("Authentication token is required to view QA datasets. Results will only include public datasets. Please set the TOKEN variable to view private datasets.")

# Pipeline value to search for
PIPELINE = "Kaggle-1 Glomerulus Segmentation"

if not PIPELINE:
    raise ValueError("Pipeline value is required. Please set the PIPELINE variable.")

# Environment: "dev," "test," or "prod"
ENV = "prod"


if ENV not in ["dev", "test", "prod"]:
    raise ValueError(f"Invalid environment: {ENV}. Must be 'dev', 'test', or 'prod'.")

ENDPOINTS = {
    "dev": {
        "search_api": "https://search-api.dev.hubmapconsortium.org/v3/portal/search",
        "assets": "https://assets.dev.hubmapconsortium.org",
    },
    "prod": {
        "search_api": "https://search.api.hubmapconsortium.org/v3/portal/search",
        "assets": "https://assets.hubmapconsortium.org",
    },
    "test": {
        "search_api": "https://search-api.test.hubmapconsortium.org/v3/portal/search",
        "assets": "https://assets.test.hubmapconsortium.org",
    }
}

SEARCH_API = ENDPOINTS[ENV]["search_api"]
ASSETS_URL = ENDPOINTS[ENV]["assets"]

print(f"Looking for {PIPELINE} datasets in {ENV} environment.{' (including private datasets)' if TOKEN else ' (public datasets only)'}")

## Helper Functions

In [None]:
def get_headers():
    headers = {}
    if TOKEN:
        headers["Authorization"] = f"Bearer {TOKEN}"
    return headers


def search_api_query(query):
    """Execute a search API query and return hits."""
    response = requests.post(SEARCH_API, headers=get_headers(), json=query)
    response.raise_for_status()
    data = response.json()
    return data["hits"]["hits"]


def get_entity_by_uuid(uuid):
    """Fetch a single entity from the search API."""
    query = {"query": {"ids": {"values": [uuid]}}}
    hits = search_api_query(query)
    if hits:
        return hits[0]["_source"]
    return None


def find_support_entity(parent_uuid):
    """Find the most recent is_support + is_image descendant of the given UUID.

    Mirrors client.py's get_descendant_to_lift() method.
    Returns the most recently modified support entity, or None.
    """
    query = {
        "query": {
            "bool": {
                "must": [
                    {"term": {"vitessce-hints": "is_support"}},
                    {"term": {"vitessce-hints": "is_image"}},
                    {"term": {"ancestor_ids": parent_uuid}},
                    {"terms": {"mapped_status.keyword": ["QA", "Published"]}},
                ]
            }
        },
        "sort": [{"last_modified_timestamp": {"order": "desc"}}],
        "size": 1,
    }
    hits = search_api_query(query)
    if hits:
        return hits[0]["_source"]
    return None


def vis_lift_entity(entity, max_depth=3):
    """Replicate the portal's vis-lifting chain (client.py:get_vitessce_conf_cells_and_lifted_uuid).

    The portal doesn't directly build configs from the entity you're viewing.
    Instead, it checks if the entity has a newer support descendant (is_support + is_image)
    and recursively follows that chain. The final entity is what gets visualized,
    with the previous entity in the chain as its parent.

    Returns (entity_to_visualize, parent_entity, vis_lifted_uuid or None).
    """
    descendant = find_support_entity(entity["uuid"])

    if descendant and max_depth > 0:
        # Ensure files are available on the descendant
        if not descendant.get("files") and descendant.get("metadata", {}).get("files"):
            descendant["files"] = descendant["metadata"]["files"]

        if descendant.get("files") or descendant.get("metadata", {}).get("files"):
            # Vis-lift: the descendant becomes the entity to visualize,
            # the current entity becomes the parent
            vis_lifted_uuid = descendant["uuid"]

            # Recursively check if the descendant also has a support descendant
            inner_entity, inner_parent, inner_lifted = vis_lift_entity(
                descendant, max_depth=max_depth - 1
            )

            # If inner vis-lifting happened, use that result
            if inner_lifted:
                return inner_entity, inner_parent, inner_lifted

            # Otherwise, the descendant is the entity, current is the parent
            return descendant, entity, vis_lifted_uuid

    # No vis-lifting: return entity with no parent (caller will resolve parent)
    return entity, None, None

## Step 1: Fetch All Datasets with Input Pipeline Value

In [None]:
pipeline_query = {
    "query": {
        "bool": {
            "must": [
                {"term": {"entity_type.keyword": "Dataset"}},
                {"term": {"pipeline.keyword": PIPELINE}},
            ]
        }
    },
    "size": 10000,
    "_source": [
        "uuid",
        "hubmap_id",
        "pipeline",
        "immediate_ancestor_ids",
        "files",
        "vitessce-hints",
        "soft_assaytype",
        "mapped_status",
        "status",
        "metadata",
    ],
}

pipeline_hits = search_api_query(pipeline_query)
datasets = [hit["_source"] for hit in pipeline_hits]
print(f"Found {len(datasets)} datasets with pipeline '{PIPELINE}'")

for ds in datasets:
    parent_ids = ds.get("immediate_ancestor_ids", [])
    print(
        f"  {ds.get('hubmap_id', 'N/A')} ({ds['uuid'][:8]}...) "
        f"status={ds.get('mapped_status', ds.get('status', 'unknown'))} "
        f"parents={parent_ids}"
    )

## Step 2: Generate Vitessce Configurations

For each dataset, replicate the portal's vis-lifting chain: check if the entity
has a newer support descendant, and if so, use that descendant for config generation.
This matches how the portal actually generates configurations at runtime.

In [None]:
results = []

for ds in datasets:
    uuid = ds["uuid"]
    hubmap_id = ds.get("hubmap_id", "N/A")
    parent_ids = ds.get("immediate_ancestor_ids", [])

    print("Processing dataset {} ({}) with parents {}".format(hubmap_id, uuid[:8], parent_ids))

    # Ensure files are present in the expected format
    if not ds.get("files") and ds.get("metadata", {}).get("files"):
        ds["files"] = ds["metadata"]["files"]

    if not ds.get("files"):
        results.append(
            {
                "uuid": uuid,
                "hubmap_id": hubmap_id,
                "parent_uuid": parent_ids[0] if parent_ids else None,
                "status": "error",
                "error": "No files found in entity",
                "base_image_source": None,
                "conf": None,
            }
        )
        continue

    parent_uuid = parent_ids[0] if parent_ids else None
    if parent_uuid is None:
        results.append(
            {
                "uuid": uuid,
                "hubmap_id": hubmap_id,
                "parent_uuid": None,
                "status": "error",
                "error": "No parent UUID found",
                "base_image_source": None,
                "conf": None,
            }
        )
        continue

    try:
        # Replicate the portal's vis-lifting chain:
        # Check if this entity has a newer support descendant, and if so,
        # use that descendant as the entity to build the config for.
        entity_to_viz, vis_parent, vis_lifted_uuid = vis_lift_entity(ds)

        # If vis-lifting happened, use the vis-lifted entity/parent.
        # Otherwise, resolve the parent from the original entity's ancestors.
        if vis_lifted_uuid:
            print(
                f"  {hubmap_id}: vis-lifted to {vis_lifted_uuid[:8]}... "
                f"(parent={vis_parent['uuid'][:8]}...)"
            )
        else:
            parent_entity = get_entity_by_uuid(parent_uuid)
            entity_to_viz = ds
            vis_parent = parent_entity or {"uuid": parent_uuid}

        # get_view_config_builder expects a UUID string for parent, not a dict
        parent_uuid_str = vis_parent["uuid"] if isinstance(vis_parent, dict) else vis_parent

        Builder = get_view_config_builder(
            entity_to_viz, get_entity_by_uuid, parent_uuid_str
        )
        builder = Builder(
            entity_to_viz,
            TOKEN,
            ASSETS_URL,
            get_entity=get_entity_by_uuid,
            parent=vis_parent,
            find_support_entity=find_support_entity,
        )

        conf_cells = builder.get_conf_cells()

        # Read the debug flag
        base_image_source = getattr(builder, "base_image_source", "unknown")

        results.append(
            {
                "uuid": uuid,
                "hubmap_id": hubmap_id,
                "parent_uuid": parent_uuid,
                "vis_lifted_uuid": vis_lifted_uuid,
                "entity_visualized": entity_to_viz["uuid"],
                "status": "success",
                "error": None,
                "base_image_source": base_image_source,
                "conf": conf_cells.conf,
                "builder_class": builder.__class__.__name__,
            }
        )
        print(
            f"  OK: {hubmap_id} - source={base_image_source}, "
            f"builder={builder.__class__.__name__}"
        )

    except Exception as e:
        results.append(
            {
                "uuid": uuid,
                "hubmap_id": hubmap_id,
                "parent_uuid": parent_uuid,
                "vis_lifted_uuid": None,
                "entity_visualized": None,
                "status": "error",
                "error": str(e),
                "base_image_source": None,
                "conf": None,
            }
        )
        print(f"  FAIL: {hubmap_id} - {e}")

succeeded = sum(1 for r in results if r["status"] == "success")
failed = sum(1 for r in results if r["status"] == "error")
vis_lifted = sum(1 for r in results if r.get("vis_lifted_uuid"))
print(f"\nSucceeded: {succeeded}, Failed: {failed}, Vis-lifted: {vis_lifted}")

## Step 3: Extract OME-TIFF URLs from Configurations

In [None]:
def extract_urls_from_conf(conf):
    """Extract base image and segmentation image URLs from a Vitessce config."""
    info = {
        "base_image_url": None,
        "seg_image_url": None,
        "coordinate_transformations": None,
    }

    if conf is None:
        return info

    datasets = conf.get("datasets", [])
    if not datasets:
        return info

    files = datasets[0].get("files", [])
    for f in files:
        file_type = f.get("fileType", "")
        if file_type == "image.ome-tiff":
            info["base_image_url"] = f.get("url")
        elif file_type == "obsSegmentations.ome-tiff":
            info["seg_image_url"] = f.get("url")
            transforms = f.get("options", {}).get(
                "coordinateTransformations", []
            )
            if transforms:
                info["coordinate_transformations"] = transforms[0].get("scale")

    return info


for r in results:
    if r["status"] == "success":
        urls = extract_urls_from_conf(r["conf"])
        r.update(urls)

## Step 4: Fetch OME-TIFF Metadata

For each image URL in the generated configs, fetch metadata containing pixel dimensions
and physical sizes. First tries the `metadata.json` sidecar files; for older datasets
where these don't exist, falls back to reading the OME XML directly from the TIFF header.

In [None]:
import xml.etree.ElementTree as ET
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse


def metadata_url_from_image_url(image_url):
    """Convert an OME-TIFF image URL to its corresponding metadata.json URL.

    Pattern: ometiff-pyramids/X.ome.tif -> image_metadata/X.metadata.json
    """
    if image_url is None:
        return None
    url = re.sub(r"ometiff-pyramids", "image_metadata", image_url)
    url = re.sub(r"\.ome\.tiff?", ".metadata.json", url)
    return url


def fetch_metadata_json(url):
    """Fetch a metadata.json sidecar file from the assets endpoint.

    Returns the parsed JSON dict on success, or None silently on 404
    (older datasets don't have these files).
    """
    if url is None:
        return None
    try:
        resp = requests.get(url, headers=get_headers(), timeout=30)
        if resp.status_code == 200:
            data = resp.json()
            if isinstance(data, dict):
                return data
        # 404 is expected for older datasets — fall through silently
        return None
    except Exception:
        return None


def fetch_ome_metadata_from_tiff(image_url):
    """Read OME metadata directly from a remote OME-TIFF file header.

    Uses tifffile with fsspec's HTTPFileSystem to make HTTP range requests,
    reading only the TIFF IFDs and OME XML without downloading the full pyramid.

    Returns a dict with SizeX, SizeY, PhysicalSizeX, PhysicalSizeY,
    PhysicalSizeXUnit, PhysicalSizeYUnit (matching metadata.json format),
    or None on failure.
    """
    if image_url is None:
        return None
    try:
        import fsspec
        import tifffile

        # Parse the URL to extract query params (e.g. ?token=...) as headers
        parsed = urlparse(image_url)
        params = parse_qs(parsed.query)
        # Rebuild URL without query string for fsspec
        clean_url = urlunparse(parsed._replace(query=""))

        # Pass token as Authorization header if present in query params
        storage_options = {}
        if "token" in params:
            storage_options["headers"] = {
                "Authorization": f"Bearer {params['token'][0]}"
            }

        with fsspec.open(clean_url, mode="rb", **storage_options) as f:
            with tifffile.TiffFile(f) as tif:
                result = {}

                # Get pixel dimensions from first page
                if tif.pages:
                    page = tif.pages[0]
                    if len(page.shape) >= 2:
                        result["SizeX"] = page.shape[-1]
                        result["SizeY"] = page.shape[-2]

                # Get physical sizes from OME XML
                ome_xml = tif.ome_metadata
                if ome_xml:
                    root = ET.fromstring(ome_xml)
                    # Handle OME namespace
                    ns = ""
                    if "}" in root.tag:
                        ns = root.tag.split("}")[0] + "}"
                    pixels = root.find(f".//{ns}Pixels")
                    if pixels is not None:
                        result["SizeX"] = int(pixels.get("SizeX", result.get("SizeX", 0)))
                        result["SizeY"] = int(pixels.get("SizeY", result.get("SizeY", 0)))
                        phys_x = pixels.get("PhysicalSizeX")
                        phys_y = pixels.get("PhysicalSizeY")
                        if phys_x is not None:
                            result["PhysicalSizeX"] = float(phys_x)
                        if phys_y is not None:
                            result["PhysicalSizeY"] = float(phys_y)
                        unit_x = pixels.get("PhysicalSizeXUnit", "µm")
                        unit_y = pixels.get("PhysicalSizeYUnit", "µm")
                        result["PhysicalSizeXUnit"] = unit_x
                        result["PhysicalSizeYUnit"] = unit_y
                        result["PhysicalSizeUnitX"] = unit_x
                        result["PhysicalSizeUnitY"] = unit_y

                return result if result else None
    except Exception as e:
        print(f"    Could not read TIFF header from {image_url}: {e}")
        return None


def fetch_image_metadata(image_url):
    """Fetch OME-TIFF metadata, trying metadata.json first, then TIFF header.

    metadata.json may not contain SizeX/SizeY (only PhysicalSize fields),
    so we merge with TIFF header data if pixel dimensions are missing.
    """
    # Try metadata.json sidecar first (faster, smaller download)
    meta_url = metadata_url_from_image_url(image_url)
    json_meta = fetch_metadata_json(meta_url)

    if json_meta is not None and json_meta.get("SizeX") and json_meta.get("SizeY"):
        # metadata.json has everything we need
        return "metadata.json", json_meta

    # Need TIFF header — either metadata.json is missing entirely,
    # or it lacks pixel dimensions (SizeX/SizeY)
    tiff_meta = fetch_ome_metadata_from_tiff(image_url)

    if json_meta is not None and tiff_meta is not None:
        # Merge: use metadata.json as base, fill in missing fields from TIFF
        merged = {**tiff_meta, **json_meta}
        # Ensure SizeX/SizeY come from TIFF if not in json
        if not json_meta.get("SizeX") and tiff_meta.get("SizeX"):
            merged["SizeX"] = tiff_meta["SizeX"]
        if not json_meta.get("SizeY") and tiff_meta.get("SizeY"):
            merged["SizeY"] = tiff_meta["SizeY"]
        return "metadata.json+tiff_header", merged

    if json_meta is not None:
        return "metadata.json", json_meta

    if tiff_meta is not None:
        return "tiff_header", tiff_meta

    return "none", None


for r in results:
    if r["status"] != "success":
        continue

    base_source, base_meta = fetch_image_metadata(r.get("base_image_url"))
    seg_source, seg_meta = fetch_image_metadata(r.get("seg_image_url"))

    r["base_metadata"] = base_meta
    r["seg_metadata"] = seg_meta
    r["base_meta_source"] = base_source
    r["seg_meta_source"] = seg_source

    r["base_pixel_x"] = base_meta.get("SizeX") if base_meta else None
    r["base_pixel_y"] = base_meta.get("SizeY") if base_meta else None
    r["seg_pixel_x"] = seg_meta.get("SizeX") if seg_meta else None
    r["seg_pixel_y"] = seg_meta.get("SizeY") if seg_meta else None

    print(
        f"  {r['hubmap_id']}: "
        f"base={base_source} ({r.get('base_pixel_x')}x{r.get('base_pixel_y')}), "
        f"seg={seg_source} ({r.get('seg_pixel_x')}x{r.get('seg_pixel_y')})"
    )

In [None]:
def compute_physical_dimensions(metadata, pixel_x, pixel_y):
    """Compute total physical dimensions from pixel size and physical size per pixel."""
    if metadata is None or pixel_x is None or pixel_y is None:
        return None, None, None

    phys_x = metadata.get("PhysicalSizeX")
    phys_y = metadata.get("PhysicalSizeY")
    unit_x = metadata.get("PhysicalSizeUnitX", metadata.get("PhysicalSizeXUnit", ""))
    unit_y = metadata.get("PhysicalSizeUnitY", metadata.get("PhysicalSizeYUnit", ""))

    if phys_x is None or phys_y is None:
        return None, None, None

    total_x = pixel_x * phys_x
    total_y = pixel_y * phys_y
    unit = unit_x or unit_y

    return total_x, total_y, unit


summary_rows = []
for r in results:
    # Compute physical dimensions
    base_total_x, base_total_y, base_unit = compute_physical_dimensions(
        r.get("base_metadata"), r.get("base_pixel_x"), r.get("base_pixel_y")
    )
    seg_total_x, seg_total_y, seg_unit = compute_physical_dimensions(
        r.get("seg_metadata"), r.get("seg_pixel_x"), r.get("seg_pixel_y")
    )

    row = {
        "HuBMAP ID": r.get("hubmap_id", ""),
        "UUID": r.get("uuid", ""),
        "Parent UUID": r.get("parent_uuid", ""),
        "Vis-Lifted": r.get("vis_lifted_uuid", "") or "",
        "Status": r.get("status"),
        "Error": r.get("error", ""),
        "Base Image Source": r.get("base_image_source", ""),
        "Metadata Source": f"base={r.get('base_meta_source', 'N/A')}, seg={r.get('seg_meta_source', 'N/A')}",
        "Base Pixel (W x H)": (
            f"{r.get('base_pixel_x')} x {r.get('base_pixel_y')}"
            if r.get("base_pixel_x") is not None
            else "N/A"
        ),
        "Seg Pixel (W x H)": (
            f"{r.get('seg_pixel_x')} x {r.get('seg_pixel_y')}"
            if r.get("seg_pixel_x") is not None
            else "N/A"
        ),
        "Base Physical (W x H)": (
            f"{base_total_x:.2f} x {base_total_y:.2f} {base_unit}"
            if base_total_x is not None
            else "N/A"
        ),
        "Seg Physical (W x H)": (
            f"{seg_total_x:.2f} x {seg_total_y:.2f} {seg_unit}"
            if seg_total_x is not None
            else "N/A"
        ),
    }
    summary_rows.append(row)

df = pd.DataFrame(summary_rows)
print(f"Total datasets: {len(df)}")
print(f"  Successful: {len(df[df['Status'] == 'success'])}")
print(f"  Failed: {len(df[df['Status'] == 'error'])}")
print(f"  Vis-lifted: {len(df[df['Vis-Lifted'] != ''])}")
df

In [None]:
# Export df to CSV for external analysis
df.to_csv("kaggle1_segmentation_alignment_summary.csv", index=False)