Skip to content

Default to service mode returning the results for the ingestion job#2133

Merged
jdye64 merged 3 commits into
NVIDIA:26.05from
jdye64:ingest-return-results
May 27, 2026
Merged

Default to service mode returning the results for the ingestion job#2133
jdye64 merged 3 commits into
NVIDIA:26.05from
jdye64:ingest-return-results

Conversation

@jdye64
Copy link
Copy Markdown
Collaborator

@jdye64 jdye64 commented May 27, 2026

Description

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.
  • If adjusting docker-compose.yaml environment variables have you ensured those are mimicked in the Helm values.yaml file.

@jdye64 jdye64 requested review from a team as code owners May 27, 2026 16:44
@jdye64 jdye64 requested review from jioffe502 and removed request for a team May 27, 2026 16:44
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 27, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 27, 2026

Greptile Summary

This PR introduces return_results=True as the default for ServiceIngestor.ingest(), populating result.dataframe with extracted rows after ingestion completes. It also redesigns the split-topology (gateway+worker) result path: instead of embedding large row payloads in the job-completion callback, workers cache rows locally in a new worker_result_store and expose them through a new internal GET endpoint.

  • New return_results flag: added to IngestExecuteParams (default True), plumbed through _resolve_execute_flags, and surfaced on ServiceIngestResult.dataframe as a concatenated DataFrame in upload order.
  • Split-topology row transport: _fire_gateway_callback no longer carries result_data; rows are cached in the new worker_result_store module and fetched lazily by the gateway status endpoint.
  • Serialization consolidation: per-document row sanitization is extracted into the new nemo_retriever.ingest_results module, replacing the old approach that dropped large binary columns entirely in favour of per-cell placeholder strings.

Confidence Score: 4/5

Safe to merge for the single-pod topology; the split gateway+worker path has a potential accumulation issue in the worker result store that should be addressed before heavy production use.

The worker result store has no TTL or eviction mechanism. When _fire_gateway_callback fails (it silently swallows exceptions), the gateway never marks the document completed and never calls the worker's GET endpoint to consume the stored rows. Each such failure leaves a permanent entry in _store. In deployments where gateway callbacks fail intermittently — gateway restarts, transient network errors — the worker pod's memory grows without bound over time.

nemo_retriever/src/nemo_retriever/service/services/worker_result_store.py and pipeline_pool.py — the interaction between silent callback failure and unbounded result storage.

Important Files Changed

Filename Overview
nemo_retriever/src/nemo_retriever/service/services/pipeline_pool.py Splits result data out of the callback payload — stores rows locally before firing gateway notification, but has no recovery if the callback fails silently, leaving _store entries orphaned.
nemo_retriever/src/nemo_retriever/service/services/worker_result_store.py New in-memory row cache for split-topology workers; no TTL or eviction policy, creating a memory-leak risk if gateway callbacks fail silently.
nemo_retriever/src/nemo_retriever/ingest_results.py New shared serialization helpers; retains all columns (vs the old approach that dropped large binary columns), which is the intended behavioral change.
nemo_retriever/src/nemo_retriever/service/routers/ingest.py Adds new internal GET endpoint for workers to expose cached rows; upgrades _status_response to async and adds gateway-mode fallback to fetch from worker pods.
nemo_retriever/src/nemo_retriever/service_ingestor.py Adds return_results=True default, refactors fetch/persist into _materialize_completed_document (single HTTP round-trip for both paths), and populates result.dataframe at the end of ingest().
nemo_retriever/src/nemo_retriever/params/models.py Adds return_results: bool = True to IngestExecuteParams; safe additive change.
nemo_retriever/tests/test_service_worker_callback.py New tests covering split-topology callback omitting result_data and the worker document-result endpoint; good coverage of the happy path.
nemo_retriever/tests/test_ingest_results.py Tests for ingest_results helpers: column preservation, round-trip consistency, and document ordering; well-structured.

Sequence Diagram

