From 405e9c161ca850debbe0a48f8330e81a0b5aadca Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 3 Dec 2025 15:25:25 +0100 Subject: [PATCH 1/9] server: move msg diffs tracking to HTTP thread --- tools/server/server-context.cpp | 37 +++++++-------------------------- tools/server/server-task.cpp | 19 +++++++++++++++++ tools/server/server-task.h | 36 +++++++++++++++++++++++++++++--- 3 files changed, 60 insertions(+), 32 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index c9245745756..583f60513b2 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -101,8 +101,6 @@ struct server_slot { std::string generated_text; llama_tokens generated_tokens; - common_chat_msg chat_msg; - std::vector generated_token_probs; bool has_next_token = true; @@ -153,9 +151,6 @@ struct server_slot { llama_token sampled; - common_chat_format chat_format = COMMON_CHAT_FORMAT_CONTENT_ONLY; - std::vector generated_tool_call_ids; - // stats size_t n_sent_text = 0; // number of sent text character @@ -183,13 +178,10 @@ struct server_slot { stop = STOP_TYPE_NONE; stopping_word = ""; n_sent_text = 0; - chat_format = COMMON_CHAT_FORMAT_CONTENT_ONLY; generated_tokens.clear(); generated_token_probs.clear(); - chat_msg = {}; json_schema = json(); - generated_tool_call_ids.clear(); // clear speculative decoding stats n_draft_total = 0; @@ -302,23 +294,6 @@ struct server_slot { return timings; } - const common_chat_msg & update_chat_msg(std::vector & diffs) { - GGML_ASSERT(task); - - auto previous_msg = chat_msg; - SRV_DBG("Parsing chat message: %s\n", generated_text.c_str()); - auto new_msg = common_chat_parse( - generated_text, - /* is_partial= */ stop != STOP_TYPE_EOS, - task->params.oaicompat_chat_syntax); - if (!new_msg.empty()) { - new_msg.set_tool_call_ids(generated_tool_call_ids, gen_tool_call_id); - chat_msg = new_msg; - diffs = common_chat_msg_diff::compute_diffs(previous_msg, new_msg.empty() ? previous_msg : new_msg); - } - return chat_msg; - } - size_t find_stopping_strings(const std::string & text, const size_t last_token_size, bool is_full_stop) { GGML_ASSERT(task); @@ -1284,8 +1259,6 @@ struct server_context_impl { } else { res->content = tkn.text_to_send; res->tokens = { tkn.tok }; - - slot.update_chat_msg(res->oaicompat_msg_diffs); } res->n_decoded = slot.n_decoded; @@ -1338,7 +1311,6 @@ struct server_context_impl { res->res_type = slot.task->params.res_type; res->oaicompat_model = slot.task->params.oaicompat_model; res->oaicompat_cmpl_id = slot.task->params.oaicompat_cmpl_id; - res->oaicompat_msg = slot.update_chat_msg(res->oaicompat_msg_diffs); // populate res.probs_output if (slot.task->params.sampling.n_probs > 0) { @@ -2593,6 +2565,9 @@ static std::unique_ptr handle_completions_impl( auto completion_id = gen_chatcmplid(); auto & rd = res->rd; + // tracking generation state and partial tool calls + std::vector states; + try { std::vector tasks; @@ -2630,6 +2605,7 @@ static std::unique_ptr handle_completions_impl( task.params.oaicompat_model = ctx_server.model_name; tasks.push_back(std::move(task)); + states.emplace_back(task.params.oaicompat_chat_syntax); } rd.post_tasks(std::move(tasks)); @@ -2652,6 +2628,7 @@ static std::unique_ptr handle_completions_impl( json arr = json::array(); for (auto & res : all_results.results) { GGML_ASSERT(dynamic_cast(res.get()) != nullptr); + res->update(states[res->get_index()]); // update generation state arr.push_back(res->to_json()); } // if single request, return single object instead of array @@ -2673,6 +2650,7 @@ static std::unique_ptr handle_completions_impl( dynamic_cast(first_result.get()) != nullptr || dynamic_cast(first_result.get()) != nullptr ); + first_result->update(states[first_result->get_index()]); // update generation state } // next responses are streamed @@ -2683,7 +2661,7 @@ static std::unique_ptr handle_completions_impl( } res->status = 200; res->content_type = "text/event-stream"; - res->next = [res_this = res.get(), res_type, &should_stop](std::string & output) -> bool { + res->next = [res_this = res.get(), res_type, &should_stop, states = std::move(states)](std::string & output) mutable -> bool { if (should_stop()) { SRV_DBG("%s", "stopping streaming due to should_stop condition\n"); return false; // should_stop condition met @@ -2737,6 +2715,7 @@ static std::unique_ptr handle_completions_impl( dynamic_cast(result.get()) != nullptr || dynamic_cast(result.get()) != nullptr ); + result->update(states[result->get_index()]); // update generation state if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { output = format_anthropic_sse(res_json); } else { diff --git a/tools/server/server-task.cpp b/tools/server/server-task.cpp index 8a9477d7321..a1791a03f37 100644 --- a/tools/server/server-task.cpp +++ b/tools/server/server-task.cpp @@ -700,6 +700,25 @@ json server_task_result_cmpl_final::to_json_oaicompat_chat() { return res; } +common_chat_msg task_result_state::update_chat_msg( + const std::string & text_added, + bool is_partial, + std::vector & diffs) { + generated_text += text_added; + auto msg_prv_copy = chat_msg; + SRV_DBG("Parsing chat message: %s\n", generated_text.c_str()); + auto new_msg = common_chat_parse( + generated_text, + is_partial, + oaicompat_chat_syntax); + if (!new_msg.empty()) { + new_msg.set_tool_call_ids(generated_tool_call_ids, gen_tool_call_id); + chat_msg = new_msg; + diffs = common_chat_msg_diff::compute_diffs(msg_prv_copy, new_msg.empty() ? msg_prv_copy : new_msg); + } + return chat_msg; +} + json server_task_result_cmpl_final::to_json_oaicompat_chat_stream() { std::time_t t = std::time(0); std::string finish_reason = "length"; diff --git a/tools/server/server-task.h b/tools/server/server-task.h index a22d7cab116..5d8d9f8281f 100644 --- a/tools/server/server-task.h +++ b/tools/server/server-task.h @@ -161,6 +161,25 @@ struct result_prompt_progress { json to_json() const; }; +// struct for tracking the state of a task (e.g., for streaming) +struct task_result_state { + // tracking diffs for partial tool calls + std::vector diffs; + common_chat_syntax oaicompat_chat_syntax; + common_chat_msg chat_msg; + std::string generated_text; // append new chunks of generated text here + std::vector generated_tool_call_ids; + + task_result_state(const common_chat_syntax & oaicompat_chat_syntax) + : oaicompat_chat_syntax(oaicompat_chat_syntax) {} + + // parse partial tool calls and update the internal state + common_chat_msg update_chat_msg( + const std::string & text_added, + bool is_partial, + std::vector & diffs); +}; + struct server_task_result { int id = -1; int id_slot = -1; @@ -175,6 +194,9 @@ struct server_task_result { virtual int get_index() { return -1; } + virtual void update(task_result_state &) { + // only used by server_task_result_cmpl_* + } virtual json to_json() = 0; virtual ~server_task_result() = default; }; @@ -233,9 +255,9 @@ struct server_task_result_cmpl_final : server_task_result { task_response_type res_type = TASK_RESPONSE_TYPE_NONE; std::string oaicompat_model; std::string oaicompat_cmpl_id; - common_chat_msg oaicompat_msg; + common_chat_msg oaicompat_msg; // to be populated by update() - std::vector oaicompat_msg_diffs; + std::vector oaicompat_msg_diffs; // to be populated by update() virtual int get_index() override { return index; @@ -247,6 +269,10 @@ struct server_task_result_cmpl_final : server_task_result { virtual json to_json() override; + virtual void update(task_result_state & state) override { + oaicompat_msg = state.update_chat_msg(content, false, oaicompat_msg_diffs); + } + json to_json_non_oaicompat(); json to_json_oaicompat(); @@ -280,7 +306,7 @@ struct server_task_result_cmpl_partial : server_task_result { task_response_type res_type = TASK_RESPONSE_TYPE_NONE; std::string oaicompat_model; std::string oaicompat_cmpl_id; - std::vector oaicompat_msg_diffs; + std::vector oaicompat_msg_diffs; // to be populated by update() virtual int get_index() override { return index; @@ -292,6 +318,10 @@ struct server_task_result_cmpl_partial : server_task_result { virtual json to_json() override; + virtual void update(task_result_state & state) override { + state.update_chat_msg(content, true, oaicompat_msg_diffs); + } + json to_json_non_oaicompat(); json to_json_oaicompat(); From 60f12c2af5ebc90d5c99f868cbdf418a688fe22c Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 3 Dec 2025 15:51:56 +0100 Subject: [PATCH 2/9] wip --- tools/server/server-context.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 583f60513b2..0ac43fe312e 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -2654,10 +2654,12 @@ static std::unique_ptr handle_completions_impl( } // next responses are streamed + // to be sent immediately + json first_result_json = first_result->to_json(); if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { - res->data = format_anthropic_sse(first_result->to_json()); + res->data = format_anthropic_sse(first_result_json); } else { - res->data = format_oai_sse(first_result->to_json()); // to be sent immediately + res->data = format_oai_sse(first_result_json); } res->status = 200; res->content_type = "text/event-stream"; @@ -2698,8 +2700,8 @@ static std::unique_ptr handle_completions_impl( } // send the results - json res_json = result->to_json(); if (result->is_error()) { + json res_json = result->to_json(); if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { output = format_anthropic_sse({ {"event", "error"}, @@ -2716,6 +2718,7 @@ static std::unique_ptr handle_completions_impl( || dynamic_cast(result.get()) != nullptr ); result->update(states[result->get_index()]); // update generation state + json res_json = result->to_json(); if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { output = format_anthropic_sse(res_json); } else { From d9faf85c4f00aac1441f86f0cbf0964126e504c5 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 3 Dec 2025 16:16:06 +0100 Subject: [PATCH 3/9] tool call tests ok --- tools/server/server-context.cpp | 134 ++++++++++++++++++-------------- tools/server/server-task.cpp | 6 +- 2 files changed, 79 insertions(+), 61 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 0ac43fe312e..442c59aeb87 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -1290,8 +1290,13 @@ struct server_context_impl { res->id_slot = slot.id; res->index = slot.task->index; - res->content = slot.generated_text; - res->tokens = std::move(slot.generated_tokens); + // in stream mode, content and tokens are already in last partial chunk + res->content = slot.task->params.stream ? "" : slot.generated_text; + if (slot.task->params.stream) { + res->tokens = llama_tokens{}; + } else { + res->tokens = std::move(slot.generated_tokens); + } res->timings = slot.get_timings(); res->prompt = slot.task->tokens.detokenize(ctx, true); res->response_fields = std::move(slot.task->params.response_fields); @@ -2603,9 +2608,9 @@ static std::unique_ptr handle_completions_impl( task.params.res_type = res_type; task.params.oaicompat_cmpl_id = completion_id; task.params.oaicompat_model = ctx_server.model_name; + states.emplace_back(task.params.oaicompat_chat_syntax); tasks.push_back(std::move(task)); - states.emplace_back(task.params.oaicompat_chat_syntax); } rd.post_tasks(std::move(tasks)); @@ -2664,70 +2669,83 @@ static std::unique_ptr handle_completions_impl( res->status = 200; res->content_type = "text/event-stream"; res->next = [res_this = res.get(), res_type, &should_stop, states = std::move(states)](std::string & output) mutable -> bool { - if (should_stop()) { - SRV_DBG("%s", "stopping streaming due to should_stop condition\n"); - return false; // should_stop condition met - } - - if (!res_this->data.empty()) { - // flush the first chunk - output = std::move(res_this->data); - res_this->data.clear(); - return true; - } - - server_response_reader & rd = res_this->rd; - - // check if there is more data - if (!rd.has_next()) { + static auto format_error = [](task_response_type res_type, const json & res_json) { if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { - // Anthropic doesn't send [DONE], message_stop was already sent - output = ""; - } else if (res_type != TASK_RESPONSE_TYPE_NONE) { - output = "data: [DONE]\n\n"; - } else { - output = ""; - } - SRV_DBG("%s", "all results received, terminating stream\n"); - return false; // no more data, terminate - } - - // receive subsequent results - auto result = rd.next(should_stop); - if (result == nullptr) { - SRV_DBG("%s", "stopping streaming due to should_stop condition\n"); - return false; // should_stop condition met - } - - // send the results - if (result->is_error()) { - json res_json = result->to_json(); - if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { - output = format_anthropic_sse({ + return format_anthropic_sse({ {"event", "error"}, {"data", res_json}, }); } else { - output = format_oai_sse(json {{ "error", res_json }}); + return format_oai_sse(json {{ "error", res_json }}); } - SRV_DBG("%s", "error received during streaming, terminating stream\n"); - return false; // terminate on error - } else { - GGML_ASSERT( - dynamic_cast(result.get()) != nullptr - || dynamic_cast(result.get()) != nullptr - ); - result->update(states[result->get_index()]); // update generation state - json res_json = result->to_json(); - if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { - output = format_anthropic_sse(res_json); + }; + + try { + if (should_stop()) { + SRV_DBG("%s", "stopping streaming due to should_stop condition\n"); + return false; // should_stop condition met + } + + if (!res_this->data.empty()) { + // flush the first chunk + output = std::move(res_this->data); + res_this->data.clear(); + return true; + } + + server_response_reader & rd = res_this->rd; + + // check if there is more data + if (!rd.has_next()) { + if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { + // Anthropic doesn't send [DONE], message_stop was already sent + output = ""; + } else if (res_type != TASK_RESPONSE_TYPE_NONE) { + output = "data: [DONE]\n\n"; + } else { + output = ""; + } + SRV_DBG("%s", "all results received, terminating stream\n"); + return false; // no more data, terminate + } + + // receive subsequent results + auto result = rd.next(should_stop); + if (result == nullptr) { + SRV_DBG("%s", "stopping streaming due to should_stop condition\n"); + return false; // should_stop condition met + } + + // send the results + if (result->is_error()) { + json res_json = result->to_json(); + output = format_error(res_type, res_json); + SRV_DBG("%s", "error received during streaming, terminating stream\n"); + return false; // terminate on error } else { - output = format_oai_sse(res_json); + GGML_ASSERT( + dynamic_cast(result.get()) != nullptr + || dynamic_cast(result.get()) != nullptr + ); + result->update(states[result->get_index()]); // update generation state + json res_json = result->to_json(); + if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { + output = format_anthropic_sse(res_json); + } else { + output = format_oai_sse(res_json); + } } - } - // has next data, continue - return true; + // has next data, continue + return true; + + } catch (const std::exception & e) { + json error_json = format_error_response(e.what(), ERROR_TYPE_SERVER); + output = format_error(res_type, error_json); + + // terminate on exception + return false; + } }; } diff --git a/tools/server/server-task.cpp b/tools/server/server-task.cpp index a1791a03f37..980e4904943 100644 --- a/tools/server/server-task.cpp +++ b/tools/server/server-task.cpp @@ -582,8 +582,8 @@ json server_task_result_cmpl_final::to_json() { json server_task_result_cmpl_final::to_json_non_oaicompat() { json res = json { {"index", index}, - {"content", stream ? "" : content}, // in stream mode, content is already in last partial chunk - {"tokens", stream ? llama_tokens {} : tokens}, + {"content", content}, + {"tokens", tokens}, {"id_slot", id_slot}, {"stop", true}, {"model", oaicompat_model}, @@ -619,7 +619,7 @@ json server_task_result_cmpl_final::to_json_oaicompat() { json res = json { {"choices", json::array({ json{ - {"text", stream ? "" : content}, // in stream mode, content is already in last partial chunk + {"text", content}, {"index", index}, {"logprobs", logprobs}, {"finish_reason", finish_reason}, From f63e76011d344dd42489f50c50c1cd04ba8be9a1 Mon Sep 17 00:00:00 2001 From: Georgi Gerganov Date: Thu, 4 Dec 2025 12:42:36 +0200 Subject: [PATCH 4/9] minor : style --- tools/server/server-context.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 442c59aeb87..85471199d85 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -1291,10 +1291,11 @@ struct server_context_impl { res->index = slot.task->index; // in stream mode, content and tokens are already in last partial chunk - res->content = slot.task->params.stream ? "" : slot.generated_text; if (slot.task->params.stream) { + res->content = ""; res->tokens = llama_tokens{}; } else { + res->content = std::move(slot.generated_text); res->tokens = std::move(slot.generated_tokens); } res->timings = slot.get_timings(); @@ -2591,6 +2592,7 @@ static std::unique_ptr handle_completions_impl( inputs = tokenize_input_prompts(ctx_server.vocab, ctx_server.mctx, prompt, true, true); } tasks.reserve(inputs.size()); + states.reserve(inputs.size()); for (size_t i = 0; i < inputs.size(); i++) { server_task task = server_task(type); @@ -2608,9 +2610,9 @@ static std::unique_ptr handle_completions_impl( task.params.res_type = res_type; task.params.oaicompat_cmpl_id = completion_id; task.params.oaicompat_model = ctx_server.model_name; - states.emplace_back(task.params.oaicompat_chat_syntax); tasks.push_back(std::move(task)); + states.push_back(task.params.oaicompat_chat_syntax); } rd.post_tasks(std::move(tasks)); @@ -2639,7 +2641,6 @@ static std::unique_ptr handle_completions_impl( // if single request, return single object instead of array res->ok(arr.size() == 1 ? arr[0] : arr); } - } else { // in streaming mode, the first error must be treated as non-stream response // this is to match the OAI API behavior From 537ffa94ddf0c38987149141767f8aa2b16a911a Mon Sep 17 00:00:00 2001 From: Georgi Gerganov Date: Thu, 4 Dec 2025 13:10:39 +0200 Subject: [PATCH 5/9] cont : fix --- tools/server/server-context.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 85471199d85..1ab6592c67f 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -2610,9 +2610,9 @@ static std::unique_ptr handle_completions_impl( task.params.res_type = res_type; task.params.oaicompat_cmpl_id = completion_id; task.params.oaicompat_model = ctx_server.model_name; + states.push_back(task.params.oaicompat_chat_syntax); tasks.push_back(std::move(task)); - states.push_back(task.params.oaicompat_chat_syntax); } rd.post_tasks(std::move(tasks)); From 0b1704a8af52a3a92fa7195f4976030e9c24f039 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Thu, 4 Dec 2025 13:34:17 +0100 Subject: [PATCH 6/9] move states to server_response_reader --- tools/server/server-context.cpp | 12 +++++------- tools/server/server-queue.cpp | 10 ++++++++++ tools/server/server-queue.h | 9 +++++++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 1ab6592c67f..b2116316dc2 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -2571,12 +2571,12 @@ static std::unique_ptr handle_completions_impl( auto completion_id = gen_chatcmplid(); auto & rd = res->rd; - // tracking generation state and partial tool calls - std::vector states; - try { std::vector tasks; + // tracking generation state and partial tool calls + std::vector states; + const auto & prompt = data.at("prompt"); // TODO: this log can become very long, put it behind a flag or think about a more compact format //SRV_DBG("Prompt: %s\n", prompt.is_string() ? prompt.get().c_str() : prompt.dump(2).c_str()); @@ -2615,6 +2615,7 @@ static std::unique_ptr handle_completions_impl( tasks.push_back(std::move(task)); } + rd.set_states(std::move(states)); rd.post_tasks(std::move(tasks)); } catch (const std::exception & e) { res->error(format_error_response(e.what(), ERROR_TYPE_INVALID_REQUEST)); @@ -2635,7 +2636,6 @@ static std::unique_ptr handle_completions_impl( json arr = json::array(); for (auto & res : all_results.results) { GGML_ASSERT(dynamic_cast(res.get()) != nullptr); - res->update(states[res->get_index()]); // update generation state arr.push_back(res->to_json()); } // if single request, return single object instead of array @@ -2656,7 +2656,6 @@ static std::unique_ptr handle_completions_impl( dynamic_cast(first_result.get()) != nullptr || dynamic_cast(first_result.get()) != nullptr ); - first_result->update(states[first_result->get_index()]); // update generation state } // next responses are streamed @@ -2669,7 +2668,7 @@ static std::unique_ptr handle_completions_impl( } res->status = 200; res->content_type = "text/event-stream"; - res->next = [res_this = res.get(), res_type, &should_stop, states = std::move(states)](std::string & output) mutable -> bool { + res->next = [res_this = res.get(), res_type, &should_stop](std::string & output) mutable -> bool { static auto format_error = [](task_response_type res_type, const json & res_json) { if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { return format_anthropic_sse({ @@ -2728,7 +2727,6 @@ static std::unique_ptr handle_completions_impl( dynamic_cast(result.get()) != nullptr || dynamic_cast(result.get()) != nullptr ); - result->update(states[result->get_index()]); // update generation state json res_json = result->to_json(); if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { output = format_anthropic_sse(res_json); diff --git a/tools/server/server-queue.cpp b/tools/server/server-queue.cpp index 38a4858522e..a63ba90c8b0 100644 --- a/tools/server/server-queue.cpp +++ b/tools/server/server-queue.cpp @@ -271,6 +271,10 @@ void server_response::terminate() { // server_response_reader // +void server_response_reader::set_states(std::vector && states) { + this->states = std::move(states); +} + void server_response_reader::post_tasks(std::vector && tasks) { id_tasks = server_task::get_list_id(tasks); queue_results.add_waiting_tasks(tasks); @@ -298,6 +302,12 @@ server_task_result_ptr server_response_reader::next(const std::function SRV_DBG("%s", "received error result, stopping further processing\n"); return result; } + if (!states.empty()) { + // update the generation state if needed + auto idx = result->get_index(); + GGML_ASSERT(idx >= 0 && idx < states.size()); + result->update(states[idx]); + } if (result->is_stop()) { received_count++; } diff --git a/tools/server/server-queue.h b/tools/server/server-queue.h index 209d2017c7e..a5c3179d8ca 100644 --- a/tools/server/server-queue.h +++ b/tools/server/server-queue.h @@ -7,6 +7,8 @@ #include #include +// struct for managing server tasks +// in most cases, use server_response_reader to post new tasks and retrieve results struct server_queue { private: int id = 0; @@ -67,6 +69,8 @@ struct server_queue { void cleanup_pending_task(int id_target); }; +// struct for managing server responses +// in most cases, use server_response_reader to retrieve results struct server_response { private: bool running = true; @@ -120,6 +124,10 @@ struct server_response_reader { bool cancelled = false; int polling_interval_seconds; + // tracking generation state and partial tool calls + // only used by streaming completions + std::vector states; + // should_stop function will be called each polling_interval_seconds server_response_reader(std::pair server_queues, int polling_interval_seconds) : queue_tasks(server_queues.first), queue_results(server_queues.second), polling_interval_seconds(polling_interval_seconds) {} @@ -127,6 +135,7 @@ struct server_response_reader { stop(); } + void set_states(std::vector && states); void post_tasks(std::vector && tasks); bool has_next() const; From a7efdeedee146578db546abd76e30c19210fbb6f Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Thu, 4 Dec 2025 13:39:11 +0100 Subject: [PATCH 7/9] add safe-guard --- tools/server/server-task.cpp | 2 ++ tools/server/server-task.h | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/tools/server/server-task.cpp b/tools/server/server-task.cpp index 980e4904943..df066264778 100644 --- a/tools/server/server-task.cpp +++ b/tools/server/server-task.cpp @@ -565,6 +565,7 @@ std::vector completion_token_output::str_to_bytes(const std::stri // server_task_result_cmpl_final // json server_task_result_cmpl_final::to_json() { + GGML_ASSERT(is_updated && "update() must be called before to_json()"); switch (res_type) { case TASK_RESPONSE_TYPE_NONE: return to_json_non_oaicompat(); @@ -975,6 +976,7 @@ json server_task_result_cmpl_final::to_json_anthropic_stream() { // server_task_result_cmpl_partial // json server_task_result_cmpl_partial::to_json() { + GGML_ASSERT(is_updated && "update() must be called before to_json()"); switch (res_type) { case TASK_RESPONSE_TYPE_NONE: return to_json_non_oaicompat(); diff --git a/tools/server/server-task.h b/tools/server/server-task.h index 5d8d9f8281f..8e7b9e3e310 100644 --- a/tools/server/server-task.h +++ b/tools/server/server-task.h @@ -258,6 +258,7 @@ struct server_task_result_cmpl_final : server_task_result { common_chat_msg oaicompat_msg; // to be populated by update() std::vector oaicompat_msg_diffs; // to be populated by update() + bool is_updated = false; virtual int get_index() override { return index; @@ -270,6 +271,7 @@ struct server_task_result_cmpl_final : server_task_result { virtual json to_json() override; virtual void update(task_result_state & state) override { + is_updated = true; oaicompat_msg = state.update_chat_msg(content, false, oaicompat_msg_diffs); } @@ -307,6 +309,7 @@ struct server_task_result_cmpl_partial : server_task_result { std::string oaicompat_model; std::string oaicompat_cmpl_id; std::vector oaicompat_msg_diffs; // to be populated by update() + bool is_updated = false; virtual int get_index() override { return index; @@ -319,6 +322,7 @@ struct server_task_result_cmpl_partial : server_task_result { virtual json to_json() override; virtual void update(task_result_state & state) override { + is_updated = true; state.update_chat_msg(content, true, oaicompat_msg_diffs); } From ad7220709a96e408a35f360a4e934d0bd1a2d46d Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Thu, 4 Dec 2025 13:43:43 +0100 Subject: [PATCH 8/9] fix --- tools/server/server-context.cpp | 2 +- tools/server/server-queue.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index b2116316dc2..f3f2edc0cc4 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -2668,7 +2668,7 @@ static std::unique_ptr handle_completions_impl( } res->status = 200; res->content_type = "text/event-stream"; - res->next = [res_this = res.get(), res_type, &should_stop](std::string & output) mutable -> bool { + res->next = [res_this = res.get(), res_type, &should_stop](std::string & output) -> bool { static auto format_error = [](task_response_type res_type, const json & res_json) { if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) { return format_anthropic_sse({ diff --git a/tools/server/server-queue.cpp b/tools/server/server-queue.cpp index a63ba90c8b0..5091047342b 100644 --- a/tools/server/server-queue.cpp +++ b/tools/server/server-queue.cpp @@ -304,7 +304,7 @@ server_task_result_ptr server_response_reader::next(const std::function } if (!states.empty()) { // update the generation state if needed - auto idx = result->get_index(); + size_t idx = result->get_index(); GGML_ASSERT(idx >= 0 && idx < states.size()); result->update(states[idx]); } From 1c30c28d6ff5f2194378e94ff9d77e5eb6b602b0 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Thu, 4 Dec 2025 13:54:30 +0100 Subject: [PATCH 9/9] fix 2 --- tools/server/server-queue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/server/server-queue.cpp b/tools/server/server-queue.cpp index 5091047342b..10196128db1 100644 --- a/tools/server/server-queue.cpp +++ b/tools/server/server-queue.cpp @@ -305,7 +305,7 @@ server_task_result_ptr server_response_reader::next(const std::function if (!states.empty()) { // update the generation state if needed size_t idx = result->get_index(); - GGML_ASSERT(idx >= 0 && idx < states.size()); + GGML_ASSERT(idx < states.size()); result->update(states[idx]); } if (result->is_stop()) {