Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cozeloop/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ def execute_prompt(
timeout: Optional[int] = None
) -> Union[ExecuteResult, StreamReader[ExecuteResult]]:
"""
执行Prompt请求
Execute Prompt request

:param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟)
:param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes)
"""
if self._closed:
raise ClientClosedError()
Expand All @@ -310,9 +310,9 @@ async def aexecute_prompt(
timeout: Optional[int] = None
) -> Union[ExecuteResult, StreamReader[ExecuteResult]]:
"""
异步执行Prompt请求
Asynchronously execute Prompt request

:param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟)
:param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes)
"""
if self._closed:
raise ClientClosedError()
Expand Down Expand Up @@ -436,9 +436,9 @@ def execute_prompt(
timeout: Optional[int] = None
) -> Union[ExecuteResult, StreamReader[ExecuteResult]]:
"""
执行Prompt请求
Execute Prompt request

:param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟)
:param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes)
"""
return get_default_client().execute_prompt(
prompt_key,
Expand All @@ -462,9 +462,9 @@ async def aexecute_prompt(
timeout: Optional[int] = None
) -> Union[ExecuteResult, StreamReader[ExecuteResult]]:
"""
异步执行Prompt请求
Asynchronously execute Prompt request

:param timeout: 请求超时时间(秒),可选,默认为600秒(10分钟)
:param timeout: Request timeout (seconds), optional, default is 600 seconds (10 minutes)
"""
return await get_default_client().aexecute_prompt(
prompt_key,
Expand Down
6 changes: 3 additions & 3 deletions cozeloop/entities/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class Prompt(BaseModel):


class ExecuteParam(BaseModel):
"""Execute参数"""
"""Execute parameters"""
prompt_key: str
version: str = ""
label: str = ""
Expand All @@ -136,13 +136,13 @@ class ExecuteParam(BaseModel):


class TokenUsage(BaseModel):
"""Token使用统计"""
"""Token usage statistics"""
input_tokens: int = 0
output_tokens: int = 0


class ExecuteResult(BaseModel):
"""Execute结果"""
"""Execute result"""
message: Optional[Message] = None
finish_reason: Optional[str] = None
usage: Optional[TokenUsage] = None
Expand Down
12 changes: 6 additions & 6 deletions cozeloop/entities/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,29 @@


class StreamReader(ABC, Generic[T]):
"""流式读取器接口"""
"""Stream reader interface"""

@abstractmethod
def __iter__(self) -> Iterator[T]:
"""支持同步迭代 - for循环直接读取"""
"""Support synchronous iteration - for loop direct reading"""
pass

@abstractmethod
def __next__(self) -> T:
"""支持next()函数调用"""
"""Support next() function call"""
pass

@abstractmethod
def __aiter__(self) -> AsyncIterator[T]:
"""支持异步迭代 - async for循环直接读取"""
"""Support asynchronous iteration - async for loop direct reading"""
pass

@abstractmethod
async def __anext__(self) -> T:
"""支持async next()调用"""
"""Support async next() call"""
pass

@abstractmethod
def close(self):
"""关闭流"""
"""Close stream"""
pass
2 changes: 1 addition & 1 deletion cozeloop/internal/consts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL = 60
DEFAULT_TIMEOUT = 3
DEFAULT_UPLOAD_TIMEOUT = 30
DEFAULT_PROMPT_EXECUTE_TIMEOUT = 600 # 10分钟,专用于execute_prompt和aexecute_prompt方法
DEFAULT_PROMPT_EXECUTE_TIMEOUT = 600 # 10 minutes, dedicated for execute_prompt and aexecute_prompt methods

LOG_ID_HEADER = "x-tt-logid"
AUTHORIZE_HEADER = "Authorization"
Expand Down
51 changes: 46 additions & 5 deletions cozeloop/internal/httpclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def post_stream(
json: Union[BaseModel, Dict] = None,
timeout: Optional[int] = None,
):
"""发起流式POST请求,返回stream_context"""
"""Initiate streaming POST request, return stream_context"""
url = self._build_url(path)
headers = self._set_headers({"Content-Type": "application/json"})

Expand All @@ -140,7 +140,7 @@ def post_stream(
_timeout = timeout if timeout is not None else self.timeout

try:
# 返回stream_context,让StreamReader管理上下文
# Return stream_context, let StreamReader manage context
stream_context = self.http_client.stream(
"POST",
url,
Expand All @@ -153,13 +153,54 @@ def post_stream(
logger.error(f"Http client stream request failed, path: {path}, err: {e}.")
raise consts.NetworkError from e

async def arequest(
self,
path: str,
method: str,
response_model: Type[T],
*,
params: Optional[Dict[str, str]] = None,
form: Optional[Dict[str, str]] = None,
json: Optional[Union[BaseModel, Dict]] = None,
files: Optional[Dict[str, FileType]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
) -> T:
url = self._build_url(path)
_headers = self._set_headers(headers)

_timeout = timeout if timeout is not None else self.timeout

if isinstance(json, BaseModel):
if pydantic.VERSION.startswith('1'):
json = json.dict(by_alias=True)
else:
json = json.model_dump(by_alias=True)

try:
response = await self.http_client.arequest(
method,
url,
params=params,
data=form,
json=json,
files=files,
headers=_headers,
timeout=_timeout
)
except httpx.HTTPError as e:
logger.error(f"Http client request failed, path: {path}, err: {e}.")
raise consts.NetworkError from e

return parse_response(url, response, response_model)

async def apost_stream(
self,
path: str,
json: Union[BaseModel, Dict] = None,
timeout: Optional[int] = None,
):
"""发起异步流式POST请求,返回stream_context"""
"""Initiate asynchronous streaming POST request, return stream_context"""
url = self._build_url(path)
headers = self._set_headers({"Content-Type": "application/json"})

Expand All @@ -169,8 +210,8 @@ async def apost_stream(
_timeout = timeout if timeout is not None else self.timeout

try:
# 返回stream_context,让StreamReader管理上下文
stream_context = self.http_client.stream(
# Return stream_context, let StreamReader manage context
stream_context = self.http_client.astream(
"POST",
url,
json=json,
Expand Down
25 changes: 21 additions & 4 deletions cozeloop/internal/httpclient/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
# SPDX-License-Identifier: MIT

import logging
from typing import Dict, Type, TypeVar
import typing
from typing import Dict, Type, TypeVar, Any, Generator

import httpx
import pydantic
from httpx import URL, Response
from pydantic import ValidationError

from cozeloop.internal import consts
Expand All @@ -16,9 +18,24 @@
T = TypeVar('T', bound=BaseResponse)


class HTTPClient(httpx.Client):
class HTTPClient:
def __init__(self):
super().__init__()
self.sync_client = httpx.Client()
self.async_client = httpx.AsyncClient()

def request(self, method: str, url: URL | str, **kwargs: Any) -> Response:
return self.sync_client.request(method, url, **kwargs)

def stream(self, method: str, url: URL | str, **kwargs: Any):
"""Return synchronous stream context manager"""
return self.sync_client.stream(method, url, **kwargs)

async def arequest(self, method: str, url: URL | str, **kwargs: Any) -> Response:
return await self.async_client.request(method, url, **kwargs)

def astream(self, method: str, url: URL | str, **kwargs: Any):
"""Return asynchronous stream context manager"""
return self.async_client.stream(method, url, **kwargs)


def _check_oauth_error(body: Dict, http_code: int, log_id: str) -> None:
Expand Down Expand Up @@ -63,4 +80,4 @@ def parse_response(url: str, response: httpx.Response, response_model: Type[T])
logger.error(f"Failed to parse response. Path: {url}, http code: {http_code}, log id: {log_id}, error: {e}.")
raise consts.InternalError from e
logger.debug(f"Call remote service success. Path: {url}, response: {res}, log id: {log_id}")
return res
return res
Loading