sequenceDiagram
    participant Client as ServiceIngestor (client)
    participant GW as Gateway Pod
    participant WK as Worker Pod

    Client->>GW: POST /v1/ingest/job (open job)
    Client->>GW: "POST /v1/ingest/document (upload doc, callback_url=gateway)"
    GW->>WK: "enqueue WorkItem(callback_url=...)"

    Note over WK: Pipeline processes document
    WK->>WK: store_result_data(doc_id, rows)
    WK->>GW: "POST /v1/internal/job-callback {id, status=completed, result_rows=N}"

    GW->>GW: tracker.mark_completed(doc_id)
    GW-->>Client: SSE: document_complete event

    Client->>GW: "GET /v1/ingest/status/{doc_id}"
    GW->>GW: consume_result_data(doc_id) → None (gateway tracker has no rows)
    GW->>WK: "GET /v1/internal/document-result/{doc_id}"
    WK->>WK: consume_result_data(doc_id) → rows (removed from _store)
    WK-->>GW: "{result_data: [...rows...]}"
    GW-->>Client: "JobStatusResponse {result_data: [...rows...]}"

    Note over Client: concat_ingest_results → result.dataframe
Loading
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
nemo_retriever/src/nemo_retriever/service/services/pipeline_pool.py:270-279
**Unbounded memory leak when gateway callback fails silently**

`store_result_data` is called before `_fire_gateway_callback`. Because `_fire_gateway_callback` catches all exceptions internally and returns `None` on any network failure, the worker has no way to know the callback was lost. When that happens, the gateway never marks the document as `COMPLETED`, the status endpoint never calls `consume_result_data`, and the rows accumulate in `_store` indefinitely — there is no TTL or max-size guard in `worker_result_store`. In a long-running worker pod with even occasional callback failures (gateway restarts, transient network errors), `_store` will grow without bound.

### Issue 2 of 3
nemo_retriever/src/nemo_retriever/service/services/worker_result_store.py:1-2
Missing `# All rights reserved.` line between the copyright and license identifier lines. All four new files in this PR share this omission.

```suggestion
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
```

### Issue 3 of 3
nemo_retriever/helm/README.md:39-44
The backtick opening the inline code span on the second line is never closed. Most Markdown renderers will treat everything after it as code until the next backtick is found (or fall back to literal rendering), producing garbled output. The block also needs structural fixes to separate the prose from the code examples.

```suggestion
> For behavioral consistency between local HuggingFace deployments and Helm service deployments,
> `return_results` defaults to `True`:
>
> ```python
> results = ingestor.ingest(..., return_results=True)
> ```
>
> This incurs a significant performance and system memory usage cost.
> Unless you explicitly need extraction results returned to the client, use:
>
> ```python
> results = ingestor.ingest(..., return_results=False)
> ```
>
> If you must return results, you may need to increase pod memory specs to support the increased usage.
```

Reviews (2): Last reviewed commit: "Update Helm readme with snippet about re..." | Re-trigger Greptile

Comment on lines +445 to +457
def _materialize_completed_document(
self,
document_id: str,
*,
return_results: bool,
) -> list[dict[str, Any]] | None:
"""Fetch (once) and optionally persist rows for a completed document."""
if not return_results and self._save_to_disk_dir is None:
return None
result_data = self._fetch_document_result_data(document_id)
if self._save_to_disk_dir is not None:
self._write_result_data_to_disk(document_id, result_data)
return result_data if return_results else None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Data loss when both return_results and save_to_disk are active

If _write_result_data_to_disk raises (e.g., disk full, permission error), the exception bubbles out of _materialize_completed_document and is caught by the except Exception in the ingest loop. At that point the status endpoint has already consumed result_data (the service drops it on first read), so the data is gone from both the in-memory rows_by_document map and disk — even though return_results=True was requested. The concrete failure: user enables .save_to_disk(), disk is nearly full, one document's write fails, and that document silently disappears from result.dataframe with no way to recover it.

A safer structure would be to persist the disk write independently and return result_data regardless of whether the write succeeds.

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/service_ingestor.py
Line: 445-457

Comment:
**Data loss when both `return_results` and `save_to_disk` are active**

If `_write_result_data_to_disk` raises (e.g., disk full, permission error), the exception bubbles out of `_materialize_completed_document` and is caught by the `except Exception` in the `ingest` loop. At that point the status endpoint has already consumed `result_data` (the service drops it on first read), so the data is gone from both the in-memory `rows_by_document` map and disk — even though `return_results=True` was requested. The concrete failure: user enables `.save_to_disk()`, disk is nearly full, one document's write fails, and that document silently disappears from `result.dataframe` with no way to recover it.

