From 439c6dbd2e8800f0e63ee811e5e3bdaaab1e8103 Mon Sep 17 00:00:00 2001 From: "jiangqi.rrt" Date: Mon, 22 Sep 2025 22:40:38 +0800 Subject: [PATCH 1/5] feat: [Coda] add header injection mechanism for trace context (LogID: 20250922205816010091118021641B5E5) Co-Authored-By: Coda --- cozeloop/_client.py | 21 ++++++++++++++++++++- cozeloop/internal/httpclient/client.py | 17 +++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/cozeloop/_client.py b/cozeloop/_client.py index 657cb9d..b620151 100644 --- a/cozeloop/_client.py +++ b/cozeloop/_client.py @@ -74,6 +74,7 @@ def new_client( tag_truncate_conf: Optional[TagTruncateConf] = None, api_base_path: Optional[APIBasePath] = None, trace_queue_conf: Optional[QueueConf] = None, + header_injector: Optional[Callable[[], Dict[str, str]]] = None, ) -> Client: cache_key = _generate_cache_key( # all args are used to generate cache key api_base_url, @@ -93,6 +94,7 @@ def new_client( tag_truncate_conf, api_base_path, trace_queue_conf, + header_injector, ) with _cache_lock: @@ -118,6 +120,7 @@ def new_client( tag_truncate_conf=tag_truncate_conf, api_base_path=api_base_path, trace_queue_conf=trace_queue_conf, + header_injector=header_injector, ) _client_cache[cache_key] = client return client @@ -149,6 +152,7 @@ def __init__( tag_truncate_conf: Optional[TagTruncateConf] = None, api_base_path: Optional[APIBasePath] = None, trace_queue_conf: Optional[QueueConf] = None, + header_injector: Optional[Callable[[], Dict[str, str]]] = None, ): workspace_id = self._get_from_env(workspace_id, ENV_WORKSPACE_ID) api_base_url = self._get_from_env(api_base_url, ENV_API_BASE_URL) @@ -181,12 +185,27 @@ def __init__( jwt_oauth_private_key=jwt_oauth_private_key, jwt_oauth_public_key_id=jwt_oauth_public_key_id ) + + # 创建默认header注入函数 + if header_injector is None: + def default_header_injector() -> Dict[str, str]: + try: + span = self.get_span_from_context() + if span and hasattr(span, 'to_header'): + return span.to_header() + except Exception: + # 静默处理异常,不影响正常请求 + pass + return {} + header_injector = default_header_injector + http_client = httpclient.Client( api_base_url=api_base_url, http_client=inner_client, auth=auth, timeout=timeout, - upload_timeout=upload_timeout + upload_timeout=upload_timeout, + header_injector=header_injector ) finish_pro = default_finish_event_processor if trace_finish_event_processor: diff --git a/cozeloop/internal/httpclient/client.py b/cozeloop/internal/httpclient/client.py index 4e36d64..e8210cb 100644 --- a/cozeloop/internal/httpclient/client.py +++ b/cozeloop/internal/httpclient/client.py @@ -3,7 +3,7 @@ import logging import os -from typing import Optional, Dict, Union, IO, Type, Tuple, Any +from typing import Optional, Dict, Union, IO, Type, Tuple, Any, Callable import httpx import pydantic @@ -29,18 +29,30 @@ def __init__( auth: Auth, timeout: int = consts.DEFAULT_TIMEOUT, upload_timeout: int = consts.DEFAULT_UPLOAD_TIMEOUT, + header_injector: Optional[Callable[[], Dict[str, str]]] = None, ): self.api_base_url = api_base_url self.http_client = http_client self.auth = auth self.timeout = timeout self.upload_timeout = upload_timeout + self.header_injector = header_injector def _build_url(self, path: str) -> str: return f"{self.api_base_url}{path}" - def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: res = user_agent_header() + + # 注入header + if self.header_injector: + try: + injected_headers = self.header_injector() + if injected_headers: + res.update(injected_headers) + except Exception as e: + # 静默处理异常,不影响正常请求 + logger.debug(f"Header injection failed: {e}") + if headers: res.update(headers) res[consts.AUTHORIZE_HEADER] = f"Bearer {self.auth.token}" @@ -53,6 +65,7 @@ def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, st res["x-use-ppe"] = "1" return res + return res def request( self, From 61c8300704b3422a5f9799a3dd62726ba11bb83a Mon Sep 17 00:00:00 2001 From: "jiangqi.rrt" Date: Mon, 22 Sep 2025 22:46:40 +0800 Subject: [PATCH 2/5] feat: [Coda] add fixed trace header injection in HTTP client (LogID: 20250922205816010091118021641B5E5) Co-Authored-By: Coda --- cozeloop/_client.py | 18 ------------------ cozeloop/internal/httpclient/client.py | 19 +++++++++---------- 2 files changed, 9 insertions(+), 28 deletions(-) diff --git a/cozeloop/_client.py b/cozeloop/_client.py index b620151..f6614a7 100644 --- a/cozeloop/_client.py +++ b/cozeloop/_client.py @@ -74,7 +74,6 @@ def new_client( tag_truncate_conf: Optional[TagTruncateConf] = None, api_base_path: Optional[APIBasePath] = None, trace_queue_conf: Optional[QueueConf] = None, - header_injector: Optional[Callable[[], Dict[str, str]]] = None, ) -> Client: cache_key = _generate_cache_key( # all args are used to generate cache key api_base_url, @@ -94,7 +93,6 @@ def new_client( tag_truncate_conf, api_base_path, trace_queue_conf, - header_injector, ) with _cache_lock: @@ -120,7 +118,6 @@ def new_client( tag_truncate_conf=tag_truncate_conf, api_base_path=api_base_path, trace_queue_conf=trace_queue_conf, - header_injector=header_injector, ) _client_cache[cache_key] = client return client @@ -152,7 +149,6 @@ def __init__( tag_truncate_conf: Optional[TagTruncateConf] = None, api_base_path: Optional[APIBasePath] = None, trace_queue_conf: Optional[QueueConf] = None, - header_injector: Optional[Callable[[], Dict[str, str]]] = None, ): workspace_id = self._get_from_env(workspace_id, ENV_WORKSPACE_ID) api_base_url = self._get_from_env(api_base_url, ENV_API_BASE_URL) @@ -186,26 +182,12 @@ def __init__( jwt_oauth_public_key_id=jwt_oauth_public_key_id ) - # 创建默认header注入函数 - if header_injector is None: - def default_header_injector() -> Dict[str, str]: - try: - span = self.get_span_from_context() - if span and hasattr(span, 'to_header'): - return span.to_header() - except Exception: - # 静默处理异常,不影响正常请求 - pass - return {} - header_injector = default_header_injector - http_client = httpclient.Client( api_base_url=api_base_url, http_client=inner_client, auth=auth, timeout=timeout, upload_timeout=upload_timeout, - header_injector=header_injector ) finish_pro = default_finish_event_processor if trace_finish_event_processor: diff --git a/cozeloop/internal/httpclient/client.py b/cozeloop/internal/httpclient/client.py index e8210cb..b2bdb7f 100644 --- a/cozeloop/internal/httpclient/client.py +++ b/cozeloop/internal/httpclient/client.py @@ -29,29 +29,29 @@ def __init__( auth: Auth, timeout: int = consts.DEFAULT_TIMEOUT, upload_timeout: int = consts.DEFAULT_UPLOAD_TIMEOUT, - header_injector: Optional[Callable[[], Dict[str, str]]] = None, ): self.api_base_url = api_base_url self.http_client = http_client self.auth = auth self.timeout = timeout self.upload_timeout = upload_timeout - self.header_injector = header_injector def _build_url(self, path: str) -> str: return f"{self.api_base_url}{path}" def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: res = user_agent_header() - # 注入header - if self.header_injector: - try: - injected_headers = self.header_injector() + # 固定注入span header + try: + from cozeloop._client import get_span_from_context + span = get_span_from_context() + if span and hasattr(span, 'to_header'): + injected_headers = span.to_header() if injected_headers: res.update(injected_headers) - except Exception as e: - # 静默处理异常,不影响正常请求 - logger.debug(f"Header injection failed: {e}") + except Exception as e: + # 静默处理异常,不影响正常请求 + logger.debug(f"Header injection failed: {e}") if headers: res.update(headers) @@ -65,7 +65,6 @@ def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, st res["x-use-ppe"] = "1" return res - return res def request( self, From 021daa5face03f65c244b240260c6556557b6477 Mon Sep 17 00:00:00 2001 From: "jiangqi.rrt" Date: Tue, 23 Sep 2025 10:14:07 +0800 Subject: [PATCH 3/5] refactor: [Coda] move header injection logic to _client.py (LogID: 20250922205816010091118021641B5E5) Co-Authored-By: Coda --- cozeloop/_client.py | 22 ++++++++++++++++++++++ cozeloop/internal/httpclient/client.py | 18 +++++++++--------- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/cozeloop/_client.py b/cozeloop/_client.py index f6614a7..96611e0 100644 --- a/cozeloop/_client.py +++ b/cozeloop/_client.py @@ -74,6 +74,7 @@ def new_client( tag_truncate_conf: Optional[TagTruncateConf] = None, api_base_path: Optional[APIBasePath] = None, trace_queue_conf: Optional[QueueConf] = None, + header_injector: Optional[Callable[[], Dict[str, str]]] = None, ) -> Client: cache_key = _generate_cache_key( # all args are used to generate cache key api_base_url, @@ -93,6 +94,7 @@ def new_client( tag_truncate_conf, api_base_path, trace_queue_conf, + header_injector, ) with _cache_lock: @@ -118,6 +120,7 @@ def new_client( tag_truncate_conf=tag_truncate_conf, api_base_path=api_base_path, trace_queue_conf=trace_queue_conf, + header_injector=header_injector, ) _client_cache[cache_key] = client return client @@ -149,6 +152,7 @@ def __init__( tag_truncate_conf: Optional[TagTruncateConf] = None, api_base_path: Optional[APIBasePath] = None, trace_queue_conf: Optional[QueueConf] = None, + header_injector: Optional[Callable[[], Dict[str, str]]] = None, ): workspace_id = self._get_from_env(workspace_id, ENV_WORKSPACE_ID) api_base_url = self._get_from_env(api_base_url, ENV_API_BASE_URL) @@ -182,12 +186,17 @@ def __init__( jwt_oauth_public_key_id=jwt_oauth_public_key_id ) + # 创建默认的header注入函数 + if header_injector is None: + header_injector = self._create_default_header_injector() + http_client = httpclient.Client( api_base_url=api_base_url, http_client=inner_client, auth=auth, timeout=timeout, upload_timeout=upload_timeout, + header_injector=header_injector, ) finish_pro = default_finish_event_processor if trace_finish_event_processor: @@ -220,6 +229,19 @@ def combined_processor(event_info: FinishEventInfo): prompt_trace=prompt_trace ) + def _create_default_header_injector(self) -> Callable[[], Dict[str, str]]: + """创建默认的header注入函数,包含span header注入逻辑""" + def default_header_injector() -> Dict[str, str]: + try: + span = self.get_span_from_context() + if span and hasattr(span, 'to_header'): + return span.to_header() + except Exception: + # 静默处理异常,不影响正常请求 + pass + return {} + return default_header_injector + def _get_from_env(self, val: str, env_key: str) -> str: if val: return val diff --git a/cozeloop/internal/httpclient/client.py b/cozeloop/internal/httpclient/client.py index b2bdb7f..488545d 100644 --- a/cozeloop/internal/httpclient/client.py +++ b/cozeloop/internal/httpclient/client.py @@ -29,29 +29,29 @@ def __init__( auth: Auth, timeout: int = consts.DEFAULT_TIMEOUT, upload_timeout: int = consts.DEFAULT_UPLOAD_TIMEOUT, + header_injector: Optional[Callable[[], Dict[str, str]]] = None, ): self.api_base_url = api_base_url self.http_client = http_client self.auth = auth self.timeout = timeout self.upload_timeout = upload_timeout + self.header_injector = header_injector def _build_url(self, path: str) -> str: return f"{self.api_base_url}{path}" def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: res = user_agent_header() - # 固定注入span header - try: - from cozeloop._client import get_span_from_context - span = get_span_from_context() - if span and hasattr(span, 'to_header'): - injected_headers = span.to_header() + # 调用传入的header注入函数 + if self.header_injector: + try: + injected_headers = self.header_injector() if injected_headers: res.update(injected_headers) - except Exception as e: - # 静默处理异常,不影响正常请求 - logger.debug(f"Header injection failed: {e}") + except Exception as e: + # 静默处理异常,不影响正常请求 + logger.debug(f"Header injection failed: {e}") if headers: res.update(headers) From 815e7ce3295d247a7c2cf645b513db9a9007d544 Mon Sep 17 00:00:00 2001 From: "jiangqi.rrt" Date: Tue, 23 Sep 2025 20:57:55 +0800 Subject: [PATCH 4/5] modify header_injector --- cozeloop/_client.py | 14 ++------------ cozeloop/internal/httpclient/client.py | 19 ++++++++----------- examples/prompt/ptaas/ptaas.py | 6 +++++- 3 files changed, 15 insertions(+), 24 deletions(-) diff --git a/cozeloop/_client.py b/cozeloop/_client.py index 96611e0..61b2d6e 100644 --- a/cozeloop/_client.py +++ b/cozeloop/_client.py @@ -74,7 +74,6 @@ def new_client( tag_truncate_conf: Optional[TagTruncateConf] = None, api_base_path: Optional[APIBasePath] = None, trace_queue_conf: Optional[QueueConf] = None, - header_injector: Optional[Callable[[], Dict[str, str]]] = None, ) -> Client: cache_key = _generate_cache_key( # all args are used to generate cache key api_base_url, @@ -94,7 +93,6 @@ def new_client( tag_truncate_conf, api_base_path, trace_queue_conf, - header_injector, ) with _cache_lock: @@ -120,7 +118,6 @@ def new_client( tag_truncate_conf=tag_truncate_conf, api_base_path=api_base_path, trace_queue_conf=trace_queue_conf, - header_injector=header_injector, ) _client_cache[cache_key] = client return client @@ -152,7 +149,6 @@ def __init__( tag_truncate_conf: Optional[TagTruncateConf] = None, api_base_path: Optional[APIBasePath] = None, trace_queue_conf: Optional[QueueConf] = None, - header_injector: Optional[Callable[[], Dict[str, str]]] = None, ): workspace_id = self._get_from_env(workspace_id, ENV_WORKSPACE_ID) api_base_url = self._get_from_env(api_base_url, ENV_API_BASE_URL) @@ -185,18 +181,14 @@ def __init__( jwt_oauth_private_key=jwt_oauth_private_key, jwt_oauth_public_key_id=jwt_oauth_public_key_id ) - - # 创建默认的header注入函数 - if header_injector is None: - header_injector = self._create_default_header_injector() - + http_client = httpclient.Client( api_base_url=api_base_url, http_client=inner_client, auth=auth, timeout=timeout, upload_timeout=upload_timeout, - header_injector=header_injector, + header_injector=self._create_default_header_injector(), ) finish_pro = default_finish_event_processor if trace_finish_event_processor: @@ -230,14 +222,12 @@ def combined_processor(event_info: FinishEventInfo): ) def _create_default_header_injector(self) -> Callable[[], Dict[str, str]]: - """创建默认的header注入函数,包含span header注入逻辑""" def default_header_injector() -> Dict[str, str]: try: span = self.get_span_from_context() if span and hasattr(span, 'to_header'): return span.to_header() except Exception: - # 静默处理异常,不影响正常请求 pass return {} return default_header_injector diff --git a/cozeloop/internal/httpclient/client.py b/cozeloop/internal/httpclient/client.py index 488545d..e671664 100644 --- a/cozeloop/internal/httpclient/client.py +++ b/cozeloop/internal/httpclient/client.py @@ -42,17 +42,6 @@ def _build_url(self, path: str) -> str: return f"{self.api_base_url}{path}" def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: res = user_agent_header() - - # 调用传入的header注入函数 - if self.header_injector: - try: - injected_headers = self.header_injector() - if injected_headers: - res.update(injected_headers) - except Exception as e: - # 静默处理异常,不影响正常请求 - logger.debug(f"Header injection failed: {e}") - if headers: res.update(headers) res[consts.AUTHORIZE_HEADER] = f"Bearer {self.auth.token}" @@ -64,6 +53,14 @@ def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, st if ppe_env: res["x-use-ppe"] = "1" + if self.header_injector: + try: + injected_headers = self.header_injector() + if injected_headers: + res.update(injected_headers) + except Exception as e: + logger.debug(f"Header injection failed: {e}") + return res def request( diff --git a/examples/prompt/ptaas/ptaas.py b/examples/prompt/ptaas/ptaas.py index 8e8b166..1d595d3 100755 --- a/examples/prompt/ptaas/ptaas.py +++ b/examples/prompt/ptaas/ptaas.py @@ -14,6 +14,8 @@ import asyncio import os +from anyio import sleep + from cozeloop import new_client, Client from cozeloop.entities.prompt import Message, Role, ExecuteResult @@ -141,7 +143,8 @@ async def async_stream_example(client: Client) -> None: async def main(): """Main function""" client = setup_client() - + + root_span = client.start_span("root", "custom") try: # Sync non-stream call sync_non_stream_example(client) @@ -157,6 +160,7 @@ async def main(): finally: # Close client + root_span.finish() if hasattr(client, 'close'): client.close() From c8cfbdbd8c029e94ce735770cdd4b00d3bd62c6f Mon Sep 17 00:00:00 2001 From: "jiangqi.rrt" Date: Tue, 23 Sep 2025 21:05:04 +0800 Subject: [PATCH 5/5] modify changelog --- CHANGLOG.md | 5 +++++ cozeloop/internal/version.py | 2 +- pyproject.toml | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGLOG.md b/CHANGLOG.md index d927009..5961ee3 100644 --- a/CHANGLOG.md +++ b/CHANGLOG.md @@ -1,3 +1,8 @@ +## [0.1.16] - 2025-09-24 +### Added +- support custom trace connect ptaas trace +- fix async ptaas httpclient, modify to use AsyncClient + ## [0.1.15] - 2025-09-17 ### Added - modify cachetools version to >=5.5.2,<7.0.0 diff --git a/cozeloop/internal/version.py b/cozeloop/internal/version.py index 82ed6df..2223429 100644 --- a/cozeloop/internal/version.py +++ b/cozeloop/internal/version.py @@ -1,4 +1,4 @@ # Copyright (c) 2025 Bytedance Ltd. and/or its affiliates # SPDX-License-Identifier: MIT -VERSION = 'v0.1.10' \ No newline at end of file +VERSION = 'v0.1.16' \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index a3e590c..df48804 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cozeloop" -version = "0.1.15" +version = "0.1.16" description = "coze loop sdk" authors = ["JiangQi715 "] license = "MIT"