diff --git a/cozeloop/_client.py b/cozeloop/_client.py index 657cb9d..805f48d 100644 --- a/cozeloop/_client.py +++ b/cozeloop/_client.py @@ -282,9 +282,9 @@ def execute_prompt( timeout: Optional[int] = None ) -> Union[ExecuteResult, StreamReader[ExecuteResult]]: """ - 执行Prompt请求 + Execute Prompt request - :param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟) + :param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes) """ if self._closed: raise ClientClosedError() @@ -310,9 +310,9 @@ async def aexecute_prompt( timeout: Optional[int] = None ) -> Union[ExecuteResult, StreamReader[ExecuteResult]]: """ - 异步执行Prompt请求 + Asynchronously execute Prompt request - :param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟) + :param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes) """ if self._closed: raise ClientClosedError() @@ -436,9 +436,9 @@ def execute_prompt( timeout: Optional[int] = None ) -> Union[ExecuteResult, StreamReader[ExecuteResult]]: """ - 执行Prompt请求 + Execute Prompt request - :param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟) + :param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes) """ return get_default_client().execute_prompt( prompt_key, @@ -462,9 +462,9 @@ async def aexecute_prompt( timeout: Optional[int] = None ) -> Union[ExecuteResult, StreamReader[ExecuteResult]]: """ - 异步执行Prompt请求 + Asynchronously execute Prompt request - :param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟) + :param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes) """ return await get_default_client().aexecute_prompt( prompt_key, diff --git a/cozeloop/entities/prompt.py b/cozeloop/entities/prompt.py index b076ba7..e77dd07 100644 --- a/cozeloop/entities/prompt.py +++ b/cozeloop/entities/prompt.py @@ -127,7 +127,7 @@ class Prompt(BaseModel): class ExecuteParam(BaseModel): - """Execute参数""" + """Execute parameters""" prompt_key: str version: str = "" label: str = "" @@ -136,13 +136,13 @@ class ExecuteParam(BaseModel): class TokenUsage(BaseModel): - """Token使用统计""" + """Token usage statistics""" input_tokens: int = 0 output_tokens: int = 0 class ExecuteResult(BaseModel): - """Execute结果""" + """Execute result""" message: Optional[Message] = None finish_reason: Optional[str] = None usage: Optional[TokenUsage] = None diff --git a/cozeloop/entities/stream.py b/cozeloop/entities/stream.py index 4f7d6a1..3b93feb 100755 --- a/cozeloop/entities/stream.py +++ b/cozeloop/entities/stream.py @@ -8,29 +8,29 @@ class StreamReader(ABC, Generic[T]): - """流式读取器接口""" + """Stream reader interface""" @abstractmethod def __iter__(self) -> Iterator[T]: - """支持同步迭代 - for循环直接读取""" + """Support synchronous iteration - for loop direct reading""" pass @abstractmethod def __next__(self) -> T: - """支持next()函数调用""" + """Support next() function call""" pass @abstractmethod def __aiter__(self) -> AsyncIterator[T]: - """支持异步迭代 - async for循环直接读取""" + """Support asynchronous iteration - async for loop direct reading""" pass @abstractmethod async def __anext__(self) -> T: - """支持async next()调用""" + """Support async next() call""" pass @abstractmethod def close(self): - """关闭流""" + """Close stream""" pass \ No newline at end of file diff --git a/cozeloop/internal/consts/__init__.py b/cozeloop/internal/consts/__init__.py index 0736e1d..2e1ad56 100644 --- a/cozeloop/internal/consts/__init__.py +++ b/cozeloop/internal/consts/__init__.py @@ -15,7 +15,7 @@ DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL = 60 DEFAULT_TIMEOUT = 3 DEFAULT_UPLOAD_TIMEOUT = 30 -DEFAULT_PROMPT_EXECUTE_TIMEOUT = 600 # 10分钟,专用于execute_prompt和aexecute_prompt方法 +DEFAULT_PROMPT_EXECUTE_TIMEOUT = 600 # 10 minutes, dedicated for execute_prompt and aexecute_prompt methods LOG_ID_HEADER = "x-tt-logid" AUTHORIZE_HEADER = "Authorization" diff --git a/cozeloop/internal/httpclient/client.py b/cozeloop/internal/httpclient/client.py index 4e36d64..9965886 100644 --- a/cozeloop/internal/httpclient/client.py +++ b/cozeloop/internal/httpclient/client.py @@ -130,7 +130,7 @@ def post_stream( json: Union[BaseModel, Dict] = None, timeout: Optional[int] = None, ): - """发起流式POST请求,返回stream_context""" + """Initiate streaming POST request, return stream_context""" url = self._build_url(path) headers = self._set_headers({"Content-Type": "application/json"}) @@ -140,7 +140,7 @@ def post_stream( _timeout = timeout if timeout is not None else self.timeout try: - # 返回stream_context,让StreamReader管理上下文 + # Return stream_context, let StreamReader manage context stream_context = self.http_client.stream( "POST", url, @@ -153,13 +153,54 @@ def post_stream( logger.error(f"Http client stream request failed, path: {path}, err: {e}.") raise consts.NetworkError from e + async def arequest( + self, + path: str, + method: str, + response_model: Type[T], + *, + params: Optional[Dict[str, str]] = None, + form: Optional[Dict[str, str]] = None, + json: Optional[Union[BaseModel, Dict]] = None, + files: Optional[Dict[str, FileType]] = None, + headers: Optional[Dict[str, str]] = None, + timeout: Optional[int] = None, + ) -> T: + url = self._build_url(path) + _headers = self._set_headers(headers) + + _timeout = timeout if timeout is not None else self.timeout + + if isinstance(json, BaseModel): + if pydantic.VERSION.startswith('1'): + json = json.dict(by_alias=True) + else: + json = json.model_dump(by_alias=True) + + try: + response = await self.http_client.arequest( + method, + url, + params=params, + data=form, + json=json, + files=files, + headers=_headers, + timeout=_timeout + ) + except httpx.HTTPError as e: + logger.error(f"Http client request failed, path: {path}, err: {e}.") + raise consts.NetworkError from e + + return parse_response(url, response, response_model) + async def apost_stream( self, path: str, json: Union[BaseModel, Dict] = None, timeout: Optional[int] = None, ): - """发起异步流式POST请求,返回stream_context""" + """Initiate asynchronous streaming POST request, return stream_context""" url = self._build_url(path) headers = self._set_headers({"Content-Type": "application/json"}) @@ -169,8 +210,8 @@ async def apost_stream( _timeout = timeout if timeout is not None else self.timeout try: - # 返回stream_context,让StreamReader管理上下文 - stream_context = self.http_client.stream( + # Return stream_context, let StreamReader manage context + stream_context = self.http_client.astream( "POST", url, json=json, diff --git a/cozeloop/internal/httpclient/http_client.py b/cozeloop/internal/httpclient/http_client.py index fcb25b0..def7494 100644 --- a/cozeloop/internal/httpclient/http_client.py +++ b/cozeloop/internal/httpclient/http_client.py @@ -2,10 +2,12 @@ # SPDX-License-Identifier: MIT import logging -from typing import Dict, Type, TypeVar +import typing +from typing import Dict, Type, TypeVar, Any, Generator import httpx import pydantic +from httpx import URL, Response from pydantic import ValidationError from cozeloop.internal import consts @@ -16,9 +18,24 @@ T = TypeVar('T', bound=BaseResponse) -class HTTPClient(httpx.Client): +class HTTPClient: def __init__(self): - super().__init__() + self.sync_client = httpx.Client() + self.async_client = httpx.AsyncClient() + + def request(self, method: str, url: URL | str, **kwargs: Any) -> Response: + return self.sync_client.request(method, url, **kwargs) + + def stream(self, method: str, url: URL | str, **kwargs: Any): + """Return synchronous stream context manager""" + return self.sync_client.stream(method, url, **kwargs) + + async def arequest(self, method: str, url: URL | str, **kwargs: Any) -> Response: + return await self.async_client.request(method, url, **kwargs) + + def astream(self, method: str, url: URL | str, **kwargs: Any): + """Return asynchronous stream context manager""" + return self.async_client.stream(method, url, **kwargs) def _check_oauth_error(body: Dict, http_code: int, log_id: str) -> None: @@ -63,4 +80,4 @@ def parse_response(url: str, response: httpx.Response, response_model: Type[T]) logger.error(f"Failed to parse response. Path: {url}, http code: {http_code}, log id: {log_id}, error: {e}.") raise consts.InternalError from e logger.debug(f"Call remote service success. Path: {url}, response: {res}, log id: {log_id}") - return res + return res \ No newline at end of file diff --git a/cozeloop/internal/prompt/converter.py b/cozeloop/internal/prompt/converter.py index 45d95b2..2d30746 100644 --- a/cozeloop/internal/prompt/converter.py +++ b/cozeloop/internal/prompt/converter.py @@ -50,7 +50,7 @@ def _convert_role(openapi_role: OpenAPIRole) -> EntityRole: - """转换角色类型""" + """Convert role type""" role_mapping = { OpenAPIRole.SYSTEM: EntityRole.SYSTEM, OpenAPIRole.USER: EntityRole.USER, @@ -62,7 +62,7 @@ def _convert_role(openapi_role: OpenAPIRole) -> EntityRole: def _convert_content_type(openapi_type: OpenAPIContentType) -> EntityContentType: - """转换内容类型""" + """Convert content type""" content_type_mapping = { OpenAPIContentType.TEXT: EntityContentType.TEXT, OpenAPIContentType.IMAGE_URL: EntityContentType.IMAGE_URL, @@ -73,7 +73,7 @@ def _convert_content_type(openapi_type: OpenAPIContentType) -> EntityContentType def _convert_content_part(openapi_part: OpenAPIContentPart) -> EntityContentPart: - """转换内容部分,确保text、image_url、base64_data字段都被转换""" + """Convert content part, ensure text, image_url, base64_data fields are all converted""" return EntityContentPart( type=_convert_content_type(openapi_part.type), text=openapi_part.text, @@ -83,7 +83,7 @@ def _convert_content_part(openapi_part: OpenAPIContentPart) -> EntityContentPart def _convert_function_call(func_call: Optional[OpenAPIFunctionCall]) -> Optional[EntityFunctionCall]: - """转换函数调用,确保name、arguments字段都被转换""" + """Convert function call, ensure name, arguments fields are all converted""" if func_call is None: return None return EntityFunctionCall( @@ -93,7 +93,7 @@ def _convert_function_call(func_call: Optional[OpenAPIFunctionCall]) -> Optional def _convert_tool_call(tool_call: OpenAPIToolCall) -> EntityToolCall: - """转换工具调用,确保index、id、type、function_call字段都被转换""" + """Convert tool call, ensure index, id, type, function_call fields are all converted""" return EntityToolCall( index=tool_call.index, id=tool_call.id, @@ -103,7 +103,7 @@ def _convert_tool_call(tool_call: OpenAPIToolCall) -> EntityToolCall: def _convert_message(msg: OpenAPIMessage) -> EntityMessage: - """转换消息,确保role、content、reasoning_content、tool_call_id、tool_calls字段都被转换""" + """Convert message, ensure role, content, reasoning_content, tool_call_id, tool_calls fields are all converted""" return EntityMessage( role=_convert_role(msg.role), reasoning_content=msg.reasoning_content, @@ -115,7 +115,7 @@ def _convert_message(msg: OpenAPIMessage) -> EntityMessage: def _convert_variable_type(openapi_type: OpenAPIVariableType) -> EntityVariableType: - """转换变量类型""" + """Convert variable type""" type_mapping = { OpenAPIVariableType.STRING: EntityVariableType.STRING, OpenAPIVariableType.PLACEHOLDER: EntityVariableType.PLACEHOLDER, @@ -134,7 +134,7 @@ def _convert_variable_type(openapi_type: OpenAPIVariableType) -> EntityVariableT def _convert_variable_def(var_def: OpenAPIVariableDef) -> EntityVariableDef: - """转换变量定义""" + """Convert variable definition""" return EntityVariableDef( key=var_def.key, desc=var_def.desc, @@ -143,7 +143,7 @@ def _convert_variable_def(var_def: OpenAPIVariableDef) -> EntityVariableDef: def _convert_function(func: OpenAPIFunction) -> EntityFunction: - """转换函数定义""" + """Convert function definition""" return EntityFunction( name=func.name, description=func.description, @@ -152,7 +152,7 @@ def _convert_function(func: OpenAPIFunction) -> EntityFunction: def _convert_tool_type(openapi_tool_type: OpenAPIToolType) -> EntityToolType: - """转换工具类型""" + """Convert tool type""" type_mapping = { OpenAPIToolType.FUNCTION: EntityToolType.FUNCTION, } @@ -160,7 +160,7 @@ def _convert_tool_type(openapi_tool_type: OpenAPIToolType) -> EntityToolType: def _convert_tool(tool: OpenAPITool) -> EntityTool: - """转换工具定义""" + """Convert tool definition""" return EntityTool( type=_convert_tool_type(tool.type), function=_convert_function(tool.function) if tool.function else None @@ -168,7 +168,7 @@ def _convert_tool(tool: OpenAPITool) -> EntityTool: def _convert_tool_choice_type(openapi_tool_choice_type: OpenAPIChoiceType) -> EntityToolChoiceType: - """转换工具选择类型""" + """Convert tool choice type""" choice_mapping = { OpenAPIChoiceType.AUTO: EntityToolChoiceType.AUTO, OpenAPIChoiceType.NONE: EntityToolChoiceType.NONE @@ -177,14 +177,14 @@ def _convert_tool_choice_type(openapi_tool_choice_type: OpenAPIChoiceType) -> En def _convert_tool_call_config(config: OpenAPIToolCallConfig) -> EntityToolCallConfig: - """转换工具调用配置""" + """Convert tool call configuration""" return EntityToolCallConfig( tool_choice=_convert_tool_choice_type(config.tool_choice) ) def _convert_llm_config(config: OpenAPIModelConfig) -> EntityModelConfig: - """转换LLM配置""" + """Convert LLM configuration""" return EntityModelConfig( temperature=config.temperature, max_tokens=config.max_tokens, @@ -197,7 +197,7 @@ def _convert_llm_config(config: OpenAPIModelConfig) -> EntityModelConfig: def _convert_template_type(openapi_template_type: OpenAPITemplateType) -> EntityTemplateType: - """转换模板类型""" + """Convert template type""" template_mapping = { OpenAPITemplateType.NORMAL: EntityTemplateType.NORMAL, OpenAPITemplateType.JINJA2: EntityTemplateType.JINJA2 @@ -206,7 +206,7 @@ def _convert_template_type(openapi_template_type: OpenAPITemplateType) -> Entity def _convert_prompt_template(template: OpenAPIPromptTemplate) -> EntityPromptTemplate: - """转换提示模板""" + """Convert prompt template""" return EntityPromptTemplate( template_type=_convert_template_type(template.template_type), messages=[_convert_message(msg) for msg in template.messages] if template.messages else None, @@ -216,7 +216,7 @@ def _convert_prompt_template(template: OpenAPIPromptTemplate) -> EntityPromptTem def _convert_token_usage(usage: Optional[OpenAPITokenUsage]) -> Optional[EntityTokenUsage]: - """转换Token使用统计,确保input_tokens、output_tokens字段都被转换""" + """Convert Token usage statistics, ensure input_tokens, output_tokens fields are all converted""" if usage is None: return None return EntityTokenUsage( @@ -226,7 +226,7 @@ def _convert_token_usage(usage: Optional[OpenAPITokenUsage]) -> Optional[EntityT def _convert_prompt(prompt: OpenAPIPrompt) -> EntityPrompt: - """转换OpenAPI Prompt对象到entity Prompt对象""" + """Convert OpenAPI Prompt object to entity Prompt object""" return EntityPrompt( workspace_id=prompt.workspace_id, prompt_key=prompt.prompt_key, @@ -238,38 +238,38 @@ def _convert_prompt(prompt: OpenAPIPrompt) -> EntityPrompt: ) -# 公开的转换函数 +# Public conversion functions def to_content_part(openapi_part: OpenAPIContentPart) -> EntityContentPart: - """公开的内容部分转换函数""" + """Public content part conversion function""" return _convert_content_part(openapi_part) def to_prompt(openapi_prompt: OpenAPIPrompt) -> EntityPrompt: - """公开的提示转换函数""" + """Public prompt conversion function""" return _convert_prompt(openapi_prompt) def to_message(openapi_message: OpenAPIMessage) -> EntityMessage: - """公开的消息转换函数""" + """Public message conversion function""" return _convert_message(openapi_message) def to_token_usage(openapi_usage: Optional[OpenAPITokenUsage]) -> Optional[EntityTokenUsage]: - """公开的Token使用统计转换函数""" + """Public Token usage statistics conversion function""" return _convert_token_usage(openapi_usage) def convert_execute_data_to_result(data) -> 'ExecuteResult': - """将ExecuteData转换为ExecuteResult + """Convert ExecuteData to ExecuteResult - 统一的转换入口,复用现有转换逻辑 - 用于替代 prompt.py 和 reader.py 中的重复实现 + Unified conversion entry point, reusing existing conversion logic + Used to replace duplicate implementations in prompt.py and reader.py Args: - data: ExecuteData对象,包含执行结果数据 + data: ExecuteData object containing execution result data Returns: - ExecuteResult: 转换后的执行结果对象 + ExecuteResult: Converted execution result object """ from cozeloop.entities.prompt import ExecuteResult @@ -281,7 +281,7 @@ def convert_execute_data_to_result(data) -> 'ExecuteResult': def to_openapi_message(message: EntityMessage) -> OpenAPIMessage: - """将EntityMessage转换为OpenAPIMessage""" + """Convert EntityMessage to OpenAPIMessage""" return OpenAPIMessage( role=_to_openapi_role(message.role), reasoning_content=message.reasoning_content, @@ -294,7 +294,7 @@ def to_openapi_message(message: EntityMessage) -> OpenAPIMessage: def _to_openapi_role(role: EntityRole) -> OpenAPIRole: - """将EntityRole转换为OpenAPIRole""" + """Convert EntityRole to OpenAPIRole""" role_mapping = { EntityRole.SYSTEM: OpenAPIRole.SYSTEM, EntityRole.USER: OpenAPIRole.USER, @@ -306,7 +306,7 @@ def _to_openapi_role(role: EntityRole) -> OpenAPIRole: def _to_openapi_content_part(part: EntityContentPart) -> OpenAPIContentPart: - """将EntityContentPart转换为OpenAPIContentPart""" + """Convert EntityContentPart to OpenAPIContentPart""" return OpenAPIContentPart( type=_to_openapi_content_type(part.type), text=part.text, @@ -316,7 +316,7 @@ def _to_openapi_content_part(part: EntityContentPart) -> OpenAPIContentPart: def _to_openapi_content_type(content_type: EntityContentType) -> OpenAPIContentType: - """将EntityContentType转换为OpenAPIContentType""" + """Convert EntityContentType to OpenAPIContentType""" type_mapping = { EntityContentType.TEXT: OpenAPIContentType.TEXT, EntityContentType.IMAGE_URL: OpenAPIContentType.IMAGE_URL, @@ -327,7 +327,7 @@ def _to_openapi_content_type(content_type: EntityContentType) -> OpenAPIContentT def _to_openapi_tool_call(tool_call: EntityToolCall) -> OpenAPIToolCall: - """将EntityToolCall转换为OpenAPIToolCall""" + """Convert EntityToolCall to OpenAPIToolCall""" return OpenAPIToolCall( index=tool_call.index, id=tool_call.id, @@ -337,7 +337,7 @@ def _to_openapi_tool_call(tool_call: EntityToolCall) -> OpenAPIToolCall: def _to_openapi_function_call(func_call: Optional[EntityFunctionCall]) -> Optional[OpenAPIFunctionCall]: - """将EntityFunctionCall转换为OpenAPIFunctionCall""" + """Convert EntityFunctionCall to OpenAPIFunctionCall""" if func_call is None: return None return OpenAPIFunctionCall( @@ -347,16 +347,16 @@ def _to_openapi_function_call(func_call: Optional[EntityFunctionCall]) -> Option def _to_openapi_tool_type(tool_type: EntityToolType) -> OpenAPIToolType: - """将EntityToolType转换为OpenAPIToolType""" + """Convert EntityToolType to OpenAPIToolType""" type_mapping = { EntityToolType.FUNCTION: OpenAPIToolType.FUNCTION, } return type_mapping.get(tool_type, OpenAPIToolType.FUNCTION) -# Span相关转换函数 +# Span-related conversion functions def _to_span_prompt_input(messages: List[EntityMessage], variables: Dict[str, PromptVariable]) -> PromptInput: - """转换到Span的提示输入""" + """Convert to Span prompt input""" return PromptInput( templates=_to_span_messages(messages), arguments=_to_span_arguments(variables), @@ -364,14 +364,14 @@ def _to_span_prompt_input(messages: List[EntityMessage], variables: Dict[str, Pr def _to_span_prompt_output(messages: List[EntityMessage]) -> PromptOutput: - """转换到Span的提示输出""" + """Convert to Span prompt output""" return PromptOutput( prompts=_to_span_messages(messages) ) def _to_span_messages(messages: List[EntityMessage]) -> List[ModelMessage]: - """转换消息列表到Span格式""" + """Convert message list to Span format""" return [ ModelMessage( role=msg.role, @@ -382,23 +382,23 @@ def _to_span_messages(messages: List[EntityMessage]) -> List[ModelMessage]: def _to_span_arguments(arguments: Dict[str, PromptVariable]) -> List[PromptArgument]: - """转换参数字典到Span格式""" + """Convert argument dictionary to Span format""" return [ to_span_argument(key, value) for key, value in arguments.items() ] def to_span_argument(key: str, value: any) -> PromptArgument: - """转换单个参数到Span格式""" + """Convert single argument to Span format""" converted_value = str(value) value_type = PromptArgumentValueType.TEXT - # 判断是否是多模态变量 + # Check if it's a multimodal variable if isinstance(value, list) and all(isinstance(part, EntityContentPart) for part in value): value_type = PromptArgumentValueType.MODEL_MESSAGE_PART converted_value = [_to_span_content_part(part) for part in value] - # 判断是否是placeholder变量 + # Check if it's a placeholder variable if isinstance(value, list) and all(isinstance(part, EntityMessage) for part in value): value_type = PromptArgumentValueType.MODEL_MESSAGE converted_value = _to_span_messages(value) @@ -412,7 +412,7 @@ def to_span_argument(key: str, value: any) -> PromptArgument: def _to_span_content_type(entity_type: EntityContentType) -> ModelMessagePartType: - """转换内容类型到Span格式""" + """Convert content type to Span format""" span_content_type_mapping = { EntityContentType.TEXT: ModelMessagePartType.TEXT, EntityContentType.IMAGE_URL: ModelMessagePartType.IMAGE, @@ -423,7 +423,7 @@ def _to_span_content_type(entity_type: EntityContentType) -> ModelMessagePartTyp def _to_span_content_part(entity_part: EntityContentPart) -> ModelMessagePart: - """转换内容部分到Span格式""" + """Convert content part to Span format""" image_url = None if entity_part.image_url is not None: image_url = ModelImageURL( @@ -434,4 +434,4 @@ def _to_span_content_part(entity_part: EntityContentPart) -> ModelMessagePart: type=_to_span_content_type(entity_part.type), text=entity_part.text, image_url=image_url, - ) + ) \ No newline at end of file diff --git a/cozeloop/internal/prompt/execute_stream_reader.py b/cozeloop/internal/prompt/execute_stream_reader.py index 4227d90..9983e1b 100755 --- a/cozeloop/internal/prompt/execute_stream_reader.py +++ b/cozeloop/internal/prompt/execute_stream_reader.py @@ -19,61 +19,61 @@ class ExecuteStreamReader(BaseStreamReader[ExecuteResult]): """ - Prompt执行结果的StreamReader实现 + StreamReader implementation for Prompt execution results - 继承自BaseStreamReader,实现具体的SSE数据解析逻辑 - 将SSE事件中的数据解析为ExecuteResult对象 - 支持同步和异步迭代器模式,提供完整的流式处理能力 - 直接实现上下文管理器,无需单独的Context类 + Inherits from BaseStreamReader, implements specific SSE data parsing logic + Parses data from SSE events into ExecuteResult objects + Supports synchronous and asynchronous iterator patterns, providing complete streaming processing capabilities + Directly implements context manager, no separate Context class needed """ def __init__(self, stream_context, log_id: str = ""): """ - 初始化ExecuteStreamReader + Initialize ExecuteStreamReader Args: - stream_context: 流上下文管理器 - log_id: 日志ID,用于错误追踪 + stream_context: Stream context manager + log_id: Log ID for error tracking """ self._stream_context = stream_context self._response = None self._context_entered = False self.log_id = log_id self._closed = False - # 不调用super().__init__,因为还没有response对象 + # Don't call super().__init__() because there's no response object yet def _parse_sse_data(self, sse: ServerSentEvent) -> Optional[ExecuteResult]: """ - 解析SSE数据为ExecuteResult对象 + Parse SSE data into ExecuteResult object Args: - sse: ServerSentEvent对象 + sse: ServerSentEvent object Returns: - Optional[ExecuteResult]: 解析后的ExecuteResult对象,如果不需要返回则为None + Optional[ExecuteResult]: Parsed ExecuteResult object, None if no return needed """ - # 跳过空数据 + # Skip empty data if not sse.data or sse.data.strip() == "": return None - # 跳过非data事件 + # Skip non-data events if sse.event and sse.event != "data": logger.debug(f"Skipping non-data event: {sse.event}") return None try: - # 解析JSON数据 + # Parse JSON data data_dict = sse.json() - # 验证数据结构 + # Validate data structure if not isinstance(data_dict, dict): logger.warning(f"Invalid SSE data format, expected dict, got {type(data_dict)}") return None - # 将字典转换为ExecuteData对象 + # Convert dictionary to ExecuteData object execute_data = ExecuteData.model_validate(data_dict) - # 转换为ExecuteResult + # Convert to ExecuteResult result = convert_execute_data_to_result(execute_data) logger.debug(f"Successfully parsed SSE data to ExecuteResult: {result}") @@ -90,20 +90,20 @@ def _parse_sse_data(self, sse: ServerSentEvent) -> Optional[ExecuteResult]: return None def __enter__(self): - """同步上下文管理器入口""" + """Synchronous context manager entry""" if not self._context_entered: - self._response = self._stream_context.__enter__() # 检查HTTP状态码 + self._response = self._stream_context.__enter__() # Check HTTP status code if self._response.status_code != 200: try: - # 先读取完整响应内容 + # Read complete response content first self._response.read() - # 现在可以安全调用json() + # Now can safely call json() error_data = self._response.json() log_id = self._response.headers.get("x-tt-logid", "") error_code = error_data.get('code', 0) error_msg = error_data.get('msg', 'Unknown error') - # 确保关闭stream_context + # Ensure stream_context is closed self._stream_context.__exit__(None, None, None) raise RemoteServiceError(self._response.status_code, error_code, error_msg, log_id) except Exception as e: @@ -113,75 +113,75 @@ def __enter__(self): from cozeloop.internal.consts.error import InternalError raise InternalError(f"Failed to parse error response: {e}") - # 初始化BaseStreamReader的属性 + # Initialize BaseStreamReader attributes super().__init__(self._response, self.log_id) self._context_entered = True return self def __exit__(self, exc_type, exc_val, exc_tb): - """同步上下文管理器出口""" + """Synchronous context manager exit""" self.close() if self._context_entered: return self._stream_context.__exit__(exc_type, exc_val, exc_tb) async def __aenter__(self): - """异步上下文管理器入口""" + """Asynchronous context manager entry""" if not self._context_entered: - self._response = self._stream_context.__enter__() # 检查HTTP状态码(同步版本逻辑) + self._response = await self._stream_context.__aenter__() # Check HTTP status code (async version logic) if self._response.status_code != 200: try: - # 先读取完整响应内容 + # Read complete response content first await self._response.aread() - # 现在可以安全调用json() + # Now can safely call json() error_data = self._response.json() log_id = self._response.headers.get("x-tt-logid", "") error_code = error_data.get('code', 0) error_msg = error_data.get('msg', 'Unknown error') - self._stream_context.__exit__(None, None, None) + await self._stream_context.__aexit__(None, None, None) raise RemoteServiceError(self._response.status_code, error_code, error_msg, log_id) except Exception as e: - self._stream_context.__exit__(None, None, None) + await self._stream_context.__aexit__(None, None, None) if isinstance(e, RemoteServiceError): raise from cozeloop.internal.consts.error import InternalError raise InternalError(f"Failed to parse error response: {e}") - # 初始化BaseStreamReader的属性 + # Initialize BaseStreamReader attributes super().__init__(self._response, self.log_id) self._context_entered = True return self async def __aexit__(self, exc_type, exc_val, exc_tb): - """异步上下文管理器出口""" + """Asynchronous context manager exit""" await self.aclose() if self._context_entered: - return self._stream_context.__exit__(exc_type, exc_val, exc_tb) + return await self._stream_context.__aexit__(exc_type, exc_val, exc_tb) def __iter__(self): - """支持for循环直接读取""" + """Support direct reading with for loop""" if not self._context_entered: self.__enter__() return super().__iter__() def __aiter__(self): - """支持async for循环直接读取""" - # 注意:异步版本需要特殊处理 + """Support direct reading with async for loop""" + # Note: Async version requires special handling return self._aiter_impl() async def _aiter_impl(self): - """异步迭代器实现""" + """Async iterator implementation""" if not self._context_entered: await self.__aenter__() async for item in super().__aiter__(): yield item def close(self) -> None: - """关闭流""" + """Close stream""" self._closed = True - # 如果还没有进入上下文,直接关闭stream_context + # If context hasn't been entered yet, directly close stream_context if not self._context_entered: if hasattr(self._stream_context, '__exit__'): try: @@ -190,11 +190,12 @@ def close(self) -> None: pass return - # 如果已经进入上下文,调用父类的close方法 + + # If context has been entered, call parent class close method if hasattr(self, 'response'): super().close() else: - # 如果response属性不存在,只关闭stream_context + # If response attribute doesn't exist, only close stream_contextt if hasattr(self._stream_context, '__exit__'): try: self._stream_context.__exit__(None, None, None) @@ -202,24 +203,24 @@ def close(self) -> None: pass async def aclose(self) -> None: - """异步关闭流""" + """Asynchronously close stream""" self._closed = True - # 如果还没有进入上下文,直接关闭stream_context + # If context hasn't been entered yet, directly close stream_context if not self._context_entered: - if hasattr(self._stream_context, '__exit__'): + if hasattr(self._stream_context, '__aexit__'): try: - self._stream_context.__exit__(None, None, None) + await self._stream_context.__aexit__(None, None, None) except Exception: pass return - # 如果已经进入上下文,调用父类的aclose方法 + # If context has been entered, call parent class aclose method if hasattr(self, 'response'): await super().aclose() else: - # 如果response属性不存在,只关闭stream_context - if hasattr(self._stream_context, '__exit__'): + # If response attribute doesn't exist, only close stream_context + if hasattr(self._stream_context, '__aexit__'): try: - self._stream_context.__exit__(None, None, None) + await self._stream_context.__aexit__(None, None, None) except Exception: pass \ No newline at end of file diff --git a/cozeloop/internal/prompt/openapi.py b/cozeloop/internal/prompt/openapi.py index 17da6ab..7ff9233 100644 --- a/cozeloop/internal/prompt/openapi.py +++ b/cozeloop/internal/prompt/openapi.py @@ -241,7 +241,7 @@ def _do_mpull_prompt(self, workspace_id: str, queries: List[PromptQuery]) -> Opt return real_resp.data.items def execute(self, request: ExecuteRequest, timeout: Optional[int] = None) -> ExecuteData: - """执行Prompt请求""" + """Execute Prompt request""" response = self.http_client.request( EXECUTE_PROMPT_PATH, "POST", @@ -254,12 +254,12 @@ def execute(self, request: ExecuteRequest, timeout: Optional[int] = None) -> Exe return response.data def execute_streaming(self, request: ExecuteRequest, timeout: Optional[int] = None): - """流式执行Prompt请求""" + """Execute Prompt request in streaming mode""" return self.http_client.post_stream(EXECUTE_STREAMING_PROMPT_PATH, request, timeout=timeout) async def aexecute(self, request: ExecuteRequest, timeout: Optional[int] = None) -> ExecuteData: - """异步执行Prompt请求""" - response = self.http_client.request( + """Asynchronously execute Prompt request""" + response = await self.http_client.arequest( EXECUTE_PROMPT_PATH, "POST", ExecuteResponse, @@ -271,5 +271,5 @@ async def aexecute(self, request: ExecuteRequest, timeout: Optional[int] = None) return response.data async def aexecute_streaming(self, request: ExecuteRequest, timeout: Optional[int] = None): - """异步流式执行Prompt请求""" + """Asynchronously execute Prompt request in streaming mode""" return await self.http_client.apost_stream(EXECUTE_STREAMING_PROMPT_PATH, request, timeout=timeout) \ No newline at end of file diff --git a/cozeloop/internal/prompt/prompt.py b/cozeloop/internal/prompt/prompt.py index 2699ed4..4c16c39 100644 --- a/cozeloop/internal/prompt/prompt.py +++ b/cozeloop/internal/prompt/prompt.py @@ -333,7 +333,7 @@ def _render_text_content( def _render_jinja2_template(self, template_str: str, variable_def_map: Dict[str, VariableDef], variables: Dict[str, Any]) -> str: - """渲染 Jinja2 模板""" + """Render Jinja2 template""" env = SandboxedEnvironment() template = env.from_string(template_str) render_vars = {k: variables[k] for k in variable_def_map.keys() if variables is not None and k in variables} @@ -351,28 +351,29 @@ def execute_prompt( timeout: Optional[int] = None ) -> Union[ExecuteResult, StreamReader[ExecuteResult]]: """ - 执行Prompt请求 + Execute Prompt request - 使用基于SSE解码器的PromptStreamReader提供更好的流式处理性能和错误处理能力 + Uses SSE decoder-based PromptStreamReader to provide better streaming performance and error handling capabilities Args: - prompt_key: Prompt标识符 - version: Prompt版本,可选 - label: Prompt标签,可选 - variable_vals: 变量值字典,可选 - messages: 消息列表,可选 - stream: 是否使用流式处理 - timeout: 请求超时时间(秒),可选,默认为600秒(10分钟) + prompt_key: Prompt identifier + version: Prompt version, optional + label: Prompt label, optional + variable_vals: Variable values dictionary, optional + messages: Message list, optional + stream: Whether to use streaming processing + timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes) Returns: Union[ExecuteResult, StreamReader[ExecuteResult]]: - 如果stream=False,返回ExecuteResult - 如果stream=True,返回PromptStreamReader实例(支持上下文管理器) + If stream=False, returns ExecuteResult + If stream=True, returns PromptStreamReader instance (supports context manager) """ - # 设置默认超时时间为600秒(10分钟) + # Set default timeout to 600 seconds (10 minutes) + actual_timeout = timeout if timeout is not None else consts.DEFAULT_PROMPT_EXECUTE_TIMEOUT actual_timeout = timeout if timeout is not None else consts.DEFAULT_PROMPT_EXECUTE_TIMEOUT - # 验证timeout参数 + # Validate timeout parameter self._validate_timeout(actual_timeout) request = self._build_execute_request( @@ -403,28 +404,28 @@ async def aexecute_prompt( timeout: Optional[int] = None ) -> Union[ExecuteResult, StreamReader[ExecuteResult]]: """ - 异步执行Prompt请求 + Asynchronously execute Prompt request - 使用基于SSE解码器的PromptStreamReader提供更好的流式处理性能和错误处理能力 + Uses SSE decoder-based PromptStreamReader to provide better streaming performance and error handling capabilities Args: - prompt_key: Prompt标识符 - version: Prompt版本,可选 - label: Prompt标签,可选 - variable_vals: 变量值字典,可选 - messages: 消息列表,可选 - stream: 是否使用流式处理 - timeout: 请求超时时间(秒),可选,默认为600秒(10分钟) + prompt_key: Prompt identifier + version: Prompt version, optional + label: Prompt label, optional + variable_vals: Variable values dictionary, optional + messages: Message list, optional + stream: Whether to use streaming processing + timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes) Returns: Union[ExecuteResult, StreamReader[ExecuteResult]]: - 如果stream=False,返回ExecuteResult - 如果stream=True,返回PromptStreamReader实例(支持异步上下文管理器) + If stream=False, returns ExecuteResult + If stream=True, returns PromptStreamReader instance (supports async context manager) """ - # 设置默认超时时间为600秒(10分钟) + # Set default timeout to 600 seconds (10 minutes) actual_timeout = timeout if timeout is not None else consts.DEFAULT_PROMPT_EXECUTE_TIMEOUT - # 验证timeout参数 + # Validate timeout parameter self._validate_timeout(actual_timeout) request = self._build_execute_request( @@ -451,15 +452,15 @@ def _build_execute_request( variable_vals: Optional[Dict[str, Any]] = None, messages: Optional[List[Message]] = None ) -> ExecuteRequest: - """构建执行请求""" - # 构建prompt_identifier + """Build execute request""" + # Build prompt_identifier prompt_identifier = PromptQuery( prompt_key=prompt_key, version=version if version else None, label=label if label else None ) - # 构建variable_vals + # Build variable_vals variable_vals_list = None if variable_vals: variable_vals_list = [] @@ -478,10 +479,10 @@ def _build_execute_request( elif all(isinstance(item, ContentPart) for item in value): var_val.multi_part_values = value else: - # 对于其他类型的list,转换为JSON字符串 + # For other types of list, convert to JSON string var_val.value = json.dumps(value) else: - # 对于其他类型,转换为JSON字符串 + # For other types, convert to JSON string var_val.value = json.dumps(value) variable_vals_list.append(var_val) @@ -494,7 +495,7 @@ def _build_execute_request( ) def _validate_timeout(self, timeout: int) -> None: - """验证超时参数""" + """Validate timeout parameter""" if not isinstance(timeout, int): raise ValueError("timeout must be an integer") if timeout <= 0: diff --git a/cozeloop/internal/stream/base_stream_reader.py b/cozeloop/internal/stream/base_stream_reader.py index c71dbf4..f339444 100755 --- a/cozeloop/internal/stream/base_stream_reader.py +++ b/cozeloop/internal/stream/base_stream_reader.py @@ -21,20 +21,20 @@ class BaseStreamReader(StreamReader[T], ABC, Generic[T]): """ - 通用StreamReader基类 + Generic StreamReader base class - 基于Fornax的Stream设计模式,集成SSEDecoder进行SSE数据解码 - 支持同步和异步迭代器模式,实现上下文管理器 - 提供统一的错误处理机制和资源管理 + Based on Fornax's Stream design pattern, integrates SSEDecoder for SSE data decoding + Supports synchronous and asynchronous iterator patterns, implements context manager + Provides unified error handling mechanism and resource management """ def __init__(self, response: httpx.Response, log_id: str = ""): """ - 初始化BaseStreamReader + Initialize BaseStreamReader Args: - response: httpx响应对象 - log_id: 日志ID,用于错误追踪 + response: httpx response object + log_id: Log ID for error tracking """ self.response = response self.log_id = log_id @@ -46,22 +46,22 @@ def __init__(self, response: httpx.Response, log_id: str = ""): @abstractmethod def _parse_sse_data(self, sse: ServerSentEvent) -> Optional[T]: """ - 解析SSE数据为业务对象,子类必须实现 + Parse SSE data into business object, must be implemented by subclasses Args: - sse: ServerSentEvent对象 + sse: ServerSentEvent object Returns: - Optional[T]: 解析后的业务对象,如果不需要返回则为None + Optional[T]: Parsed business object, None if no return needed """ pass def _iter_events(self) -> Iterator[ServerSentEvent]: """ - 迭代SSE事件 + Iterate SSE events Yields: - ServerSentEvent: 解码后的SSE事件 + ServerSentEvent: Decoded SSE events """ try: for sse in self._decoder.iter_bytes(self.response.iter_bytes()): @@ -72,15 +72,14 @@ def _iter_events(self) -> Iterator[ServerSentEvent]: async def _aiter_events(self) -> AsyncIterator[ServerSentEvent]: """ - 异步迭代SSE事件 + Asynchronously iterate SSE events Yields: - ServerSentEvent: 解码后的SSE事件 + ServerSentEvent: Decoded SSE events """ try: - # 由于httpx.stream()返回的是同步流,即使在异步上下文中也需要使用同步迭代 - # 将同步迭代包装成异步生成器 - for sse in self._decoder.iter_bytes(self.response.iter_bytes()): + # Use async iterator + async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()): yield sse except Exception as e: logger.error(f"Error async iterating SSE events: {e}") @@ -88,13 +87,13 @@ async def _aiter_events(self) -> AsyncIterator[ServerSentEvent]: def _handle_sse_error(self, sse: ServerSentEvent) -> None: """ - 处理SSE事件中的错误 + Handle errors in SSE events Args: - sse: ServerSentEvent对象 + sse: ServerSentEvent object Raises: - RemoteServiceError: 当检测到错误事件时 + RemoteServiceError: When error event is detected """ if not sse.data: return @@ -102,15 +101,15 @@ def _handle_sse_error(self, sse: ServerSentEvent) -> None: try: data = sse.json() - # 检查是否包含错误信息 + # Check if contains error information if isinstance(data, dict): - # 检查错误码字段 + # Check error code field if 'code' in data and data['code'] != 0: error_code = data.get('code', 0) error_msg = data.get('msg', 'Unknown error') raise RemoteServiceError(200, error_code, error_msg, self.log_id) - # 检查error字段 + # Check error field if 'error' in data: error_info = data['error'] if isinstance(error_info, dict): @@ -122,20 +121,20 @@ def _handle_sse_error(self, sse: ServerSentEvent) -> None: raise RemoteServiceError(200, error_code, error_msg, self.log_id) except json.JSONDecodeError: - # 如果不是JSON格式,忽略错误检查 + # If not JSON format, ignore error checking pass except RemoteServiceError: - # 重新抛出RemoteServiceError + # Re-raise RemoteServiceError raise except Exception as e: logger.warning(f"Error checking SSE error: {e}") def __stream__(self) -> Iterator[T]: """ - 核心流处理逻辑 + Core stream processing logic Yields: - T: 解析后的业务对象 + T: Parsed business objects """ if self._closed: return @@ -145,10 +144,10 @@ def __stream__(self) -> Iterator[T]: if self._closed: break - # 检查错误 + # Check for errors self._handle_sse_error(sse) - # 解析数据 + # Parse data result = self._parse_sse_data(sse) if result is not None: yield result @@ -163,10 +162,10 @@ def __stream__(self) -> Iterator[T]: async def __astream__(self) -> AsyncIterator[T]: """ - 异步核心流处理逻辑 + Asynchronous core stream processing logic Yields: - T: 解析后的业务对象 + T: Parsed business objects """ if self._closed: return @@ -176,10 +175,10 @@ async def __astream__(self) -> AsyncIterator[T]: if self._closed: break - # 检查错误 + # Check for errors self._handle_sse_error(sse) - # 解析数据 + # Parse data result = self._parse_sse_data(sse) if result is not None: yield result @@ -192,15 +191,15 @@ async def __astream__(self) -> AsyncIterator[T]: finally: self._closed = True - # 同步迭代器接口 + # Synchronous iterator interface def __iter__(self) -> Iterator[T]: - """支持同步迭代 - for循环直接读取""" + """Support synchronous iteration - direct reading with for loop""" if self._sync_iterator is None: self._sync_iterator = self.__stream__() return self._sync_iterator def __next__(self) -> T: - """支持next()函数调用""" + """Support next() function call""" if self._closed: raise StopIteration("Stream is closed") @@ -215,15 +214,15 @@ def __next__(self) -> T: self._closed = True raise StopIteration from e - # 异步迭代器接口 + # Asynchronous iterator interface def __aiter__(self) -> AsyncIterator[T]: - """支持异步迭代 - async for循环直接读取""" + """Support asynchronous iteration - direct reading with async for loop""" if self._async_iterator is None: self._async_iterator = self.__astream__() return self._async_iterator async def __anext__(self) -> T: - """支持async next()调用""" + """Support async next() call""" if self._closed: raise StopAsyncIteration("Stream is closed") @@ -238,37 +237,37 @@ async def __anext__(self) -> T: self._closed = True raise StopAsyncIteration from e - # 上下文管理器接口 + # Context manager interface def __enter__(self) -> BaseStreamReader[T]: - """同步上下文管理器入口""" + """Synchronous context manager entry""" return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - """同步上下文管理器出口""" + """Synchronous context manager exit""" self.close() async def __aenter__(self) -> BaseStreamReader[T]: - """异步上下文管理器入口""" + """Asynchronous context manager entry""" return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - """异步上下文管理器出口""" + """Asynchronous context manager exit""" await self.aclose() - # 资源管理 + # Resource management def close(self) -> None: - """关闭流""" + """Close stream""" self._closed = True if hasattr(self.response, 'close'): self.response.close() async def aclose(self) -> None: - """异步关闭流""" + """Asynchronously close stream""" self._closed = True if hasattr(self.response, 'aclose'): await self.response.aclose() @property def closed(self) -> bool: - """检查流是否已关闭""" + """Check if stream is closed""" return self._closed \ No newline at end of file diff --git a/cozeloop/internal/stream/sse.py b/cozeloop/internal/stream/sse.py index babb74f..c201583 100755 --- a/cozeloop/internal/stream/sse.py +++ b/cozeloop/internal/stream/sse.py @@ -4,15 +4,15 @@ from __future__ import annotations import json -from typing import Any, Iterator, Optional +from typing import Any, Iterator, Optional, AsyncIterator class ServerSentEvent: """ - Server-Sent Event (SSE) 数据结构 + Server-Sent Event (SSE) data structure - 封装SSE事件的各个字段:event, data, id, retry - 提供JSON解析功能 + Encapsulates various fields of SSE events: event, data, id, retry + Provides JSON parsing functionality """ def __init__( @@ -24,13 +24,13 @@ def __init__( retry: int | None = None, ) -> None: """ - 初始化ServerSentEvent + Initialize ServerSentEvent Args: - event: 事件类型 - data: 事件数据 - id: 事件ID - retry: 重试间隔(毫秒) + event: Event type + data: Event data + id: Event ID + retry: Retry interval (milliseconds) """ if data is None: data = "" @@ -42,33 +42,33 @@ def __init__( @property def event(self) -> str | None: - """获取事件类型""" + """Get event type""" return self._event @property def id(self) -> str | None: - """获取事件ID""" + """Get event ID""" return self._id @property def retry(self) -> int | None: - """获取重试间隔""" + """Get retry interval""" return self._retry @property def data(self) -> str: - """获取事件数据""" + """Get event data""" return self._data def json(self) -> Any: """ - 将data字段解析为JSON对象 + Parse data field as JSON object Returns: - 解析后的JSON对象 + Parsed JSON object Raises: - json.JSONDecodeError: 当data不是有效的JSON时 + json.JSONDecodeError: When data is not valid JSON """ return json.loads(self.data) @@ -78,14 +78,14 @@ def __repr__(self) -> str: class SSEDecoder: """ - Server-Sent Event (SSE) 解码器 + Server-Sent Event (SSE) decoder - 负责将字节流解码为ServerSentEvent对象 - 支持SSE协议的完整规范,包括多行数据累积和各种字段处理 + Responsible for decoding byte streams into ServerSentEvent objects + Supports complete SSE protocol specification, including multi-line data accumulation and various field processing """ def __init__(self) -> None: - """初始化SSE解码器""" + """Initialize SSE decoder""" self._event: Optional[str] = None self._data: list[str] = [] self._last_event_id: Optional[str] = None @@ -93,16 +93,16 @@ def __init__(self) -> None: def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]: """ - 同步解码字节流为SSE事件 + Synchronously decode byte stream into SSE events Args: - iterator: 字节流迭代器 + iterator: Byte stream iterator Yields: - ServerSentEvent: 解码后的SSE事件 + ServerSentEvent: Decoded SSE events """ for chunk in self._iter_chunks(iterator): - # 先分割再解码,确保splitlines()只使用\r和\n + # Split first then decode, ensure splitlines() only uses \r and \n for raw_line in chunk.splitlines(): line = raw_line.decode("utf-8") sse = self.decode(line) @@ -111,13 +111,13 @@ def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]: def _iter_chunks(self, iterator: Iterator[bytes]) -> Iterator[bytes]: """ - 同步处理字节块,确保完整的SSE消息 + Synchronously process byte chunks, ensuring complete SSE messages Args: - iterator: 字节流迭代器 + iterator: Byte stream iterator Yields: - bytes: 完整的SSE消息块 + bytes: Complete SSE message chunks """ data = b"" for chunk in iterator: @@ -129,18 +129,56 @@ def _iter_chunks(self, iterator: Iterator[bytes]) -> Iterator[bytes]: if data: yield data + async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]: + """ + Asynchronously decode byte stream into SSE events + + Args: + iterator: Asynchronous byte stream iterator + + Yields: + ServerSentEvent: Decoded SSE events + """ + async for chunk in self._aiter_chunks(iterator): + # Split first then decode, ensure splitlines() only uses \r and \n + for raw_line in chunk.splitlines(): + line = raw_line.decode("utf-8") + sse = self.decode(line) + if sse: + yield sse + + async def _aiter_chunks(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[bytes]: + """ + Asynchronously process byte chunks, ensuring complete SSE messages + + Args: + iterator: Asynchronous byte stream iterator + + Yields: + bytes: Complete SSE message chunks + """ + data = b"" + async for chunk in iterator: + for line in chunk.splitlines(keepends=True): + data += line + if data.endswith((b"\r\r", b"\n\n", b"\r\n\r\n")): + yield data + data = b"" + if data: + yield data + def decode(self, line: str) -> Optional[ServerSentEvent]: """ - 解码单行SSE数据 + Decode single line SSE data Args: - line: SSE数据行 + line: SSE data line Returns: - Optional[ServerSentEvent]: 解码后的SSE事件,如果未完成则返回None + Optional[ServerSentEvent]: Decoded SSE event, None if not complete """ if not line: - # 空行表示事件结束,构造SSE事件 + # Empty line indicates end of event, construct SSE event if not self._event and not self._data and not self._last_event_id and self._retry is None: return None @@ -151,35 +189,36 @@ def decode(self, line: str) -> Optional[ServerSentEvent]: retry=self._retry, ) - # 重置状态,准备下一个事件 + # Reset state, prepare for next event self._event = None self._data = [] self._retry = None return sse - # 解析字段 + # Parse fields fieldname, _, value = line.partition(":") + # Remove leading space from value # 去掉值前面的空格 if value.startswith(" "): value = value[1:] - # 处理各种字段 + # Process various fields if fieldname == "event": self._event = value elif fieldname == "data": self._data.append(value) elif fieldname == "id": - # 根据SSE规范,id字段不能包含null字符 + # According to SSE specification, id field cannot contain null characters if "\0" not in value: self._last_event_id = value elif fieldname == "retry": try: self._retry = int(value) except (TypeError, ValueError): - # 忽略无效的retry值 + # Ignore invalid retry values pass - # 其他字段被忽略 + # Other fields are ignored - return None + return None \ No newline at end of file diff --git a/cozeloop/internal/version.py b/cozeloop/internal/version.py index 82ed6df..5f90db9 100644 --- a/cozeloop/internal/version.py +++ b/cozeloop/internal/version.py @@ -1,4 +1,4 @@ # Copyright (c) 2025 Bytedance Ltd. and/or its affiliates # SPDX-License-Identifier: MIT -VERSION = 'v0.1.10' \ No newline at end of file +VERSION = 'v0.1.11' \ No newline at end of file diff --git a/cozeloop/prompt.py b/cozeloop/prompt.py index 74b3ba7..0b21f19 100644 --- a/cozeloop/prompt.py +++ b/cozeloop/prompt.py @@ -51,16 +51,16 @@ def execute_prompt( timeout: Optional[int] = None ) -> Union[ExecuteResult, StreamReader[ExecuteResult]]: """ - 执行Prompt请求 + Execute Prompt request - :param prompt_key: prompt的唯一标识 - :param version: prompt版本,可选 - :param label: prompt标签,可选 - :param variable_vals: 变量值字典,可选 - :param messages: 消息列表,可选 - :param stream: 是否流式返回,默认False - :param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟) - :return: stream=False时返回ExecuteResult,stream=True时返回StreamReader[ExecuteResult] + :param prompt_key: Unique identifier of the prompt + :param version: Prompt version, optional + :param label: Prompt label, optional + :param variable_vals: Variable values dictionary, optional + :param messages: Message list, optional + :param stream: Whether to return stream response, default False + :param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes) + :return: Returns ExecuteResult when stream=False, returns StreamReader[ExecuteResult] when stream=True """ @abstractmethod @@ -76,14 +76,14 @@ async def aexecute_prompt( timeout: Optional[int] = None ) -> Union[ExecuteResult, StreamReader[ExecuteResult]]: """ - 异步执行Prompt请求 + Asynchronously execute Prompt request - :param prompt_key: prompt的唯一标识 - :param version: prompt版本,可选 - :param label: prompt标签,可选 - :param variable_vals: 变量值字典,可选 - :param messages: 消息列表,可选 - :param stream: 是否流式返回,默认False - :param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟) - :return: stream=False时返回ExecuteResult,stream=True时返回StreamReader[ExecuteResult] + :param prompt_key: Unique identifier of the prompt + :param version: Prompt version, optional + :param label: Prompt label, optional + :param variable_vals: Variable values dictionary, optional + :param messages: Message list, optional + :param stream: Whether to return stream response, default False + :param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes) + :return: Returns ExecuteResult when stream=False, returns StreamReader[ExecuteResult] when stream=True """ \ No newline at end of file diff --git a/examples/lcel/lcel.py b/examples/lcel/lcel.py index 2b4256f..16a82f4 100644 --- a/examples/lcel/lcel.py +++ b/examples/lcel/lcel.py @@ -31,7 +31,7 @@ def do_lcel_demo(): # execute lcel, and print intermediate results. lcel_sequence = llm_model | StrOutputParser() output = lcel_sequence.invoke( - input='用你所学的技巧,帮我生成几个有意思的问题', + input='Use your learned techniques to help me generate some interesting questions', config=RunnableConfig(callbacks=[trace_callback_handler]) ) print('\n====== model output start ======\n' + output + '\n====== model output finish ======\n') diff --git a/examples/lcel/lcel_stream.py b/examples/lcel/lcel_stream.py index e36cc08..ba78955 100644 --- a/examples/lcel/lcel_stream.py +++ b/examples/lcel/lcel_stream.py @@ -36,7 +36,7 @@ def do_lcel_stream_demo(): lcel_sequence = llm_model | StrOutputParser() chunks = [] for chunk in lcel_sequence.stream( - input='用你所学的技巧,帮我生成几个有意思的问题', + input='Use your learned techniques to help me generate some interesting questions', config=RunnableConfig(callbacks=[trace_callback_handler]) ): chunks.append(chunk) diff --git a/examples/prompt/prompt_hub/prompt_hub_with_label.py b/examples/prompt/prompt_hub/prompt_hub_with_label.py index bfb8df5..d065678 100644 --- a/examples/prompt/prompt_hub/prompt_hub_with_label.py +++ b/examples/prompt/prompt_hub/prompt_hub_with_label.py @@ -13,7 +13,7 @@ def convert_model_input(messages: List[Message]) -> ModelInput: - """将 cozeloop Message 转换为 ModelInput""" + """Convert cozeloop Message to ModelInput""" model_messages = [] for message in messages: model_messages.append(ModelMessage( @@ -27,27 +27,27 @@ def convert_model_input(messages: List[Message]) -> ModelInput: class LLMRunner: - """LLM 运行器,用于模拟 LLM 调用并设置相关的 span 标签""" + """LLM runner for simulating LLM calls and setting related span tags""" def __init__(self, client): self.client = client def llm_call(self, input_data): """ - 模拟 LLM 调用并设置相关的 span 标签 + Simulate LLM call and set related span tags """ span = self.client.start_span("llmCall", "model") try: - # 模拟 LLM 处理过程 + # Simulate LLM processing # output = ChatOpenAI().invoke(input=input_data) - # 模拟响应 + # Simulate response time.sleep(1) output = "I'm a robot. I don't have a specific name. You can give me one." input_token = 232 output_token = 1211 - # 设置 span 标签 + # Set span tags span.set_input(convert_model_input(input_data)) span.set_output(output) span.set_model_provider("openai") @@ -137,4 +137,4 @@ def llm_call(self, input_data): # Warning! In general, this method is not needed to be call, as spans will be automatically reported in batches. # Note that flush will block and wait for the report to complete, and it may cause frequent reporting, # affecting performance. - client.flush() + client.flush() \ No newline at end of file diff --git a/examples/prompt/prompt_hub/prompt_hub_with_multipart.py b/examples/prompt/prompt_hub/prompt_hub_with_multipart.py index 0176b7c..67c658d 100644 --- a/examples/prompt/prompt_hub/prompt_hub_with_multipart.py +++ b/examples/prompt/prompt_hub/prompt_hub_with_multipart.py @@ -143,7 +143,7 @@ def llm_call(self, input_data): "count": 10, # im1 is a multi-part variable, and the value is a list of ContentPart "im1": [ - ContentPart(type=ContentType.TEXT, text="图片示例"), + ContentPart(type=ContentType.TEXT, text="Image example"), ContentPart(type=ContentType.IMAGE_URL, image_url="https://example.com"), ], # Other variables in the prompt template that are not provided with corresponding values will be diff --git a/examples/prompt/ptaas/__init__.py b/examples/prompt/ptaas/__init__.py index 22759db..e13ece3 100755 --- a/examples/prompt/ptaas/__init__.py +++ b/examples/prompt/ptaas/__init__.py @@ -2,11 +2,11 @@ # SPDX-License-Identifier: MIT """ -PTaaS (Prompt Template as a Service) 示例 +PTaaS (Prompt Template as a Service) Examples -本包包含了PTaaS功能的各种使用示例,包括: -- 基础示例:同步非流式、异步非流式、异步流式调用 -- 高级示例:占位符变量、标签使用、Jinja2模板、超时控制、多模态处理 +This package contains various usage examples of PTaaS functionality, including: +- Basic examples: synchronous non-streaming, asynchronous non-streaming, asynchronous streaming calls +- Advanced examples: placeholder variables, label usage, Jinja2 templates, timeout control, multimodal processing """ __all__ = [ diff --git a/examples/prompt/ptaas/ptaas_multimodal.py b/examples/prompt/ptaas/ptaas_multimodal.py index a4703f5..a8dde98 100755 --- a/examples/prompt/ptaas/ptaas_multimodal.py +++ b/examples/prompt/ptaas/ptaas_multimodal.py @@ -50,14 +50,14 @@ def multimodal_example(client: Client) -> None: # Create a Prompt on the platform's Prompt development page (set Prompt Key to 'ptaas_demo'), # add the following messages to the template, submit a version. example1 and example2 are the multi modal variables. # System: You can quickly identify the location where a photo was taken. - # User: 例如:{{example1}} + # User: For example: {{example1}} # Assistant: {{city1}} - # User: 例如:{{example2}} + # User: For example: {{example2}} # Assistant: {{city2}} image_path = "your_image_path" # If image file exists, read and encode - # 如果图片文件存在,读取并编码 + # If the image file exists, read and encode it if os.path.exists(image_path): try: with open(image_path, "rb") as f: diff --git a/examples/trace/annotation/lcel_and_annotation.py b/examples/trace/annotation/lcel_and_annotation.py index 1085af3..44b2bb1 100644 --- a/examples/trace/annotation/lcel_and_annotation.py +++ b/examples/trace/annotation/lcel_and_annotation.py @@ -37,7 +37,7 @@ def do_lcel_and_annotation_demo(): # execute lcel, and print intermediate results. lcel_sequence = llm_model | StrOutputParser() output = lcel_sequence.invoke( - input='用你所学的技巧,帮我生成几个有意思的问题', + input='Use your learned techniques to help me generate some interesting questions', config=RunnableConfig(callbacks=[trace_callback_handler]) ) print('\n====== model output start ======\n' + output + '\n====== model output finish ======\n') diff --git a/examples/trace/large_text.py b/examples/trace/large_text.py index bec1f26..bebdfa5 100644 --- a/examples/trace/large_text.py +++ b/examples/trace/large_text.py @@ -82,7 +82,7 @@ def main(): # assuming call llm, input is large text try: - llm_runner.llm_call("你叫什么名字" + get_large_text()) + llm_runner.llm_call("What's your name?" + get_large_text()) except Exception as e: # set tag key: `_status_code` span.set_status_code(600789111) diff --git a/examples/trace/prompt.py b/examples/trace/prompt.py index c884a1b..480aac8 100644 --- a/examples/trace/prompt.py +++ b/examples/trace/prompt.py @@ -50,7 +50,7 @@ def main(): span.set_status_code(ERR_CODE_INTERNAL) span.set_error(str(err)) - res_prompt = get_prompt_runner.format_prompt(prompt, {"var1": "你会什么技能"}) + res_prompt = get_prompt_runner.format_prompt(prompt, {"var1": "What skills do you have?"}) # 3. finish span span.finish() @@ -124,7 +124,7 @@ def get_prompt(self) -> Optional[Prompt]: PROMPT_KEY: "test_demo", PROMPT_VERSION: "v1.0.1" }), - PROMPT_VERSION: "v1.0.1", # mock版本 + PROMPT_VERSION: "v1.0.1", # mock version OUTPUT: prompt }) diff --git a/examples/trace/transfer_between_services.py b/examples/trace/transfer_between_services.py index e77a110..3d64433 100644 --- a/examples/trace/transfer_between_services.py +++ b/examples/trace/transfer_between_services.py @@ -103,7 +103,7 @@ def invoke_service_b(self, req_header: Dict[str, str]): root_span.set_user_id_baggage("123456") # assuming call llm - err = llm_runner.llm_call("你叫什么名字") + err = llm_runner.llm_call("What's your name?") if err is not None: # set tag key: `_status_code` root_span.set_status_code(ERR_CODE_LLM_CALL) diff --git a/tests/internal/prompt/test_prompt.py b/tests/internal/prompt/test_prompt.py index 3262e76..09457a6 100644 --- a/tests/internal/prompt/test_prompt.py +++ b/tests/internal/prompt/test_prompt.py @@ -636,18 +636,18 @@ def test_format_normal_messages_null_message(prompt_provider): assert result[1].role == Role.USER assert result[1].content == "World" def test_validate_variable_values_type_boolean_valid(prompt_provider): - """测试有效的 boolean 类型变量""" + """Test valid boolean type variable""" var_defs = [VariableDef(key="enabled", desc="Enable feature", type=VariableType.BOOLEAN)] variables = {"enabled": True} - # 应该不抛出异常 + # Should not raise exception prompt_provider._validate_variable_values_type(var_defs, variables) def test_validate_variable_values_type_boolean_invalid(prompt_provider): - """测试无效的 boolean 类型变量""" + """Test invalid boolean type variable""" var_defs = [VariableDef(key="enabled", desc="Enable feature", type=VariableType.BOOLEAN)] - variables = {"enabled": "true"} # 字符串而不是布尔值 + variables = {"enabled": "true"} # String instead of boolean with pytest.raises(ValueError) as excinfo: prompt_provider._validate_variable_values_type(var_defs, variables) @@ -656,18 +656,18 @@ def test_validate_variable_values_type_boolean_invalid(prompt_provider): def test_validate_variable_values_type_integer_valid(prompt_provider): - """测试有效的 integer 类型变量""" + """Test valid integer type variable""" var_defs = [VariableDef(key="count", desc="Item count", type=VariableType.INTEGER)] variables = {"count": 42} - # 应该不抛出异常 + # Should not raise exception prompt_provider._validate_variable_values_type(var_defs, variables) def test_validate_variable_values_type_integer_invalid(prompt_provider): - """测试无效的 integer 类型变量""" + """Test invalid integer type variable""" var_defs = [VariableDef(key="count", desc="Item count", type=VariableType.INTEGER)] - variables = {"count": "42"} # 字符串而不是整数 + variables = {"count": "42"} # String instead of integer with pytest.raises(ValueError) as excinfo: prompt_provider._validate_variable_values_type(var_defs, variables) @@ -676,18 +676,18 @@ def test_validate_variable_values_type_integer_invalid(prompt_provider): def test_validate_variable_values_type_float_valid(prompt_provider): - """测试有效的 float 类型变量""" + """Test valid float type variable""" var_defs = [VariableDef(key="temperature", desc="Temperature value", type=VariableType.FLOAT)] variables = {"temperature": 3.14} - # 应该不抛出异常 + # Should not raise exception prompt_provider._validate_variable_values_type(var_defs, variables) def test_validate_variable_values_type_float_invalid(prompt_provider): - """测试无效的 float 类型变量""" + """Test invalid float type variable""" var_defs = [VariableDef(key="temperature", desc="Temperature value", type=VariableType.FLOAT)] - variables = {"temperature": "3.14"} # 字符串而不是浮点数 + variables = {"temperature": "3.14"} # String instead of float with pytest.raises(ValueError) as excinfo: prompt_provider._validate_variable_values_type(var_defs, variables) @@ -696,18 +696,18 @@ def test_validate_variable_values_type_float_invalid(prompt_provider): def test_validate_variable_values_type_array_string_valid(prompt_provider): - """测试有效的 array 类型变量""" + """Test valid array type variable""" var_defs = [VariableDef(key="tags", desc="Tag list", type=VariableType.ARRAY_STRING)] variables = {"tags": ["tag1", "tag2", "tag3"]} - # 应该不抛出异常 + # Should not raise exception prompt_provider._validate_variable_values_type(var_defs, variables) def test_validate_variable_values_type_array_string_invalid_not_list(prompt_provider): - """测试无效的 array 类型变量 - 不是列表""" + """Test invalid array type variable - not a list""" var_defs = [VariableDef(key="tags", desc="Tag list", type=VariableType.ARRAY_STRING)] - variables = {"tags": "tag1,tag2,tag3"} # 字符串而不是列表 + variables = {"tags": "tag1,tag2,tag3"} # String instead of list with pytest.raises(ValueError) as excinfo: prompt_provider._validate_variable_values_type(var_defs, variables) @@ -716,9 +716,9 @@ def test_validate_variable_values_type_array_string_invalid_not_list(prompt_prov def test_validate_variable_values_type_array_string_invalid_wrong_element_type(prompt_provider): - """测试无效的 array 类型变量 - 元素类型错误""" + """Test invalid array type variable - wrong element type""" var_defs = [VariableDef(key="tags", desc="Tag list", type=VariableType.ARRAY_STRING)] - variables = {"tags": ["tag1", 123, "tag3"]} # 包含非字符串元素 + variables = {"tags": ["tag1", 123, "tag3"]} # Contains non-string elements with pytest.raises(ValueError) as excinfo: prompt_provider._validate_variable_values_type(var_defs, variables) @@ -727,18 +727,18 @@ def test_validate_variable_values_type_array_string_invalid_wrong_element_type(p def test_validate_variable_values_type_array_boolean_valid(prompt_provider): - """测试有效的 array 类型变量""" + """Test valid array type variable""" var_defs = [VariableDef(key="flags", desc="Boolean flags", type=VariableType.ARRAY_BOOLEAN)] variables = {"flags": [True, False, True]} - # 应该不抛出异常 + # Should not raise exception prompt_provider._validate_variable_values_type(var_defs, variables) def test_validate_variable_values_type_array_boolean_invalid(prompt_provider): - """测试无效的 array 类型变量""" + """Test invalid array type variable""" var_defs = [VariableDef(key="flags", desc="Boolean flags", type=VariableType.ARRAY_BOOLEAN)] - variables = {"flags": [True, "false", True]} # 包含字符串而不是布尔值 + variables = {"flags": [True, "false", True]} # Contains string instead of boolean with pytest.raises(ValueError) as excinfo: prompt_provider._validate_variable_values_type(var_defs, variables) @@ -747,16 +747,16 @@ def test_validate_variable_values_type_array_boolean_invalid(prompt_provider): def test_validate_variable_values_type_array_integer_valid(prompt_provider): - """测试有效的 array 类型变量""" + """Test valid array type variable""" var_defs = [VariableDef(key="numbers", desc="Number list", type=VariableType.ARRAY_INTEGER)] variables = {"numbers": [1, 2, 3, 4, 5]} - # 应该不抛出异常 + # Should not raise exception prompt_provider._validate_variable_values_type(var_defs, variables) def test_validate_variable_values_type_array_integer_invalid(prompt_provider): - """测试无效的 array 类型变量""" + """Test invalid array type variable""" var_defs = [VariableDef(key="numbers", desc="Number list", type=VariableType.ARRAY_INTEGER)] variables = {"numbers": [1, "2", 3]} # 包含字符串而不是整数