Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGLOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## [0.1.14] - 2025-09-11
### Added
- trace support openai wrapper

## [0.1.13] - 2025-09-10
### Added
Expand Down
31 changes: 27 additions & 4 deletions cozeloop/decorator/decorator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT
import time
from typing import Optional, Callable, Any, overload, Dict, Generic, Iterator, TypeVar, List, cast, AsyncIterator
from functools import wraps

Expand Down Expand Up @@ -82,7 +83,7 @@ def sync_wrapper(*args: Any, **kwargs: Any):
output = res
if process_outputs:
output = process_outputs(output)

inject_inner_token(span, output)
span.set_output(output)
except StopIteration:
pass
Expand Down Expand Up @@ -116,6 +117,7 @@ async def async_wrapper(*args: Any, **kwargs: Any):
output = res
if process_outputs:
output = process_outputs(output)
inject_inner_token(span, output)
span.set_output(output)
except StopIteration:
pass
Expand Down Expand Up @@ -227,7 +229,7 @@ def sync_stream_wrapper(*args: Any, **kwargs: Any):
res = func(*args, **kwargs)
output = res
if hasattr(output, "__iter__"):
return _CozeLoopTraceStream(output, span, process_iterator_outputs)
return _CozeLoopTraceStream(output, span, process_iterator_outputs, span_type)
if process_outputs:
output = process_outputs(output)

Expand Down Expand Up @@ -262,7 +264,7 @@ async def async_stream_wrapper(*args: Any, **kwargs: Any):
res = await func(*args, **kwargs)
output = res
if hasattr(output, "__aiter__"):
return _CozeLoopAsyncTraceStream(output, span, process_iterator_outputs)
return _CozeLoopAsyncTraceStream(output, span, process_iterator_outputs, span_type)
if process_outputs:
output = process_outputs(output)
span.set_output(output)
Expand All @@ -289,7 +291,6 @@ async def async_stream_wrapper(*args: Any, **kwargs: Any):
if not hasattr(res, "__aiter__") and res:
return res


if is_async_gen_func(func):
return async_gen_wrapper
if is_gen_func(func):
Expand Down Expand Up @@ -317,11 +318,14 @@ def __init__(
stream: Iterator[S],
span: Span,
process_iterator_outputs: Optional[Callable[[Any], Any]] = None,
span_type: str = "",
):
self.__stream__ = stream
self.__span = span
self.__output__: list[S] = []
self.__process_iterator_outputs = process_iterator_outputs
self.__is_set_start_time_first_token: bool = False
self.__span_type = span_type

def __next__(self) -> S:
try:
Expand Down Expand Up @@ -360,13 +364,17 @@ def __streamer__(
while True:
s = next(temp_stream)
self.__output__.append(s)
if not self.__is_set_start_time_first_token and self.__span_type == "model":
self.__span.set_start_time_first_resp(time.time_ns() // 1_000)
self.__is_set_start_time_first_token = True
yield s
except StopIteration as e:
return e

def __end__(self, err: Exception = None):
if self.__process_iterator_outputs:
self.__output__ = self.__process_iterator_outputs(self.__output__)
inject_inner_token(self.__span, self.__output__)
self.__span.set_output(self.__output__)
if err:
self.__span.set_error(err)
Expand All @@ -379,17 +387,21 @@ def __init__(
stream: AsyncIterator[S],
span: Span,
process_iterator_outputs: Optional[Callable[[Any], Any]] = None,
span_type: str = "",
):
self.__stream__ = stream
self.__span = span
self.__output__: list[S] = []
self.__process_iterator_outputs = process_iterator_outputs
self.__is_set_start_time_first_token: bool = False
self.__span_type = span_type

async def _aend(self, error: Optional[Exception] = None):
if error:
self.__span.set_error(error)
if self.__process_iterator_outputs:
self.__output__ = self.__process_iterator_outputs(self.__output__)
inject_inner_token(self.__span, self.__output__)
self.__span.set_output(self.__output__)
self.__span.finish()

Expand Down Expand Up @@ -433,6 +445,17 @@ async def __async_streamer__(
while True:
s = await temp_stream.__anext__()
self.__output__.append(s)
if not self.__is_set_start_time_first_token and self.__span_type == "model":
self.__span.set_start_time_first_resp(time.time_ns() // 1_000)
self.__is_set_start_time_first_token = True
yield s
except StopIteration:
pass


def inject_inner_token(span: Span, src):
if isinstance(src, dict) and src.get("_inner_tokens_dict"):
if input_tokens := src.get("_inner_tokens_dict").get("input_tokens", 0):
span.set_input_tokens(input_tokens)
if output_tokens := src.get("_inner_tokens_dict").get("output_tokens", 0):
span.set_output_tokens(output_tokens)
3 changes: 3 additions & 0 deletions cozeloop/integration/wrapper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from cozeloop.integration.wrapper._openai import openai_wrapper

__all__ = ["openai_wrapper"]
Loading