From 47ea8b0ec849e0a54f0de021621749de805d010f Mon Sep 17 00:00:00 2001 From: wydrox <79707825+wydrox@users.noreply.github.com> Date: Thu, 26 Mar 2026 01:12:48 +0100 Subject: [PATCH] =?UTF-8?q?refactor:=20simplify=20app=20=E2=80=94=20dedupl?= =?UTF-8?q?icate=20think-tag=20streaming=20and=20model=20resolution?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract shared think-tag stream processor, deduplicate _resolve_model_path across engines into models.py, remove setproctitle dependency. Reduces ~200 lines of duplicated code. 156 tests pass. Addresses 8LI-314. Co-Authored-By: Claude Opus 4.6 (1M context) --- ppmlx/engine.py | 14 +- ppmlx/engine_embed.py | 9 +- ppmlx/engine_vlm.py | 9 +- ppmlx/models.py | 9 + ppmlx/server.py | 401 +++++++++++++++--------------------------- pyproject.toml | 2 - tests/conftest.py | 5 +- 7 files changed, 165 insertions(+), 284 deletions(-) diff --git a/ppmlx/engine.py b/ppmlx/engine.py index b43fc2b..beed82b 100644 --- a/ppmlx/engine.py +++ b/ppmlx/engine.py @@ -61,18 +61,12 @@ def _strip_thinking(text: str) -> tuple[str, str | None]: def _resolve_model_path(repo_id: str) -> str: - """ - Resolve a repo_id to a local path if available, otherwise return the repo_id - for direct HuggingFace loading. - """ + """Resolve a repo_id to a local path if available, otherwise return the repo_id.""" try: - from ppmlx.models import get_model_path - local = get_model_path(repo_id) - if local: - return str(local) + from ppmlx.models import resolve_model_path + return resolve_model_path(repo_id) except ImportError: - pass - return repo_id + return repo_id class TextEngine: diff --git a/ppmlx/engine_embed.py b/ppmlx/engine_embed.py index 3607825..859a90d 100644 --- a/ppmlx/engine_embed.py +++ b/ppmlx/engine_embed.py @@ -6,13 +6,10 @@ def _resolve_model_path(repo_id: str) -> str: try: - from ppmlx.models import get_model_path - local = get_model_path(repo_id) - if local: - return str(local) + from ppmlx.models import resolve_model_path + return resolve_model_path(repo_id) except ImportError: - pass - return repo_id + return repo_id def _l2_normalize(vec: list[float]) -> list[float]: diff --git a/ppmlx/engine_vlm.py b/ppmlx/engine_vlm.py index f9687f3..e9f4988 100644 --- a/ppmlx/engine_vlm.py +++ b/ppmlx/engine_vlm.py @@ -9,13 +9,10 @@ def _resolve_model_path(repo_id: str) -> str: """Resolve alias to local path if available.""" try: - from ppmlx.models import get_model_path - local = get_model_path(repo_id) - if local: - return str(local) + from ppmlx.models import resolve_model_path + return resolve_model_path(repo_id) except ImportError: - pass - return repo_id + return repo_id class VisionEngine: diff --git a/ppmlx/models.py b/ppmlx/models.py index 64003ed..b14ab99 100644 --- a/ppmlx/models.py +++ b/ppmlx/models.py @@ -395,6 +395,15 @@ def _bg_download() -> None: return local_path +def resolve_model_path(repo_id: str) -> str: + """Resolve a repo_id to a local path if available, otherwise return the + repo_id for direct HuggingFace loading.""" + local = get_model_path(repo_id) + if local: + return str(local) + return repo_id + + def get_model_path(alias_or_repo: str) -> Path | None: """Return local path if model exists, else None.""" try: diff --git a/ppmlx/server.py b/ppmlx/server.py index e03a879..987738a 100644 --- a/ppmlx/server.py +++ b/ppmlx/server.py @@ -961,6 +961,95 @@ def _producer(): yield item +# ── Shared think-tag stream parser ────────────────────────────────── + +async def _parse_think_tags(raw_stream): + """Parse ``...`` tags from a raw token stream. + + Yields tuples describing what was parsed: + + * ``("thinking", chunk)`` — a chunk of reasoning text + * ``("thinking_done", full_reasoning)`` — thinking block closed + * ``("text", chunk)`` — a chunk of answer text + * ``("flush_text", full_text)`` — final flush of any buffered text + + Assumes the model may start *inside* a thinking block (Qwen3 injects + ```` into the generation prompt). + """ + in_thinking = True # assume starting inside think block + buf = "" + reasoning_text = "" + full_text = "" + + async for chunk in raw_stream: + buf += chunk + while buf: + if not in_thinking: + think_pos = buf.find("") + close_pos = buf.find("") + if think_pos == 0: + in_thinking = True + buf = buf[len(""):] + continue + elif close_pos == 0: + # Closing tag without matching open (template-injected) + buf = buf[len(""):] + if reasoning_text: + yield ("thinking_done", reasoning_text) + continue + else: + # Check for partial tag at end of buffer + partial = any( + buf.endswith(t[:i]) + for t in ("", "") + for i in range(1, len(t)) + ) + if partial: + break + text_chunk = buf[:think_pos] if think_pos > 0 else buf + buf = buf[len(text_chunk):] + if text_chunk: + full_text += text_chunk + yield ("text", text_chunk) + if not buf: + break + else: + close_pos = buf.find("") + if close_pos >= 0: + think_chunk = buf[:close_pos] + buf = buf[close_pos + len(""):] + in_thinking = False + if think_chunk: + reasoning_text += think_chunk + yield ("thinking", think_chunk) + yield ("thinking_done", reasoning_text) + continue + else: + partial_len = 0 + for i in range(1, len("")): + if buf.endswith(""[:i]): + partial_len = i + break + safe = buf[:len(buf) - partial_len] if partial_len else buf + buf = buf[len(safe):] if partial_len else "" + if safe: + reasoning_text += safe + yield ("thinking", safe) + break + + # Flush remaining buffer + if buf: + if in_thinking: + reasoning_text += buf + yield ("thinking", buf) + yield ("thinking_done", reasoning_text) + else: + full_text += buf + yield ("text", buf) + + yield ("flush_text", full_text) + + def _stream_responses( resp_id, msg_id, created, model_name, repo_id, messages, engine_type, temperature, top_p, max_tokens, request, @@ -1024,12 +1113,7 @@ async def event_generator(): strip_thinking=False, # Handle thinking in server ) - # Qwen3 template injects into the prompt, so model - # output starts inside a thinking block. Start in thinking - # mode and emit a reasoning output item immediately. - in_thinking = True - msg_item_emitted = False - buf = "" + # Start with reasoning item (Qwen3 starts inside think block) reasoning_idx = 0 msg_output_idx = 1 rs_id = "rs_" + uuid.uuid4().hex[:12] @@ -1038,100 +1122,27 @@ async def event_generator(): "item": {"id": rs_id, "type": "reasoning", "summary": []}, }, seq) - async for chunk in _async_iter_sync_gen(gen): - buf += chunk - while buf: - if not in_thinking: - think_pos = buf.find("") - close_pos = buf.find("") - if think_pos == 0: - in_thinking = True - buf = buf[len(""):] - if reasoning_idx is None: - reasoning_idx = 0 - msg_output_idx = 1 - rs_id = "rs_" + uuid.uuid4().hex[:12] - yield _sse("response.output_item.added", { - "output_index": reasoning_idx, - "item": {"id": rs_id, "type": "reasoning", "summary": []}, - }, seq) - continue - elif close_pos == 0: - buf = buf[len(""):] - if reasoning_idx is not None: - yield _sse("response.output_item.done", { - "output_index": reasoning_idx, - "item": {"id": rs_id, "type": "reasoning", - "summary": [{"type": "summary_text", "text": reasoning_text}]}, - }, seq) - continue - else: - # Check for partial tag - partial = any(buf.endswith(t[:i]) - for t in ("", "") - for i in range(1, len(t))) - if partial: - break - # Plain text — buffer it (don't stream yet). - # We'll emit clean text after parsing tool calls. - text_chunk = buf[:think_pos] if think_pos > 0 else buf - buf = buf[len(text_chunk):] - if text_chunk: - full_text += text_chunk - if not buf: - break - else: - close_pos = buf.find("") - if close_pos >= 0: - think_chunk = buf[:close_pos] - buf = buf[close_pos + len(""):] - in_thinking = False - if think_chunk: - reasoning_text += think_chunk - yield _sse("response.reasoning_summary_text.delta", { - "output_index": reasoning_idx, - "delta": think_chunk, - }, seq) - yield _sse("response.reasoning_summary_text.done", { - "output_index": reasoning_idx, - "text": reasoning_text, - }, seq) - yield _sse("response.output_item.done", { - "output_index": reasoning_idx, - "item": {"id": rs_id, "type": "reasoning", - "summary": [{"type": "summary_text", "text": reasoning_text}]}, - }, seq) - continue - else: - # Partial check - partial_len = 0 - for i in range(1, len("")): - if buf.endswith(""[:i]): - partial_len = i - break - safe = buf[:len(buf) - partial_len] if partial_len else buf - buf = buf[len(safe):] if partial_len else "" - if safe: - reasoning_text += safe - if reasoning_idx is None: - # Template-injected thinking - reasoning_idx = 0 - msg_output_idx = 1 - rs_id = "rs_" + uuid.uuid4().hex[:12] - in_thinking = True - yield _sse("response.output_item.added", { - "output_index": reasoning_idx, - "item": {"id": rs_id, "type": "reasoning", "summary": []}, - }, seq) - yield _sse("response.reasoning_summary_text.delta", { - "output_index": reasoning_idx, - "delta": safe, - }, seq) - break - - # Flush remaining buf as text (buffered, not streamed) - if buf and not in_thinking: - full_text += buf + raw_stream = _async_iter_sync_gen(gen) + async for kind, data in _parse_think_tags(raw_stream): + if kind == "thinking": + yield _sse("response.reasoning_summary_text.delta", { + "output_index": reasoning_idx, + "delta": data, + }, seq) + elif kind == "thinking_done": + reasoning_text = data + yield _sse("response.reasoning_summary_text.done", { + "output_index": reasoning_idx, + "text": reasoning_text, + }, seq) + yield _sse("response.output_item.done", { + "output_index": reasoning_idx, + "item": {"id": rs_id, "type": "reasoning", + "summary": [{"type": "summary_text", "text": reasoning_text}]}, + }, seq) + elif kind == "flush_text": + full_text = data + elif engine_type == "vision": from ppmlx.engine_vlm import get_vision_engine engine = get_vision_engine() @@ -1470,16 +1481,8 @@ async def event_generator(): full_text = "" content_idx = 0 - - # State machine: track whether we're inside or in text. - # We use strip_thinking=False so we get raw tokens including - # ..., then emit them as thinking_delta / text_delta. - # Start in thinking mode — Qwen3 template injects into - # the prompt so the model starts generating inside a thinking block. - in_thinking = True - thinking_started = True + thinking_started = False text_started = False - buf = "" try: from ppmlx.engine import get_engine @@ -1489,164 +1492,41 @@ async def event_generator(): temperature=0.7 if temperature is None else temperature, max_tokens=max_tokens, tools=oai_tools, - strip_thinking=False, # We handle thinking/text separation here + strip_thinking=False, ) - # Emit thinking block start immediately + # Emit thinking block start immediately (Qwen3 starts inside think) yield _anthropic_sse({ "type": "content_block_start", "index": content_idx, "content_block": {"type": "thinking", "thinking": ""}, }) + thinking_started = True - async for chunk in _async_iter_sync_gen(gen): - buf += chunk - - # Detect transitions between thinking and text - while buf: - if not in_thinking: - # Look for to start thinking block - think_pos = buf.find("") - close_pos = buf.find("") - - if think_pos == 0: - # Start thinking block - in_thinking = True - buf = buf[len(""):] - if not thinking_started: - thinking_started = True - yield _anthropic_sse({ - "type": "content_block_start", - "index": content_idx, - "content_block": {"type": "thinking", "thinking": ""}, - }) - continue - elif close_pos == 0: - # Closing tag without opening — template injected - # into prompt, so we started inside thinking - buf = buf[len(""):] - if thinking_started: - yield _anthropic_sse({ - "type": "content_block_stop", - "index": content_idx, - }) - content_idx += 1 - thinking_started = False - continue - elif think_pos > 0: - # Text before - text_chunk = buf[:think_pos] - buf = buf[think_pos:] - if text_chunk.strip(): - if not text_started: - text_started = True - yield _anthropic_sse({ - "type": "content_block_start", - "index": content_idx, - "content_block": {"type": "text", "text": ""}, - }) - full_text += text_chunk - yield _anthropic_sse({ - "type": "content_block_delta", - "index": content_idx, - "delta": {"type": "text_delta", "text": text_chunk}, - }) - continue - else: - # No tag found — might be partial tag at end of buf - # Check for partial "", ""): - for i in range(1, len(tag)): - if buf.endswith(tag[:i]): - partial = True - break - if partial: - break - if partial: - break # Wait for more data - - # Plain text, no partial tags - text_chunk = buf - buf = "" - if text_chunk: - if not text_started: - text_started = True - yield _anthropic_sse({ - "type": "content_block_start", - "index": content_idx, - "content_block": {"type": "text", "text": ""}, - }) - full_text += text_chunk - yield _anthropic_sse({ - "type": "content_block_delta", - "index": content_idx, - "delta": {"type": "text_delta", "text": text_chunk}, - }) - break - else: - # Inside thinking block — look for - close_pos = buf.find("") - if close_pos >= 0: - think_chunk = buf[:close_pos] - buf = buf[close_pos + len(""):] - in_thinking = False - if think_chunk: - yield _anthropic_sse({ - "type": "content_block_delta", - "index": content_idx, - "delta": {"type": "thinking_delta", "thinking": think_chunk}, - }) - yield _anthropic_sse({ - "type": "content_block_stop", - "index": content_idx, - }) - content_idx += 1 - thinking_started = False - continue - else: - # Check for partial "")): - if buf.endswith(""[:i]): - partial = True - break - if partial: - # Emit everything except the partial tag - safe = buf[:len(buf) - i] - buf = buf[len(buf) - i:] - else: - safe = buf - buf = "" - if safe: - if not thinking_started: - # Template injected , we start mid-think - thinking_started = True - in_thinking = True - yield _anthropic_sse({ - "type": "content_block_start", - "index": content_idx, - "content_block": {"type": "thinking", "thinking": ""}, - }) - yield _anthropic_sse({ - "type": "content_block_delta", - "index": content_idx, - "delta": {"type": "thinking_delta", "thinking": safe}, - }) - break - - # Flush remaining buffer - if buf: - if in_thinking or thinking_started: - if buf.strip(): + raw_stream = _async_iter_sync_gen(gen) + async for kind, chunk in _parse_think_tags(raw_stream): + if kind == "thinking": + if not thinking_started: + thinking_started = True yield _anthropic_sse({ - "type": "content_block_delta", + "type": "content_block_start", "index": content_idx, - "delta": {"type": "thinking_delta", "thinking": buf}, + "content_block": {"type": "thinking", "thinking": ""}, }) - yield _anthropic_sse({"type": "content_block_stop", "index": content_idx}) - content_idx += 1 - else: + yield _anthropic_sse({ + "type": "content_block_delta", + "index": content_idx, + "delta": {"type": "thinking_delta", "thinking": chunk}, + }) + elif kind == "thinking_done": + if thinking_started: + yield _anthropic_sse({ + "type": "content_block_stop", + "index": content_idx, + }) + content_idx += 1 + thinking_started = False + elif kind == "text": if not text_started: text_started = True yield _anthropic_sse({ @@ -1654,13 +1534,16 @@ async def event_generator(): "index": content_idx, "content_block": {"type": "text", "text": ""}, }) - full_text += buf yield _anthropic_sse({ "type": "content_block_delta", "index": content_idx, - "delta": {"type": "text_delta", "text": buf}, + "delta": {"type": "text_delta", "text": chunk}, }) - elif thinking_started: + elif kind == "flush_text": + full_text = chunk + + # Close thinking block if still open + if thinking_started: yield _anthropic_sse({"type": "content_block_stop", "index": content_idx}) content_idx += 1 diff --git a/pyproject.toml b/pyproject.toml index 64c265a..66624bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,9 +32,7 @@ dependencies = [ "sse-starlette>=2.0", "httpx>=0.27", "questionary>=2.0", - "prompt-toolkit>=3.0", "tomli-w>=1.0", - "setproctitle>=1.3", ] [project.optional-dependencies] diff --git a/tests/conftest.py b/tests/conftest.py index fe07260..39a8bbc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,7 +29,10 @@ def _stub(name: str) -> types.ModuleType: import ppmlx.db # noqa: E402 import ppmlx.models # noqa: E402 import ppmlx.memory # noqa: E402 -import ppmlx.modelfile # noqa: E402 +try: + import ppmlx.modelfile # noqa: E402 +except ImportError: + pass # module may have been removed import ppmlx.quantize # noqa: E402 import ppmlx.engine # noqa: E402 import ppmlx.engine_embed # noqa: E402