A safer structure would be to persist the disk write independently and return `result_data` regardless of whether the write succeeds.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines 1072 to +1075
except Exception as exc:
logger.warning("save_to_disk: failed to persist %s: %s", doc_id, exc)
result.failures.append((doc_id, f"save_to_disk: {exc}"))
label = "return_results" if return_results else "save_to_disk"
logger.warning("%s: failed to fetch/persist %s: %s", label, doc_id, exc)
result.failures.append((doc_id, f"{label}: {exc}"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 The logger.warning call is missing exc_info=True. Per the no-bare-except rule, a broad Exception catch is only acceptable when the exception is logged with full context including the traceback; without exc_info=True the stack trace is swallowed, making post-mortem diagnosis very difficult.

Suggested change
except Exception as exc:
logger.warning("save_to_disk: failed to persist %s: %s", doc_id, exc)
result.failures.append((doc_id, f"save_to_disk: {exc}"))
label = "return_results" if return_results else "save_to_disk"
logger.warning("%s: failed to fetch/persist %s: %s", label, doc_id, exc)
result.failures.append((doc_id, f"{label}: {exc}"))
except Exception as exc:
label = "return_results" if return_results else "save_to_disk"
logger.warning("%s: failed to fetch/persist %s: %s", label, doc_id, exc, exc_info=True)
result.failures.append((doc_id, f"{label}: {exc}"))
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/service_ingestor.py
Line: 1072-1075

Comment:
The `logger.warning` call is missing `exc_info=True`. Per the `no-bare-except` rule, a broad `Exception` catch is only acceptable when the exception is logged with full context including the traceback; without `exc_info=True` the stack trace is swallowed, making post-mortem diagnosis very difficult.

```suggestion
                        except Exception as exc:
                            label = "return_results" if return_results else "save_to_disk"
                            logger.warning("%s: failed to fetch/persist %s: %s", label, doc_id, exc, exc_info=True)
                            result.failures.append((doc_id, f"{label}: {exc}"))
```

How can I resolve this? If you propose a fix, please make it concise.

self.document_ids: list[str] = []
self.elapsed_s: float = 0.0
self.job_status: str | None = None
self.dataframe: Any = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The dataframe attribute on the public ServiceIngestResult class is typed as Any, losing all type safety. Per the public-api-contract and type-hints-public-api rules, public attributes must have complete annotations. Using Optional[pd.DataFrame] accurately communicates the None/populated distinction callers need to check.

Suggested change
self.dataframe: Any = None
self.dataframe: "Optional[pd.DataFrame]" = None
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/service_ingestor.py
Line: 147

Comment:
The `dataframe` attribute on the public `ServiceIngestResult` class is typed as `Any`, losing all type safety. Per the `public-api-contract` and `type-hints-public-api` rules, public attributes must have complete annotations. Using `Optional[pd.DataFrame]` accurately communicates the `None`/populated distinction callers need to check.

```suggestion
        self.dataframe: "Optional[pd.DataFrame]" = None
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines 270 to 279
if item.callback_url:
from nemo_retriever.service.services.worker_result_store import store_result_data

store_result_data(item.id, result_data)
await _fire_gateway_callback(
item.callback_url,
item.id,
"completed",
result_rows=result_rows,
result_data=result_data,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unbounded memory leak when gateway callback fails silently

store_result_data is called before _fire_gateway_callback. Because _fire_gateway_callback catches all exceptions internally and returns None on any network failure, the worker has no way to know the callback was lost. When that happens, the gateway never marks the document as COMPLETED, the status endpoint never calls consume_result_data, and the rows accumulate in _store indefinitely — there is no TTL or max-size guard in worker_result_store. In a long-running worker pod with even occasional callback failures (gateway restarts, transient network errors), _store will grow without bound.

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/service/services/pipeline_pool.py
Line: 270-279

Comment:
**Unbounded memory leak when gateway callback fails silently**

`store_result_data` is called before `_fire_gateway_callback`. Because `_fire_gateway_callback` catches all exceptions internally and returns `None` on any network failure, the worker has no way to know the callback was lost. When that happens, the gateway never marks the document as `COMPLETED`, the status endpoint never calls `consume_result_data`, and the rows accumulate in `_store` indefinitely — there is no TTL or max-size guard in `worker_result_store`. In a long-running worker pod with even occasional callback failures (gateway restarts, transient network errors), `_store` will grow without bound.

How can I resolve this? If you propose a fix, please make it concise.

@jdye64 jdye64 merged commit ac41c62 into NVIDIA:26.05 May 27, 2026
7 of 8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant