From 029319483eb8a23bb3f8a47100acdfb68361eec3 Mon Sep 17 00:00:00 2001 From: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Date: Wed, 29 Apr 2026 20:27:00 -0700 Subject: [PATCH 1/3] [https://nvbugs/6104831][test] Add reproducers for broken-promise on disagg cancellation. Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> --- .../others/test_kv_cache_transceiver.py | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/tests/unittest/others/test_kv_cache_transceiver.py b/tests/unittest/others/test_kv_cache_transceiver.py index cf9d544a9ce0..68d7abff50a0 100644 --- a/tests/unittest/others/test_kv_cache_transceiver.py +++ b/tests/unittest/others/test_kv_cache_transceiver.py @@ -229,6 +229,89 @@ def test_cancel_request_in_transmission(attention_type): assert gen_request.state == LlmRequestState.DISAGG_TRANS_ERROR +@pytest.mark.timeout(120) +@pytest.mark.parametrize("attention_type", + [AttentionTypeCpp.DEFAULT, AttentionTypeCpp.MLA], + ids=["mha", "mla"]) +def test_cancel_request_in_transmission_does_not_break_sender_future( + attention_type, capfd): + tensorrt_llm.logger.set_level("info") + mapping = Mapping(world_size=1, rank=0) + dist = Distributed.get(mapping) + ctx_kv_cache_dtype, gen_kv_cache_dtype = DataType.HALF, DataType.HALF + kv_cache_manager_ctx = create_kv_cache_manager(mapping, ctx_kv_cache_dtype) + kv_cache_manager_gen = create_kv_cache_manager(mapping, gen_kv_cache_dtype) + + cache_transceiver_config = CacheTransceiverConfig(backend="DEFAULT", + max_tokens_in_buffer=512) + + kv_cache_transceiver_ctx = create_kv_cache_transceiver( + mapping, dist, kv_cache_manager_ctx, attention_type, + cache_transceiver_config) + + kv_cache_transceiver_gen = create_kv_cache_transceiver( + mapping, dist, kv_cache_manager_gen, attention_type, + cache_transceiver_config) + + fill_kv_cache_buffer(kv_cache_manager_ctx) + + sampling_params = SamplingParams() + ctx_request = LlmRequest( + request_id=0, + max_new_tokens=1, + input_tokens=list(range(256)), + sampling_config=tensorrt_llm.bindings.SamplingConfig( + sampling_params._get_sampling_config()), + is_streaming=False, + llm_request_type=LlmRequestType.LLMREQUEST_TYPE_CONTEXT_ONLY) + + kv_cache_manager_ctx.impl.add_sequence_batch( + [(ctx_request.py_request_id, ctx_request.prompt_len, 1)], [ctx_request]) + kv_cache_transceiver_ctx.respond_and_send_async(ctx_request) + + time.sleep(2) + is_cancelled = kv_cache_transceiver_ctx.cancel_request(ctx_request) + assert is_cancelled + + gen_request = LlmRequest( + request_id=0, + max_new_tokens=1, + input_tokens=list(range(256)), + sampling_config=tensorrt_llm.bindings.SamplingConfig( + sampling_params._get_sampling_config()), + is_streaming=False, + llm_request_type=LlmRequestType.LLMREQUEST_TYPE_GENERATION_ONLY, + context_phase_params=ctx_request.context_phase_params) + + kv_cache_manager_gen.impl.add_sequence_batch( + [(gen_request.py_request_id, gen_request.prompt_len, 1)], [gen_request]) + kv_cache_transceiver_gen.request_and_receive_async(gen_request) + + completed_ids, error_ids = [], [] + deadline = time.time() + 10 + while time.time() < deadline and not error_ids: + completed_ids, error_ids = kv_cache_transceiver_ctx.check_context_transfer_status( + 1) + if error_ids: + break + time.sleep(0.1) + + assert ctx_request.py_request_id not in completed_ids + assert ctx_request.py_request_id in error_ids + + deadline = time.time() + 10 + while time.time( + ) < deadline and gen_request.state != LlmRequestState.DISAGG_TRANS_ERROR: + kv_cache_transceiver_gen.check_gen_transfer_status(1) + time.sleep(0.1) + + assert gen_request.state == LlmRequestState.DISAGG_TRANS_ERROR + + captured = capfd.readouterr() + merged = captured.out + captured.err + assert "Broken promise" not in merged + + def create_hybrid_cache_manager(mapping, dtype, mamba_conv_dtype=torch.float16, From af58cf14eb58f81e2e6e3c17b27c48b6c434a142 Mon Sep 17 00:00:00 2001 From: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Date: Wed, 29 Apr 2026 20:33:13 -0700 Subject: [PATCH 2/3] [https://nvbugs/6104831][fix] Fulfill sender future on disagg cancellation Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> --- cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp b/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp index 3ecceb9f3f2c..9864d29e06e4 100644 --- a/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp +++ b/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp @@ -586,6 +586,10 @@ class CacheSender::Impl // not be removed from mCancelledRequests. This should be handled by timeout. auto it = mReadyResponses.find(mCurrentRequest.value()); TLLM_CHECK(it != mReadyResponses.end()); + auto cancelledException + = TLLM_REQUEST_EXCEPTION(reqId, tensorrt_llm::common::RequestErrorCode::kNETWORK_ERROR, + "Context KV cache transfer cancelled after ready-signal for request %zu", reqId); + it->second.mPromise.set_exception(std::make_exception_ptr(cancelledException)); { std::scoped_lock lkResp(mSenderMutex); mReadyResponses.erase(it); From ff898323d6fe5afdff9696a01076daf4ab0528ab Mon Sep 17 00:00:00 2001 From: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Date: Thu, 30 Apr 2026 02:07:18 -0700 Subject: [PATCH 3/3] [https://nvbugs/6104831][fix] Free recv buffer index on cancelled-after-ready disagg generation request MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix signature #6 of NVBug 6104831 — a recv-buffer index leak that becomes a permanent global wedge under combined cancel+retry+long-prompt disagg load. The fix is two-layer because the leak has two distinct exit paths. The signature #1 fix on the sender side correctly sends is_ready=false for cancelled-after-ready requests; on the receiver side that becomes bool isReady = false from receiveReadySignal() in CacheReceiver::Impl::requestSync(). The pre-fix early return in requestSync sets kDISAGG_TRANS_ERROR and returns without calling receiveSync(), so unformat() never runs and the recv buffer index reserved at the top of CacheReceiver::Impl::sendRequestInfo() is leaked. mRecvBufferCount defaults to 1 for the NIXL agent backend, so a single leaked recv buffer index is enough to wedge every subsequent assignBufferIndexForRecv() call forever inside the unbounded cv.wait in BaseTransBufferManager::assignBufferIndex. Layer A — sendRequestInfo() exception safety. Track every (BaseTransBufferManager*, std::optional) pair returned by assignBufferIndexForRecv() in a local vector. Wrap the rest of the function body in try {...} catch (...) { freeAssignedRecvBuffers(); throw; } so any exception between assignment and the eventual freeBufferIndexForRecv() call inside unformat() releases the indices. On the success path the local tracking vector is explicitly cleared because ownership has been handed off to the AgentConnection's mCacheBufferIds, which unformat() will free. Layer B — requestSync() !isReady cleanup. Mirror what unformat() does on the success path. In the !isReady early-return branch, iterate the session's connections, look up each pre-assigned recv buffer ID via agentConnection->getPreAssignedBufferId(...), and free it via mgr->freeBufferIndexForRecv(id). The new test test_cancelled_after_ready_does_not_leak_recv_buffer_index uses the NIXL backend (the only backend that goes through assignBufferIndexForRecv), drives one full ctx/gen handshake to completion, exercises the cancelled-after-ready path once, and then issues a follow-up generation request on a worker thread with a 10s probe timeout. Pre-fix the worker thread stays alive past the timeout because assignBufferIndexForRecv() blocks; post-fix the follow-up request completes normally. This PR is chained on top of #13640 (sig #1 fix) because the !isReady early-return path is only reachable once the sender-side cancellation correctly sends is_ready=false. Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com> Made-with: Cursor --- .../batch_manager/dataTransceiver.cpp | 225 ++++++++++++------ .../others/test_kv_cache_transceiver.py | 176 ++++++++++++++ 2 files changed, 327 insertions(+), 74 deletions(-) diff --git a/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp b/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp index 9864d29e06e4..5139d5f55087 100644 --- a/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp +++ b/cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -851,103 +851,149 @@ class CacheReceiver::Impl auto* agentConnectionManager = dynamic_cast(mManager); std::vector> cacheBufferIds; + // Track every recv-buffer slot we reserve here so the RAII guard + // below can release them again if anything later in this function + // throws. Without this, an exception between assignBufferIndexForRecv() + // and the eventual freeBufferIndexForRecv() call inside unformat() / + // receiveSync() leaks the slot, and because mRecvBufferCount defaults + // to 1 the next assignBufferIndexForRecv() then blocks forever in the + // unbounded cv.wait inside BaseTransBufferManager::assignBufferIndex + // (signature #6 of NVBug 6104831). + std::vector>> assignedRecvBuffers; if (agentConnectionManager) { for (auto& cacheTransBufferManager : agentConnectionManager->getCacheTransBufferManagers()) { - cacheBufferIds.push_back(cacheTransBufferManager->assignBufferIndexForRecv()); + auto reservedId = cacheTransBufferManager->assignBufferIndexForRecv(); + cacheBufferIds.push_back(reservedId); + assignedRecvBuffers.emplace_back(cacheTransBufferManager, reservedId); } TLLM_CHECK(!cacheBufferIds.empty()); } + auto freeAssignedRecvBuffers = [&assignedRecvBuffers, &llmRequest]() noexcept + { + for (auto& [mgr, id] : assignedRecvBuffers) + { + if (mgr != nullptr && id.has_value()) + { + try + { + mgr->freeBufferIndexForRecv(id); + } + catch (std::exception const& freeExc) + { + TLLM_LOG_ERROR("Failed to free recv buffer index for request %zu during cleanup: %s", + llmRequest.mRequestId, freeExc.what()); + } + } + } + assignedRecvBuffers.clear(); + }; - auto allCounterparts - = mCacheTransferLayer.computeCounterparts(mSelfState.getCommState().value().getSelfIdx(), contextState); + try + { + auto allCounterparts + = mCacheTransferLayer.computeCounterparts(mSelfState.getCommState().value().getSelfIdx(), contextState); - auto kvCounterParts = mCacheTransferLayer.getKvFormatter()->getCounterparts( - mCacheTransferLayer.getCacheState(), mSelfState.getCommState().value().getSelfIdx(), destCacheState); + auto kvCounterParts = mCacheTransferLayer.getKvFormatter()->getCounterparts( + mCacheTransferLayer.getCacheState(), mSelfState.getCommState().value().getSelfIdx(), destCacheState); - bool hasRnn = mCacheTransferLayer.getCacheState().hasRnnConfig() && destCacheState.hasRnnConfig(); + bool hasRnn = mCacheTransferLayer.getCacheState().hasRnnConfig() && destCacheState.hasRnnConfig(); - std::vector rnnCounterParts; - if (hasRnn) - { - rnnCounterParts = executor::kv_cache::targetIRanksForRnn( - destCacheState, mCacheTransferLayer.getCacheState(), mSelfState.getCommState().value().getSelfIdx()) - .mIRanks; - } + std::vector rnnCounterParts; + if (hasRnn) + { + rnnCounterParts = executor::kv_cache::targetIRanksForRnn( + destCacheState, mCacheTransferLayer.getCacheState(), mSelfState.getCommState().value().getSelfIdx()) + .mIRanks; + } - auto connections = mManager->getConnections(commState); - std::vector allConnections; - for (auto index : allCounterparts) - { - auto const* connection = connections.at(index); - allConnections.emplace_back(connection); - } + auto connections = mManager->getConnections(commState); + std::vector allConnections; + for (auto index : allCounterparts) + { + auto const* connection = connections.at(index); + allConnections.emplace_back(connection); + } - for (size_t ci = 0; ci < allCounterparts.size(); ci++) - { - auto rank = allCounterparts[ci]; - auto const* connection = connections.at(rank); + for (size_t ci = 0; ci < allCounterparts.size(); ci++) + { + auto rank = allCounterparts[ci]; + auto const* connection = connections.at(rank); - bool isKvCounterpart - = std::find(kvCounterParts.begin(), kvCounterParts.end(), rank) != kvCounterParts.end(); - bool isRnnCounterpart - = hasRnn && std::find(rnnCounterParts.begin(), rnnCounterParts.end(), rank) != rnnCounterParts.end(); + bool isKvCounterpart + = std::find(kvCounterParts.begin(), kvCounterParts.end(), rank) != kvCounterParts.end(); + bool isRnnCounterpart = hasRnn + && std::find(rnnCounterParts.begin(), rnnCounterParts.end(), rank) != rnnCounterParts.end(); - if (agentConnectionManager) - { - auto idsForRank = cacheBufferIds; - auto const& managers = agentConnectionManager->getCacheTransBufferManagers(); - for (size_t i = 0; i < idsForRank.size(); i++) + if (agentConnectionManager) { - auto kind = managers[i]->getBufferKind(); - bool include = (kind != BufferKind::kRNN) ? isKvCounterpart : isRnnCounterpart; - if (!include) + auto idsForRank = cacheBufferIds; + auto const& managers = agentConnectionManager->getCacheTransBufferManagers(); + for (size_t i = 0; i < idsForRank.size(); i++) { - idsForRank[i] = std::nullopt; + auto kind = managers[i]->getBufferKind(); + bool include = (kind != BufferKind::kRNN) ? isKvCounterpart : isRnnCounterpart; + if (!include) + { + idsForRank[i] = std::nullopt; + } } - } - int validConnectionIdx = 0; - if (isKvCounterpart) - { - auto kvCpIdx - = std::find(kvCounterParts.begin(), kvCounterParts.end(), rank) - kvCounterParts.begin(); - auto [pickUpIdx, localRankIdx] = mCacheTransferLayer.getKvFormatter()->pickRecvConnections( - allCounterparts.size(), mSelfState.getCacheState().value(), - mSelfState.getCommState().value().getSelfIdx(), destCacheState, allCounterparts); - validConnectionIdx - = std::find(localRankIdx.begin(), localRankIdx.end(), kvCpIdx) - localRankIdx.begin(); + int validConnectionIdx = 0; + if (isKvCounterpart) + { + auto kvCpIdx + = std::find(kvCounterParts.begin(), kvCounterParts.end(), rank) - kvCounterParts.begin(); + auto [pickUpIdx, localRankIdx] = mCacheTransferLayer.getKvFormatter()->pickRecvConnections( + allCounterparts.size(), mSelfState.getCacheState().value(), + mSelfState.getCommState().value().getSelfIdx(), destCacheState, allCounterparts); + validConnectionIdx + = std::find(localRankIdx.begin(), localRankIdx.end(), kvCpIdx) - localRankIdx.begin(); + } + else if (isRnnCounterpart) + { + auto rnnTargetInfo = executor::kv_cache::targetIRanksForRnn(destCacheState, + mCacheTransferLayer.getCacheState(), mSelfState.getCommState().value().getSelfIdx()); + auto rnnCpIdx + = std::find(rnnCounterParts.begin(), rnnCounterParts.end(), rank) - rnnCounterParts.begin(); + auto [pickUpIdx, localRankIdx] + = cache_formatter_utils::pickRecvConnections(rnnCounterParts.size(), + mCacheTransferLayer.getCacheState(), mSelfState.getCommState().value().getSelfIdx(), + destCacheState, rnnCounterParts, rnnTargetInfo); + validConnectionIdx + = std::find(localRankIdx.begin(), localRankIdx.end(), rnnCpIdx) - localRankIdx.begin(); + } + + auto* agentConnection = dynamic_cast(connection); + TLLM_CHECK(agentConnection != nullptr); + + const_cast(agentConnection) + ->sendRequestAndBufferInfo(requestInfo, idsForRank, validConnectionIdx); } - else if (isRnnCounterpart) + else { - auto rnnTargetInfo = executor::kv_cache::targetIRanksForRnn(destCacheState, - mCacheTransferLayer.getCacheState(), mSelfState.getCommState().value().getSelfIdx()); - auto rnnCpIdx - = std::find(rnnCounterParts.begin(), rnnCounterParts.end(), rank) - rnnCounterParts.begin(); - auto [pickUpIdx, localRankIdx] = cache_formatter_utils::pickRecvConnections(rnnCounterParts.size(), - mCacheTransferLayer.getCacheState(), mSelfState.getCommState().value().getSelfIdx(), - destCacheState, rnnCounterParts, rnnTargetInfo); - validConnectionIdx - = std::find(localRankIdx.begin(), localRankIdx.end(), rnnCpIdx) - localRankIdx.begin(); + sendRequestInfo(connection, requestInfo); } - - auto* agentConnection = dynamic_cast(connection); - TLLM_CHECK(agentConnection != nullptr); - - const_cast(agentConnection) - ->sendRequestAndBufferInfo(requestInfo, idsForRank, validConnectionIdx); - } - else - { - sendRequestInfo(connection, requestInfo); } + auto const& resource = getReceiveCacheResource(llmRequest); + // Buffer indices are now owned by the agent connections + // (mCacheBufferIds) and will be freed by unformat() during + // receiveSync() on the success path, or by the !isReady early + // return in requestSync() on the cancelled-after-ready path. + // Hand ownership off and clear the local reservation list so + // the catch below does not double-free on the success path. + assignedRecvBuffers.clear(); + return TransferSession(std::move(allConnections), DataContext{tagFromRequestId(requestId), mTerminate}, + std::move(allCounterparts), mSelfState, contextState, resource->mBufferManager, + requestInfo.getIndexFromEnd(), requestInfo.getLastBlockKey(), &llmRequest, + !common::getEnvKVCacheTimeOutputPath().empty()); + } + catch (...) + { + freeAssignedRecvBuffers(); + throw; } - auto const& resource = getReceiveCacheResource(llmRequest); - return TransferSession(std::move(allConnections), DataContext{tagFromRequestId(requestId), mTerminate}, - std::move(allCounterparts), mSelfState, contextState, resource->mBufferManager, - requestInfo.getIndexFromEnd(), requestInfo.getLastBlockKey(), &llmRequest, - !common::getEnvKVCacheTimeOutputPath().empty()); } std::unique_ptr const& getReceiveCacheResource(LlmRequest const& llmRequest) @@ -1066,6 +1112,37 @@ class CacheReceiver::Impl // Reuse the error state for the cancelled request. llmRequest.setState(LlmRequestState::kDISAGG_TRANS_ERROR); llmRequest.setKvCacheTransferEnd(std::chrono::steady_clock::now()); + // Mirror what unformat() does on the success path: explicitly free + // any pre-assigned recv buffer indices so a cancelled-after-ready + // request does not leak the recv buffer slot. Without this, the + // next assignBufferIndexForRecv() call blocks forever in the + // unbounded cv.wait inside BaseTransBufferManager::assignBufferIndex + // (signature #6 of NVBug 6104831). The Layer A guard in + // sendRequestInfo() already covers exception paths; this branch + // covers the structured early return that fires whenever the + // sender-side cancellation path (signature #1 fix) sends + // is_ready=false. + auto* agentConnectionManager = dynamic_cast(mManager); + if (agentConnectionManager != nullptr) + { + for (auto const* connection : session.getConnections()) + { + auto const* agentConnection = dynamic_cast(connection); + if (agentConnection == nullptr) + { + continue; + } + for (auto& mgr : agentConnectionManager->getCacheTransBufferManagers()) + { + auto cacheBufferId + = agentConnection->getPreAssignedBufferId(static_cast(mgr->getBufferKind())); + if (cacheBufferId.has_value()) + { + mgr->freeBufferIndexForRecv(cacheBufferId); + } + } + } + } return; } receiveSync(session); diff --git a/tests/unittest/others/test_kv_cache_transceiver.py b/tests/unittest/others/test_kv_cache_transceiver.py index 68d7abff50a0..87eb2acba781 100644 --- a/tests/unittest/others/test_kv_cache_transceiver.py +++ b/tests/unittest/others/test_kv_cache_transceiver.py @@ -1,3 +1,4 @@ +import threading import time import uuid @@ -312,6 +313,181 @@ def test_cancel_request_in_transmission_does_not_break_sender_future( assert "Broken promise" not in merged +@pytest.mark.timeout(180) +@pytest.mark.parametrize("attention_type", + [AttentionTypeCpp.DEFAULT, AttentionTypeCpp.MLA], + ids=["mha", "mla"]) +def test_cancelled_after_ready_does_not_leak_recv_buffer_index(attention_type): + """Reproduce the recv-buffer leak via the !isReady early return. + + Signature #6 of NVBug 6104831. The signature #1 fix on the sender + side correctly sends ``is_ready=false`` for cancelled-after-ready + requests; on the receiver side that becomes + ``bool isReady = false`` from ``receiveReadySignal()`` in + ``CacheReceiver::Impl::requestSync()``. The pre-fix early return + sets ``kDISAGG_TRANS_ERROR`` and returns *without* calling + ``receiveSync()``, so ``unformat()`` never runs and the recv buffer + index reserved at the top of ``sendRequestInfo()`` is leaked. + Because ``mRecvBufferCount`` defaults to ``1`` for the NIXL agent + backend, a single leaked recv buffer index is enough to wedge every + subsequent ``assignBufferIndexForRecv()`` call forever inside the + unbounded ``cv.wait`` in ``BaseTransBufferManager::assignBufferIndex``. + + The test drives one full ctx/gen handshake to completion to capture + a real opaque comm/cache state, then exercises the + cancelled-after-ready path once (sender ``respond_and_send_async`` + + ``cancel_request`` + receiver ``request_and_receive_async``) so + the receiver side hits the ``!isReady`` early return. It then + issues a follow-up generation request whose context counterpart is + a fresh, healthy ``respond_and_send_async`` and asserts the + follow-up request completes within a reasonable timeout. Pre-fix + the follow-up request hangs in + ``assignBufferIndexForRecv()``; post-fix it completes promptly + because the recv buffer index has been freed by the new explicit + free in the ``!isReady`` early return path. + """ + tensorrt_llm.logger.set_level("info") + mapping = Mapping(world_size=1, rank=0) + dist = Distributed.get(mapping) + kv_cache_manager_ctx = create_kv_cache_manager(mapping, DataType.HALF) + kv_cache_manager_gen = create_kv_cache_manager(mapping, DataType.HALF) + + # The bug only manifests with the NIXL agent backend, which is the only + # backend that goes through assignBufferIndexForRecv() / + # AgentConnection::sendRequestAndBufferInfo(). The DEFAULT-resolves-to-UCX + # path uses connection->send() directly and does not reserve recv buffer + # slots, so it cannot exercise this leak. + cache_transceiver_config = CacheTransceiverConfig(backend="NIXL", + max_tokens_in_buffer=512) + + kv_cache_transceiver_ctx = create_kv_cache_transceiver( + mapping, dist, kv_cache_manager_ctx, attention_type, + cache_transceiver_config) + kv_cache_transceiver_gen = create_kv_cache_transceiver( + mapping, dist, kv_cache_manager_gen, attention_type, + cache_transceiver_config) + + fill_kv_cache_buffer(kv_cache_manager_ctx) + sampling_params = SamplingParams() + + def make_request(request_id, llm_request_type, context_phase_params=None): + kwargs = dict( + request_id=request_id, + max_new_tokens=1, + input_tokens=list(range(256)), + sampling_config=tensorrt_llm.bindings.SamplingConfig( + sampling_params._get_sampling_config()), + is_streaming=False, + llm_request_type=llm_request_type, + ) + if context_phase_params is not None: + kwargs["context_phase_params"] = context_phase_params + return LlmRequest(**kwargs) + + def add_sequence(kv_cache_manager, request): + kv_cache_manager.impl.add_sequence(request.py_request_id, + request.prompt_len, 1, request) + + template_ctx_request = make_request( + 200, LlmRequestType.LLMREQUEST_TYPE_CONTEXT_ONLY) + add_sequence(kv_cache_manager_ctx, template_ctx_request) + kv_cache_transceiver_ctx.respond_and_send_async(template_ctx_request) + + template_gen_request = make_request( + 200, LlmRequestType.LLMREQUEST_TYPE_GENERATION_ONLY, + template_ctx_request.context_phase_params) + add_sequence(kv_cache_manager_gen, template_gen_request) + kv_cache_transceiver_gen.request_and_receive_async(template_gen_request) + kv_cache_transceiver_ctx.check_context_transfer_status(1) + kv_cache_transceiver_gen.check_gen_transfer_status(1) + + opaque_state = template_ctx_request.context_phase_params.opaque_state + assert opaque_state is not None + kv_cache_manager_ctx.free_resources(template_ctx_request) + kv_cache_manager_gen.free_resources(template_gen_request) + + # Trigger a cancelled-after-ready transfer. The signature #1 fix is a + # prerequisite (it is the code path that sends is_ready=false); this PR + # is chained on top of #13640 to ensure that fix is present. + cancel_ctx_request = make_request( + 201, LlmRequestType.LLMREQUEST_TYPE_CONTEXT_ONLY) + add_sequence(kv_cache_manager_ctx, cancel_ctx_request) + kv_cache_transceiver_ctx.respond_and_send_async(cancel_ctx_request) + time.sleep(2) + is_cancelled = kv_cache_transceiver_ctx.cancel_request(cancel_ctx_request) + assert is_cancelled + + cancel_gen_request = make_request( + 201, LlmRequestType.LLMREQUEST_TYPE_GENERATION_ONLY, + cancel_ctx_request.context_phase_params) + add_sequence(kv_cache_manager_gen, cancel_gen_request) + kv_cache_transceiver_gen.request_and_receive_async(cancel_gen_request) + + deadline = time.time() + 10 + while time.time() < deadline and (cancel_gen_request.state + != LlmRequestState.DISAGG_TRANS_ERROR): + kv_cache_transceiver_gen.check_gen_transfer_status(1) + time.sleep(0.1) + assert cancel_gen_request.state == LlmRequestState.DISAGG_TRANS_ERROR + deadline = time.time() + 10 + completed_ids, _ = [], [] + while time.time( + ) < deadline and cancel_ctx_request.py_request_id not in completed_ids: + completed_ids, _ = kv_cache_transceiver_ctx.check_context_transfer_status( + 1) + time.sleep(0.1) + kv_cache_manager_ctx.free_resources(cancel_ctx_request) + kv_cache_manager_gen.free_resources(cancel_gen_request) + + # Issue a follow-up generation request with a real, healthy context + # counterpart and assert it completes within a reasonable timeout. + # Pre-fix the receiver worker thread blocks forever inside + # assignBufferIndexForRecv() because the only recv buffer slot was leaked + # by the previous !isReady early return. Post-fix the slot is freed and + # the follow-up completes normally. + followup_ctx_request = make_request( + 202, LlmRequestType.LLMREQUEST_TYPE_CONTEXT_ONLY) + add_sequence(kv_cache_manager_ctx, followup_ctx_request) + kv_cache_transceiver_ctx.respond_and_send_async(followup_ctx_request) + + followup_gen_request = make_request( + 202, LlmRequestType.LLMREQUEST_TYPE_GENERATION_ONLY, + followup_ctx_request.context_phase_params) + add_sequence(kv_cache_manager_gen, followup_gen_request) + + # Run request_and_receive_async on a worker thread so a pre-fix wedge + # surfaces as the worker thread staying alive past the probe timeout + # rather than as the test process itself hanging. + request_done = threading.Event() + + def issue_followup(): + try: + kv_cache_transceiver_gen.request_and_receive_async( + followup_gen_request) + finally: + request_done.set() + + issuer = threading.Thread(target=issue_followup, daemon=True) + issuer.start() + issuer.join(timeout=10) + wedged_in_assign_buffer = issuer.is_alive() + + deadline = time.time() + 30 + while time.time() < deadline and ( + followup_gen_request.state + != LlmRequestState.DISAGG_GENERATION_TRANS_COMPLETE): + kv_cache_transceiver_ctx.check_context_transfer_status(1) + kv_cache_transceiver_gen.check_gen_transfer_status(1) + time.sleep(0.1) + + assert not wedged_in_assign_buffer, ( + "signature #6 reproduced: assignBufferIndexForRecv() blocked the " + "follow-up generation request because the previous " + "cancelled-after-ready transfer leaked its recv buffer slot") + assert (followup_gen_request.state == + LlmRequestState.DISAGG_GENERATION_TRANS_COMPLETE) + + def create_hybrid_cache_manager(mapping, dtype, mamba_conv_dtype=torch.float16,