From 433f63ea6e59cc097ce1f67706d86284375f7d4b Mon Sep 17 00:00:00 2001 From: "jiangqi.rrt" Date: Thu, 11 Sep 2025 15:22:21 +0800 Subject: [PATCH] trace add openai wrapper --- CHANGLOG.md | 3 + cozeloop/decorator/decorator.py | 31 +- cozeloop/integration/wrapper/__init__.py | 3 + cozeloop/integration/wrapper/_openai.py | 273 ++++++++++++++++++ examples/trace/wrapper_openai/__init__.py | 0 .../wrapper_openai/async_azure_openai_chat.py | 67 +++++ .../async_azure_openai_responses.py | 62 ++++ .../trace/wrapper_openai/async_openai_chat.py | 65 +++++ .../wrapper_openai/sync_azure_openai_chat.py | 61 ++++ .../sync_azure_openai_responses.py | 57 ++++ .../trace/wrapper_openai/sync_openai_chat.py | 58 ++++ pyproject.toml | 2 +- 12 files changed, 677 insertions(+), 5 deletions(-) create mode 100644 cozeloop/integration/wrapper/__init__.py create mode 100644 cozeloop/integration/wrapper/_openai.py create mode 100644 examples/trace/wrapper_openai/__init__.py create mode 100644 examples/trace/wrapper_openai/async_azure_openai_chat.py create mode 100644 examples/trace/wrapper_openai/async_azure_openai_responses.py create mode 100644 examples/trace/wrapper_openai/async_openai_chat.py create mode 100644 examples/trace/wrapper_openai/sync_azure_openai_chat.py create mode 100644 examples/trace/wrapper_openai/sync_azure_openai_responses.py create mode 100644 examples/trace/wrapper_openai/sync_openai_chat.py diff --git a/CHANGLOG.md b/CHANGLOG.md index 562a4ed..6b447a0 100644 --- a/CHANGLOG.md +++ b/CHANGLOG.md @@ -1,3 +1,6 @@ +## [0.1.14] - 2025-09-11 +### Added +- trace support openai wrapper ## [0.1.13] - 2025-09-10 ### Added diff --git a/cozeloop/decorator/decorator.py b/cozeloop/decorator/decorator.py index 32ccd68..7bda41b 100644 --- a/cozeloop/decorator/decorator.py +++ b/cozeloop/decorator/decorator.py @@ -1,5 +1,6 @@ # Copyright (c) 2025 Bytedance Ltd. and/or its affiliates # SPDX-License-Identifier: MIT +import time from typing import Optional, Callable, Any, overload, Dict, Generic, Iterator, TypeVar, List, cast, AsyncIterator from functools import wraps @@ -82,7 +83,7 @@ def sync_wrapper(*args: Any, **kwargs: Any): output = res if process_outputs: output = process_outputs(output) - + inject_inner_token(span, output) span.set_output(output) except StopIteration: pass @@ -116,6 +117,7 @@ async def async_wrapper(*args: Any, **kwargs: Any): output = res if process_outputs: output = process_outputs(output) + inject_inner_token(span, output) span.set_output(output) except StopIteration: pass @@ -227,7 +229,7 @@ def sync_stream_wrapper(*args: Any, **kwargs: Any): res = func(*args, **kwargs) output = res if hasattr(output, "__iter__"): - return _CozeLoopTraceStream(output, span, process_iterator_outputs) + return _CozeLoopTraceStream(output, span, process_iterator_outputs, span_type) if process_outputs: output = process_outputs(output) @@ -262,7 +264,7 @@ async def async_stream_wrapper(*args: Any, **kwargs: Any): res = await func(*args, **kwargs) output = res if hasattr(output, "__aiter__"): - return _CozeLoopAsyncTraceStream(output, span, process_iterator_outputs) + return _CozeLoopAsyncTraceStream(output, span, process_iterator_outputs, span_type) if process_outputs: output = process_outputs(output) span.set_output(output) @@ -289,7 +291,6 @@ async def async_stream_wrapper(*args: Any, **kwargs: Any): if not hasattr(res, "__aiter__") and res: return res - if is_async_gen_func(func): return async_gen_wrapper if is_gen_func(func): @@ -317,11 +318,14 @@ def __init__( stream: Iterator[S], span: Span, process_iterator_outputs: Optional[Callable[[Any], Any]] = None, + span_type: str = "", ): self.__stream__ = stream self.__span = span self.__output__: list[S] = [] self.__process_iterator_outputs = process_iterator_outputs + self.__is_set_start_time_first_token: bool = False + self.__span_type = span_type def __next__(self) -> S: try: @@ -360,6 +364,9 @@ def __streamer__( while True: s = next(temp_stream) self.__output__.append(s) + if not self.__is_set_start_time_first_token and self.__span_type == "model": + self.__span.set_start_time_first_resp(time.time_ns() // 1_000) + self.__is_set_start_time_first_token = True yield s except StopIteration as e: return e @@ -367,6 +374,7 @@ def __streamer__( def __end__(self, err: Exception = None): if self.__process_iterator_outputs: self.__output__ = self.__process_iterator_outputs(self.__output__) + inject_inner_token(self.__span, self.__output__) self.__span.set_output(self.__output__) if err: self.__span.set_error(err) @@ -379,17 +387,21 @@ def __init__( stream: AsyncIterator[S], span: Span, process_iterator_outputs: Optional[Callable[[Any], Any]] = None, + span_type: str = "", ): self.__stream__ = stream self.__span = span self.__output__: list[S] = [] self.__process_iterator_outputs = process_iterator_outputs + self.__is_set_start_time_first_token: bool = False + self.__span_type = span_type async def _aend(self, error: Optional[Exception] = None): if error: self.__span.set_error(error) if self.__process_iterator_outputs: self.__output__ = self.__process_iterator_outputs(self.__output__) + inject_inner_token(self.__span, self.__output__) self.__span.set_output(self.__output__) self.__span.finish() @@ -433,6 +445,17 @@ async def __async_streamer__( while True: s = await temp_stream.__anext__() self.__output__.append(s) + if not self.__is_set_start_time_first_token and self.__span_type == "model": + self.__span.set_start_time_first_resp(time.time_ns() // 1_000) + self.__is_set_start_time_first_token = True yield s except StopIteration: pass + + +def inject_inner_token(span: Span, src): + if isinstance(src, dict) and src.get("_inner_tokens_dict"): + if input_tokens := src.get("_inner_tokens_dict").get("input_tokens", 0): + span.set_input_tokens(input_tokens) + if output_tokens := src.get("_inner_tokens_dict").get("output_tokens", 0): + span.set_output_tokens(output_tokens) diff --git a/cozeloop/integration/wrapper/__init__.py b/cozeloop/integration/wrapper/__init__.py new file mode 100644 index 0000000..5d2427e --- /dev/null +++ b/cozeloop/integration/wrapper/__init__.py @@ -0,0 +1,3 @@ +from cozeloop.integration.wrapper._openai import openai_wrapper + +__all__ = ["openai_wrapper"] diff --git a/cozeloop/integration/wrapper/_openai.py b/cozeloop/integration/wrapper/_openai.py new file mode 100644 index 0000000..8b0e214 --- /dev/null +++ b/cozeloop/integration/wrapper/_openai.py @@ -0,0 +1,273 @@ +import logging +from collections import defaultdict +from typing import Any, Callable, Optional +from functools import wraps + +from cozeloop import get_span_from_context +from cozeloop.decorator import observe +from cozeloop.decorator.utils import is_async_func +from cozeloop.spec.tracespec import ModelCallOption + +from openai.types.chat import ChatCompletionChunk +from openai.types.chat.chat_completion_chunk import Choice, ChoiceDeltaToolCall +from openai.types.responses import ResponseStreamEvent + + +def openai_wrapper( + client: Any, + *, + chat_name: str = "ChatOpenAI", +) -> Any: + model_provider = "openai" + try: + from openai import AsyncOpenAI, OpenAI, AsyncAzureOpenAI, AzureOpenAI + if not isinstance(client, (AsyncOpenAI, OpenAI, AzureOpenAI, AsyncAzureOpenAI)): + return client + if isinstance(client, (AzureOpenAI, AsyncAzureOpenAI)): + model_provider = "azure" + if chat_name == "ChatOpenAI": + chat_name = "AzureChatOpenAI" + except Exception as e: + logging.warning(f"import OpenAI error: {e}") + return client + + client.chat.completions.create = _get_openai_wrapper( + client.chat.completions.create, + chat_name, + model_provider, + gen_tags_func=_get_openai_model_tags, + process_inputs=_process_chat_completion_input, + process_outputs=_process_chat_completion_output, + process_iterator_outputs=_process_chat_completion_iter_output, + ) + + if hasattr(client, "responses"): + if hasattr(client.responses, "create"): + client.responses.create = _get_openai_wrapper( + client.responses.create, + chat_name, + model_provider, + gen_tags_func=_get_openai_model_tags, + process_inputs=_process_chat_completion_input, + process_outputs=_process_responses_output, + process_iterator_outputs=_process_response_iter_output, + ) + + return client + + +def _get_openai_wrapper( + original_func: Callable, + name: str, + model_provider: str, + gen_tags_func: Optional[Callable] = None, + process_inputs: Optional[Callable] = None, + process_outputs: Optional[Callable] = None, + process_iterator_outputs: Callable = None +) -> Callable: + @wraps(original_func) + def sync_create(*args, **kwargs): + tags = gen_tags_func(model_provider, **kwargs) + decorator = observe( + name=name, + span_type="model", + tags=tags, + process_inputs=process_inputs, + process_outputs=process_outputs, + process_iterator_outputs=process_iterator_outputs if kwargs.get("stream") is True else None, + ) + + return decorator(original_func)(*args, **kwargs) + + @wraps(original_func) + async def async_create(*args, **kwargs): + tags = gen_tags_func(model_provider, **kwargs) + decorator = observe( + name=name, + span_type="model", + tags=tags, + process_inputs=process_inputs, + process_outputs=process_outputs, + process_iterator_outputs=process_iterator_outputs if kwargs.get("stream") is True else None, + + ) + return await decorator(original_func)(*args, **kwargs) + + return async_create if is_async_func(original_func) else sync_create + + +def _get_openai_model_tags(provider: str, **kwargs) -> dict[str, Any]: + temperature = kwargs.get("temperature", 0.0) + if not isinstance(temperature, float): + temperature = float(temperature) + stop_sequences = kwargs.get("stop") + if stop_sequences and isinstance(stop_sequences, str): + stop_sequences = [stop_sequences] + opt = ModelCallOption( + temperature=temperature, + max_tokens=kwargs.get("max_tokens") or kwargs.get("max_completion_tokens") or kwargs.get("max_output_tokens"), + stop=stop_sequences, + top_p=kwargs.get("top_p"), + n=kwargs.get("n"), + frequency_penalty=kwargs.get("frequency_penalty"), + presence_penalty=kwargs.get("presence_penalty"), + ) + call_options = '{}' + try: + call_options = opt.model_dump_json() + except Exception as e: + pass + + return { + "model_provider": provider, + "model_name": kwargs.get("model"), + "call_options": call_options, + "stream": kwargs.get("stream", False), + } + + +def _process_chat_completion_input(input: dict): + return input.get('kwargs', '') + + +def _process_chat_completion_output(outputs: Any): + try: + output_dict = outputs.model_dump() + usage = output_dict.pop("usage", None) + output_dict["_inner_tokens_dict"] = ( + _create_inner_tokens_dict(usage) if usage else None + ) + _compatibility_with_frontend(output_dict) + return output_dict + except BaseException as e: + return outputs + + +def _compatibility_with_frontend(output_dict: Any): # for compatibility with frontend render + if output_dict.get("choices"): + for choice in output_dict["choices"]: + if choice.get("index") is None: + choice["index"] = 0 + if choice.get("message") and choice["message"].get("tool_calls") is None: + choice["message"]["tool_calls"] = [] + + +def _create_inner_tokens_dict(token_usage: dict) -> dict: + if not token_usage: + return {} + input_tokens = ( + token_usage.get("prompt_tokens") or token_usage.get("input_tokens") or 0 + ) + output_tokens = ( + token_usage.get("completion_tokens") + or token_usage.get("output_tokens") + or 0 + ) + return { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + } + + +def _process_chat_completion_iter_output(all_chunks: list[ChatCompletionChunk]) -> dict: + choices_by_index: defaultdict[int, list[Choice]] = defaultdict(list) + for chunk in all_chunks: + for choice in chunk.choices: + choices_by_index[choice.index].append(choice) + if all_chunks: + d = all_chunks[-1].model_dump() + d["choices"] = [ + _convert_choices(choices) for choices in choices_by_index.values() + ] + else: + d = {"choices": [{"message": {"role": "assistant", "content": ""}}]} + oai_token_usage = d.pop("usage", None) + d["_inner_tokens_dict"] = ( + _create_inner_tokens_dict(oai_token_usage) if oai_token_usage else None + ) + return d + + +def _convert_choices(choices: list[Choice]) -> dict: + reversed_choices = list(reversed(choices)) + message: dict[str, Any] = { + "role": "assistant", + "content": "", + } + for c in reversed_choices: + if hasattr(c, "delta") and getattr(c.delta, "role", None): + message["role"] = c.delta.role + break + tool_calls: defaultdict[int, list[ChoiceDeltaToolCall]] = defaultdict(list) + for c in choices: + if hasattr(c, "delta"): + if getattr(c.delta, "content", None): + message["content"] += c.delta.content + if getattr(c.delta, "reasoning_content", None): + if message.get("reasoning_content") is None: + message["reasoning_content"] = "" + message["reasoning_content"] += c.delta.reasoning_content + if getattr(c.delta, "function_call", None): + if not message.get("function_call"): + message["function_call"] = {"name": "", "arguments": ""} + name_ = getattr(c.delta.function_call, "name", None) + if name_: + message["function_call"]["name"] += name_ + arguments_ = getattr(c.delta.function_call, "arguments", None) + if arguments_: + message["function_call"]["arguments"] += arguments_ + if getattr(c.delta, "tool_calls", None): + tool_calls_list = c.delta.tool_calls + if tool_calls_list is not None: + for tool_call in tool_calls_list: + tool_calls[tool_call.index].append(tool_call) + if tool_calls: + message["tool_calls"] = [None for _ in range(max(tool_calls.keys()) + 1)] + for index, tool_call_chunks in tool_calls.items(): + message["tool_calls"][index] = { + "index": index, + "id": next((c.id for c in tool_call_chunks if c.id), None), + "type": next((c.type for c in tool_call_chunks if c.type), None), + "function": {"name": "", "arguments": ""}, + } + for chunk in tool_call_chunks: + if getattr(chunk, "function", None): + name_ = getattr(chunk.function, "name", None) + if name_: + message["tool_calls"][index]["function"]["name"] += name_ + arguments_ = getattr(chunk.function, "arguments", None) + if arguments_: + message["tool_calls"][index]["function"][ + "arguments" + ] += arguments_ + return { + "index": getattr(choices[0], "index", 0) if choices else 0, + "finish_reason": next( + ( + c.finish_reason + for c in reversed_choices + if getattr(c, "finish_reason", None) + ), + None, + ), + "message": message, + } + + +def _process_response_iter_output(outputs: list[ResponseStreamEvent]) -> dict: + for output in outputs: + if output.type == "response.completed": + return _process_responses_output(output.response) + return {} + + +def _process_responses_output(response: Any) -> dict: + if response: + try: + output = response.model_dump(exclude_none=True, mode="json") + if usage := output.pop("usage", None): + output["_inner_tokens_dict"] = _create_inner_tokens_dict(usage) + return output + except Exception: + return {"output": response} + return {} diff --git a/examples/trace/wrapper_openai/__init__.py b/examples/trace/wrapper_openai/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/trace/wrapper_openai/async_azure_openai_chat.py b/examples/trace/wrapper_openai/async_azure_openai_chat.py new file mode 100644 index 0000000..11ca091 --- /dev/null +++ b/examples/trace/wrapper_openai/async_azure_openai_chat.py @@ -0,0 +1,67 @@ +import asyncio + +from openai import AsyncAzureOpenAI + +from cozeloop import new_client +from cozeloop.decorator import observe +from cozeloop.integration.wrapper import openai_wrapper + +openai_client = openai_wrapper(AsyncAzureOpenAI( + # azure_endpoint="***", + api_key="***", + azure_deployment="gpt-5-chat-2025-08-07", + api_version="", +)) + + +def retriever(): + results = ["John worked at Beijing"] + return results + + +@observe +async def rag(question): + docs = retriever() + system_message = """Answer the question using only the provided information below: + + {docs}""".format(docs="\n".join(docs)) + + # not stream + res = await openai_client.chat.completions.create( + messages=[ + {"role": "system", "content": system_message}, + {"role": "user", "content": question}, + ], + model="", + ) + print(res) + + # stream + # res = await openai_client.chat.completions.create( + # messages=[ + # {"role": "system", "content": system_message}, + # {"role": "user", "content": question}, + # ], + # model="", + # stream=True, + # extra_body={ + # "stream_options": { + # "include_usage": True # bytedance gpt, return usage by this param + # } + # } + # ) + # try: + # async for chunk in res: + # print(chunk) + # except Exception as e: + # print(e) + + +if __name__ == '__main__': + # Set the following environment variables first (Assuming you are using a PAT token.). + # os.environ["COZELOOP_WORKSPACE_ID"] = "your workspace id" + # os.environ["COZELOOP_API_TOKEN"] = "your token" + + client = new_client() + asyncio.run(rag("Where is John worked?")) + client.flush() diff --git a/examples/trace/wrapper_openai/async_azure_openai_responses.py b/examples/trace/wrapper_openai/async_azure_openai_responses.py new file mode 100644 index 0000000..7f28983 --- /dev/null +++ b/examples/trace/wrapper_openai/async_azure_openai_responses.py @@ -0,0 +1,62 @@ +import asyncio + +from openai import AsyncAzureOpenAI + +from cozeloop import new_client +from cozeloop.decorator import observe +from cozeloop.integration.wrapper import openai_wrapper + +openai_client = openai_wrapper(AsyncAzureOpenAI( + # azure_endpoint="***", + api_key="***", + azure_deployment="gpt-5-chat-2025-08-07", + api_version="", +)) + + +def retriever(): + results = ["John worked at Beijing"] + return results + + +@observe +async def rag(question): + docs = retriever() + system_message = """Answer the question using only the provided information below: + + {docs}""".format(docs="\n".join(docs)) + + # not stream + res = await openai_client.responses.create( + input=[ + {"role": "system", "content": system_message}, + {"role": "user", "content": question}, + ], + model="gpt-5-chat-2025-08-07", + ) + print(res) + + # stream + # res = await openai_client.responses.create( + # input=[ + # {"role": "system", "content": system_message}, + # {"role": "user", "content": question}, + # ], + # model="gpt-5-chat-2025-08-07", + # stream=True, + # ) + # try: + # async for chunk in res: + # print(chunk) + # except Exception as e: + # print(e) + + +if __name__ == '__main__': + # Set the following environment variables first (Assuming you are using a PAT token.). + # os.environ["COZELOOP_WORKSPACE_ID"] = "your workspace id" + # os.environ["COZELOOP_API_TOKEN"] = "your token" + + client = new_client() + asyncio.run(rag("Where is John worked?")) + client.flush() diff --git a/examples/trace/wrapper_openai/async_openai_chat.py b/examples/trace/wrapper_openai/async_openai_chat.py new file mode 100644 index 0000000..3f81016 --- /dev/null +++ b/examples/trace/wrapper_openai/async_openai_chat.py @@ -0,0 +1,65 @@ +import asyncio + +from openai import AsyncOpenAI + +from cozeloop import new_client +from cozeloop.decorator import observe +from cozeloop.integration.wrapper import openai_wrapper + +openai_client = openai_wrapper(AsyncOpenAI( + base_url="https://ark.cn-beijing.volces.com/api/v3", # use ark model, refer: https://www.volcengine.com/docs/82379/1361424 + api_key="***", +)) + + +def retriever(): + results = ["John worked at Beijing"] + return results + + +@observe +async def rag(question): + docs = retriever() + system_message = """Answer the question using only the provided information below: + + {docs}""".format(docs="\n".join(docs)) + + # not stream + res = await openai_client.chat.completions.create( + messages=[ + {"role": "system", "content": system_message}, + {"role": "user", "content": question}, + ], + model="doubao-1-5-vision-pro-32k-250115", + ) + print(res) + + # stream + # res = await openai_client.chat.completions.create( + # messages=[ + # {"role": "system", "content": system_message}, + # {"role": "user", "content": question}, + # ], + # model="doubao-1-5-vision-pro-32k-250115", + # stream=True, + # extra_body={ + # "stream_options": { + # "include_usage": True # bytedance gpt, return usage by this param + # } + # } + # ) + # try: + # async for chunk in res: + # print(chunk) + # except Exception as e: + # print(e) + + +if __name__ == '__main__': + # Set the following environment variables first (Assuming you are using a PAT token.). + # os.environ["COZELOOP_WORKSPACE_ID"] = "your workspace id" + # os.environ["COZELOOP_API_TOKEN"] = "your token" + + client = new_client() + asyncio.run(rag("Where is John worked?")) + client.flush() diff --git a/examples/trace/wrapper_openai/sync_azure_openai_chat.py b/examples/trace/wrapper_openai/sync_azure_openai_chat.py new file mode 100644 index 0000000..570b70a --- /dev/null +++ b/examples/trace/wrapper_openai/sync_azure_openai_chat.py @@ -0,0 +1,61 @@ +from openai import AzureOpenAI + +from cozeloop import new_client +from cozeloop.decorator import observe +from cozeloop.integration.wrapper import openai_wrapper + +openai_client = openai_wrapper(AzureOpenAI( + # azure_endpoint="***", + api_key="***", + azure_deployment="gpt-5-chat-2025-08-07", + api_version="", +)) + + +def retriever(): + results = ["John worked at Beijing"] + return results + +@observe +def rag(question): + docs = retriever() + system_message = """Answer the question using only the provided information below: + + {docs}""".format(docs="\n".join(docs)) + + # not stream + res = openai_client.chat.completions.create( + messages=[ + {"role": "system", "content": system_message}, + {"role": "user", "content": question}, + ], + model="", + ) + print(res) + + # stream + # res = openai_client.chat.completions.create( + # messages=[ + # {"role": "system", "content": system_message}, + # {"role": "user", "content": question}, + # ], + # model="", + # stream=True, + # extra_body={ + # "stream_options": { + # "include_usage": True # bytedance gpt, return usage by this param + # } + # } + # ) + # for chunk in res: + # print(chunk) + + +if __name__ == '__main__': + # Set the following environment variables first (Assuming you are using a PAT token.). + # os.environ["COZELOOP_WORKSPACE_ID"] = "your workspace id" + # os.environ["COZELOOP_API_TOKEN"] = "your token" + + client = new_client() + rag("Where is John worked?") + client.flush() diff --git a/examples/trace/wrapper_openai/sync_azure_openai_responses.py b/examples/trace/wrapper_openai/sync_azure_openai_responses.py new file mode 100644 index 0000000..ac726c1 --- /dev/null +++ b/examples/trace/wrapper_openai/sync_azure_openai_responses.py @@ -0,0 +1,57 @@ +from openai import AzureOpenAI + +from cozeloop import new_client +from cozeloop.decorator import observe +from cozeloop.integration.wrapper import openai_wrapper + +openai_client = openai_wrapper(AzureOpenAI( + # azure_endpoint="***", + api_key="***", + azure_deployment="gpt-5-chat-2025-08-07", + api_version="", +)) + + +def retriever(): + results = ["John worked at Beijing"] + return results + + +@observe +def rag(question): + docs = retriever() + system_message = """Answer the question using only the provided information below: + + {docs}""".format(docs="\n".join(docs)) + + # not stream + res = openai_client.responses.create( + input=[ + {"role": "system", "content": system_message}, + {"role": "user", "content": question}, + ], + model="gpt-5-chat-2025-08-07", + ) + print(res) + + # stream + # res = openai_client.responses.create( + # input=[ + # {"role": "system", "content": system_message}, + # {"role": "user", "content": question}, + # ], + # model="gpt-5-chat-2025-08-07", + # stream=True, + # ) + # for chunk in res: + # print(chunk) + + +if __name__ == '__main__': + # Set the following environment variables first (Assuming you are using a PAT token.). + # os.environ["COZELOOP_WORKSPACE_ID"] = "your workspace id" + # os.environ["COZELOOP_API_TOKEN"] = "your token" + + client = new_client() + rag("Where is John worked?") + client.flush() diff --git a/examples/trace/wrapper_openai/sync_openai_chat.py b/examples/trace/wrapper_openai/sync_openai_chat.py new file mode 100644 index 0000000..b58c196 --- /dev/null +++ b/examples/trace/wrapper_openai/sync_openai_chat.py @@ -0,0 +1,58 @@ +from openai import OpenAI + +from cozeloop import new_client +from cozeloop.decorator import observe +from cozeloop.integration.wrapper import openai_wrapper + +openai_client = openai_wrapper(OpenAI( + base_url="https://ark.cn-beijing.volces.com/api/v3", # use ark model, refer: https://www.volcengine.com/docs/82379/1361424 + api_key="***", +)) + +def retriever(): + results = ["John worked at Beijing"] + return results + +@observe +def rag(question): + docs = retriever() + system_message = """Answer the question using only the provided information below: + + {docs}""".format(docs="\n".join(docs)) + + # not stream + # res = openai_client.chat.completions.create( # chat completion + # messages=[ + # {"role": "system", "content": system_message}, + # {"role": "user", "content": question}, + # ], + # model="doubao-1-5-vision-pro-32k-250115", + # ) + # print(res) + + # stream + res = openai_client.chat.completions.create( # chat completion + messages=[ + {"role": "system", "content": system_message}, + {"role": "user", "content": question}, + ], + model="doubao-1-5-vision-pro-32k-250115", + stream=True, + extra_body={ + "stream_options": { + "include_usage": True # ark model, return token usage by this param + } + } + ) + for chunk in res: + print(chunk) + + +if __name__ == '__main__': + # Set the following environment variables first (Assuming you are using a PAT token.). + # os.environ["COZELOOP_WORKSPACE_ID"] = "your workspace id" + # os.environ["COZELOOP_API_TOKEN"] = "your token" + + client = new_client() + rag("Where is John worked?") + client.flush() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 760d3c1..ff5a21e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cozeloop" -version = "0.1.13" +version = "0.1.14" description = "coze loop sdk" authors = ["JiangQi715 "] license = "MIT"