Skip to content

Add on-demand preprocessing and indexing workflow to /data and /index endpoints — Closes #30#32

Open
conradbzura wants to merge 37 commits intomasterfrom
30-add-preprocessing-indexing-workflow
Open

Add on-demand preprocessing and indexing workflow to /data and /index endpoints — Closes #30#32
conradbzura wants to merge 37 commits intomasterfrom
30-add-preprocessing-indexing-workflow

Conversation

@conradbzura
Copy link
Copy Markdown
Collaborator

Summary

Introduce an on-demand preprocessing and indexing workflow that runs when a /data or /index request for a Gosling-consumable artifact misses cache, writes outputs to a pluggable byte-range-aware cache, and shares a single workflow per source file between both endpoints via a Mongo-backed mutex. Processors handle SAM/BAM (sort + index), VCF/GFF/GTF/BED/BroadPeak/NarrowPeak/bigBed (decompress → sort → bgzip → tabix, with GTF and bigBed routed through format-conversion pre-steps), and let CSV/TSV/bigWig fall through the existing direct-streaming path unchanged. Workflows dispatch to a Wool WorkerPool started in the FastAPI lifespan; a new /jobs/{id} endpoint exposes job status. Cache keys are content-addressed by upstream md5 (100% populated across DCCs), so upstream byte changes with refreshed md5 invalidate cached artifacts automatically.

Closes #30

Proposed changes

