Skip to content

Cherry pick changes from 2.7#4327

Merged
YuanTingHsieh merged 16 commits intoNVIDIA:mainfrom
YuanTingHsieh:cherry-pick-changes-from-27
Mar 17, 2026
Merged

Cherry pick changes from 2.7#4327
YuanTingHsieh merged 16 commits intoNVIDIA:mainfrom
YuanTingHsieh:cherry-pick-changes-from-27

Conversation

@YuanTingHsieh
Copy link
Copy Markdown
Collaborator

Description

Cherry-pick #4225 #4231 #4247 #4250 #4260 #4259 #4272 #4270 #4249 #4275 #4296 #4297 #4309 #4307 #4312 #4314

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change (fix or new feature that would cause existing functionality to change).
  • New tests added to cover the changes.
  • Quick tests passed locally by running ./runtest.sh.
  • In-line docstrings updated.
  • Documentation updated.

chesterxgchen and others added 16 commits March 17, 2026 13:20
…e enhancements (NVIDIA#4225)

## Summary

This PR addresses several related bugs and improvements in the Client
API subprocess path, swarm learning, and the recipe interface.
Address FLARE-2743 (bug/5921094) and (bug/5921708)

---

## Issues

### 1. LazyDownloadRef dtype error with pass-through streaming
When pass-through tensor streaming is enabled on the CJ, param
converters (e.g. `NumpyToPTParamsConverter`) were running on the CJ
*before* lazy references were materialized. This caused `"Could not
infer dtype of LazyDownloadRef"` warnings and potential data corruption
on large-model jobs using pass-through transfer.

### 2. Subprocess Client API had no converter wiring
The `ExProcessClientAPI` (subprocess path) created a
`FlareAgentWithFLModel` with no converters attached. Format conversion
was happening on the CJ side in `LauncherExecutor.execute()`, which
triggered issue #1.

### 3. Missing `learn_task_check_interval` in `CCWFJob.add_swarm()`
`SwarmClientConfig.learn_task_check_interval` was not forwarded to the
swarm controller, so any customization of that parameter was silently
ignored.

### 4. `extract_participants()` did not support `{"targets": [...]}`
dict form
The deploy map validation only handled list-form and `{"sites": [...]}`
dict-form entries. The `{"targets": [...]}` form (used by some job
generators) raised an unhandled error.

### 5. `SimpleSwarmLearningRecipe` missing parameters
`min_clients`, `launch_external_process`, and `command` could not be set
through the recipe interface, forcing users to construct jobs manually.

### 6. 3rd-party integration docs referenced deprecated
`FlareAgentWithCellPipe`
The class has been superseded by constructing a `CellPipe` and
`FlareAgent` directly, or using the higher-level Client API
(`nvflare.client`).

---

## Issues fixed from review comments

### 7. `task_name` read from absent shareable header in
`FlareAgentWithFLModel.shareable_to_task_data()`
The task name was read via `shareable.get_header(FLContextKey.TASK_NAME,
"")`, but this header is never populated in the subprocess path — the
task name is carried as the pipe `Message.topic` and stored in
`self.current_task.task_name` by `FlareAgent._on_task()`. An absent
header caused `task_name` to fall back to `""`, making
`ParamsConverter.process()` silently skip conversion. Fixed to use
`self.current_task.task_name if self.current_task else ""`, consistent
with `task_result_to_shareable()`.

### 8. Double-conversion when `from_nvflare_converter_id` /
`to_nvflare_converter_id` set on launcher executor
`LauncherExecutor.execute()` still applied CJ-side converters if users
explicitly set `from_nvflare_converter_id` / `to_nvflare_converter_id`,
while the subprocess was also applying its own default converters —
resulting in double conversion. Fixed by removing the converter calls
from `LauncherExecutor.execute()` entirely. Since `LauncherExecutor` is
only ever subclassed by `ClientAPILauncherExecutor` (subprocess path),
this has no effect on the in-process path.

---

## Approach

**Converter logic removed from CJ launcher executors; moved to the
subprocess agent side.**

Previously, `PTClientAPILauncherExecutor` and
`TFClientAPILauncherExecutor` set up format converters in their
`initialize()` methods, and `LauncherExecutor.execute()` applied them on
the CJ before dispatching to the subprocess. This was the root cause of
issue #1 — the CJ was touching tensor data (triggering dtype inference)
before `LazyDownloadRef` placeholders were materialized in the
subprocess.

The converter setup blocks have been **completely removed** from both
launcher executors. Converters for the subprocess path now live entirely
inside the subprocess, wired into `FlareAgentWithFLModel` and executing
after full payload materialization.

Key changes:
- `FlareAgentWithFLModel` gains optional `from_nvflare_converter` /
`to_nvflare_converter` params. A lightweight `_ConverterContext` stub
(duck-typing `FLContext.get_prop/set_prop`) allows
`ParamsConverter.process()` to work without a real `FLContext` in the
subprocess.
- `SERVER_EXPECTED_FORMAT` is added to `ConfigKey` and propagated in the
subprocess launch config so the agent knows what format the server
expects.
- A shared factory `converter_utils.create_default_params_converters()`
is introduced, replacing four separate copies of the same
format-conditional logic across PT and TF executors.
- In-process executors (`PTInProcessClientAPIExecutor`,
`TFInProcessClientAPIExecutor`) are **unchanged in behavior** — they
still apply converters on the CJ side, which is correct since there is
no separate subprocess.

---

## Reason for this approach over alternatives

An earlier iteration kept the converter setup in `initialize()` but
added a `ClientAPILauncherExecutor.execute()` override that nulled the
converters out before calling `super().execute()` and restored them
after. This was removed because it left converters alive on the object
but always bypassed — confusing dead code. The current approach is
explicit and honest: launcher executors do not set converters at all,
and the subprocess handles its own conversion autonomously.

---

## Affected files

| Area | Files |
|------|-------|
| Core bug fixes | `ccwf_job.py`, `fed_utils.py` |
| Converter framework | `converter_utils.py` (new), `config.py`,
`flare_agent_with_fl_model.py`, `ex_process/api.py`,
`client_api_launcher_executor.py` (base) |
| PT launcher executor — converter removed |
`pt/client_api_launcher_executor.py` |
| PT in-process executor — refactored to shared factory |
`pt/in_process_client_api_executor.py` |
| TF launcher executor — converter removed + `initialize()` deleted |
`tf/client_api_launcher_executor.py` |
| TF in-process executor — refactored to shared factory |
`tf/in_process_client_api_executor.py` |
| Recipe | `pt/recipes/swarm.py` |
| Docs | `3rd_party_integration.rst`, `3rd_party_trainer.py` |
| Tests | 4 new test files, 4 modified test files, 2 integration YAMLs |

---

## Trade-offs and future work

- **TF/Keras converter path is not directly unit-tested** due to
TF/PyTorch dependency conflicts in a shared environment. The
`create_default_params_converters` Keras branch is structurally
identical to the PT branch (which is fully tested). A dedicated TF-only
test environment could add coverage in a future pass.
- **`_ConverterContext` is a duck-type shim.** If more `ParamsConverter`
use cases arise in subprocess contexts, a formal lightweight interface
(separate from `FLContext`) would be worth extracting.
- **Converter plugin (`from_nvflare_converter_id`)** for the subprocess
path is currently not wired — only the auto-created default converters
are passed to the agent. Explicit user-provided converter IDs are still
only supported for the in-process path. This can be addressed in a
follow-up.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…VIDIA#4231)

Three related issues were discovered in the NVFlare client-side stack:

After each federated learning round, the client process RSS grew
monotonically. Model parameters — both the received global model and the
locally trained model — were kept alive long after `flare.send()`
completed. The reference chain kept them live until the next `receive()`
overwrote `self.fl_model`, meaning one full round's worth of tensors was
always double-buffered in memory.

`delete_msg_root`

The original `send_to_peer()` had:

```python
finally:
    # the msg_id is also used as msg_root_id
    delete_msg_root(msg.msg_id)
```

This was a race condition. `CellPipe.send_request()` returns as soon as
the peer acknowledges receipt via `make_reply(ReturnCode.OK)` in
`_receive_message` — which happens **before** the peer finishes
downloading model params. The `finally` block fired
`delete_msg_root(msg.msg_id)` and destroyed the active download
transaction while the CJ was still pulling tensors from the subprocess,
causing intermittent download failures.

When `launch_external_process=True`, `SubprocessLauncher` spawns
`python3 -u client.py`. This new process starts with Python's
unconfigured logging system — `lastResort` only emits `WARNING+`, so
every `logger.info()` inside the subprocess is silently dropped. All
INFO-level diagnostics (NVFlare internals, user training progress) were
invisible in subprocess mode.

---

`39eb631d`

**Files:** `nvflare/client/in_process/api.py`,
`nvflare/client/ex_process/api.py`, `nvflare/client/model_registry.py`

After `flare.send()`, both the sent model's params and the received
global model's params are dead weight — already serialized and
transmitted. We now explicitly null them immediately when
`clear_cache=True` (the default):

- **`InProcessClientAPI`**: Nulls `model.params`,
`model.optimizer_params`, `self.fl_model.params`,
`self.fl_model.optimizer_params` before clearing `self.fl_model`.
- **`ExProcessClientAPI`**: Calls `model_registry.release_params(model)`
which nulls both the sent and received model's params.

**Why not `gc.collect()`?** CPython's reference counting reclaims
objects as soon as their refcount drops to zero — `gc.collect()` only
helps with reference *cycles*. PyTorch tensors don't form cycles. Once
the params dicts are nulled, tensor memory is freed immediately.
Confirmed experimentally: RSS stays flat without `gc.collect()`.

**Why null params specifically?** Params (the large tensor dicts) are
the dominant memory consumer. Nulling them frees the tensors while
keeping the `FLModel` shell (metadata, metrics, round) alive for any
code that reads those fields after `send()`.

`PipeHandler.send_to_peer()` — commit `39eb631d`

**File:** `nvflare/fuel/utils/pipe/pipe_handler.py`

Removed the `finally: delete_msg_root(msg.msg_id)` block and its import.
The `msg_id` doubles as the `msg_root_id` used by
`ViaDownloaderDecomposer` to track download transaction lifecycle.
Calling `delete_msg_root` in a `finally` block races against the peer's
download: the ack is immediate but the actual tensor download is
asynchronous. Download transaction cleanup now correctly happens in
`wf_comm_server.py` and `task_controller.py`, which call
`delete_msg_root` only after the task is fully processed server-side.

commit `bc6d87847`

**File:** `nvflare/client/ex_process/api.py` — added
`_configure_subprocess_logging()`

Immediately after loading `client_api_config.json`, `init()` calls
`_configure_subprocess_logging(client_config)`, which:

1. Reads `workspace_dir` from `client_api_config.json` under
`TASK_EXCHANGE.pipe.ARG.workspace_dir`
2. Loads `{workspace_dir}/local/log_config.json` (the site's standard
NVFlare log config)
3. Calls `apply_log_config(dict_config, workspace_dir)` to invoke
`logging.config.dictConfig()`

This wires up `consoleHandler -> sys.stdout` (captured by
`SubprocessLauncher`), `logFileHandler -> log.txt`, and `jsonFileHandler
-> log.json` — exactly as the main NVFlare process configures them. The
entire method is wrapped in `try/except Exception: pass` — a logging
setup failure must never crash the training script.

Called before the `try` block in `init()` so that even errors during
`flare_agent.start()` are properly logged.

**New file:** `nvflare/fuel/utils/mem_utils.py`

Zero-cost diagnostic to measure client-side RSS per round:

- Gated by `NVFLARE_CLIENT_MEMORY_PROFILE=1` (off by default). Name is
client-scoped so server/relay processes are unaffected.
- `_ENABLED` evaluated once at import time — single boolean check per
call, zero overhead when disabled.
- Uses `logger.info()`: after Fix 3, works correctly in both modes. RSS
lines land in `log.txt`/`log.json` alongside all other NVFlare
diagnostics.
- `psutil` imported lazily; entire call is `try/except`-wrapped so
missing install is a silent no-op.
- Instrumented at `receive()` and `send()` in both `InProcessClientAPI`
and `ExProcessClientAPI`.

---

| File | Change |
|------|--------|
| `nvflare/client/api_spec.py` | `_maybe_cleanup_memory()` base
implementation and memory management attributes |
| `nvflare/client/in_process/api.py` | Eager param release in `send()`,
`log_rss()` instrumentation, `configure_memory_management()` |
| `nvflare/client/ex_process/api.py` | Eager param release,
`_configure_subprocess_logging()`, `log_rss()` instrumentation, memory
GC config |
| `nvflare/client/model_registry.py` | Added `release_params()` method |
| `nvflare/fuel/utils/pipe/pipe_handler.py` | Removed racy `finally:
delete_msg_root(msg.msg_id)` and its import |
| `nvflare/fuel/utils/mem_utils.py` | **New file** — `log_rss(tag)` RSS
profiling utility |
| `docs/client_api.rst` | Clarified param lifecycle and `clear_cache`
semantics |

---

Tested with `hello-pt` (CIFAR-10, 2 clients, 2 rounds) using
`NVFLARE_CLIENT_MEMORY_PROFILE=1`:

**In-process mode:**
```
round=0 after_receive: 629.2 MB
round=0 after_send:    270.9 MB   <- params freed immediately
round=1 after_receive: 283.4 MB
round=1 after_send:    448.5 MB   <- flat, no accumulation
```

**Subprocess mode (`launch_external_process=True`):**
```
round=0 after_receive: 623.5 MB
round=0 after_send:    129.4 MB   <- params freed immediately
round=1 after_receive: 136.1 MB
round=1 after_send:    121.5 MB   <- flat, no accumulation
```

RSS stabilizes after round 0 in both modes. Subprocess logging
confirmed: `logger.info()` lines now appear in `site-N/log.txt` (written
directly by the subprocess) and captured via `SubprocessLauncher`.

- [x] In-process RSS profiling with `NVFLARE_CLIENT_MEMORY_PROFILE=1` —
`[RSS]` lines in `site-N/log.txt`
- [x] Subprocess RSS profiling with `launch_external_process=True` —
`[RSS]` lines in both subprocess log and `SubprocessLauncher` capture
- [x] Subprocess logging gap fixed — `logger.info()` calls now visible
(previously silently dropped)
- [x] Memory flat across rounds in both modes (no monotonic growth)
- [x] `flare.send(clear_cache=False)` path untouched — params only
released when `clear_cache=True`
- [x] `delete_msg_root` race removed — download transactions no longer
prematurely destroyed

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…HROUGH, download gating, MSG_ROOT_TTL, LazyRef local-aggr, logging, min_clients, diagnostics (NVIDIA#4247)

Comprehensive fix series for large-model federated training with
subprocess clients. Addresses memory accumulation root causes,
correctness crashes, timeout misconfiguration, and operational
diagnostics across 18 numbered fixes.

**Data-flow context** (subprocess-mode per round):
```
Forward path:  FL Server / Aggregator → [scatter] → Trainer CJ → [task delivery] → Subprocess
Reverse path:  Subprocess → [result]  → Trainer CJ → [relay]   → FL Server
```

---

**Problem**: `CellPipe.send()` re-serialized the payload on every retry,
creating a new `ArrayDownloadable` download transaction per attempt.
With unbounded retries on a 5 GiB model, each failed send accumulated a
fresh ~5 GiB transaction, leading to OOM.

**Approach**: Cache the serialized `CellMessage` on the `Message` object
(e.g. `msg._cached_cell_msg`) on the first send. Retries reuse the same
bytes and the same download transaction — no re-serialization, no
transaction accumulation.

**Files**: `nvflare/fuel/utils/pipe/cell_pipe.py`

**Problem**: The cached `CellMessage` held references to encoded bytes
and the FOBS download transaction long after the retry loop exited,
delaying garbage collection.

**Approach**: `PipeHandler._send_to_pipe()` calls
`pipe.release_send_cache(msg)` unconditionally in a `try/finally` after
the loop. The transaction is not cancelled on each retry (the peer may
be mid-download); it is released exactly once after the loop terminates.

**Files**: `nvflare/fuel/utils/pipe/pipe_handler.py`

transfer
**Problem**: `FlareAgent.submit_result_timeout` defaulted to 60 s — far
too short for a 5 GiB transfer. There was no way to set it from the job
config.

**Approach**: Add `ConfigKey.SUBMIT_RESULT_TIMEOUT` to `ClientConfig`;
wire through `ClientAPILauncherExecutor` → `prepare_config_for_launch()`
→ `FlareAgentWithFLModel` → `FlareAgent`. Operator sets it per job via
`recipe.add_client_config({"submit_result_timeout": 1800})`.

**Files**: `nvflare/client/config.py`,
`nvflare/app_common/executors/client_api_launcher_executor.py`,
`nvflare/client/ex_process/api.py`

completes
**Problem**: `CacheableObject.transaction_done()` cleared `self.cache`
but not `self.base_obj`, keeping the numpy source array alive after all
receivers had finished downloading from it.

**Approach**: Set `self.base_obj = None` in `transaction_done()` so the
reference is dropped deterministically at transaction completion, not
deferred to GC.

**Files**: `nvflare/fuel/f3/streaming/cacheable.py`

**Problem**: `peer_read_timeout` (CJ waiting for subprocess to read the
task) was 1800 s, but `submit_result_timeout` (subprocess waiting for CJ
to ACK result) defaulted to 60 s — structurally inconsistent. A
large-model send could exhaust `submit_result_timeout` multiple times
before CJ finished downloading.

**Approach**: With Fix 3 making `submit_result_timeout` configurable,
both timeouts are now set consistently at job-config level. Fix 13 adds
a startup warning when they are mismatched.

**Problem**: `CellPipe._receive_message()` returned `ReturnCode.OK` only
after FOBS deserialization completed inline — meaning the subprocess
waited for CJ to finish downloading before receiving the ACK. On slow
networks or large models, the subprocess timed out waiting for ACK while
CJ was still downloading.

**Approach**: Return the pipe-level ACK immediately on receipt, then
decode the payload asynchronously. This decouples the acknowledgment
latency from the download transfer time.

**Files**: `nvflare/fuel/utils/pipe/cell_pipe.py`

**Problem**: `ClientAPILauncherExecutor.initialize()` enabled
PASS_THROUGH regardless of pipe type. For `FilePipe` subprocesses
(third-party integrations), the subprocess cannot resolve cell FQCNs to
pull from the server's `DownloadService` directly — enabling
PASS_THROUGH in that mode silently breaks deserialization.

**Approach**: Check `isinstance(self.pipe, CellPipe)` before enabling
PASS_THROUGH. `FilePipe` mode keeps PASS_THROUGH disabled and CJ
continues to download and re-serialize into the file.

**Files**:
`nvflare/app_common/executors/client_api_launcher_executor.py`

**Problem**: On the result path, CJ decoded the subprocess's result
(downloading ~1 GB of tensors), then re-encoded and re-sent to the
server (another ~1 GB upload). CJ materialized the full model twice with
no benefit — it was a pure relay.

**Approach**: Enable `PASS_THROUGH=True` on the subprocess↔CJ pipe cell.
CJ creates `LazyDownloadRef` objects instead of downloading tensors.
When CJ re-encodes for the server, `LazyDownloadRefDecomposer` re-emits
the subprocess's original `fqcn + ref_id`. The server pulls directly
from the subprocess's `DownloadService` — CJ never materializes the
result tensors. `MSG_ROOT_TTL` is stamped on the outgoing cell message
to keep the subprocess transaction alive for the full download.

**Files**:
`nvflare/app_common/executors/client_api_launcher_executor.py`,
`nvflare/fuel/utils/pipe/cell_pipe.py`

**Problem**: `_MIN_DOWNLOAD_TIMEOUT` was a hardcoded 60 s constant. For
70 B models the inter-chunk gap can exceed 60 s on any realistic
network, causing `_monitor_tx` to kill an active transaction
mid-transfer.

**Approach**: Add `ConfigVarName.MIN_DOWNLOAD_TIMEOUT`; read via
`acu.get_positive_float_var()` in `_create_downloader()`, following the
same pattern as `STREAMING_PER_REQUEST_TIMEOUT`. Operator sets
`np_min_download_timeout` or `tensor_min_download_timeout` in job
config. 60 s remains the fallback default.

**Files**: `nvflare/apis/fl_constant.py`,
`nvflare/fuel/utils/fobs/decomposers/via_downloader.py`

**Problem**: `FlareAgent` accepted `max_resends` but it defaulted to
`None` (unlimited) and was not exposed through the job config system.
Unlimited retries with Fix 1's caching are safe, but still produce an
indefinite wait on persistent failures.

**Approach**: Add `ConfigKey.MAX_RESENDS` to `ClientConfig`; wire
through `ClientAPILauncherExecutor` → `prepare_config_for_launch()` →
`FlareAgentWithFLModel` → `FlareAgent` → `PipeHandler`. Default: 3.

**Files**: `nvflare/client/config.py`,
`nvflare/app_common/executors/client_api_launcher_executor.py`,
`nvflare/client/ex_process/api.py`

**Problem**: Subprocess had `log_rss()` + `cleanup_memory()` via
`APISpec._maybe_cleanup_memory()`, but the client-job (CJ) process had
neither. Per-round RSS growth in CJ was invisible.

**Approach**: Override `check_output_shareable()` in
`ClientAPILauncherExecutor` to call `log_rss()` (gated on
`NVFLARE_CLIENT_MEMORY_PROFILE=1`) and `_maybe_cleanup_cj_memory()`
after each result relay. Reuses the existing `memory_gc_rounds` and
`cuda_empty_cache` config params.

**Files**:
`nvflare/app_common/executors/client_api_launcher_executor.py`

duplicate file writes
**Problem**: `_configure_subprocess_logging()` hardcoded
`local/log_config.json`, missing `.default`/`.conf`/`.yml` variants
common in provisioned deployments. Subprocess file handlers duplicated
every log line with the parent's rotating log files.

**Approach**: Use
`ConfigFactory.load_config(WorkspaceConstants.LOGGING_CONFIG,
search_dirs=[local_dir])` so all variants are resolved. Strip all
non-console handlers before applying the config — subprocess logs flow
exclusively through `SubprocessLauncher`'s stdout pipe to the parent.

**Files**: `nvflare/client/ex_process/api.py`
**Tests**: `tests/unit_test/client/test_logging_and_rss_tags.py` (6
tests)

**Problem**: RSS markers (`after_receive`, `after_send`, `client_job`)
were verbose and did not identify which process emitted them.

**Approach**: Standardize to `"CA s={site} r={round} recv/send"`
(ClientAgent/subprocess) and `"CJ s={site} t={task} r={round} relay"`
(ClientJob). Short, greppable, and role-attributable.

**Files**: `nvflare/client/ex_process/api.py`,
`nvflare/client/in_process/api.py`,
`nvflare/app_common/executors/client_api_launcher_executor.py`
**Tests**: `tests/unit_test/client/test_logging_and_rss_tags.py` (10
tests)

**Problem**: Timeout mismatches (e.g. `min_download_timeout <
streaming_per_request_timeout`) were silent until a download failed
mid-transfer; the operator had no actionable warning.

**Approach**: `_validate_timeout_config()` called at end of
`initialize()` logs warnings for: `min_download_timeout <
streaming_per_request_timeout`, `submit_result_timeout >
min_download_timeout`, and `max_resends is None`. Uses `log_warning`
(not `raise`) so the job still runs.

**Files**:
`nvflare/app_common/executors/client_api_launcher_executor.py`

per-call FOBS decode context
**Problem**: PASS_THROUGH was enabled globally on the engine cell's FOBS
context. The engine cell handles all CJ messages — not just task-result
messages from the subprocess, but also Swarm P2P aggregation results
from other trainers. Swarm P2P messages decoded with `PASS_THROUGH=True`
produced `LazyDownloadRef` objects passed to aggregation math
(`weights[k] += v`), crashing with a dtype error.

**Approach**:
- `CellPipe.pass_through_on_send=True` (set in
`ExProcessClientAPI.init()`) stamps `MessageHeaderKey.PASS_THROUGH=True`
on outgoing task-result messages from the subprocess.
- `Adapter.call()` reads the header **per-message** and builds a
per-call FOBS decode context. Only messages explicitly stamped with
`PASS_THROUGH=True` are decoded as `LazyDownloadRef`.
- Swarm P2P messages arrive without the header → `PASS_THROUGH=False` in
decode ctx → tensors decoded normally, no crash.

**Reason**: Per-message header is the only safe granularity; a shared
cell FOBS context cannot distinguish result-relay from P2P aggregation
traffic.

**Files**: `nvflare/fuel/f3/cellnet/cell.py`,
`nvflare/fuel/f3/cellnet/defs.py`,
`nvflare/fuel/utils/pipe/cell_pipe.py`,
`nvflare/client/ex_process/api.py`
**Tests**: `tests/unit_test/fuel/f3/cellnet/test_pass_through_header.py`
(11 tests)

**Problem**: `broadcast_and_wait()` unconditionally overwrote any
pre-set `MSG_ROOT_TTL` with the ACK timeout (~seconds). Swarm P2P
scatter messages carried this short TTL instead of the full
`learn_task_timeout` (hours), causing `_monitor_tx` to kill large-model
download transactions mid-transfer.

**Approach**:
- `SwarmClientController._scatter()` stamps
`ReservedHeaderKey.MSG_ROOT_TTL = float(learn_task_timeout)` on task
data **before** `deepcopy` so the TTL reaches the cell router.
- `task_controller.broadcast_and_wait()` uses an explicit `is not None`
guard to preserve a pre-set TTL instead of overwriting it.

**Files**: `nvflare/apis/impl/task_controller.py`,
`nvflare/app_common/ccwf/swarm_client_ctl.py`
**Tests**: `tests/unit_test/app_common/ccwf/test_msg_root_ttl.py` (6
tests)

server download
**Problem**: The subprocess exited as soon as `send_to_peer()` received
the CJ ACK (a sub-second event after Fix 6). Exiting tore down the
subprocess's `DownloadService` while the server was still mid-download
from it — corrupting or truncating the transfer.

**Approach**:
- `FlareAgent._do_submit_result()` registers a `threading.Event` as
`FOBSContextKey.DOWNLOAD_COMPLETE_CB` in the subprocess cell's FOBS
context before serialization, then waits up to
`download_complete_timeout` seconds for the event to fire.
- `via_downloader._create_downloader()` wires `DOWNLOAD_COMPLETE_CB` as
`transaction_done_cb` on `ObjectDownloader`; the event is set when the
server's final chunk ACKs.

**Files**: `nvflare/client/flare_agent.py`,
`nvflare/fuel/utils/fobs/decomposers/via_downloader.py`,
`nvflare/client/config.py`,
`nvflare/app_common/executors/client_api_launcher_executor.py`,
`nvflare/client/ex_process/api.py`
**Tests**: `tests/unit_test/client/test_download_complete_gating.py` (12
tests)

**Problem**: When the swarm aggregator and a trainer ran on the same CJ
site, `_local_submit()` bypassed the network path. The result tensors
remained as `LazyDownloadRef` objects and reached
`shareable_to_learnable()` unevaluated. The aggregation function inside
`shareable_to_learnable()` expected real numpy arrays and failed with a
runtime error when it encountered the opaque ref objects.

**Approach**: Force-resolve all `LazyDownloadRef` objects in the local
result before passing it to `_local_submit()`. This mirrors the implicit
resolution that occurs via cell deserialization on the remote path.

**Files**: `nvflare/app_common/ccwf/swarm_client_ctl.py`

(feature-flagged, off by default)
**Problem**: When the aggregator scattered the global model to trainer
CJs, each CJ downloaded the full model (copy #1), deep-copied it for the
local learn thread (copy #2), then streamed it to the subprocess. Peak
CJ RSS ≈ 2× model size during scatter with no algorithmic necessity.

**Approach**: Feature-flag that enables `PASS_THROUGH=True` on the
forward scatter path. With the flag on, the CJ acts as a pure relay — it
never materializes the model tensors, forwarding streaming ref-IDs
directly to the subprocess, which pulls the model directly from the
aggregator's `DownloadService`. Eliminates both copies from CJ memory.

**Reason**: Feature-flagged to allow gradual rollout; correctness on all
Swarm topologies requires validation before enabling by default.

**Files**: `nvflare/app_common/ccwf/swarm_client_ctl.py`,
`nvflare/fuel/f3/cellnet/cell.py`

---

| # | Change | Files |
|---|--------|-------|
| A | `min_clients: int = 0` on `ServerSideController` — workflow
tolerates partial dropout; default 0 = all clients required (backward
compatible) | `nvflare/app_common/ccwf/server_ctl.py` |
| B | Subprocess stdout passed through directly via `print()` — no
re-logging wrapper, no added timestamp or prefix (subprocess lines
already carry their own log formatting) |
`nvflare/app_common/launchers/subprocess_launcher.py` |
| C | Split download-gate timing into two log lines: CJ ACK latency +
server download duration | `nvflare/client/flare_agent.py` |
| D | `conn_manager.py` shutdown race: set `stopped=True` before
executor shutdown; wrap `executor.submit()` in `try/except RuntimeError`
| `nvflare/fuel/f3/sfm/conn_manager.py` |
| E | `_Transaction` tracks `start_time` + `total_bytes`;
`transaction_done()` logs elapsed and MB transferred |
`nvflare/fuel/f3/streaming/download_service.py` |
| F | `logging.captureWarnings(True)` in `apply_log_config()` so
`ResourceWarning` / `DeprecationWarning` reach file handlers |
`nvflare/fuel/utils/log_utils.py` |
| G | `try/finally: sess.close()` in `internal_submit_job()` — fix gRPC
session resource leak | `nvflare/tool/job/job_cli.py` |

---

The following issues were identified during local PR review and resolved
in follow-up commits.

(`server_ctl.py`)
**Original approach**: After the configure phase, failed clients were
removed from `participating_clients`, `result_clients`, and
`client_statuses`, a membership-update broadcast was sent, and
`starting_client` was reselected if the original starter had failed (see
R6).

**Rolled back**: The pruning logic was reverted because removing clients
from `participating_clients` mid-startup introduced round-deadlock
scenarios (Gatherer's expected-response set diverged from the live
client set across rounds) and the `starting_client` reselection added
complexity that masked the root fault. The `progress_timeout` sentinel
bug (R5) was also a direct consequence of the pruning code.

**Current behavior**: If `configured_count >= min_clients`, the workflow
proceeds. Failed clients that did not configure are logged as a warning
— they remain in `participating_clients` and may rejoin in a later
round. If `configured_count < min_clients`, the workflow panics as
before.

**Files**: `nvflare/app_common/ccwf/server_ctl.py`

`ImportError` (`client_api_launcher_executor.py`)
**Problem**: Both lazy imports inside `_validate_timeout_config` could
raise `ImportError` in constrained environments, crashing `initialize()`
and aborting the job start.

**Fix**: Imports wrapped in `try/except ImportError` with a
`log_warning` fallback.

`forward_pass_through` (`swarm_client_ctl.py`)
**Problem**: The guard in `_end_gather()` called
`_from_shareable(aggr_result)` unconditionally on every aggregation
round, deserializing the full aggregated DXO (with all weight data) just
to inspect types — wasted work in the common non-PASS_THROUGH case.

**Fix**: Guard is now gated behind `if self.forward_pass_through`. Also
removed the redundant local `ReservedHeaderKey` import (already at
module level).

`_round_count` → `_cj_round_count` (`client_api_launcher_executor.py`)
`finalize()` only called `super().finalize()` and was deleted.
`_round_count` renamed to `_cj_round_count` to avoid a naming collision
with `APISpec._round_count`.

`min_clients > 0`)
This issue was a direct consequence of the R1 pruning logic
(`overall_last_progress_time = now` made the sentinel equal to current
time). Resolved by rolling back the pruning in R1 — the
`progress_timeout` sentinel is unaffected by the current warning-only
approach.

This issue existed only when failed clients were pruned from
`participating_clients`. Since pruning was rolled back in R1, the
`starting_client` is always one of the configured participants and no
reselection is needed.

`forward_pass_through` attribute
**Problem**: The R4 fix gated the `LazyDownloadRef` guard in
`_end_gather()` behind `if self.forward_pass_through`, but
`_make_controller()` in `test_swarm_memory_gc.py` used `__new__` and
never set the attribute — 7 tests failed with `AttributeError`.

**Fix**: Added `ctrl.forward_pass_through = False` to the stub.

job init (`swarm_server_ctl.py`)
**Problem**: `SwarmServerController.__init__` defaulted
`min_clients=None` and passed it directly to
`ServerSideController.__init__()`, which performs `if min_clients < 0:`.
In Python 3, `None < 0` raises `TypeError` — crashing job initialization
whenever `SwarmServerController` is instantiated without an explicit
`min_clients` (e.g. from a JSON config that omits the field).

**Fix**: Changed the default to `min_clients: int = 0` to match
`ServerSideController`'s own default.

**Files**: `nvflare/app_common/ccwf/swarm_server_ctl.py`

| Fix | Test file | Tests |
|-----|-----------|-------|
| Fix 14 (reverse PASS_THROUGH header) |
`tests/unit_test/fuel/f3/cellnet/test_pass_through_header.py` | 11 |
| Fix 15 (MSG_ROOT_TTL) |
`tests/unit_test/app_common/ccwf/test_msg_root_ttl.py` | 6 |
| Fix 16 (download gating) |
`tests/unit_test/client/test_download_complete_gating.py` | 12 |
| Fix 12a/b (logging + RSS tags) |
`tests/unit_test/client/test_logging_and_rss_tags.py` | 16 |
| Infra B (subprocess log passthrough) |
`tests/unit_test/app_common/launchers/subprocess_launcher_test.py` | 2 |
| Infra A (min_clients fault tolerance) |
`tests/unit_test/app_common/ccwf/test_server_ctl_min_clients.py` | 30 |

All existing unit tests pass; no regressions.

1. Reverse path mirrors forward path: `subprocess -> CJ -> aggregator`.
2. The subprocess sets PASS_THROUGH on its outgoing pipe message, so CJ
receives proxy/lazy refs first.
3. CJ then forwards/submits toward the aggregator in Swarm.
4. In Swarm, aggregator may be remote CJ, same node CJ, or same-process
CJ logic path, but subprocess is always a different process from CJ.
5. Aggregation must operate on resolved arrays (not unresolved lazy
refs).

---

- **Style**: Rename `pt` → `passthrough` variable in `cell.py`
(YuanTingHsieh review)
- **Style**: Remove `_` prefix on `from_shareable` import in
`swarm_client_ctl.py` (nvidianz review)
- **Lint**: Fix all 19 flake8 F401 unused-import warnings across
integration and unit test files
- **Naming**: Rename `test_via_downloader_fix9.py` →
`test_via_downloader_download_timeout.py`

- **YT1**: `LazyDownloadRef` guard in `_end_gather()` — change from
`log_error` + recover to `system_panic` + return (agreed, pending
implementation)
- **YT2**: `MSG_ROOT_TTL > task.timeout` leaves DownloadService
transaction alive after send failure — bounded resource leak, tracked as
follow-up
- **YT**: `min_clients` naming collision (job-scheduler vs
fault-tolerance quorum) — needs design decision before H4/H5 wiring
- **YT**: `progress_timeout` interaction with `min_clients` dropout
tolerance — follow-up item
- **nvidianz**: `_decomposer_prefix()` in PT executor — confirmed used
(base class override for `_validate_timeout_config`), keeping
- **nvidianz**: `transaction_done()` info log verbosity — one log per
completed model download; discussing gating strategy
- **nvidianz**: Consolidate timeout params — review which can be
combined or derived from each other

**31 non-test production files** changed in this PR:

**Core APIs (4)**
- `nvflare/apis/fl_constant.py` — `ConfigVarName.MIN_DOWNLOAD_TIMEOUT`
- `nvflare/apis/impl/task_controller.py` — pre-set `MSG_ROOT_TTL`
preservation
- `nvflare/apis/shareable.py` — `ReservedHeaderKey.PASS_THROUGH`
- `nvflare/fuel/f3/cellnet/defs.py` — `MessageHeaderKey.PASS_THROUGH`

**Swarm / CCWF (5)**
- `nvflare/app_common/ccwf/ccwf_job.py` — `min_clients` wiring
- `nvflare/app_common/ccwf/common.py` — membership update topic
- `nvflare/app_common/ccwf/server_ctl.py` — `min_clients` fault
tolerance (warning-only on partial dropout; pruning rolled back)
- `nvflare/app_common/ccwf/swarm_client_ctl.py` — Fix 15/17/18,
`_resolve_lazy_refs`, forward pass-through
- `nvflare/app_common/ccwf/swarm_server_ctl.py` — `min_clients`
forwarding; default fixed to `int = 0`

**Executors / Launchers (3)**
- `nvflare/app_common/executors/client_api_launcher_executor.py` — Fix
3/7/8/10/12/13/16
- `nvflare/app_common/launchers/subprocess_launcher.py` — direct stdout
passthrough (no re-logging)
- `nvflare/app_opt/pt/client_api_launcher_executor.py` — H2 param
forwarding

**Recipe (1)**
- `nvflare/app_opt/pt/recipes/swarm.py` — recipe params

**Client (5)**
- `nvflare/client/config.py` — `submit_result_timeout`, `max_resends`,
`download_complete_timeout`
- `nvflare/client/constants.py` — config key constants
- `nvflare/client/ex_process/api.py` — Fix 12a/12b/14/16
- `nvflare/client/flare_agent.py` — Fix 16 download gating
- `nvflare/client/in_process/api.py` — Fix 12b RSS tags

**Fuel / Streaming (5)**
- `nvflare/fuel/f3/streaming/cacheable.py` — Fix 4 `base_obj` release
- `nvflare/fuel/f3/streaming/download_service.py` — byte accounting,
transaction logging
- `nvflare/fuel/utils/fobs/__init__.py` —
`FOBSContextKey.DOWNLOAD_COMPLETE_CB`
- `nvflare/fuel/utils/fobs/decomposers/via_downloader.py` — Fix 9/16
- `nvflare/fuel/utils/memory_utils.py` — GC logging

**Fuel / Pipe (3)**
- `nvflare/fuel/utils/pipe/cell_pipe.py` — Fix 1/6/8/14
- `nvflare/fuel/utils/pipe/pipe.py` — `release_send_cache` interface
- `nvflare/fuel/utils/pipe/pipe_handler.py` — Fix 2 cache teardown

**Fuel / Cell & Infra (3)**
- `nvflare/fuel/f3/cellnet/cell.py` — Fix 14 per-message PASS_THROUGH
- `nvflare/fuel/f3/sfm/conn_manager.py` — shutdown race fix
- `nvflare/fuel/utils/log_utils.py` — `captureWarnings`

**Private / Other (2)**
- `nvflare/private/aux_runner.py` — PASS_THROUGH header propagation
- `nvflare/tool/job/job_cli.py` — session leak fix

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…IA#4250)

### Fix SimEnv to be compatible with strict simulator_run API

The simulator_run API originally rejected any call that provided both
named clients and n_clients, since they are redundant — if you've named
your clients, the count is already known.

This PR fixes SimEnv properly instead:

- sim_env.py: Fix SimEnv.deploy() to pass n_clients only when
self.clients is None (i.e., for ALL_SITES expansion). When explicit
client names are given, n_clients is omitted.
- fed_job_test.py: Update unit tests to reflect the restored strict
behavior.

The root cause was that SimEnv always passed both n_clients and clients
to simulator_run. The correct fix is in SimEnv, not in relaxing the API
contract.

### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Fixes NVIDIA#4244.

### Description

## Problem

When using a dict model config with `SimpleSwarmLearningRecipe`, the
exported `config_fed_client.json` contained an empty `args: {}` for the
persistor's model, losing all constructor arguments (e.g.
`model_name_or_path` for HuggingFace models).

Root cause: introduced in NVIDIA#4130, `_instantiate_model_from_dict` eagerly
instantiated the model from the dict and passed the live `nn.Module` to
`PTFileModelPersistor`. The job serializer (`_get_args`) then
introspected the live object and could not recover the original
constructor args (especially for HuggingFace models where args are
buried in internal config, not stored as plain instance attributes).

## Fix

Remove `_instantiate_model_from_dict` and pass the normalized dict
`{"path": "...", "args": {...}}` directly to `PTFileModelPersistor`. The
persistor already supports dict config and resolves it to an `nn.Module`
at runtime via `instantiate_class()` — no eager instantiation needed at
recipe construction time.

Side benefit: large models (e.g. LLaMA-8B) are no longer loaded into
memory just to export a job config.

## Changes

- `nvflare/app_opt/pt/recipes/swarm.py`: remove
`_instantiate_model_from_dict`, unused `importlib` import, and the
`model_instance` indirection — pass `model` dict directly to
`PTFileModelPersistor`

## Test


`TestSimpleSwarmLearningRecipeExport.test_export_preserves_dict_model_args_in_client_config`
— exports the job and asserts the persistor's model args are present in
the JSON config. This test would have failed before this fix.


### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.
Fix: Graceful handling when `executorch` is not installed (ET FedBuff)

## Problem

`ETFedBuffRecipe` and `ETTaskProcessor` imported `executorch` at module
level. Any environment without `executorch` installed would get a raw
`ModuleNotFoundError` deep in the import chain — with no indication of
what to install or why.

## Changes

**`nvflare/edge/models/model.py`**
- Replace top-level `from executorch.exir import to_edge` with
`optional_import(...)` — defers the error to call time so importing
`DeviceModel` no longer requires `executorch`

**`nvflare/edge/simulation/et_task_processor.py`**
- Same pattern for `_load_for_executorch_for_training_from_buffer` and
`get_sgd_optimizer` from `executorch.extension.training`

**`nvflare/edge/tools/et_fed_buff_recipe.py`**
- Add explicit guard in `ETFedBuffRecipe.__init__()`: checks
`importlib.util.find_spec("executorch")` and raises
`ImportError("ETFedBuffRecipe requires executorch. See installation
instructions:
https://pytorch.org/executorch/stable/getting-started-setup.html")` —
fires early with actionable guidance instead of a cryptic traceback

**`tests/unit_test/recipe/edge_recipe_test.py`**
- Add `TestETFedBuffRecipeSimBasic` — basic initialization test,
`@pytest.mark.skipif` when `executorch` is absent
- Add
`TestETFedBuffRecipeWithoutExecutorch.test_raises_import_error_without_executorch`
— mocks `find_spec` to return `None` so the negative test always runs
regardless of environment; asserts `ImportError` with `"executorch"` in
the message

## Notes

- HE is intentionally not supported in `SimEnv` — `PocEnv(use_he=True)`
is the correct local environment for HE workflows (it has full
provisioning that generates TenSEAL context files)


### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.
update example readme

update example readme, remove non-existed links

<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
… validate hang, round_timeout (NVIDIA#4270)

Fixes four bugs in NVFlare 2.7.2 RC12 affecting Swarm Learning with
`launch_external_process=True`
Implementation Reference @GeorgeWang-nv's original PR
NVIDIA#4263
Bug 1  -- different implementation from PR4263
Bug 2 -- accept the implementation approach with small changes from PR
4263
Bug 3 -- use different approach
Bug 4 -- add timeout to swarm API but consolidate the timeouts from 4 to
2 and expose to Swarm Recipes
Bug 5 -- restore `self._max_resends` (private convention); subprocess
logging fixed (see below)
Bug 6 -- guard ConnManager against RuntimeError after shutdown; 4
regression tests added

**Additional (review feedback):**
- Rename `SimpleSwarmLearningRecipe` → `SwarmLearningRecipe` everywhere;
backward-compat alias kept
- Fix docs: add missing required `min_clients` arg to all
`SwarmLearningRecipe` examples (TypeError for users copying doc
examples)
- CSE inventory scan: take LAST "best" key, not first — prevents initial
checkpoint named "best_*" from shadowing trained best model
- Add `pipe_type` / `pipe_root_path` to `SwarmLearningRecipe` so users
can select `FilePipe` without dropping to low-level API

(`via_downloader.py`)

**Root cause**: `_create_downloader()` subscribed to msg_root deletion
to clean up download transactions. The msg_root is deleted as soon as
blob envelopes are delivered, but `blob_cb` fires asynchronously —
secondary tensor downloads were still in flight when
`delete_transaction()` removed the ref from `_ref_table`, causing `"no
ref found" FATAL_SYSTEM_ERROR` on large models (≥1B parameters).

**Fix**: Remove `subscribe_to_msg_root` from `_create_downloader()`
entirely. Transaction lifecycle is now managed solely by `_monitor_tx()`
in `download_service.py`, which polls `is_finished()` every 5s and
cleans up only after all chunk downloads complete.

(`cse_client_ctl.py`)

**Root cause**: `_prepare_local_model()` called
`submit_model_executor.execute()` which, for
`ClientAPILauncherExecutor`, launches a fresh subprocess with no trained
model state. The training subprocess had already exited; the model was
already saved to disk by `PTFileModelPersistor`.

**Fix**: Try the persistor first. Inventory key scan prefers the LAST
key containing `"best"` when `model_name` contains `"best"` — taking the
last "best" key avoids mistaking an initial checkpoint named `"best_*"`
(added first by `PTFileModelPersistor`) for the trained best model.
`isinstance` guard replaces `assert isinstance` (safe under `python
-O`). Falls back to executor for in-process mode compatibility.

(`flare_agent.py` + `via_downloader.py`)

**Root cause**: `FlareAgent._do_submit_result()` unconditionally waited
1800s for `DOWNLOAD_COMPLETE_CB` after every result send when
`pass_through_on_send=True`. For validate results (metrics only, no
tensors), `_finalize_download_tx()` creates no download transaction and
the callback never fires — subprocess blocks indefinitely.

**Fix**: Thread-local `was_download_initiated()` flag, set by
`_finalize_download_tx()` only when downloadable objects exist. Agent
returns immediately if `False` (validate result). Thread-local is
required because task pipe and metric pipe share the same `CoreCell`
(same `site_name + token + mode` → same FQCN → same `_CellInfo` cache
entry → same `fobs_ctx`); a plain flag would be clobbered by concurrent
metric serialisation from a different thread.

(`swarm.py`)

**Root cause**: `SwarmClientConfig` hardcodes
`learn_task_ack_timeout=10s` and `final_result_ack_timeout=10s`. For
large models (≥2 GB), P2P tensor streaming takes minutes — the ACK times
out before download completes.

**Fix**: Add `round_timeout: float = 3600` to `SwarmLearningRecipe`.
Wires both `learn_task_ack_timeout` and `final_result_ack_timeout`;
`learn_task_timeout` is intentionally left `None` (unbounded) to avoid
capping per-round training time on slow hardware or 70B+ models.

(`client_api_launcher_executor.py`)

**Root cause**: `ClientAPILauncherExecutor.__init__()` stored
`max_resends` as `self._max_resends` but `TaskExchanger` (the parent)
uses `self.max_resends`. `PipeHandler` reads `self.max_resends` and saw
`None` (the parent default) instead of the configured value.

**Fix**: Restore `self._max_resends` as the private attribute throughout
`ClientAPILauncherExecutor` (consistent with private-member convention);
the executor builds its own config dict explicitly so it does not rely
on the inherited attribute.

`ex_process/api.py`)

**Problem**: The subprocess had no logging configuration —
`logger.info()` calls were silently dropped. Wrapping all stdout via
`logger.info()` in the parent caused double-prefixed entries in
`log.txt` for NVFlare-formatted log lines.

**Fix**: `_configure_subprocess_logging()` in `ex_process/api.py` loads
the site's `log_config.json` unchanged, giving the subprocess identical
loggers to the parent (both consoleHandler + file handlers). The
parent's `log_subprocess_output()` now calls `_route_subprocess_line()`
which strips ANSI codes and checks for a `YYYY-MM-DD HH:MM:SS`
timestamp:
- Formatted NVFlare log line → `print()` to terminal only (file handler
already wrote it to `log.txt`)
- Raw `print()` from user training script → `logger.info()` so it
reaches `log.txt`

Regression tests:
`test_log_subprocess_output_formatted_lines_not_double_logged` + 5
`TestRouteSubprocessLine` tests.

(`conn_manager.py`)

**Root cause**: `process_frame_task()` and `start_connector()` could
raise unhandled `RuntimeError` when called after the executor was shut
down, causing noisy tracebacks during job teardown.

**Fix**: Guard `conn_mgr_executor.submit()` with try/except
`RuntimeError` (log debug, skip). Add `if self.stopped: return` early
exit in `process_frame_task()`.

**Regression tests**: 4 new tests in
`test_conn_manager_shutdown_race.py`.

`SwarmLearningRecipe` always created a `CellPipe`; users needing
`FilePipe` (restricted networks, third-party integrations) had to drop
to the low-level `CCWFJob` + `SwarmClientConfig` API.

**Change**: Add `pipe_type: str = "cell_pipe"` and `pipe_root_path:
Optional[str] = None`:
- `"cell_pipe"` (default): unchanged — zero-copy PASS_THROUGH
forwarding, ~1 GB RAM overhead
- `"file_pipe"`: `FilePipe` created with `root_path` defaulting to
`{WORKSPACE}/pipe` (resolved at runtime); custom absolute path validated
to exist at recipe construction time
- Warnings for `pipe_root_path` ignored with `cell_pipe`, and
`file_pipe` with `launch_external_process=False`
- `ScriptRunner.__init__` gains `task_pipe` parameter forwarded to
`BaseScriptRunner`

9 new tests in `TestSwarmLearningRecipePipeType`.

| File | Bug | Change |
|------|-----|--------|
| `nvflare/fuel/utils/fobs/decomposers/via_downloader.py` | 1 + 3 |
Remove `subscribe_to_msg_root`; add thread-local `_tls`,
`was_download_initiated()`, `clear_download_initiated()` |
| `nvflare/client/flare_agent.py` | 3 | Check `was_download_initiated()`
after `send_to_peer()`; return immediately for validate results |
| `nvflare/app_common/ccwf/cse_client_ctl.py` | 2 | Persistor-first
`_prepare_local_model()` with last-"best"-key inventory scan and
isinstance guard |
| `nvflare/app_opt/pt/recipes/swarm.py` | 4 + rename + pipe | Add
`round_timeout`, `pipe_type`, `pipe_root_path`; rename to
`SwarmLearningRecipe`; backward-compat alias kept |
| `nvflare/job_config/script_runner.py` | pipe | Add `task_pipe` param
to `ScriptRunner`, forwarded to `BaseScriptRunner` |
| `nvflare/app_common/ccwf/recipes/swarm.py` | rename | Export
`SwarmLearningRecipe` alongside `SimpleSwarmLearningRecipe` |
| `nvflare/app_opt/pt/recipes/__init__.py` | rename | Add
`SwarmLearningRecipe` to lazy imports and `__all__` |
| `nvflare/app_common/executors/client_api_launcher_executor.py` | 5 |
Restore `self._max_resends` (private convention) |
| `nvflare/app_common/launchers/subprocess_launcher.py` | logging |
`_route_subprocess_line()`: formatted lines → `print()`, raw lines →
`logger.info()` |
| `nvflare/client/ex_process/api.py` | logging |
`_configure_subprocess_logging()`: apply site log config unchanged (same
handlers as parent) |
| `nvflare/fuel/f3/sfm/conn_manager.py` | 6 | Guard executor submit +
frame processing against post-shutdown RuntimeError |
| `docs/programming_guide/memory_management.rst` | docs | Add missing
`min_clients` to example |
| `docs/user_guide/data_scientist_guide/available_recipes.rst` | docs |
Add missing `min_clients`; rename to `SwarmLearningRecipe` |
| `docs/programming_guide/controllers/client_controlled_workflows.rst` |
docs | Add missing `min_clients` (x2) |
| `docs/programming_guide/timeouts.rst` | docs | Rename to
`SwarmLearningRecipe` |
| `examples/advanced/swarm_learning/swarm_pt/README.md` | 4 + rename |
Add `round_timeout` to config table; rename to `SwarmLearningRecipe` |
| `examples/advanced/swarm_learning/swarm_pt/job.py` | rename | Update
import to `SwarmLearningRecipe` |
| `tests/unit_test/fuel/utils/fobs/test_via_downloader_msg_root.py` | 1
| 4 new tests |
| `tests/unit_test/client/test_download_initiated_gating.py` | 3 | 7 new
tests |
| `tests/unit_test/app_common/ccwf/test_cse_persistor_fallback.py` | 2 |
10 new tests (incl. initial-ckpt-shadowing regression) |
| `tests/unit_test/fuel/f3/sfm/test_conn_manager_shutdown_race.py` | 6 |
4 new regression tests |
| `tests/unit_test/app_common/launchers/subprocess_launcher_test.py` |
logging | 6 new tests for `_route_subprocess_line` and double-logging
prevention |
| `tests/unit_test/recipe/swarm_recipe_test.py` | rename + pipe |
Updated to `SwarmLearningRecipe`; 9 new
`TestSwarmLearningRecipePipeType` tests |
|
`tests/unit_test/app_common/executors/client_api_launcher_executor_test.py`
| 5 | Updated: `_max_resends` assertions |
| `tests/unit_test/recipe/component_config_verification_test.py` |
rename | Updated to `SwarmLearningRecipe` |

- [x] 250+ unit tests pass across all affected modules
- [x] Bug 1: `test_via_downloader_msg_root.py` — verifies
`subscribe_to_msg_root` never called, method removed,
`DOWNLOAD_COMPLETE_CB` unaffected
- [x] Bug 2: `test_cse_persistor_fallback.py` — verifies persistor-first
path, last-"best"-key preference, initial-ckpt-shadowing regression,
isinstance guard, all fallback paths
- [x] Bug 3: `test_download_initiated_gating.py` — verifies thread
isolation, validate returns immediately (<1s), train waits,
clear-before-send
- [x] Bug 4: covered by existing recipe and ccwf tests
- [x] Bug 5: `client_api_launcher_executor_test.py` updated
(`_max_resends` assertions)
- [x] Bug 6: `test_conn_manager_shutdown_race.py` — 4 tests:
executor-shutdown no-raise, stop() no-raise, stopped early-return,
Prefix.from_bytes not called
- [x] Logging: `subprocess_launcher_test.py` — double-logging
prevention, ANSI stripping, plain-line capture, partial-timestamp
detection
- [x] Pipe type: `swarm_recipe_test.py` — 9 tests: default cell_pipe,
file_pipe instance, workspace template, custom path, invalid type,
warnings, path validation
- [x] Rename: backward-compat `SimpleSwarmLearningRecipe` import test
passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Make integration tests' install_requirements more robust

<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.

---------

Co-authored-by: Peter Cnudde <pcnudde@nvidia.com>
## Fix: subprocess cell torn down before server tensor download
(large-model deadlock)

### Problem

`big_model_4g` CI test reliably failed with:

```
RuntimeError: failed to download from site-X_active
```

after 1–3 hours of retry loops (300 s SubmitUpdate timeout × 3 download
retries).

**Root cause 1 — ordering bug in `_finalize_external_execution()`:**

The subprocess client API works by keeping the subprocess's CellNet
connection alive
after it sends its result, so the server can pull large tensors directly
from the
subprocess's `DownloadService` (the "reverse PASS_THROUGH" path). To
prevent the
subprocess from exiting before the download completes,
`_do_submit_result()` blocks
on a `download_done.wait(1800 s)` event that fires only after the server
has
finished downloading all tensors.

However, `_finalize_external_execution()` called `stop_task()`
**synchronously**,
which sends `SIGTERM` to the subprocess immediately after `execute()`
receives the
result — before `ClientRunner` had even sent `SubmitUpdate` to the
server, let alone
before the server had started downloading. This tore down the subprocess
cell,
causing every server download attempt to hit `"no connection to
site-X_active"` /
`"cannot forward req: no path"`.

The subprocess-side wait was therefore unreachable: the process was
killed
externally before it could block.

**Root cause 2 — `launch_once=True` subprocess exits after round 1:**

`_do_submit_result()` called `os._exit(0)` unconditionally at the end of
the
`CellPipe + pass_through_on_send` path. For `launch_once=False` (one
subprocess per
round) this is correct — the process should exit immediately after the
download so
the deferred-stop poller unblocks. But for `launch_once=True` (one
subprocess for
all rounds, e.g. `np-loop-cell-pipe`, `pt_client_api_launch_once`), the
subprocess
was killed after round 1, leaving rounds 2–N unhandled and the job
hanging.

---

### Fix

**1. Deferred `stop_task()` (`launcher_executor.py`)**

Instead of calling `stop_task()` synchronously,
`_finalize_external_execution()`
now starts a background thread (when `_stop_task_wait_timeout > 0`) that
polls
`check_run_status()` until the subprocess exits naturally (i.e. after
`download_done` fires and the subprocess unblocks), then calls
`stop_task()`.
`execute()` returns immediately, so `ClientRunner` can send
`SubmitUpdate` and the
server can connect to the still-alive subprocess cell to download
tensors.

**2. Round-boundary coordination (`_deferred_stop_event`)**

Without additional synchronization, the deferred thread from round N
could fire
*after* round N+1's `launch_task()` call. Since `SubprocessLauncher`
guards
`_start_external_process()` with `if self._process is None`, seeing a
not-yet-cleared
reference to the exited round-N process causes it to skip starting a new
subprocess.
Round N+1 then sees `COMPLETE_SUCCESS` immediately and fails with
`"External process has not called flare.init and run status becomes
success"`.

A `threading.Event` (`_deferred_stop_event`, initially set) coordinates
the two:

- **Cleared** in `_finalize_external_execution()` just before the
deferred thread starts.
- **Set** unconditionally in a `finally` block when the deferred thread
completes.
- `_initialize_external_execution()` **waits** on this event before
calling
  `launch_task()`, with a forced `stop_task()` fallback if it times out.

In practice the wait is 0–1 s (subprocess exits naturally after
download, well
before the server completes global aggregation and dispatches the next
round's task),
so there is no meaningful latency impact.

**3. `ClientAPILauncherExecutor` opt-in
(`client_api_launcher_executor.py`)**

`_stop_task_wait_timeout` is set to `download_complete_timeout` (default
1800 s) in
`ClientAPILauncherExecutor.__init__()`, enabling the deferred path only
for the
subprocess client API where large-tensor downloads are expected. The
base
`LauncherExecutor` defaults to `0.0` (original synchronous behaviour, no
change).

**4. `launch_once`-aware subprocess exit (`flare_agent.py`, `config.py`,
`ex_process/api.py`)**

`prepare_config_for_launch()` now writes `launch_once` (derived from
`launcher.needs_deferred_stop()`) into the subprocess config file. The
subprocess
reads it via `ClientConfig.get_launch_once()` and passes it to
`FlareAgent`.
`_do_submit_result()` branches on `_launch_once`:

| `launch_once` | Behaviour after download gate |
|---|---|
| `False` (one subprocess per round, e.g. `pt-client-api`) |
`os._exit(0)` called directly — deferred-stop poller unblocks
immediately (original behaviour preserved) |
| `True` (one subprocess for all rounds, e.g. `np-loop-cell-pipe`) |
`atexit.register(os._exit, 0)` registered once — subprocess continues to
next round; `os._exit` fires only when `main()` finally returns,
bypassing non-daemon CoreCell thread cleanup |

`_resolve_launch_once()` safely fetches the launcher even when
`self.launcher` is
still `None` at `initialize()` time (resolved directly from the engine
component
registry).

**5. Pipe handler identity guard and safe close (`task_exchanger.py`)**

Pipe handler creation is refactored into `_create_pipe_handler()`, which
binds a
per-handler status callback that checks `self.pipe_handler is _h` before
acting.
This prevents a late `PEER_GONE` from round N's (now-stale) handler from
stopping
round N+1's handler. `stop(close_pipe=False)` is used because
`CellPipe.close()`
is irreversible — closing it in the status callback would prevent the
next round
from communicating. An explicit `self.pipe.close()` is added in
`END_RUN` instead.

**6. Defensive logging and heartbeat error handling
(`pipe_handler.py`)**

`_send_to_pipe` now logs `asked_to_stop` and `abort_triggered` status
when a send
is suppressed. Peer-gone detection logs the elapsed time since the last
heartbeat.
Heartbeat sends are wrapped in `try/except` so a broken pipe sets
`asked_to_stop`
and breaks the heartbeat loop cleanly instead of propagating an
unhandled exception.

**7. New unit tests (`test_download_complete_gating.py`,
`test_download_initiated_gating.py`)**

Two new test files covering the subprocess download-gating behaviour
(the
`download_done` wait, the validate-path fast return, and `launch_once`).
Both use `FlareAgent.__new__()` to construct minimal agent
stubs. To prevent `os._exit(0)` from killing the pytest-xdist worker:

- An `autouse` `_no_os_exit` fixture patches
`nvflare.client.flare_agent.os._exit`
  to a no-op for every test in the file.
- `_make_agent()` sets `agent._launch_once = False` (the per-round path
that calls
  `os._exit` directly, making the fixture's patch the active guard).

---

### Files changed

| File | Change |
|------|--------|
| `nvflare/app_common/executors/launcher_executor.py` | Deferred
`stop_task()` background thread + `_deferred_stop_event` round-boundary
coordination |
| `nvflare/app_common/executors/client_api_launcher_executor.py` | Set
`_stop_task_wait_timeout = download_complete_timeout`; add
`_resolve_launch_once()`; write `LAUNCH_ONCE` to subprocess config |
| `nvflare/app_common/abstract/launcher.py` | `needs_deferred_stop()`
abstract method + idempotency/thread-safety note on `stop_task()` |
| `nvflare/app_common/launchers/subprocess_launcher.py` | Implement
`needs_deferred_stop()`; add info logging for process start/stop |
| `nvflare/app_common/executors/task_exchanger.py` | Refactor pipe
handler creation into `_create_pipe_handler()` with identity-checking
callback; use `close_pipe=False` to prevent irreversible
`CellPipe.close()` |
| `nvflare/app_common/widgets/metric_relay.py` | Include `msg.data` in
pipe status log message |
| `nvflare/fuel/utils/pipe/pipe_handler.py` | Enhanced logging (send
failures, peer-gone elapsed time); heartbeat send error handling |
| `nvflare/client/config.py` | Add `LAUNCH_ONCE` config key +
`get_launch_once()` |
| `nvflare/client/flare_agent.py` | `launch_once`-aware
`_do_submit_result()`: direct `os._exit` vs `atexit` |
| `nvflare/client/ex_process/api.py` | Pass `launch_once` to
`FlareAgentWithFLModel` |
| `tests/unit_test/client/test_download_complete_gating.py` | **New** —
tests for DOWNLOAD_COMPLETE_CB registration, download-done wait,
timeout, status logging, and cleanup |
| `tests/unit_test/client/test_download_initiated_gating.py` | **New** —
tests for thread-local download-initiation detection (validate-path fast
return, no spurious 1800 s wait) |
|
`tests/unit_test/app_common/executors/client_api_launcher_executor_test.py`
| New tests for deferred stop, `_deferred_stop_event`,
`_stop_task_wait_timeout` |



### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.
…ad corruption (NVIDIA#4297)

Data corruption: CIFAR-10 dataset downloaded concurrently by two clients
at runtime

### Problem

The data_path argument is not passed in the trainer/validator job
configs. The integration test suite pre-downloads CIFAR-10 to
/tmp/nvflare/cifar10_data in a setup step, but because the configs omit
that path, both client processes fall through to the default download
logic at runtime. Two processes writing to the same directory
simultaneously causes data corruption.

### Fix

Pass data_path="/tmp/nvflare/cifar10_data" in the trainer and validator
configs so the pre-downloaded dataset is used directly and no runtime
download occurs.

### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>
…DIA#4309)

## Summary

Adds a **receiver-side opt-in** mechanism so that ext-process Client Job
(CJ) cells automatically decode incoming model tensors as
`LazyDownloadRef` placeholders. The subprocess then downloads tensors
**directly from the source** (server in FedAvg, aggregator in Swarm) via
CellNet routing, completely bypassing materialisation inside the CJ.

Previously, the forward-path PASS_THROUGH was controlled by a
sender-side flag (`forward_pass_through` on `SwarmClientController`),
which required the sender to know the receiver's execution mode — unsafe
in mixed deployments and inapplicable to FedAvg. This PR replaces that
with a receiver-decides architecture: the CJ knows it is an ext-process
launcher and opts in locally.

## Problem

In ext-process mode, the CJ receives the full model from the
server/aggregator, materialises all tensors into memory, then
re-serialises and sends them to the subprocess. For large models (e.g.
Llama 8B at ~16 GB), this doubles peak CJ memory and adds unnecessary
serialisation latency.

The existing `forward_pass_through` flag on `SwarmClientController` was:
- **Redundant** for ext-process receivers (receiver-side opt-in handles
it)
- **Dangerous** for in-process receivers (forces LazyDownloadRef on
nodes that can't handle it)
- **Inapplicable** to FedAvg (no SwarmClientController involved)

## Solution

### Core mechanism (3 changes in `cell.py`)

- `Cell.__init__()`: add `self.decode_pass_through = False` flag
- `Adapter.call()`: check `self.cell.decode_pass_through` for incoming
REQUESTs (Swarm AUX path)
- `Cell._send_one_request()`: check `self.decode_pass_through` on reply
decode (FedAvg GET_TASK path)

### Trigger (1 change in `client_api_launcher_executor.py`)

- `ClientAPILauncherExecutor.initialize()`: set
`cell.decode_pass_through = True` when pipe is `CellPipe` (FilePipe has
no CellNet cell for subprocess downloads, so it stays off)

### Flag removal (changes in `swarm_client_ctl.py`)

- Remove `forward_pass_through` parameter, field, and sender-side header
stamping
- Add `_has_lazy_refs()` static method for data-driven detection
- Replace all flag-based guards with `_has_lazy_refs()` checks at 3
sites:
  - `_scatter()` local queue resolution
  - `_end_gather()` defensive panic check
  - `_do_learn()` GLOBAL_MODEL resolution

### Cleanup

- Remove dead `CellPipe` import from `task_exchanger.py`

## Data flow (FedAvg ext-process)

```
Server                CJ cell              Subprocess cell
  │                     │                      │
  │── model reply ──►   │                      │
  │                  decode with               │
  │                  PASS_THROUGH=True         │
  │                  → LazyDownloadRef         │
  │                     │                      │
  │                  CellPipe.send()           │
  │                  FOBS encode:              │
  │                  LazyDownloadRefDecomposer │
  │                  re-emits server datum     │
  │                     │── cell msg ────────► │
  │                     │                   Adapter.call()
  │                     │                   PASS_THROUGH=False
  │                     │                   ViaDownloaderDecomposer
  │                     │                   _download_from_remote_cell()
  │◄── download chunks ─┼──────────────────── │
  │── tensor bytes ────►┼─────────────────►   │
  │                     │                   real tensors in shareable
```

The CJ never materialises the full model. The subprocess downloads
directly from the server.

## Coverage matrix

| Scenario | CJ behavior | Subprocess behavior |
|---|---|---|
| FedAvg + CellPipe (ext-process) | `decode_pass_through=True` →
LazyDownloadRef | Downloads from server via CellNet |
| FedAvg + FilePipe (ext-process) | `decode_pass_through=False` → real
tensors | Receives real tensors via file |
| FedAvg in-process | `decode_pass_through=False` → real tensors | N/A
(same process) |
| Swarm ext-process | `decode_pass_through=True` → LazyDownloadRef |
Downloads from aggregator via CellNet |
| Swarm in-process | `decode_pass_through=False` → real tensors | N/A
(same process) |
| Mixed deployment | Each CJ decides independently | No sender-side
knowledge needed |

## Files changed

| File | Change |
|---|---|
| `nvflare/fuel/f3/cellnet/cell.py` | `decode_pass_through` flag +
checks in `Adapter.call()` and `_send_one_request()` |
| `nvflare/app_common/executors/client_api_launcher_executor.py` | Set
flag when pipe is CellPipe |
| `nvflare/app_common/ccwf/swarm_client_ctl.py` | Remove
`forward_pass_through`, add `_has_lazy_refs()`, update 3 guard sites |
| `nvflare/app_common/executors/task_exchanger.py` | Remove dead
`CellPipe` import |
| `tests/unit_test/app_common/ccwf/test_swarm_forward_memory_path.py` |
Rewritten for data-driven detection tests |
| `tests/unit_test/app_common/ccwf/test_swarm_self_message_deadlock.py`
| Remove `forward_pass_through` reference |
| `tests/unit_test/app_common/ccwf/test_swarm_memory_gc.py` | Remove
`forward_pass_through` reference |
| `tests/unit_test/app_common/ccwf/test_msg_root_ttl.py` | Remove
`forward_pass_through` reference |
| `tests/unit_test/app_common/ccwf/test_lazy_ref_local_aggr.py` | Remove
`forward_pass_through` reference |



## Known limitations

- `_has_lazy_refs()` only recurses into `dict`/`list`/`tuple`. If an
`FLModel` object (via `FLModelDecomposer`) carries `LazyDownloadRef` in
its `.params` dict, the check will miss it. Current Swarm uses DXO
(stored as plain nested dicts), so this is not an issue today.


### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.
…IA#4307)

Fix FilePipe heartbeat send timeout too short for heavy aggregation
workloads

### Summary
Increase FilePipe heartbeat send timeout from 5s to 600s in file_pipe.py

### Background
FilePipe implements heartbeats as file-based request-reply: the sender
creates a heartbeat file and waits up to N seconds for the receiver to
delete it. If the receiver doesn't read it within N seconds, the sender
deletes the file itself and the heartbeat is silently dropped.

The previous default of 5s was too short for nodes running as aggregator
in CCWF Swarm with large models (e.g. 8B parameters). During inter-round
aggregation, P2P model streaming (~15GB per client) causes GIL
contention and filesystem pressure that delays the receiver's
PipeHandler._read thread beyond 5s. Heartbeat files were deleted before
being read, causing _last_heartbeat_received_time to stall and
eventually triggering false PEER_GONE after heartbeat_timeout (30s).

### Why 600s is safe
_monitor_file returns immediately when the receiver deletes the file —
600s is only the ceiling if the receiver never reads it. Normal
operation is unaffected. Genuine peer death is detected independently by
PipeHandler.heartbeat_timeout on the receiver side, which is unrelated
to this send-side timeout.

### Notes
This fix applies to FilePipe only. CellPipe (used in all current
production configs) is not affected — it sends heartbeats via
fire_and_forget with no send-side timeout.

### Future Enhancements
- Make the timeout configurable: expose heartbeat_send_timeout as a
FilePipe.__init__ parameter so users running extremely large models or
slow storage can tune it without touching source code.
- Enforce the invariant FilePipe.heartbeat_send_timeout ≥
PipeHandler.heartbeat_timeout: currently these two values are configured
independently and silently break each other if mismatched. The right fix
is for PipeHandler to pass its heartbeat_timeout down to the pipe at
start() time, making the relationship explicit and impossible to
misconfigure.


### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.
…VIDIA#4312)

### Problem
Receiver-side PASS_THROUGH does not reduce CJ memory for FedAvg. The CJ
still downloads the full global model from the server every round
despite the log showing the feature is activated.

Root cause: `ClientAPILauncherExecutor.initialize()` registered
`get_pipe_channel_name() ("task")` into
`cell.decode_pass_through_channels`, but GET_TASK replies arrive on
`CellChannel.SERVER_COMMAND ("server_command")`. The check `if channel
in self.decode_pass_through_channels in Cell._send_one_request()` never
matched, so replies were always fully decoded instead of kept as
LazyDownloadRef.

### Fix
Register CellChannel.SERVER_COMMAND instead of the pipe channel name.
This is the CellNet channel that the communicator uses for
`send_request(channel=CellChannel.SERVER_COMMAND, topic=GET_TASK)`, so
the pass-through check now matches correctly.

### Changes
- client_api_launcher_executor.py: replace self.get_pipe_channel_name()
with CellChannel.SERVER_COMMAND
- Added unit tests verifying SERVER_COMMAND is registered on
initialize() and deregistered on finalize()

### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.
…acement mid-transaction (NVIDIA#4314)

### Problem

BEFORE_TASK_EXECUTION is a global event broadcast to all FLComponents.
In a CCWF Swarm aggregator node, receiving swarm_report_learn_result aux
tasks from other sites while the local subprocess is training fires this
event concurrently. This causes TaskExchanger.handle_event() to stop and
recreate the PipeHandler while execute() is blocked in its polling loop
— the old handler (that the subprocess is writing to) is orphaned, and
the polling loop reads from the new empty handler forever. Silent
deadlock, no error, no PEER_GONE, no timeout.

Affects all pipe types (FilePipe, CellPipe). Independent of the FilePipe
TOCTOU fix in NVIDIA#4296.

### Fix

- Add threading.Event _executing as a guard flag:

handle_event(BEFORE_TASK_EXECUTION) skips handler replacement when
_executing.is_set()
- TaskExchanger.execute() uses an ownership pattern so super().execute()
from LauncherExecutor does not prematurely clear the flag
- LauncherExecutor.execute() sets the flag at the very top — before
_initialize_external_execution() — covering the up-to-60 s
_wait_external_setup() window

### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Copilot AI review requested due to automatic review settings March 17, 2026 20:37
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Cherry-picks a broad set of NVFlare 2.7 fixes/enhancements into this branch, primarily focused on ext-process (subprocess) Client API reliability/performance for large-model transfers, along with recipe/simulator correctness, optional dependency handling (executorch), logging, and documentation/test updates.

Changes:

  • Improves ext-process large-model transfer behavior (PASS_THROUGH forwarding, download lifecycle/TTL, retry caching, timeouts) and reduces client/CJ memory pressure.
  • Hardens pipe/streaming infrastructure (FilePipe TOCTOU, PipeHandler retry cache release, download-service/cacheable lifecycle, ConnManager shutdown races).
  • Updates recipes/examples/docs/tests (SwarmLearningRecipe rename usage, simulator_run strictness tests, executorch optional import UX, new integration/unit validators).

Reviewed changes

Copilot reviewed 130 out of 131 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
versioneer.py Hardens git invocations against ambiguous HEAD pathspec by using -- separation and --verify.
setup.py Loads local versioneer.py via importlib to avoid import path issues.
nvflare/_version.py Removes spurious zero-byte working-tree HEAD file before running git commands.
nvflare/tool/job/job_cli.py Ensures secure session is closed after submit; updates submit output.
nvflare/recipe/sim_env.py Adjusts simulator deploy argument passing for strict simulator_run validation.
nvflare/private/fed/utils/fed_utils.py Extends deploy_map participant extraction to support {"targets": [...]} forms.
nvflare/private/aux_runner.py Propagates PASS_THROUGH header from request into cell message for forwarding.
nvflare/job_config/script_runner.py Adds optional task_pipe injection for external process task exchange.
nvflare/job_config/fed_job_config.py Treats None as serializable arg value when exporting component args.
nvflare/fuel/utils/pipe/pipe_handler.py Removes racy msg_root cleanup; adds send-cache release hooks, BrokenPipe handling, and heartbeat improvements.
nvflare/fuel/utils/pipe/pipe.py Adds HEARTBEAT_SEND_TIMEOUT and a release_send_cache() extension point for pipes.
nvflare/fuel/utils/pipe/file_pipe.py Fixes heartbeat send timeout, TOCTOU races in _get_next, and defers root creation to open().
nvflare/fuel/utils/pipe/cell_pipe.py Adds send serialization caching, PASS_THROUGH/TTL stamping, deferred message conversion, and cache release.
nvflare/fuel/utils/memory_utils.py Logs non-zero gc.collect() results at INFO for visibility.
nvflare/fuel/utils/mem_utils.py Adds opt-in RSS logging utility gated by NVFLARE_CLIENT_MEMORY_PROFILE.
nvflare/fuel/utils/log_utils.py Enables logging.captureWarnings(True) so warnings are routed to handlers.
nvflare/fuel/utils/fobs/decomposers/via_downloader.py Makes min download timeout configurable, removes msg_root subscription, adds thread-local download-initiation flag.
nvflare/fuel/utils/fobs/init.py Adds DOWNLOAD_COMPLETE_CB key for download-complete gating.
nvflare/fuel/f3/streaming/cacheable.py Splits cache clearing vs source release (clear_cache() vs release()), adds defensive guards.
nvflare/fuel/f3/sfm/conn_manager.py Guards executor submissions/processing paths during/after shutdown to avoid RuntimeError races.
nvflare/fuel/f3/cellnet/defs.py Adds MessageHeaderKey.PASS_THROUGH for per-message receiver-side pass-through decode.
nvflare/fuel/f3/cellnet/cell.py Implements receiver-side PASS_THROUGH opt-in by channel or per-message header on decode paths.
nvflare/edge/tools/et_fed_buff_recipe.py Adds early, actionable ImportError if executorch training components are unavailable.
nvflare/edge/simulation/et_task_processor.py Switches to optional imports for executorch training helpers; improves ImportError handling.
nvflare/edge/models/model.py Makes ExecuTorch export optional via optional_import() to avoid import-time failures.
nvflare/client/model_registry.py Adds release_params() to drop large parameter references after send/serialization.
nvflare/client/in_process/api.py Adds RSS logging and eager param release on send(clear_cache=True) to reduce RSS growth.
nvflare/client/flare_agent_with_fl_model.py Adds subprocess-safe converter wiring via _ConverterContext + from/to ParamsConverters.
nvflare/client/ex_process/api.py Adds subprocess logging configuration, reverse PASS_THROUGH enablement, default converter wiring, RSS logging, and param release via ModelRegistry.
nvflare/client/converter_utils.py Introduces shared default converter factory for numpy↔PT / numpy↔Keras conversions.
nvflare/client/constants.py Adds PEER_READ_TIMEOUT config key documentation for job-level tuning.
nvflare/client/config.py Adds new client config keys (expected format, submit_result_timeout, max_resends, download_complete_timeout, launch_once) + parsing/validation.
nvflare/client/api_spec.py Raises memory cleanup log visibility from DEBUG to INFO.
nvflare/app_opt/tf/in_process_client_api_executor.py Uses shared create_default_params_converters() instead of duplicated TF converter wiring.
nvflare/app_opt/tf/client_api_launcher_executor.py Removes CJ-side TF converter wiring (subprocess now owns conversion).
nvflare/app_opt/pt/recipes/init.py Updates recipe exports to SwarmLearningRecipe.
nvflare/app_opt/pt/in_process_client_api_executor.py Uses shared default converter factory for PT in-process executor.
nvflare/app_opt/pt/client_api_launcher_executor.py Exposes/forwards new timeout params and removes CJ-side converter wiring.
nvflare/app_common/widgets/metric_relay.py Lowers stop logging verbosity; improves pipe status log content.
nvflare/app_common/launchers/subprocess_launcher.py Adds stdout routing to avoid double-logging; introduces deferred-stop signaling for launch-once behavior.
nvflare/app_common/executors/task_exchanger.py Adds _executing guard to prevent handler replacement mid-execution; hardens status callback ownership.
nvflare/app_common/ccwf/swarm_server_ctl.py Forwards min_clients into base controller configuration.
nvflare/app_common/ccwf/server_ctl.py Adds min_clients semantics to workflow control-flow and status/progress checks.
nvflare/app_common/ccwf/recipes/swarm.py Updates optional import export to SwarmLearningRecipe.
nvflare/app_common/ccwf/cse_client_ctl.py Persists-first local model loading for ext-process; inventory key selection improvements.
nvflare/app_common/ccwf/common.py Adds membership topic constant and helper.
nvflare/app_common/ccwf/client_ctl.py Reduces end-workflow log verbosity.
nvflare/app_common/ccwf/client_controller_executor.py Reduces end-workflow log verbosity.
nvflare/app_common/ccwf/ccwf_job.py Adds min_clients forwarding and wires learn_task_check_interval.
nvflare/app_common/abstract/launcher.py Adds needs_deferred_stop() API and documents stop_task idempotency/thread-safety contract.
nvflare/apis/shareable.py Adds reserved header __pass_through__ for CJ receiver-side pass-through opt-in.
nvflare/apis/impl/task_controller.py Allows pre-set MSG_ROOT_TTL override instead of always using task.timeout.
nvflare/apis/fl_constant.py Adds ConfigVarName.MIN_DOWNLOAD_TIMEOUT for tuning streaming inactivity timeout.
tests/unit_test/recipe/sim_env_test.py Adds deploy validation test for clients vs n_clients behavior.
tests/unit_test/recipe/edge_recipe_test.py Adds executorch presence/absence tests for ETFedBuffRecipe.
tests/unit_test/recipe/component_config_verification_test.py Updates swarm recipe import/name and required args.
tests/unit_test/private/fed/server/job_meta_validator_test.py Adds deploy_map coverage for targets dict form and invalid-key cases.
tests/unit_test/job_config/fed_job_test.py Adds strict simulator_run validation tests for clients vs n_clients.
tests/unit_test/fuel/utils/pipe/pipe_handler_test.py Adds regression tests for BrokenPipe read handling and heartbeat timeout usage.
tests/unit_test/fuel/utils/memory_utils_test.py Updates gc.collect mocking and adds INFO logging behavior tests.
tests/unit_test/fuel/utils/fobs/test_via_downloader_msg_root.py Validates removal of msg_root subscription and callback removal.
tests/unit_test/fuel/utils/fobs/test_via_downloader_min_timeout.py Locks in new min download timeout default and removal of _on_tx_done.
tests/unit_test/fuel/utils/fobs/test_pass_through.py Adds forward-path regression guards for PASS_THROUGH decode context semantics.
tests/unit_test/fuel/f3/streaming/streaming_test.py Removes shared util constants; uses test-local cell names.
tests/unit_test/fuel/f3/streaming/download_service_test.py Expands transaction_done callback tests and byte accounting guard.
tests/unit_test/fuel/f3/streaming/cacheable_test.py Adds tests for cache/base_obj lifecycle split and idempotency.
tests/unit_test/client/test_model_registry.py Adds unit tests for ModelRegistry.release_params() behavior and safety.
tests/unit_test/client/in_process/api_test.py Adds tests for send() param-release semantics and clear_cache behavior.
tests/unit_test/client/flare_agent_with_fl_model_test.py Tests converter application + fallback to current_task when header absent.
tests/unit_test/app_opt/pt/test_pt_launcher_executor_params.py Ensures PT launcher exposes/forwards new timeout params.
tests/unit_test/app_opt/feature_election/test_conn_manager_shutdown_race.py Adds regression tests for ConnManager shutdown race guards.
tests/unit_test/app_common/launchers/subprocess_launcher_test.py Tests subprocess stdout routing and formatted-line double-logging avoidance.
tests/unit_test/app_common/ccwf/test_swarm_memory_gc.py Adds tests ensuring aggregator GC is logged at INFO when it fires.
tests/unit_test/app_common/ccwf/test_membership_update.py Documents/locks in removal of membership update handler registration.
tests/integration_test/src/validators/log_pattern_validator.py Adds validator for required/forbidden log patterns in client run logs.
tests/integration_test/src/validators/init.py Exposes LogPatternValidator in validators package.
tests/integration_test/src/utils.py Fixes subprocess stdin path to return None after communicate().
tests/integration_test/install_requirements.py Simplifies temp-file write path (removes redundant fd-close wrapper).
tests/integration_test/data/jobs/np_large_model_memory_swarm/app/custom/train_script.py Adds large-model subprocess training script for memory/path validation.
tests/integration_test/data/jobs/np_large_model_memory_swarm/app/custom/np_model_persistor.py Adds synthetic NP persistor for streaming threshold test coverage.
tests/integration_test/data/jobs/np_large_model_memory_fedavg/app/custom/train_script.py Adds fedavg variant large-model memory integration train script.
tests/integration_test/data/jobs/np_large_model_memory_fedavg/app/custom/np_model_persistor.py Adds fedavg variant synthetic NP persistor.
tests/integration_test/data/jobs/big_model_4g/app/config/config_fed_server.conf Tunes tensor_min_download_timeout and increases num_rounds for stability coverage.
tests/integration_test/data/jobs/big_model_4g/app/config/config_fed_client.conf Removes overly-short/ext-process timeouts from integration config.
tests/integration_test/data/apps/pt_init_client/app/config/config_fed_client.json Pins CIFAR10 data_path to avoid concurrent download corruption.
examples/swarm_pt/app/config/config_fed_client.conf Adds swarm PT example config tuned for large-model transfer/timeouts.
examples/hello-world/hello-pt/job.py Adds CLI options for external subprocess execution and client memory GC rounds.
examples/hello-world/hello-lightning/client.py Simplifies trainer device selection via accelerator="auto", devices="auto".
examples/advanced/swarm_learning/swarm_pt/requirements.txt Adds requirements for advanced swarm LoRA example.
examples/advanced/swarm_learning/swarm_pt/prepare_data.py Adds dataset sharding utility for advanced swarm LoRA example.
examples/advanced/swarm_learning/swarm_pt/model.py Adds LoRA wrapper to expose only adapter weights for compact persistence/exchange.
examples/advanced/swarm_learning/swarm_pt/job.py Adds advanced swarm LoRA recipe-based job (export or SimEnv run).
examples/advanced/swarm_learning/swarm_pt/download_data.py Adds pre-download utility for dataset/model caching.
docs/user_guide/data_scientist_guide/available_recipes.rst Updates Swarm recipe name/import and adds note about aliasing.
docs/resources/3rd_party_trainer.py Updates 3rd-party integration example to use explicit CellPipe + FlareAgent.
docs/release_notes/flare_272.rst Notes client-side param release semantics alongside server-side memory management.
docs/programming_guide/timeouts.rst Updates swarm recipe import/name in timeouts guide.
docs/programming_guide/tensor_downloader.rst Replaces disk-offload note with client memory lifecycle guidance.
docs/programming_guide/execution_api_type/3rd_party_integration.rst Updates integration guidance away from FlareAgentWithCellPipe; adds Client API pattern.
docs/programming_guide/controllers/client_controlled_workflows.rst Updates swarm recipe import/name and required min_clients in examples.
.gitignore Ignores spurious working-tree /HEAD file.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +48 to +60
def test_sim_env_deploy_with_explicit_clients_does_not_pass_n_clients():
"""SimEnv.deploy() must not pass n_clients when clients are explicit."""
job = FedJob(name="test_job")
job._deployed = True

env = SimEnv(clients=["site-1", "site-2"])
with patch("nvflare.recipe.sim_env.collect_non_local_scripts", return_value=[]):
with patch.object(job, "simulator_run") as mock_run:
env.deploy(job)

_, kwargs = mock_run.call_args
assert kwargs.get("n_clients") is None
assert kwargs.get("clients") == ["site-1", "site-2"]
Comment on lines +396 to +399
try:
job_id = sess.submit_job(temp_job_dir)
print(f"job: '{job_id} was submitted")
finally:
Comment on lines 719 to 722
.. note::
``SimpleSwarmLearningRecipe`` is also available from the original location for backward compatibility:
``SimpleSwarmLearningRecipe`` is a backward-compatible alias for ``SwarmLearningRecipe``.
It is also available from the original location:
``from nvflare.app_common.ccwf.recipes.swarm import SimpleSwarmLearningRecipe``
Comment on lines +3 to +4
np_download_chunk_size = 2097152 # 20GB
tensor_download_chunk_size = 2097152 # 20GB
Comment on lines +83 to +86
if not os.path.exists(self.root_path):
os.makedirs(self.root_path)
self._remove_root = True

Comment on lines +333 to +339
# Read min_download_timeout from job config so operators can tune
# it per-job (e.g. np_min_download_timeout: 600 for a 70B model).
# Falls back to the module-level constant (60s) when not set.
min_timeout = acu.get_positive_float_var(
self._config_var_name(ConfigVarName.MIN_DOWNLOAD_TIMEOUT),
_MIN_DOWNLOAD_TIMEOUT,
)
Comment thread nvflare/recipe/sim_env.py
Comment on lines 107 to 111
job.simulator_run(
workspace=os.path.join(self.workspace_root, job.name),
n_clients=self.num_clients,
n_clients=self.num_clients if self.clients is None else None,
clients=self.clients,
threads=self.num_threads,
@YuanTingHsieh YuanTingHsieh deleted the cherry-pick-changes-from-27 branch March 17, 2026 21:11
@YuanTingHsieh YuanTingHsieh restored the cherry-pick-changes-from-27 branch March 17, 2026 21:11
@YuanTingHsieh YuanTingHsieh reopened this Mar 17, 2026
@YuanTingHsieh YuanTingHsieh merged commit e55402e into NVIDIA:main Mar 17, 2026
57 checks passed
@YuanTingHsieh YuanTingHsieh deleted the cherry-pick-changes-from-27 branch March 17, 2026 21:47
@YuanTingHsieh YuanTingHsieh restored the cherry-pick-changes-from-27 branch March 17, 2026 23:37
chesterxgchen added a commit that referenced this pull request Mar 31, 2026
### Description

Port missing 2.7 doc content and tensor streaming improvements to main

Several doc changes from 2.7 PRs (#4270, #4278, #4206) were silently
dropped during batch squash cherry-picks (#4327, #4329) due to merge
conflicts on files touched by multiple batches.

#### Docs restored:

- timeouts.rst: fixed broken SwarmLearningRecipe import; added server
startup/dead-job safety flags section
- timeout_troubleshooting.rst: added sfm_* streaming stall guardrail
section; added HPC/hierarchical deployment guidance (Slurm/Lustre)
- client_controlled_workflows.rst: added round_timeout to Swarm
examples; added client dropout tolerance section
available_recipes.rst: added round_timeout and large-model tuning note
to Swarm section


### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants