From 23fc76a65b636cf34ac56bc8465870ae9953e52a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20C=C3=ADfka?= <550433@mail.muni.cz> Date: Tue, 5 May 2026 19:07:15 +0200 Subject: [PATCH 1/9] feat: add filter_tiles preprocessing step Drops tiles with no annotation coverage and no tissue coverage by joining tiling output with tissue stats. Carries through annotation and tissue coverage columns so downstream stages can consume the filtered set without re-joining. Co-Authored-By: Claude Sonnet 4.6 --- configs/data/dataset.yaml | 1 + configs/preprocessing/filter_tiles.yaml | 13 +++ preprocessing/filter_tiles.py | 106 ++++++++++++++++++++++++ 3 files changed, 120 insertions(+) create mode 100644 configs/preprocessing/filter_tiles.yaml create mode 100644 preprocessing/filter_tiles.py diff --git a/configs/data/dataset.yaml b/configs/data/dataset.yaml index 57d87c5..0303fc2 100644 --- a/configs/data/dataset.yaml +++ b/configs/data/dataset.yaml @@ -14,6 +14,7 @@ dataset: test_split_filename: "split_mapping/test_split.csv" tiling_run_id: "fdf7550a2004474f8c7a05dc0cf1fd86" tissue_masks_run_id: "52bc0924f8624b259819c480c7cf213f" + tissue_stats_run_id: "16ae2d003d88471b924e5f332415232a" exclusions: bad_slides: diff --git a/configs/preprocessing/filter_tiles.yaml b/configs/preprocessing/filter_tiles.yaml new file mode 100644 index 0000000..55350c5 --- /dev/null +++ b/configs/preprocessing/filter_tiles.yaml @@ -0,0 +1,13 @@ +# @package _global_ + +tissue_stats_run_id: ${dataset.mlflow_artifacts.tissue_stats_run_id} +tissue_stats_artifact_path: tissue_stats +tissue_coverage_column: tile_tissue_coverage + +mlflow_artifact_path: filter_tiles + +metadata: + run_name: "Filter tiles ${dataset.name}" + description: "Drop tiles with no annotation coverage and no tile tissue coverage." + hyperparams: + tissue_coverage_column: ${tissue_coverage_column} diff --git a/preprocessing/filter_tiles.py b/preprocessing/filter_tiles.py new file mode 100644 index 0000000..c53cb0d --- /dev/null +++ b/preprocessing/filter_tiles.py @@ -0,0 +1,106 @@ +import tempfile +from pathlib import Path + +import hydra +import mlflow +import mlflow.artifacts +import pyarrow.dataset as pads +import pyarrow.parquet as pq +from omegaconf import DictConfig +from rationai.mlkit import autolog, with_cli_args +from rationai.mlkit.lightning.loggers import MLFlowLogger + + +def filter_split( + tiling_run_id: str, + tissue_stats_run_id: str, + tissue_stats_artifact_path: str, + tissue_column: str, + split_name: str, + output_path: Path, +) -> dict[str, int]: + """Drop tiles with no annotation coverage and no tissue coverage. + + Uses PyArrow predicate pushdown so the full tiles parquet is never loaded into + memory — only rows passing the annotation filter are materialised. The tissue + coverage table is then joined in-memory to drop tiles outside tissue and to + carry through the per-tile coverage values into the output. + """ + tiles_local = mlflow.artifacts.download_artifacts( + run_id=tiling_run_id, artifact_path=f"{split_name}_split/tiles.parquet" + ) + tiles_ds = pads.dataset(tiles_local, format="parquet") + original_count = tiles_ds.count_rows() + + ann_cols = [f.name for f in tiles_ds.schema if f.name.startswith("tile_coverage_")] + if not ann_cols: + raise RuntimeError( + "No tile_coverage_* columns found in tiles parquet. " + "Check that tiling used a class mapping with annotations." + ) + ann_filter = pads.field(ann_cols[0]) > 0 + for c in ann_cols[1:]: + ann_filter = ann_filter | (pads.field(c) > 0) + + tiles_table = tiles_ds.to_table(filter=ann_filter) + ann_count = len(tiles_table) + print( + f"[{split_name}] annotation filter: " + f"{original_count} → {ann_count} ({ann_count / original_count:.1%} kept)" + ) + if ann_count == 0: + raise RuntimeError( + f"All {original_count} tiles dropped by annotation filter for {split_name}. " + "Check the tiling run's class mapping and annotation sources." + ) + + tissue_local = mlflow.artifacts.download_artifacts( + run_id=tissue_stats_run_id, + artifact_path=f"{tissue_stats_artifact_path}/{split_name}_tiles.parquet", + ) + tissue_table = pads.dataset(tissue_local, format="parquet").to_table( + filter=pads.field(tissue_column) > 0, + ) + + filtered = tiles_table.join( + tissue_table, keys=["slide_id", "x", "y"], join_type="inner" + ) + final_count = len(filtered) + print( + f"[{split_name}] tissue filter: " + f"{ann_count} → {final_count} ({final_count / ann_count:.1%} kept)" + ) + + pq.write_table(filtered, str(output_path)) + return { + "original_count": original_count, + "after_annotation": ann_count, + "after_tissue": final_count, + } + + +@with_cli_args(["+preprocessing=filter_tiles"]) +@hydra.main(config_path="../configs", config_name="preprocessing", version_base=None) +@autolog +def main(config: DictConfig, logger: MLFlowLogger) -> None: + tiling_run_id = config.dataset.mlflow_artifacts.tiling_run_id + + with tempfile.TemporaryDirectory() as tmp_dir: + for split_name in ("train", "test"): + output_path = Path(tmp_dir) / f"{split_name}_tiles.parquet" + stats = filter_split( + tiling_run_id=tiling_run_id, + tissue_stats_run_id=config.tissue_stats_run_id, + tissue_stats_artifact_path=config.tissue_stats_artifact_path, + tissue_column=config.tissue_coverage_column, + split_name=split_name, + output_path=output_path, + ) + for key, value in stats.items(): + mlflow.log_metric(f"{split_name}_{key}", value) + + mlflow.log_artifacts(tmp_dir, config.mlflow_artifact_path) + + +if __name__ == "__main__": + main() From 1f86829dc2c019a40527b47461142241df1e7fd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20C=C3=ADfka?= <550433@mail.muni.cz> Date: Tue, 5 May 2026 19:08:33 +0200 Subject: [PATCH 2/9] chore: add submission script and experiment config for filter_tiles Co-Authored-By: Claude Sonnet 4.6 --- .../experiment/preprocessing/filter_tiles.yaml | 12 ++++++++++++ scripts/submit_filter_tiles.py | 17 +++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 configs/experiment/preprocessing/filter_tiles.yaml create mode 100644 scripts/submit_filter_tiles.py diff --git a/configs/experiment/preprocessing/filter_tiles.yaml b/configs/experiment/preprocessing/filter_tiles.yaml new file mode 100644 index 0000000..6521401 --- /dev/null +++ b/configs/experiment/preprocessing/filter_tiles.yaml @@ -0,0 +1,12 @@ +# @package _global_ + +defaults: + - /data: dataset + - _self_ + +metadata: + run_name: Filter tiles ${dataset.name} + description: "Drop tiles with no annotation coverage and no tile tissue coverage over tiling run ${dataset.mlflow_artifacts.tiling_run_id} and tissue stats run ${dataset.mlflow_artifacts.tissue_stats_run_id}." + hyperparams: + tiling_run_id: ${dataset.mlflow_artifacts.tiling_run_id} + tissue_stats_run_id: ${dataset.mlflow_artifacts.tissue_stats_run_id} diff --git a/scripts/submit_filter_tiles.py b/scripts/submit_filter_tiles.py new file mode 100644 index 0000000..2834c0e --- /dev/null +++ b/scripts/submit_filter_tiles.py @@ -0,0 +1,17 @@ +from kube_jobs import storage, submit_job + + +submit_job( + job_name="tissue-classification-filter-tiles", + username=..., + cpu=8, + memory="16Gi", + public=False, + script=[ + "git clone https://github.com/RationAI/tissue-classification.git workdir", + "cd workdir", + "uv sync", + "uv run python -m preprocessing.filter_tiles +experiment=preprocessing/filter_tiles", + ], + storage=[storage.secure.PROJECTS], +) From 832a2ca855eef8ffeb7077e891c43606bfba0444 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20C=C3=ADfka?= <550433@mail.muni.cz> Date: Tue, 5 May 2026 19:10:47 +0200 Subject: [PATCH 3/9] fix: remove experiment name --- scripts/submit_filter_tiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/submit_filter_tiles.py b/scripts/submit_filter_tiles.py index 2834c0e..8ad16cf 100644 --- a/scripts/submit_filter_tiles.py +++ b/scripts/submit_filter_tiles.py @@ -11,7 +11,7 @@ "git clone https://github.com/RationAI/tissue-classification.git workdir", "cd workdir", "uv sync", - "uv run python -m preprocessing.filter_tiles +experiment=preprocessing/filter_tiles", + "uv run python -m preprocessing.filter_tiles +experiment=...", ], storage=[storage.secure.PROJECTS], ) From c6e103122bea2f30381ac1b96b35d77098b60784 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20C=C3=ADfka?= <550433@mail.muni.cz> Date: Tue, 5 May 2026 19:15:21 +0200 Subject: [PATCH 4/9] fix: project columns and prefilter tissue parquet by surviving slides Adds progress prints between read/join/write and restricts the tissue table to columns and slides that can possibly join, cutting peak memory and join time on the 80M-row input. Co-Authored-By: Claude Sonnet 4.6 --- preprocessing/filter_tiles.py | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/preprocessing/filter_tiles.py b/preprocessing/filter_tiles.py index c53cb0d..80aa952 100644 --- a/preprocessing/filter_tiles.py +++ b/preprocessing/filter_tiles.py @@ -1,4 +1,5 @@ import tempfile +import time from pathlib import Path import hydra @@ -58,20 +59,42 @@ def filter_split( run_id=tissue_stats_run_id, artifact_path=f"{tissue_stats_artifact_path}/{split_name}_tiles.parquet", ) - tissue_table = pads.dataset(tissue_local, format="parquet").to_table( - filter=pads.field(tissue_column) > 0, + tissue_ds = pads.dataset(tissue_local, format="parquet") + tissue_cols = [ + f.name + for f in tissue_ds.schema + if f.name in {"slide_id", "x", "y"} or f.name.endswith("_tissue_coverage") + ] + survivor_slides = tiles_table.column("slide_id").combine_chunks().unique() + t = time.monotonic() + print( + f"[{split_name}] reading tissue stats: columns={tissue_cols}, " + f"{len(survivor_slides)} surviving slides" + ) + tissue_table = tissue_ds.to_table( + columns=tissue_cols, + filter=(pads.field(tissue_column) > 0) + & pads.field("slide_id").isin(survivor_slides), + ) + print( + f"[{split_name}] tissue read: {len(tissue_table)} rows " + f"in {time.monotonic() - t:.1f}s" ) + t = time.monotonic() filtered = tiles_table.join( tissue_table, keys=["slide_id", "x", "y"], join_type="inner" ) final_count = len(filtered) print( - f"[{split_name}] tissue filter: " - f"{ann_count} → {final_count} ({final_count / ann_count:.1%} kept)" + f"[{split_name}] tissue join: " + f"{ann_count} → {final_count} ({final_count / ann_count:.1%} kept) " + f"in {time.monotonic() - t:.1f}s" ) + t = time.monotonic() pq.write_table(filtered, str(output_path)) + print(f"[{split_name}] wrote {output_path.name} in {time.monotonic() - t:.1f}s") return { "original_count": original_count, "after_annotation": ann_count, From 03cf08513604ff570016c929d98b9bcb8013ec05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20C=C3=ADfka?= <550433@mail.muni.cz> Date: Tue, 5 May 2026 19:18:45 +0200 Subject: [PATCH 5/9] fix: flush prints and drop isin prefilter on tissue table isin against ~200 string slide ids forced per-row Python checks across the 80M-row tissue parquet, dwarfing the tiny saving. Drop it and rely on column projection plus the tissue>0 predicate. Co-Authored-By: Claude Sonnet 4.6 --- preprocessing/filter_tiles.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/preprocessing/filter_tiles.py b/preprocessing/filter_tiles.py index 80aa952..9cec86f 100644 --- a/preprocessing/filter_tiles.py +++ b/preprocessing/filter_tiles.py @@ -47,7 +47,8 @@ def filter_split( ann_count = len(tiles_table) print( f"[{split_name}] annotation filter: " - f"{original_count} → {ann_count} ({ann_count / original_count:.1%} kept)" + f"{original_count} → {ann_count} ({ann_count / original_count:.1%} kept)", + flush=True, ) if ann_count == 0: raise RuntimeError( @@ -65,23 +66,20 @@ def filter_split( for f in tissue_ds.schema if f.name in {"slide_id", "x", "y"} or f.name.endswith("_tissue_coverage") ] - survivor_slides = tiles_table.column("slide_id").combine_chunks().unique() t = time.monotonic() - print( - f"[{split_name}] reading tissue stats: columns={tissue_cols}, " - f"{len(survivor_slides)} surviving slides" - ) + print(f"[{split_name}] reading tissue stats: columns={tissue_cols}", flush=True) tissue_table = tissue_ds.to_table( columns=tissue_cols, - filter=(pads.field(tissue_column) > 0) - & pads.field("slide_id").isin(survivor_slides), + filter=pads.field(tissue_column) > 0, ) print( f"[{split_name}] tissue read: {len(tissue_table)} rows " - f"in {time.monotonic() - t:.1f}s" + f"in {time.monotonic() - t:.1f}s", + flush=True, ) t = time.monotonic() + print(f"[{split_name}] joining…", flush=True) filtered = tiles_table.join( tissue_table, keys=["slide_id", "x", "y"], join_type="inner" ) @@ -89,12 +87,16 @@ def filter_split( print( f"[{split_name}] tissue join: " f"{ann_count} → {final_count} ({final_count / ann_count:.1%} kept) " - f"in {time.monotonic() - t:.1f}s" + f"in {time.monotonic() - t:.1f}s", + flush=True, ) t = time.monotonic() pq.write_table(filtered, str(output_path)) - print(f"[{split_name}] wrote {output_path.name} in {time.monotonic() - t:.1f}s") + print( + f"[{split_name}] wrote {output_path.name} in {time.monotonic() - t:.1f}s", + flush=True, + ) return { "original_count": original_count, "after_annotation": ann_count, From f3a914840e85b22c748f8d6a4a49d899d033c01d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20C=C3=ADfka?= <550433@mail.muni.cz> Date: Tue, 5 May 2026 19:22:12 +0200 Subject: [PATCH 6/9] fix: switch tissue join to pandas merge PyArrow's hash join on 45M string-keyed rows hangs in practice. Pandas merge on the same data takes ~30s. Frees intermediate tables to keep peak memory bounded. Co-Authored-By: Claude Sonnet 4.6 --- preprocessing/filter_tiles.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/preprocessing/filter_tiles.py b/preprocessing/filter_tiles.py index 9cec86f..ab2eb24 100644 --- a/preprocessing/filter_tiles.py +++ b/preprocessing/filter_tiles.py @@ -5,6 +5,7 @@ import hydra import mlflow import mlflow.artifacts +import pyarrow as pa import pyarrow.dataset as pads import pyarrow.parquet as pq from omegaconf import DictConfig @@ -79,11 +80,15 @@ def filter_split( ) t = time.monotonic() - print(f"[{split_name}] joining…", flush=True) - filtered = tiles_table.join( - tissue_table, keys=["slide_id", "x", "y"], join_type="inner" - ) + print(f"[{split_name}] joining via pandas…", flush=True) + tiles_df = tiles_table.to_pandas() + tissue_df = tissue_table.to_pandas() + del tiles_table, tissue_table + filtered_df = tiles_df.merge(tissue_df, on=["slide_id", "x", "y"], how="inner") + del tiles_df, tissue_df + filtered = pa.Table.from_pandas(filtered_df, preserve_index=False) final_count = len(filtered) + del filtered_df print( f"[{split_name}] tissue join: " f"{ann_count} → {final_count} ({final_count / ann_count:.1%} kept) " From 90435e9780ed06d8523320efdb4d1e30c12f28b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20C=C3=ADfka?= <550433@mail.muni.cz> Date: Tue, 5 May 2026 20:20:29 +0200 Subject: [PATCH 7/9] chore: remove debug prints from filter_tiles Co-Authored-By: Claude Sonnet 4.6 --- preprocessing/filter_tiles.py | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/preprocessing/filter_tiles.py b/preprocessing/filter_tiles.py index ab2eb24..f80af18 100644 --- a/preprocessing/filter_tiles.py +++ b/preprocessing/filter_tiles.py @@ -1,5 +1,4 @@ import tempfile -import time from pathlib import Path import hydra @@ -46,11 +45,6 @@ def filter_split( tiles_table = tiles_ds.to_table(filter=ann_filter) ann_count = len(tiles_table) - print( - f"[{split_name}] annotation filter: " - f"{original_count} → {ann_count} ({ann_count / original_count:.1%} kept)", - flush=True, - ) if ann_count == 0: raise RuntimeError( f"All {original_count} tiles dropped by annotation filter for {split_name}. " @@ -67,20 +61,11 @@ def filter_split( for f in tissue_ds.schema if f.name in {"slide_id", "x", "y"} or f.name.endswith("_tissue_coverage") ] - t = time.monotonic() - print(f"[{split_name}] reading tissue stats: columns={tissue_cols}", flush=True) tissue_table = tissue_ds.to_table( columns=tissue_cols, filter=pads.field(tissue_column) > 0, ) - print( - f"[{split_name}] tissue read: {len(tissue_table)} rows " - f"in {time.monotonic() - t:.1f}s", - flush=True, - ) - t = time.monotonic() - print(f"[{split_name}] joining via pandas…", flush=True) tiles_df = tiles_table.to_pandas() tissue_df = tissue_table.to_pandas() del tiles_table, tissue_table @@ -89,19 +74,8 @@ def filter_split( filtered = pa.Table.from_pandas(filtered_df, preserve_index=False) final_count = len(filtered) del filtered_df - print( - f"[{split_name}] tissue join: " - f"{ann_count} → {final_count} ({final_count / ann_count:.1%} kept) " - f"in {time.monotonic() - t:.1f}s", - flush=True, - ) - t = time.monotonic() pq.write_table(filtered, str(output_path)) - print( - f"[{split_name}] wrote {output_path.name} in {time.monotonic() - t:.1f}s", - flush=True, - ) return { "original_count": original_count, "after_annotation": ann_count, From f6e8eac5d3046b65eb254b07c4d42e8466d00907 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20C=C3=ADfka?= <550433@mail.muni.cz> Date: Tue, 5 May 2026 20:44:56 +0200 Subject: [PATCH 8/9] refactor: load all tissue stats columns and validate tissue_column exists Co-Authored-By: Claude Sonnet 4.6 --- preprocessing/filter_tiles.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/preprocessing/filter_tiles.py b/preprocessing/filter_tiles.py index f80af18..b3dfd18 100644 --- a/preprocessing/filter_tiles.py +++ b/preprocessing/filter_tiles.py @@ -56,15 +56,13 @@ def filter_split( artifact_path=f"{tissue_stats_artifact_path}/{split_name}_tiles.parquet", ) tissue_ds = pads.dataset(tissue_local, format="parquet") - tissue_cols = [ - f.name - for f in tissue_ds.schema - if f.name in {"slide_id", "x", "y"} or f.name.endswith("_tissue_coverage") - ] - tissue_table = tissue_ds.to_table( - columns=tissue_cols, - filter=pads.field(tissue_column) > 0, - ) + tissue_schema_names = {f.name for f in tissue_ds.schema} + if tissue_column not in tissue_schema_names: + raise RuntimeError( + f"tissue_column '{tissue_column}' not found in tissue stats parquet. " + f"Available columns: {sorted(tissue_schema_names)}" + ) + tissue_table = tissue_ds.to_table(filter=pads.field(tissue_column) > 0) tiles_df = tiles_table.to_pandas() tissue_df = tissue_table.to_pandas() From eda39fe4b50e96452c9072ecce99bb4900ab4245 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20C=C3=ADfka?= <550433@mail.muni.cz> Date: Tue, 5 May 2026 20:45:26 +0200 Subject: [PATCH 9/9] fix: raise RuntimeError when tissue filter drops all tiles Co-Authored-By: Claude Sonnet 4.6 --- preprocessing/filter_tiles.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/preprocessing/filter_tiles.py b/preprocessing/filter_tiles.py index b3dfd18..692dbaf 100644 --- a/preprocessing/filter_tiles.py +++ b/preprocessing/filter_tiles.py @@ -72,6 +72,11 @@ def filter_split( filtered = pa.Table.from_pandas(filtered_df, preserve_index=False) final_count = len(filtered) del filtered_df + if final_count == 0: + raise RuntimeError( + f"All {ann_count} annotation-passing tiles dropped by tissue filter for {split_name}. " + f"Check that tissue_column '{tissue_column}' is non-zero for at least some tiles." + ) pq.write_table(filtered, str(output_path)) return {