diff --git a/src/examples/crewai_example/__init__.py b/src/examples/crewai_example/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/examples/crewai_example/simple_agent/__init__.py b/src/examples/crewai_example/simple_agent/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/examples/crewai_example/simple_agent/agents.py b/src/examples/crewai_example/simple_agent/agents.py new file mode 100644 index 00000000..8557f82d --- /dev/null +++ b/src/examples/crewai_example/simple_agent/agents.py @@ -0,0 +1,32 @@ +from crewai import Agent +from langchain_openai import ChatOpenAI +from langchain_anthropic import ChatAnthropic +from langchain_cohere import ChatCohere +from langchain_ollama import ChatOllama + + +class PoetryAgents: + def __init__(self): + self.open_ai = ChatOpenAI( + model_name="gpt-4", temperature=0.7, stream_usage=True + ) + self.anthropic = ChatAnthropic( + model_name="claude-3-5-sonnet-20240620", temperature=0.7 + ) + + self.cohere = ChatCohere(model="command-r", temperature=0.7) + self.ollama = ChatOllama(model="llama3", temperature=0.7) + + def create_poet_agent(self): + return Agent( + role="Expert Poetry Writer", + backstory=""" + I am an Expert in poetry writing and creative expression. + I have been writing poetry for over 10 years and have published several collections. + I have a deep understanding of various poetic forms, styles, and themes. I am here to help you create beautiful and meaningful poetry that resonates with your emotions and experiences. + """, + goal="""Create a poem that captures the essence of a given theme or emotion""", + allow_delegation=False, + verbose=True, + llm=self.ollama, + ) diff --git a/src/examples/crewai_example/simple_agent/main.py b/src/examples/crewai_example/simple_agent/main.py new file mode 100644 index 00000000..61230bef --- /dev/null +++ b/src/examples/crewai_example/simple_agent/main.py @@ -0,0 +1,48 @@ +from crewai import Crew +from textwrap import dedent +from .agents import PoetryAgents +from .tasks import PoetryTasks +from langtrace_python_sdk import langtrace +from dotenv import load_dotenv +import agentops + +load_dotenv() +agentops.init() +langtrace.init(write_spans_to_console=False, batch=False) + + +class PoetryCrew: + def __init__(self, topic) -> None: + self.topic = topic + + def run(self): + agents = PoetryAgents() + tasks = PoetryTasks() + + poetry_agent = agents.create_poet_agent() + + create_poem = tasks.create_poem(poetry_agent, self.topic) + + crew = Crew(agents=[poetry_agent], tasks=[create_poem], verbose=True) + res = crew.kickoff() + return res + + +# This is the main function that you will use to run your custom crew. +if __name__ == "__main__": + print("## Welcome to Poetry Crew") + print("-------------------------------") + topic = input( + dedent( + """ + What topic do you want to write a poem on? + """ + ) + ) + + poetry_crew = PoetryCrew(topic=topic) + result = poetry_crew.run() + print("\n\n########################") + print("## Here is you poem") + print("########################\n") + print(result) diff --git a/src/examples/crewai_example/simple_agent/tasks.py b/src/examples/crewai_example/simple_agent/tasks.py new file mode 100644 index 00000000..b44952c1 --- /dev/null +++ b/src/examples/crewai_example/simple_agent/tasks.py @@ -0,0 +1,21 @@ +from crewai import Task +from textwrap import dedent + + +class PoetryTasks: + def create_poem(self, agent, topic): + return Task( + description=dedent( + f""" + **Task**: Create a Poem on {topic} + **Description**: Write a poem on the given topic that captures the essence of the theme or emotion. + The poem should be creative, expressive, and resonate with the reader's emotions and experiences. + Your poem should be well-structured, engaging, and evoke a sense of beauty and meaning. + + **Parameters**: + - Topic: {topic} + """ + ), + expected_output="A creative and expressive poem that captures the essence of the given topic.", + agent=agent, + ) diff --git a/src/examples/crewai_example/trip_planner/__init__.py b/src/examples/crewai_example/trip_planner/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/examples/crewai_example/trip_planner/agents.py b/src/examples/crewai_example/trip_planner/agents.py new file mode 100644 index 00000000..fa21b382 --- /dev/null +++ b/src/examples/crewai_example/trip_planner/agents.py @@ -0,0 +1,54 @@ +from crewai import Agent +from langchain_openai import ChatOpenAI +from langchain_ollama import ChatOllama +from langchain_cohere import ChatCohere +from langchain_anthropic import ChatAnthropic +from dotenv import load_dotenv + +load_dotenv() + + +class TravelAgents: + def __init__(self): + self.OpenAIGPT35 = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.7) + self.OpenAIGPT4 = ChatOpenAI(model_name="gpt-4", temperature=0.7) + self.Ollama = ChatOllama(model="openhermes") + self.Cohere = ChatCohere(model="command-r") + self.Anthropic = ChatAnthropic(model="claude-3-5-sonnet") + + def expert_travel_agent(self): + return Agent( + role="Expert Travel Agent", + backstory=""" + I am an Expert in travel planning and itinerary creation. + I have been in the travel industry for over 10 years and have helped thousands of clients plan their dream vacations. + I have extensive knowledge of popular travel destinations, local attractions, and travel logistics. I am here to help you create a personalized travel itinerary that suits your preferences and budget. + """, + goal="""Create a 7 day travel itinerary with detailed per-day plans, include budget, packing suggestions, and local/safety tips.""", + # tools=[tool_1, tool_2], + allow_delegation=False, + verbose=True, + llm=self.OpenAIGPT4, + ) + + def city_selection_expert(self): + return Agent( + role="City Selection Expert", + backstory="""Expert at analyzing and selecting the best cities for travel based on data""", + goal="""Select the best cities based on weather, season, prices and traveler preferences""", + # tools=[tool_1, tool_2], + allow_delegation=False, + verbose=True, + llm=self.OpenAIGPT4, + ) + + def local_tour_guide(self): + return Agent( + role="Local Expert at this city", + goal="Provide the BEST insights about the selected city", + backstory="""A knowledgeable local guide with extensive information about the city, it's attractions and customs""", + # tools=[tool_1, tool_2], + allow_delegation=False, + verbose=True, + llm=self.OpenAIGPT4, + ) diff --git a/src/examples/crewai_example/trip_planner/main.py b/src/examples/crewai_example/trip_planner/main.py new file mode 100644 index 00000000..d8b78b1b --- /dev/null +++ b/src/examples/crewai_example/trip_planner/main.py @@ -0,0 +1,96 @@ +from crewai import Crew +from textwrap import dedent +from .agents import TravelAgents +from .tasks import TravelTasks +from langtrace_python_sdk import langtrace +from dotenv import load_dotenv + +load_dotenv() + +langtrace.init() + + +class TripCrew: + def __init__(self, origin, cities, date_range, interests): + self.origin = origin + self.cities = cities + self.date_range = date_range + self.interests = interests + + def run(self): + # Define your custom agents and tasks in agents.py and tasks.py + agents = TravelAgents() + tasks = TravelTasks() + + # Define your custom agents and tasks here + expert_travel_agent = agents.expert_travel_agent() + city_selection_expert = agents.city_selection_expert() + local_tour_guide = agents.local_tour_guide() + + # Custom tasks include agent name and variables as input + plan_itinerary = tasks.plan_itinerary( + expert_travel_agent, self.cities, self.date_range, self.interests + ) + + identify_city = tasks.identify_city( + city_selection_expert, + self.origin, + self.cities, + self.interests, + self.date_range, + ) + + gather_city_info = tasks.gather_city_info( + local_tour_guide, self.cities, self.date_range, self.interests + ) + + # Define your custom crew here + crew = Crew( + agents=[expert_travel_agent, city_selection_expert, local_tour_guide], + tasks=[plan_itinerary, identify_city, gather_city_info], + verbose=True, + ) + + result = crew.kickoff() + return result + + +# This is the main function that you will use to run your custom crew. +if __name__ == "__main__": + print("## Welcome to Trip Planner Crew") + print("-------------------------------") + origin = input( + dedent( + """ + From where will you be traveling from? + """ + ) + ) + cities = input( + dedent( + """ + What are the cities options you are interested in visiting? + """ + ) + ) + date_range = input( + dedent( + """ + What is the date range you are interested in traveling? + """ + ) + ) + interests = input( + dedent( + """ + What are some of your high level interests and hobbies? + """ + ) + ) + + trip_crew = TripCrew(origin, cities, date_range, interests) + result = trip_crew.run() + print("\n\n########################") + print("## Here is you Trip Plan") + print("########################\n") + print(result) diff --git a/src/examples/crewai_example/trip_planner/tasks.py b/src/examples/crewai_example/trip_planner/tasks.py new file mode 100644 index 00000000..fcf7f9f3 --- /dev/null +++ b/src/examples/crewai_example/trip_planner/tasks.py @@ -0,0 +1,120 @@ +from crewai import Task +from textwrap import dedent + +""" +Creating Tasks Cheat Sheet: +- Begin with the end in mind. Identify the specific outcome your tasks are aiming to achieve. +- Break down the outcome into actionable tasks, assigning each task to the appropriate agent. +- Ensure tasks are descriptive, providing clear instructions and expected deliverables. + +Goal: +- Develop a detailed itinerary, including city selection, attractions, and practical travel advice. + +Key Steps for Task Creation: +1. Identify the Desired Outcome: Define what success looks like for your project. + - A detailed 7 day travel itenerary. + +2. Task Breakdown: Divide the goal into smaller, manageable tasks that agents can execute. + - Itenerary Planning: develop a detailed plan for each day of the trip. + - City Selection: Analayze and pick the best cities to visit. + - Local Tour Guide: Find a local expert to provide insights and recommendations. + +3. Assign Tasks to Agents: Match tasks with agents based on their roles and expertise. + +4. Task Description Template: + - Use this template as a guide to define each task in your CrewAI application. + - This template helps ensure that each task is clearly defined, actionable, and aligned with the specific goals of your project. + + Template: + --------- + def [task_name](self, agent, [parameters]): + return Task(description=dedent(f''' + **Task**: [Provide a concise name or summary of the task.] + **Description**: [Detailed description of what the agent is expected to do, including actionable steps and expected outcomes. This should be clear and direct, outlining the specific actions required to complete the task.] + + **Parameters**: + - [Parameter 1]: [Description] + - [Parameter 2]: [Description] + ... [Add more parameters as needed.] + + **Note**: [Optional section for incentives or encouragement for high-quality work. This can include tips, additional context, or motivations to encourage agents to deliver their best work.] + + '''), agent=agent) + +""" + + +class TravelTasks: + def __tip_section(self): + return "If you do your BEST WORK, I'll give you a $10,000 commission!" + + def plan_itinerary(self, agent, city, travel_dates, interests): + return Task( + description=dedent( + f""" + **Task**: Develop a 7-Day Travel Itinerary + **Description**: Expand the city guide into a full 7-day travel itinerary with detailed + per-day plans, including weather forecasts, places to eat, packing suggestions, + and a budget breakdown. You MUST suggest actual places to visit, actual hotels to stay, + and actual restaurants to go to. This itinerary should cover all aspects of the trip, + from arrival to departure, integrating the city guide information with practical travel logistics. + + **Parameters**: + - City: {city} + - Trip Date: {travel_dates} + - Traveler Interests: {interests} + + **Note**: {self.__tip_section()} + """ + ), + expected_output="A detailed 7-day travel itinerary with per-day plans, budget breakdown, and practical travel advice.", + agent=agent, + ) + + def identify_city(self, agent, origin, cities, interests, travel_dates): + return Task( + description=dedent( + f""" + **Task**: Identify the Best City for the Trip + **Description**: Analyze and select the best city for the trip based on specific + criteria such as weather patterns, seasonal events, and travel costs. + This task involves comparing multiple cities, considering factors like current weather + conditions, upcoming cultural or seasonal events, and overall travel expenses. + Your final answer must be a detailed report on the chosen city, + including actual flight costs, weather forecast, and attractions. + + + **Parameters**: + - Origin: {origin} + - Cities: {cities} + - Interests: {interests} + - Travel Date: {travel_dates} + + **Note**: {self.__tip_section()} + """ + ), + agent=agent, + expected_output="A detailed report on the best city for the trip, including flight costs, weather forecast, and attractions.", + ) + + def gather_city_info(self, agent, city, travel_dates, interests): + return Task( + description=dedent( + f""" + **Task**: Gather In-depth City Guide Information + **Description**: Compile an in-depth guide for the selected city, gathering information about + key attractions, local customs, special events, and daily activity recommendations. + This guide should provide a thorough overview of what the city has to offer, including + hidden gems, cultural hotspots, must-visit landmarks, weather forecasts, and high-level costs. + + **Parameters**: + - Cities: {city} + - Interests: {interests} + - Travel Date: {travel_dates} + + **Note**: {self.__tip_section()} + """ + ), + expected_output="An in-depth city guide with detailed information on attractions, local customs, events, and daily activity recommendations.", + agent=agent, + ) diff --git a/src/examples/crewai_example/trip_planner/tools/calculator.py b/src/examples/crewai_example/trip_planner/tools/calculator.py new file mode 100644 index 00000000..c7ccb721 --- /dev/null +++ b/src/examples/crewai_example/trip_planner/tools/calculator.py @@ -0,0 +1,16 @@ +from langchain.tools import tool + + +class CalculatorTools: + + @tool("Make a calculation") + def calculate(self, operation): + """Useful to perform any mathematical calculations, + like sum, minus, multiplication, division, etc. + The input to this tool should be a mathematical + expression, a couple examples are `200*7` or `5000/2*10` + """ + try: + return eval(operation) + except SyntaxError: + return "Error: Invalid syntax in mathematical expression" diff --git a/src/examples/crewai_example/trip_planner/tools/search_tools.py b/src/examples/crewai_example/trip_planner/tools/search_tools.py new file mode 100644 index 00000000..4c5e1366 --- /dev/null +++ b/src/examples/crewai_example/trip_planner/tools/search_tools.py @@ -0,0 +1,73 @@ +import json +import os + +import requests +from langchain.tools import tool + + +class SearchTools: + + @tool("Search the internet") + def search_internet(self, query): + """Useful to search the internet + about a a given topic and return relevant results""" + top_result_to_return = 4 + url = "https://google.serper.dev/search" + payload = json.dumps({"q": query}) + headers = { + "X-API-KEY": os.environ["SERPER_API_KEY"], + "content-type": "application/json", + } + response = requests.request( + "POST", url, headers=headers, data=payload, timeout=2000 + ) + # check if there is an organic key + if "organic" not in response.json(): + return "Sorry, I couldn't find anything about that, there could be an error with you serper api key." + else: + results = response.json()["organic"] + string = [] + for result in results[:top_result_to_return]: + try: + string.append( + "\n".join( + [ + f"Title: {result['title']}", + f"Link: {result['link']}", + f"Snippet: {result['snippet']}", + "\n-----------------", + ] + ) + ) + except KeyError: + next + + return "\n".join(string) + + +# from pydantic import BaseModel, Field +# from langchain.tools import tool + +# # Define a Pydantic model for the tool's input parameters +# class CalculationInput(BaseModel): +# operation: str = Field(..., description="The mathematical operation to perform") +# factor: float = Field(..., description="A factor by which to multiply the result of the operation") + +# # Use the tool decorator with the args_schema parameter pointing to the Pydantic model +# @tool("perform_calculation", args_schema=CalculationInput, return_direct=True) +# def perform_calculation(operation: str, factor: float) -> str: +# """ +# Performs a specified mathematical operation and multiplies the result by a given factor. + +# Parameters: +# - operation (str): A string representing a mathematical operation (e.g., "10 + 5"). +# - factor (float): A factor by which to multiply the result of the operation. + +# Returns: +# - A string representation of the calculation result. +# """ +# # Perform the calculation +# result = eval(operation) * factor + +# # Return the result as a string +# return f"The result of '{operation}' multiplied by {factor} is {result}." diff --git a/src/langtrace_python_sdk/instrumentation/anthropic/patch.py b/src/langtrace_python_sdk/instrumentation/anthropic/patch.py index b992dc9a..3ac5f5a6 100644 --- a/src/langtrace_python_sdk/instrumentation/anthropic/patch.py +++ b/src/langtrace_python_sdk/instrumentation/anthropic/patch.py @@ -17,8 +17,11 @@ import json from langtrace.trace_attributes import Event, LLMSpanAttributes -from langtrace_python_sdk.utils import set_span_attribute, silently_fail +from langtrace_python_sdk.utils import set_span_attribute +from langtrace_python_sdk.utils.silently_fail import silently_fail + from langtrace_python_sdk.utils.llm import ( + StreamWrapper, get_extra_attributes, get_langtrace_attributes, get_llm_request_attributes, @@ -26,7 +29,6 @@ get_span_name, is_streaming, set_event_completion, - set_event_completion_chunk, set_usage_attributes, ) from opentelemetry.trace import SpanKind @@ -83,61 +85,7 @@ def traced_method(wrapped, instance, args, kwargs): span.end() raise - def handle_streaming_response(result, span): - """Process and yield streaming response chunks.""" - result_content = [] - span.add_event(Event.STREAM_START.value) - input_tokens = 0 - output_tokens = 0 - try: - for chunk in result: - if ( - hasattr(chunk, "message") - and chunk.message is not None - and hasattr(chunk.message, "model") - and chunk.message.model is not None - ): - span.set_attribute( - SpanAttributes.LLM_RESPONSE_MODEL, chunk.message.model - ) - content = "" - if hasattr(chunk, "delta") and chunk.delta is not None: - content = chunk.delta.text if hasattr(chunk.delta, "text") else "" - # Assuming content needs to be aggregated before processing - result_content.append(content if len(content) > 0 else "") - - if hasattr(chunk, "message") and hasattr(chunk.message, "usage"): - input_tokens += ( - chunk.message.usage.input_tokens - if hasattr(chunk.message.usage, "input_tokens") - else 0 - ) - output_tokens += ( - chunk.message.usage.output_tokens - if hasattr(chunk.message.usage, "output_tokens") - else 0 - ) - - # Assuming span.add_event is part of a larger logging or event system - # Add event for each chunk of content - if content: - set_event_completion_chunk(span, "".join(content)) - - # Assuming this is part of a generator, yield chunk or aggregated content - yield content - finally: - - # Finalize span after processing all chunks - span.add_event(Event.STREAM_END.value) - set_usage_attributes( - span, {"input_tokens": input_tokens, "output_tokens": output_tokens} - ) - completion = [{"role": "assistant", "content": "".join(result_content)}] - set_event_completion(span, completion) - - span.set_status(StatusCode.OK) - span.end() - + @silently_fail def set_response_attributes(result, span, kwargs): if not is_streaming(kwargs): if hasattr(result, "content") and result.content is not None: @@ -174,7 +122,7 @@ def set_response_attributes(result, span, kwargs): span.end() return result else: - return handle_streaming_response(result, span) + return StreamWrapper(result, span) # return the wrapped method return traced_method diff --git a/src/langtrace_python_sdk/instrumentation/cohere/patch.py b/src/langtrace_python_sdk/instrumentation/cohere/patch.py index 8930f297..7704f138 100644 --- a/src/langtrace_python_sdk/instrumentation/cohere/patch.py +++ b/src/langtrace_python_sdk/instrumentation/cohere/patch.py @@ -407,15 +407,8 @@ def traced_method(wrapped, instance, args, kwargs): try: # Attempt to call the original method result = wrapped(*args, **kwargs) - span.add_event(Event.STREAM_START.value) try: for event in result: - if hasattr(event, "text") and event.text is not None: - content = event.text - else: - content = "" - set_event_completion_chunk(span, "".join(content)) - if ( hasattr(event, "finish_reason") and event.finish_reason == "COMPLETE" @@ -496,15 +489,14 @@ def traced_method(wrapped, instance, args, kwargs): (usage.input_tokens or 0) + (usage.output_tokens or 0), ) - - span.set_attribute( - "search_units", - usage.search_units or 0, - ) + if usage.search_units is not None: + span.set_attribute( + "search_units", + usage.search_units or 0, + ) yield event finally: - span.add_event(Event.STREAM_END.value) span.set_status(StatusCode.OK) span.end() diff --git a/src/langtrace_python_sdk/instrumentation/langchain_core/instrumentation.py b/src/langtrace_python_sdk/instrumentation/langchain_core/instrumentation.py index 3f3e8d3d..57d45373 100644 --- a/src/langtrace_python_sdk/instrumentation/langchain_core/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/langchain_core/instrumentation.py @@ -66,8 +66,14 @@ def patch_module_classes( if name.startswith("_") or name in exclude_classes: continue # loop through all public methods of the class - for method_name, method in inspect.getmembers(obj, predicate=inspect.isfunction): - if method_name in exclude_methods or method.__qualname__.split('.')[0] != name: + for method_name, method in inspect.getmembers( + obj, predicate=inspect.isfunction + ): + if ( + method_name in exclude_methods + or method.__qualname__.split(".")[0] != name + or method_name.startswith("_") + ): continue try: method_path = f"{name}.{method_name}" @@ -108,6 +114,9 @@ def _instrument(self, **kwargs): "format", "format_messages", "format_prompt", + "__or__", + "__init__", + "__repr__", ] exclude_classes = [ "BaseChatPromptTemplate", @@ -125,7 +134,13 @@ def _instrument(self, **kwargs): modules_to_patch = [ ("langchain_core.retrievers", "retriever", generic_patch, True, True), ("langchain_core.prompts.chat", "prompt", generic_patch, True, True), - ("langchain_core.language_models.llms", "generate", generic_patch, True, True), + ( + "langchain_core.language_models.llms", + "generate", + generic_patch, + True, + True, + ), ("langchain_core.runnables.base", "runnable", runnable_patch, True, True), ( "langchain_core.runnables.passthrough", diff --git a/src/langtrace_python_sdk/utils/llm.py b/src/langtrace_python_sdk/utils/llm.py index 42ad53cc..fb9315b4 100644 --- a/src/langtrace_python_sdk/utils/llm.py +++ b/src/langtrace_python_sdk/utils/llm.py @@ -235,8 +235,9 @@ class StreamWrapper: span: Span def __init__( - self, stream, span, prompt_tokens, function_call=False, tool_calls=False + self, stream, span, prompt_tokens=0, function_call=False, tool_calls=False ): + self.stream = stream self.span = span self.prompt_tokens = prompt_tokens @@ -245,16 +246,20 @@ def __init__( self.result_content = [] self.completion_tokens = 0 self._span_started = False + self._response_model = None self.setup() def setup(self): if not self._span_started: - self.span.add_event(Event.STREAM_START.value) self._span_started = True def cleanup(self): if self._span_started: - self.span.add_event(Event.STREAM_END.value) + set_span_attribute( + self.span, + SpanAttributes.LLM_RESPONSE_MODEL, + self._response_model, + ) set_span_attribute( self.span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, @@ -322,21 +327,26 @@ async def __anext__(self): self.cleanup() raise StopAsyncIteration - def process_chunk(self, chunk): + def set_response_model(self, chunk): + if self._response_model: + return + + # OpenAI response model is set on all chunks if hasattr(chunk, "model") and chunk.model is not None: - set_span_attribute( - self.span, - SpanAttributes.LLM_RESPONSE_MODEL, - chunk.model, - ) + self._response_model = chunk.model + + # Anthropic response model is set on the first chunk message + if hasattr(chunk, "message") and chunk.message is not None: + if hasattr(chunk.message, "model") and chunk.message.model is not None: + self._response_model = chunk.message.model + def build_streaming_response(self, chunk): + content = [] + # OpenAI if hasattr(chunk, "choices") and chunk.choices is not None: - content = [] if not self.function_call and not self.tool_calls: for choice in chunk.choices: if choice.delta and choice.delta.content is not None: - token_counts = estimate_tokens(choice.delta.content) - self.completion_tokens += token_counts content = [choice.delta.content] elif self.function_call: for choice in chunk.choices: @@ -345,10 +355,6 @@ def process_chunk(self, chunk): and choice.delta.function_call is not None and choice.delta.function_call.arguments is not None ): - token_counts = estimate_tokens( - choice.delta.function_call.arguments - ) - self.completion_tokens += token_counts content = [choice.delta.function_call.arguments] elif self.tool_calls: for choice in chunk.choices: @@ -361,30 +367,41 @@ def process_chunk(self, chunk): and tool_call.function is not None and tool_call.function.arguments is not None ): - token_counts = estimate_tokens( - tool_call.function.arguments - ) - self.completion_tokens += token_counts content.append(tool_call.function.arguments) - set_event_completion_chunk( - self.span, - "".join(content) if len(content) > 0 and content[0] is not None else "", - ) - if content: - self.result_content.append(content[0]) - if hasattr(chunk, "text"): - token_counts = estimate_tokens(chunk.text) - self.completion_tokens += token_counts + # VertexAI + if hasattr(chunk, "text") and chunk.text is not None: content = [chunk.text] - set_event_completion_chunk( - self.span, - "".join(content) if len(content) > 0 and content[0] is not None else "", - ) - if content: - self.result_content.append(content[0]) + # Anthropic + if hasattr(chunk, "delta") and chunk.delta is not None: + content = [chunk.delta.text] if hasattr(chunk.delta, "text") else [] + + if content: + self.result_content.append(content[0]) + + def set_usage_attributes(self, chunk): + # Anthropic & OpenAI + if hasattr(chunk, "type") and chunk.type == "message_start": + self.prompt_tokens = chunk.message.usage.input_tokens + + if hasattr(chunk, "usage") and chunk.usage is not None: + if hasattr(chunk.usage, "output_tokens"): + self.completion_tokens = chunk.usage.output_tokens + + if hasattr(chunk.usage, "prompt_tokens"): + self.prompt_tokens = chunk.usage.prompt_tokens + + if hasattr(chunk.usage, "completion_tokens"): + self.completion_tokens = chunk.usage.completion_tokens + + # VertexAI if hasattr(chunk, "usage_metadata"): self.completion_tokens = chunk.usage_metadata.candidates_token_count self.prompt_tokens = chunk.usage_metadata.prompt_token_count + + def process_chunk(self, chunk): + self.set_response_model(chunk=chunk) + self.build_streaming_response(chunk=chunk) + self.set_usage_attributes(chunk=chunk) diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index 97eeff4a..20868fd2 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "2.2.20" +__version__ = "2.2.21" diff --git a/src/tests/anthropic/test_anthropic.py b/src/tests/anthropic/test_anthropic.py index a8e9a9c8..248d592a 100644 --- a/src/tests/anthropic/test_anthropic.py +++ b/src/tests/anthropic/test_anthropic.py @@ -82,8 +82,4 @@ def test_anthropic_streaming(anthropic_client, exporter): assert attributes.get(SpanAttributes.LLM_REQUEST_MODEL) == llm_model_value assert attributes.get(SpanAttributes.LLM_IS_STREAMING) is True - events = streaming_span.events - - assert len(events) - 4 == chunk_count - assert_token_count(attributes) diff --git a/src/tests/cohere/test_cohere_chat.py b/src/tests/cohere/test_cohere_chat.py index f20431af..c1e87a80 100644 --- a/src/tests/cohere/test_cohere_chat.py +++ b/src/tests/cohere/test_cohere_chat.py @@ -112,7 +112,5 @@ def test_cohere_chat_streaming(cohere_client, exporter): events = cohere_span.events assert_prompt_in_events(events) assert_completion_in_events(events) - assert events[-1].name == "stream.end" - assert len(events) - 4 == chunks_count assert_token_count(attributes) diff --git a/src/tests/openai/test_chat_completion.py b/src/tests/openai/test_chat_completion.py index b503cefc..2c42adb6 100644 --- a/src/tests/openai/test_chat_completion.py +++ b/src/tests/openai/test_chat_completion.py @@ -83,9 +83,6 @@ def test_chat_completion_streaming(exporter, openai_client): assert_completion_in_events(streaming_span.events) assert attributes.get(SpanAttributes.LLM_IS_STREAMING) is True - events = streaming_span.events - assert len(events) - 4 == chunk_count # -2 for start and end events - assert_token_count(attributes) @@ -126,7 +123,4 @@ async def test_async_chat_completion_streaming(exporter, async_openai_client): assert_completion_in_events(streaming_span.events) assert attributes.get(SpanAttributes.LLM_IS_STREAMING) is True - events = streaming_span.events - assert len(events) - 4 == chunk_count # -2 for start and end events - assert_token_count(attributes)