Workflow subsystem (src/cfdb/workflows/)

  • keys.py — pure workflow_key and cache_key derivation. Workflow keys follow {dcc}/{local_id}/{md5}/v{pipeline_version}; cache keys layer on artifact_kind and processor_version so bumping either value naturally invalidates only the affected entries.
  • models.pyJobStatus, ArtifactKind, and the JobRecord Pydantic model. ACTIVE_STATUSES stays in sync with the Mongo partial-unique-index filter predicate.
  • cache.pyCacheBackend ABC and LocalFsCache. Writes via os.replace for atomicity, streams reads in 64 KiB chunks with inclusive-range support, and rejects path-traversal in cache keys.
  • lock.py — atomic claim_workflow plus mark_running, record_stage_complete, and release_workflow. Uses insert_one with DuplicateKeyError fallback to attach to an in-flight job and a stale-threshold reclamation path for crashed workers.
  • executor.pyJobExecutor ABC plus WoolExecutor. ensure_workflow claims or attaches to the mutex and, on a fresh claim, dispatches the processor as a fire-and-forget asyncio.create_task. Job duration is bounded with asyncio.wait_for at the call site (Wool's @routine timeout is dispatch-only). A retry loop tolerates the brief window where WorkerPool discovery hasn't yet surfaced a worker.
  • fetcher.pydownload_source resolves HTTPS and DRS access_url values and streams to the per-job workdir, keeping processors focused on tool invocations.
  • processors/Processor ABC with needs_processing / artifact_kinds_produced / run, plus ProcessorRegistry.lookup_for that resolves a file document to the first matching processor. Concrete processors:
    • PassthroughProcessor declares CSV/TSV/bigWig as no-workflow formats.
    • BamIndexProcessor handles SAM→BAM conversion (when needed), samtools sort, and samtools index.
    • TabixIntervalProcessor branches per format: gunzip (when plain gzip is detected magic-wise), gffread -E for GTF→GFF3, bigBedToBed for bigBed, then sort with format-appropriate keys, bgzip, and tabix with the matching preset.

Both processors consult the cache per artifact kind before running the corresponding stage, giving partial-commit recovery for free: a stage-2 failure leaves the stage-1 artifact in cache and the next retry skips the expensive stage.

HTTP surface (src/cfdb/api/)

  • routers/data.py — after the existing metadata lookup and HuBMAP access-level guard, the router consults the processor registry. Passthrough formats fall through unchanged; processed artifacts that are cache-hit stream from cache with byte-range support; cache miss dispatches a workflow and returns 202 Accepted with Location: /jobs/{job_id} and Retry-After: 5.
  • routers/index.py — preserves the 4DN extra.extra_files / extra.fourdn.extra_files sidecar fast path (the 218 BED→beddb and 4 BED→tbi sidecars continue to serve from upstream). Files without a sidecar fall through the same workflow path as /data with ArtifactKind.INDEX. Formats with no index (CSV/TSV) return 404.
  • routers/jobs.py — new GET /jobs/{job_id} endpoint exposing status, stages_done, artifacts, progress, and error.
  • api/__init__.py — new module-level cache, executor, and processor_registry attributes plus SYNC_DATA_DIR / WORKFLOW_WORKER_COUNT env vars.
  • api/main.py — lifespan starts a wool.WorkerPool(size=WORKFLOW_WORKER_COUNT), a LocalFsCache under SYNC_DATA_DIR/cache, and a registry with the passthrough, BAM, and tabix processors. When SYNC_DATA_DIR is unset the subsystem stays disabled and routers fall back to direct streaming — keeps older deployments and minimal test setups working unchanged.

Test infrastructure (tests/conftest.py)

Extend FakeCollection with insert_one (including partial-unique-index enforcement via create_index), find_one_and_update (with upsert), $addToSet, $setOnInsert, nested-key $set, and $lt/$lte/$gt/$gte comparison operators. Existing update_one behavior is preserved via a refactored _apply_update helper.

Build and ops

  • scripts/create-indexes.js — partial unique index on jobs.workflow_key filtered to status ∈ {pending, running} (backs the per-source mutex), unique index on jobs.job_id, and compound status + updated_at index for stale-lock reclamation scans.
  • Dockerfile.api — installs samtools, tabix, bcftools, and gffread via apt plus UCSC bigBedToBed as a release binary so workers inherit all preprocessing tools on PATH.
  • pyproject.toml — registers the integration pytest marker.
  • README.md — documents the workflow dispatch behavior, the per-format processor table, the /jobs/{id} endpoint, and the SYNC_DATA_DIR / WORKFLOW_WORKER_COUNT environment variables.

Test cases

# Test Suite Given When Then Coverage Target
1 TestWorkflowKey Valid dcc/local_id/md5/pipeline_version inputs workflow_key is called Returns a normalized slash-joined key and is stable across dcc/md5 case variants workflow_key normalization
2 TestWorkflowKey Empty md5 or local_id or negative version workflow_key is called Raises ValueError with the offending field named Input validation
3 TestWorkflowKey Any valid combination (Hypothesis) workflow_key is called twice Returns identical keys both times Determinism
4 TestCacheKey Valid inputs plus a different ArtifactKind or processor_version cache_key is called Returns distinct keys per artifact kind and per version Per-artifact scoping
5 TestJobStatus / TestJobRecord A JobRecord at various statuses is_active / to_mongo are invoked Returns True only for PENDING/RUNNING; serializes enums to strings and preserves datetimes JobRecord invariants
6 TestProcessor A concrete subclass of Processor needs_processing is invoked for supported, unsupported, and missing file_format Returns True/False without raising ABC default behavior
7 TestPassthroughProcessor A CSV/TSV/bigWig file_meta needs_processing is invoked; run is invoked Returns False; run raises RuntimeError Misrouted dispatch safety
8 TestProcessorRegistry A registry with multiple processors lookup_for is called for various formats Returns the first matching processor or None Registry resolution
9 TestLocalFsCache An empty or seeded cache head / put / get with and without byte_range / delete are awaited Miss yields None; put returns a CacheEntry with the expected size; range reads are inclusive; traversal keys raise ValueError LocalFsCache semantics
10 TestClaimWorkflow An empty jobs collection with the partial unique index Concurrent claims target the same workflow_key First inserts a fresh PENDING record; second attaches to it; after release_workflow COMPLETED a new fresh claim succeeds Mutex atomicity
11 TestClaimWorkflow An active record whose updated_at exceeds the stale threshold claim_workflow is awaited Stale record is flipped to FAILED and a new fresh claim succeeds Stale reclamation
12 TestMarkRunning / TestRecordStageComplete / TestReleaseWorkflow A claimed job mark_running, record_stage_complete, release_workflow are awaited Status, stages_done, and artifact_cache_keys are persisted; release to an active status raises ValueError Lifecycle transitions
13 TestWoolExecutorEnsureWorkflow A registry with a stub processor (unit), runtime cap, or failing processor ensure_workflow is awaited and the background task completes Produces artifacts on success; persists error on failure; honors asyncio.wait_for timeout cap; attaches second caller to the same job Executor orchestration
14 TestWoolExecutorPickleBoundary (integration) A real wool.WorkerPool(size=1) and a cloudpickle-registered stub processor ensure_workflow is awaited inside the pool context Routine dispatches to the worker, returns the stub's artifacts, and leaves the job COMPLETED Cloudpickle cross-process boundary
15 TestBamIndexProcessorRun A BAM or SAM file_meta with mocked download_source and _run run is awaited Writes both data and index artifacts; SAM adds a samtools view -bS step first; skips sort when data is already cached BAM pipeline sequencing
16 TestBamIndexProcessorIntegration A minimal valid BAM on disk and a stubbed downloader run is awaited with real samtools Cache contains a non-empty sorted BAM and BAI End-to-end samtools
17 TestTabixIntervalProcessorRun VCF/GTF/bigBed file_meta with mocked tools run is awaited Command sequences match the expected prefixes: gunzip → sort → bgzip → tabix, gunzip → gffread → sort → bgzip → tabix, bigBedToBed → sort → bgzip → tabix; skips stage 1 when data cached Tabix pipeline branching
18 TestTabixIntervalProcessorIntegration A real BED file and a stubbed downloader run is awaited with real htslib tooling Cache contains a non-empty bgzipped BED and TBI End-to-end htslib
19 TestGetJobStatus A JobRecord at RUNNING, FAILED, or absent get_job_status is called Returns the persisted fields or raises HTTPException(404); surfaces the error string on FAILED /jobs/{id} contract
20 TestStreamIndexFileFourdnSidecar A 4DN BED with extra.extra_files carrying an href stream_index_file is called Streams the sidecar via the DCC's api_base without touching the workflow subsystem Sidecar preservation
21 TestStreamIndexFileWorkflowPaths A CSV (no sidecar, no index-producing processor); a BAM with missing cache stream_index_file is called CSV returns 404; BAM returns 202 JSONResponse with Location: /jobs/<id> Workflow dispatch from /index
22 TestStreamFileWorkflowPath A BAM with missing cache; a CSV with the workflow wired stream_file is called BAM returns 202 with Location; CSV falls through to direct streaming (DRS raises in the mock) and does not 202 Workflow dispatch from /data

Add the Mongo operations the new workflow subsystem exercises:
insert_one with partial-unique-index enforcement, find_one_and_update
with upsert, $addToSet and $setOnInsert, nested $set, and $lt/$lte/$gt/$gte
comparison operators in the query matcher.

Existing tests that use update_one are unchanged — the refactored
_apply_update helper preserves legacy semantics.
conradbzura added 24 commits May 3, 2026 21:34
Introduce the foundational types for the preprocessing/indexing
subsystem that serves Gosling Designer's client-side fetchers:

  - JobRecord, JobStatus, and ArtifactKind model the persistent
    workflow state that lives in the Mongo jobs collection.
  - cache_key and workflow_key are pure functions that derive
    content-addressed keys from upstream md5 plus pipeline/processor
    versions. A byte change upstream with a refreshed md5 naturally
    invalidates cached artifacts, and a version bump forces fresh
    processing without any manual purge.
  - Processor is the ABC every preprocessing pipeline subclasses. A
    ProcessorRegistry resolves a file document to the first matching
    processor by format name. PassthroughProcessor covers formats
    Gosling can consume directly (CSV, TSV, bigWig).

No cache backend, executor, or HTTP surface is wired up yet — those
land in follow-on commits that depend on these types.
CacheBackend is a byte-range-aware pluggable store for workflow
artifacts. LocalFsCache writes keys as relative paths under a
configured root, puts via os.replace for atomicity, and streams reads
in 64 KiB chunks with inclusive-range support — Gosling's BAM, tabix,
and bbi fetchers all issue Range requests that must pass through to
the cache transparently. Key validation rejects path traversal so
malformed input cannot escape the cache root.

The workflow mutex uses a Mongo partial unique index on workflow_key
filtered to active statuses, mirroring the atomic-upsert idiom already
used by services.locks for the sync and cutover locks. claim_workflow
either inserts a fresh PENDING record or — on DuplicateKeyError —
attaches to the existing active job so concurrent /data and /index
requests converge on one workflow per source file. A stale-threshold
reclamation path covers the case where a worker crashes without
releasing the mutex.
JobExecutor is the abstract interface bridging /data and /index to
the processing pipeline. WoolExecutor is the concrete backend:
ensure_workflow claims or attaches to the per-source mutex, and on a
fresh claim fires a background task that dispatches the processor
through a Wool WorkerPool. Long-running jobs are bounded with
asyncio.wait_for at the call site (Wool's own @routine timeout is
dispatch-only). A short retry loop tolerates the window between
WorkerPool startup and discovery catching up.

Two concrete processors land alongside:

  - BamIndexProcessor: SAM->BAM conversion (when needed) plus
    samtools sort and samtools index. Emits the sorted BAM and its
    BAI.
  - TabixIntervalProcessor: covers VCF, GFF, BED, BroadPeak,
    NarrowPeak with decompress + sort + bgzip + tabix; GTF adds a
    gffread conversion to GFF3 first; bigBed is converted via
    bigBedToBed before the BED pipeline.

Both processors consult the cache per artifact kind before running
the corresponding stage, giving partial-commit recovery for free: a
stage-2 failure leaves the stage-1 artifact in cache, and the next
retry skips the expensive stage.

fetcher.download_source centralizes DRS/HTTPS source retrieval so
processors can focus on the tool-invocation pipeline. An integration
marker is registered for tests that shell out to real samtools or
htslib, gated off by default.
Both /data and /index now converge on ensure_workflow when a file's
processor requires preprocessing and the requested artifact is not
yet in cache. The router returns 202 with Location: /jobs/{job_id}
and Retry-After so clients can poll. On cache hit the artifact
streams directly with byte-range support. Passthrough formats (CSV,
TSV, bigWig) fall through to the existing direct-streaming path
unchanged.

/index preserves the pre-existing 4DN sidecar fast path — files
whose extra.extra_files or extra.fourdn.extra_files carries a tbi or
beddb entry continue to serve from upstream without entering the
workflow. Files with no sidecar and no processor emitting an index
artifact return 404.

/jobs/{job_id} is a new endpoint exposing the persisted JobRecord:
status, stages_done, artifact cache keys, optional progress, and
error on failure.

The FastAPI lifespan brings up a Wool WorkerPool, a LocalFsCache
under SYNC_DATA_DIR, and the processor registry (passthrough plus
BAM and tabix). When SYNC_DATA_DIR is unset the subsystem stays
disabled and routers fall back to direct streaming — keeps minimal
test deployments and older configurations working unchanged.
The api container now installs samtools, tabix, bcftools, gffread,
and UCSC bigBedToBed so the Wool workers spawned by the API process
can shell out to the preprocessing tools without any separate image.
apt-shipped packages cover everything but bigBedToBed, which is
pulled as a UCSC release binary.

create-indexes.js adds three indexes to the new jobs collection: a
partial unique index on workflow_key (filtered to active statuses)
that backs the per-source mutex, a unique index on job_id for /jobs
lookups, and a compound status+updated_at index that the stale-lock
reclamation path scans.
HEAD probes on /data and /index no longer trigger preprocessing. The
previous behavior bound HEAD and GET to the same handler with no
method check, so monitoring probes or prefetch tools claiming to be
side-effect-free were actually dispatching samtools and tabix jobs
via ensure_workflow. Cache-miss now returns 404 on HEAD; clients must
issue an explicit GET to trigger processing.

Alongside the HEAD fix, a ?raw query parameter lets clients opt out of
the preprocessing pipeline entirely. With raw=false (the default) the
router returns the visualization-ready artifact (cache hit or a 202
with a Location header for GET). With raw=true the router serves the
upstream bytes directly for /data, and restricts /index to upstream
sidecars only (404 when none exists). Passthrough formats (CSV, TSV,
bigWig) behave the same regardless of raw since there is no processed
output distinct from the upstream file.

The default is raw=false so Gosling and CVH clients get preprocessed
artifacts without needing to know about the flag.
…eaders

The VCF sort path ran GNU sort over the whole file, which scrambled
the ## header block into the data block and produced artifacts tabix
could not index. Fix is a two-pass pipeline for VCF: grep '^#' emits
the header block, then grep -v '^#' pipes the data block through
sort, and the brace group joins both streams into bgzip. Headers end
up at the top of the output as tabix requires.

Neither GNU sort nor samtools sort had a memory cap, so on Lambda
instances sized 512 MB - 2 GB either could OOM the worker on large
inputs. Add CFDB_SORT_MEMORY_CAP and CFDB_SAMTOOLS_MEMORY_CAP env
vars (both default 256M) and plumb them into sort -S / samtools
sort -m with -T pointed at the per-job workdir so spillage lands on
disk instead of in memory. LC_ALL=C on sort makes collation
byte-wise and independent of the worker's locale.

The format-specific pipelines are now single streaming shell pipes
rather than separate decompress / sort / bgzip invocations writing
intermediate files. Two intermediates remain where the tool can't
stream input: VCF (two passes for header/body split) and GTF (gffread
reads files). Other formats flow zcat -f or bigBedToBed straight into
sort and bgzip without materializing intermediate text.

GFF3 was silently bypassed because _TABIX_PRESET had only GFF, not
GFF3, while the ontology mapper emits GFF3 for .gff3 files. Add the
GFF3 key pointing at the gff preset so .gff3 inputs flow through the
same pipeline.

Tests exercise each per-format pipeline shape (grep-split VCF, gffread
GTF, bigBedToBed bigBed, generic zcat-f text), assert the -S/-T/-m
flags and LC_ALL=C prefix are present, and confirm the tabix preset
matches the format. Integration tests add an end-to-end VCF run that
verifies the output header block survives and data lines are
position-sorted.

Fixes R2-B2, R2-B3, and R2 non-blocking F (locale-sensitive sort).
release_workflow and record_stage_complete updated the jobs document
by job_id alone, with no filter on current status. A worker that
stalled past the one-hour stale threshold would get its record
flipped to FAILED by a subsequent claim_workflow call, but when the
stalled worker eventually woke and called its own release, the
update would overwrite the successor-owned status and leak stage
completions onto a record the new claimant now holds.

Both writes now filter on status in {pending, running}. A late call
from a reclaimed worker matches zero docs and becomes a no-op; a
warning is logged so stomping attempts remain visible in operations.
mark_running was already correctly fenced on status=pending and is
unchanged.
The /data and /index routers reached across package boundaries to
import workflows.executor._extract_identity via in-function imports,
and the test suite referenced the private name directly — both
symptoms of a helper that was always public in practice despite its
leading underscore. Drop the underscore, move the import to module
top in both routers, and update the test file accordingly.

Resolves the test-guide violation (tests MUST NOT reference private
symbols) and the leaky-abstraction smell in the router imports in
one go. The function's behavior, signature, and docstring are
unchanged.
The stale-reclaim test used lock._utcnow() to build a backdated
timestamp. Per the test guide, tests MUST NOT reference private
functions. The equivalent datetime.now(timezone.utc) expression is
already available from stdlib; use it directly.
The three original tests in test_index.py dropped the <method_name>
prefix required by the test guide's naming pattern
test_<method_name>_should_<outcome>[_when_<condition>]. Prefix them
all with test_stream_index_file_should_... so the target method is
clear from the test name alone and pytest output reads consistently
with the rest of the suite.
download_source is a public function but was only ever exercised
indirectly via mocks in the BAM and tabix processor tests — the
missing-access_url, direct-HTTPS, and DRS-resolution code paths were
all effectively untested.

New test module covers: ValueError on missing access_url, HTTPS
passthrough with assertion that fetch_drs_object is not called,
drs:// URIs resolving through fetch_drs_object + get_https_download_url
before streaming, and parent-directory creation for nested dest paths.
artifact_kind_str was a one-line wrapper exposing ArtifactKind.value
with zero callers anywhere in the codebase. Drop the function and
its now-unused ArtifactKind import. Callers that need the string
form can call .value directly on the enum member.
Both routers had a near-byte-identical _stream_cache_entry function
handling Range parsing, HEAD-vs-GET branching, and the Content-Range
header — so any future change to range behavior needed to land in
lockstep across both files. Move the helper into a new module,
routers._cache_stream, and import it from both callers.

The helper now accepts a media_type kwarg (defaulting to
application/octet-stream) so callers can override if they want to
signal a specific type for cached artifacts. No behavior change.
identity_dcc, shell_quote, run_argv, run_shell, and copy_from_cache
were byte-identical duplicates between the BAM and tabix processor
modules. Move them into workflows.processors._tools and re-export in
each processor under their existing local alias so test mocks that
patch the module-level names keep working without modification.

No behavior change. The processor modules now focus on pipeline shape
and let the shared subprocess / cache-staging machinery live in one
place.
Four related shutdown and mid-run hygiene issues are resolved
together in the executor's _run_workflow loop and the FastAPI
lifespan:

1. record_stage_complete failures used to leak the mutex — the
   release_workflow call was unreachable when any stage commit
   raised. The loop is now wrapped in try/finally so release always
   fires with the appropriate terminal status.

2. asyncio.CancelledError bypassed except Exception, leaving
   cancelled jobs stuck in RUNNING until the 1h stale threshold. A
   dedicated except branch now marks the job FAILED with a cancel
   reason before re-raising so cancellation still propagates.

3. The per-job workdir under SYNC_DATA_DIR/jobs/{id}/ was never
   cleaned up, slowly filling the Lambda /tmp partition. shutil.rmtree
   in the finally block (via asyncio.to_thread) removes it on every
   exit path.

4. The FastAPI lifespan used to close the WorkerPool while fire-and-
   forget tasks were still in flight, so those tasks would dispatch
   into a closed pool and try to release against a closed Mongo
   client. The lifespan now drains pending workflow tasks with a
   10-second cap before exiting the pool context.

Tests add coverage for workdir cleanup on success and for the
release-still-fires path when record_stage_complete raises.
The processors reached for identity_dcc from _tools and separately
pulled local_id and md5 from file_meta dict access. extract_identity
(made public in an earlier commit) already returns the full
(dcc, local_id, md5) triple over the same preference chain. Drop
identity_dcc from _tools, call extract_identity in BamIndexProcessor
and TabixIntervalProcessor, and destructure the triple. One helper,
one call site, one contract.
The "# --- Workflow subsystem ---" divider is a section banner; the
style guide forbids them because they duplicate what member naming
and ordering should carry. The substantive comment immediately below
(explaining the SYNC_DATA_DIR=unset fallback) stays.
…MING_CASE

The style guide reserves SCREAMING_CASE for values whose binding AND
referent never change. The previous list comprehension was SCREAMING-
cased but its referent was a mutable list, so callers could in
principle mutate it. Switching to a tuple makes the value immutable
and the name legitimate. The $in Mongo operand accepts either type,
so the downstream call sites are unchanged.
The style guide mandates public method ordering static, class, instance
within a visibility tier. Processor's public block had instance
(needs_processing) then classmethod (artifact_kinds_produced) then
abstractmethod instance (run). Move artifact_kinds_produced above
needs_processing so the public block reads class then instance then
instance as required.
The bare "from cfdb.workflows.processors import _tools" plus the
"_ = _tools" discard existed only to keep a module handle reachable
for an imagined patch target. Nothing in either processor actually
touches the module object — the re-exported aliases from the
"from _tools import ..." block cover every use — so the suppression
line was a workaround for a warning caused by dead code. Remove both.
The cancelled-bool scaffolding in _run_workflow had no observable
effect: the only place it was read was a trailing "if cancelled: pass"
in the finally block whose comment noted the raise was handled by the
already-executed "raise" inside each except asyncio.CancelledError
branch. Remove the flag, both assignments, and the terminal if-block.
Cancellation propagation is unchanged — the "raise" inside each
except branch still escapes through the finally after the terminal
release write.
bigBedToBed ships as a UCSC release binary in the api image; its
sibling bedToBigBed was missing. The integration test fixture builder
uses bedToBigBed to materialize a deterministic sample.bb from a tiny
sorted BED — without the tool the bigBed integration test skips. Add
it alongside bigBedToBed so CI exercises the full bigBed pipeline
end-to-end.
Closes the coverage gap left by the two existing integration tiers:
TestWoolExecutorPickleBoundary exercises Wool dispatch with a stub
processor that does no I/O, and the per-processor integration tests
run real samtools/htslib but call processor.run directly in the
test's event loop. Neither exercises the full claim to dispatch to
cache path from inside a Wool worker subprocess.

The new tests/integration package covers that gap:

  - fixtures/make_samples.py builds deterministic ~1 MB of sample
    data per session (SAM, BAM, VCF.gz, BED.gz, GFF3.gz, GTF.gz,
    narrowPeak.gz, broadPeak.gz, optional bigBed). The builders
    shell out to samtools / gzip / bedToBigBed; seeds are fixed so
    two runs produce byte-identical files and content-addressed
    cache keys stay stable.
  - conftest.py runs an http.server background thread serving the
    sample directory, so the Wool worker process can download over
    real HTTPS the same way production does. Mocking
    fetcher.download_source in the test process is insufficient —
    the worker re-imports its own copy and sees the real function.
  - test_processor_e2e.py parametrizes per format and asserts cache
    population, samtools quickcheck on the cached BAM, tabix query
    on staged bgz/tbi pairs, GFF3-attribute-syntax on converted
    GTF, and VCF header-block preservation end-to-end.
  - test_router_e2e.py drives the stream_file / stream_index_file /
    get_job_status handlers against a live executor: 202-then-200
    workflow, Range requests on cached artifacts, HEAD-no-dispatch,
    /jobs/{id} reporting, and the cached-index serve path.
  - test_concurrency.py asserts the Mongo partial-unique-index
    mutex dedupes two asyncio.gather'd /data requests onto a single
    workflow and converges /data + /index onto one workflow too.

Runtime is ~60 s under the integration marker with function-scoped
pools for cleanest isolation. Tests that require gffread or
bedToBigBed skip gracefully when the tool is absent.
conradbzura added 11 commits May 3, 2026 21:34
The tabix processor assembles multi-stage shell pipelines like
"zcat -f | sort | bgzip > out.bgz" and invokes them through
run_shell. asyncio.create_subprocess_shell runs via /bin/sh, which
on Debian (the Dockerfile.api base) is dash — dash does not support
pipefail. A non-terminal stage failure (truncated gzip, gffread
syntax error, bigBedToBed read error, grep out-of-memory) exits
non-zero but the pipeline's overall returncode is the last stage's.
bgzip happily consumes whatever partial bytes arrived and exits 0,
run_shell returns success, and cache.put commits a truncated
artifact under its content-addressed key. Every subsequent request
then serves the corrupt pair until the upstream md5 changes.

Invoke the shell via bash -o pipefail -c so any stage's non-zero
exit surfaces at the pipeline boundary. bash is already in the
Dockerfile.api image. The BAM processor's samtools view | samtools
sort pipeline gains the same guarantee.

New tests assert that run_shell traps both final-stage and
non-terminal-stage failures, so a regression would fail loudly
rather than leak through the cache.
bam.py, tabix.py, and jobs.py each defined a module-level
logger = logging.getLogger(__name__) with zero callers in the
respective module. Drop each binding and the logging import. When
structured logging lands on the preprocessing path, re-add targeted
log statements rather than a speculative handle.
The FastAPI lifespan was reaching into WoolExecutor._pending_tasks,
a leading-underscore attribute explicitly marked private, to
implement the shutdown drain. That tightly couples the API entry
point to an executor implementation detail and leaks the private
marker's meaning.

Expose drain(timeout) as a public method on JobExecutor (abstract)
and WoolExecutor (concrete). The lifespan now calls
executor.drain(timeout=SHUTDOWN_DRAIN_TIMEOUT_SECONDS), which
returns the number of tasks that were pending at drain entry for
logging purposes. Tasks still running after the timeout are left
for stale-reclamation, same semantics as before. The asyncio import
drops out of main.py.
Processor._format_name was a leading-underscore staticmethod with
three external callers — ProcessorRegistry.lookup_for, and the
per-module _source_suffix helpers in bam.py and tabix.py — which
violates the underscore's meaning as a class-private marker.

Promote the helper to a module-level free function format_name in
processors/_tools.py (alongside the other shared subprocess /
cache-staging helpers) and update all four call sites. The function
is a pure dict reader with no self or cls state, so it belongs at
module scope rather than hanging off the ABC.
Two methods in TestExtractIdentity were typed without the underscore
between test and extract — testextract_identity_should_... Pytest
still collected them because "test" is a matching prefix, but the
test-naming convention test_<method>_should_<outcome>[_when_<cond>]
requires <method> to mirror the callable under test. Fix the name
so the target function (extract_identity) is readable from the
test identifier alone.
The project test guide mandates that integration tests live in a
dedicated tests/integration/ package. Three @pytest.mark.integration
classes remained under tests/test_workflows/ — a layout violation
now corrected:

  - TestWoolExecutorPickleBoundary from test_executor.py →
    tests/integration/test_executor_boundary.py, carrying its stub
    processor and the cloudpickle-by-value registration.
  - TestBamIndexProcessorIntegration from test_processors_bam.py →
    tests/integration/test_direct_processors.py as
    TestBamIndexProcessorDirectCall.
  - TestTabixIntervalProcessorIntegration (BED + VCF) from
    test_processors_tabix.py → the same new file as
    TestTabixIntervalProcessorDirectCall.

The direct-call classes are kept rather than deleted even though
tests/integration/test_processor_e2e.py covers the same formats
end-to-end through Wool: the direct-call variant skips the Wool
boundary and serves as a diagnostic layer — when an e2e test fails
you can still run the direct variant to isolate whether the
regression is in the processor pipeline itself or in the Wool /
cache / executor scaffolding around it.

test_executor.py drops its cloudpickle import, the by-value
registration, and the stub processor dependency now that they only
serve the moved class.
Both concurrency tests were named after the scenario (concurrent
data requests, concurrent data and index) rather than the callable
under test, missing the test_<method>_should_<outcome>[_when_<cond>]
segment. Rename to test_stream_file_should_... and
test_stream_file_and_stream_index_file_should_... so pytest output
identifies which handler each test drives.
stream_cache_entry is a new public helper in routers/_cache_stream.py
shared by /data and /index. It had no direct unit coverage — every
test reached it only through a router. Add tests asserting the five
response shapes it can produce: 200 GET full body, 200 HEAD no body,
206 with a Range slice, 416 for an out-of-bounds Range, and 400 for
a malformed Range header. Each assertion hits the helper directly
so future regressions surface here first without depending on router
integration.
The style guide forbids function-local / method-local / conditional
imports unless breaking a circular import with an explanatory
comment. New and touched test files accumulated ~25 such imports
inside test bodies — a holdover from the implementation round when
processors and registries were being wired up incrementally.

Hoist every test-body import to the module's import block across
test_data.py, test_index.py, test_workflows/test_executor.py,
test_workflows/test_processors_bam.py,
test_workflows/test_processors_tabix.py,
integration/test_processor_e2e.py, and integration/test_router_e2e.py.
Also replace "from pathlib import Path as _P" aliases with the
top-level Path import that now exists in each file.
Bundled low-risk improvements from the review's non-blocking list:

- lock.py: swap Tuple from typing for the lowercase builtin tuple
  to match the rest of the new workflow code (from __future__
  import annotations is already present).

- tests/integration/conftest.py: the samples fixture was calling
  generate_all a second time after sample_data_root had already
  built every file, paying samtools/gzip/bedToBigBed costs twice.
  Restructure so a new session-scoped _session_samples fixture
  runs generate_all exactly once, and sample_data_root and samples
  both read from it.

- tests/integration/conftest.py: drop the fixed 3-second sleep in
  wool_pool. The executor's _dispatch_with_retry already tolerates
  the LocalDiscovery startup window, so the sleep was pure
  overhead on every test. Integration suite runtime drops from
  ~60s to ~10s.

- fetcher.py: offload file writes with asyncio.to_thread so the
  download loop doesn't block the event loop on large artifacts.
  Matters when the executor runs in-process (tests, no WorkerPool);
  harmless when dispatched to a Wool worker with its own loop.
__fields__ is deprecated in Pydantic V2 and slated for removal in V3.
The replacement model_fields is a class property on BaseModel and is
the documented V2 introspection API.
@conradbzura conradbzura force-pushed the 30-add-preprocessing-indexing-workflow branch from c8585f1 to b92c47a Compare May 4, 2026 01:55
@conradbzura conradbzura marked this pull request as ready for review May 4, 2026 01:58
Address findings from a six-reviewer test audit (3 unit × 3 integration,
covering coverage / correctness / quality). Fixup mapping per finding was
attempted via autosquash but the targets were inconsistent with the file
states at intermediate commits, so this is a single consolidated pass.

Key changes:
- B3 (private-symbol references): rename _cache_stream.py → cache_stream.py,
  _tools.py → tools.py; drop _run/_run_shell aliases in bam/tabix
  processors; update test patches to public names.
- B5 (real-Mongo concurrency): add mongomock-motor dev dep; new
  test_mongo_partial_unique.py exercising the production
  partialFilterExpression syntax with 2-way and 10-way concurrent claims
  plus post-release re-claim.
- B6 (partial-commit retry): TestWoolExecutorPartialCommit asserts a
  stage-2 failure leaves stage-1 in cache and the retry skips it.
- B7 (drain coverage): three new TestWoolExecutorDrain tests for idle,
  pending, and post-drain ensure_workflow rejection.
- B8 (routines.py extraction): move StubProcessor / stub_file_meta out of
  test_executor_boundary.py into tests/integration/routines.py; drop
  cloudpickle.register_pickle_by_value workaround.
- NB1: tighten weak/tautological assertions (BAM SO:coordinate, tabix
  query result count, sidecar bytes/URL, raw=true DRS reachability,
  pytest.raises match=, PIPELINE_VERSION isinstance).
- NB2: branch-gap unit tests (extract_identity submission/non-dict-dcc,
  normalize_dcc/md5 direct, mark_running rejection on non-PENDING,
  ArtifactKind coercion).
- NB3.b: parametrized byte-range tests + multi-chunk seam test +
  unsatisfiable 416 case.
- NB4: replace _pending_tasks introspection with public observable.
- NB5: integration polish (assert COMPLETED, drop 3s LocalDiscovery
  sleep, share wool_pool fixture, fix conftest scope docstring).
- NB6: convert string-path mocker.patch calls to mocker.patch.object.
- NB7: hygiene (unused imports, mid-file imports, missing @settings).
- NB8: unify TestBamIndexProcessorAppliesTo + Run; tabix counterpart.

Skipped: B1, B2 (test name + class-wrapping refactors) per user direction.
B4 (uppercase GWT) verified out of scope — pre-existing pre-PR code.
Deferred to follow-ups: NB3.a/c/d (low-value integration coverage),
B8 full retrofit (Scenario dataclass + pairwise + Hypothesis framework).

Test status: 181 unit tests pass, 24 integration tests collect.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add on-demand preprocessing and indexing workflow to /data and /index endpoints

1 participant