Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/examples/crewai_example/simple_agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ def run(self):
tasks = PoetryTasks()

poetry_agent = agents.create_poet_agent()
poetry_agent_2 = agents.poet_agent_2()
poetry_agent_3 = agents.poet_agent_3()
# poetry_agent_2 = agents.poet_agent_2()
# poetry_agent_3 = agents.poet_agent_3()

create_poem = tasks.create_poem(poetry_agent, self.topic)
create_poem_2 = tasks.create_poem(poetry_agent_2, self.topic)
create_poem_3 = tasks.create_poem(poetry_agent_3, self.topic)
# create_poem_2 = tasks.create_poem(poetry_agent_2, self.topic)
# create_poem_3 = tasks.create_poem(poetry_agent_3, self.topic)

crew = Crew(agents=[poetry_agent], tasks=[create_poem], verbose=True, memory=True)
res = crew.kickoff()
Expand Down
20 changes: 18 additions & 2 deletions src/langtrace_python_sdk/instrumentation/crewai/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from wrapt import wrap_function_wrapper as _W
from typing import Collection
from importlib_metadata import version as v
from .patch import patch_crew
from .patch import patch_crew, patch_memory


class CrewAIInstrumentation(BaseInstrumentor):
Expand Down Expand Up @@ -49,7 +49,23 @@ def _instrument(self, **kwargs):
"Task.execute_sync",
patch_crew("Task.execute", version, tracer),
)
except Exception as e:
_W(
"crewai.memory.storage.rag_storage",
"RAGStorage.save",
patch_memory("RAGStorage.save", version, tracer),
)
_W(
"crewai.memory.storage.rag_storage",
"RAGStorage.search",
patch_memory("RAGStorage.search", version, tracer),
)
_W(
"crewai.memory.storage.rag_storage",
"RAGStorage.reset",
patch_memory("RAGStorage.reset", version, tracer),
)
# pylint: disable=broad-except
except Exception:
pass

def _uninstrument(self, **kwargs):
Expand Down
141 changes: 58 additions & 83 deletions src/langtrace_python_sdk/instrumentation/crewai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,82 +11,56 @@
from langtrace.trace_attributes import FrameworkSpanAttributes
from opentelemetry.trace import SpanKind, Span, Tracer
from opentelemetry.trace.status import Status, StatusCode
from langtrace_python_sdk.utils.misc import serialize_args, serialize_kwargs


crew_properties = {
"tasks": "object",
"agents": "object",
"cache": "bool",
"process": "object",
"verbose": "bool",
"memory": "bool",
"embedder": "json",
"full_output": "bool",
"manager_llm": "object",
"manager_agent": "object",
"manager_callbacks": "object",
"function_calling_llm": "object",
"config": "json",
"id": "object",
"max_rpm": "int",
"share_crew": "bool",
"step_callback": "object",
"task_callback": "object",
"prompt_file": "str",
"output_log_file": "bool",
}

task_properties = {
"id": "object",
"used_tools": "int",
"tools_errors": "int",
"delegations": "int",
"i18n": "object",
"thread": "object",
"prompt_context": "object",
"description": "str",
"expected_output": "str",
"config": "object",
"callback": "str",
"agent": "object",
"context": "object",
"async_execution": "bool",
"output_json": "object",
"output_pydantic": "object",
"output_file": "object",
"output": "object",
"tools": "object",
"human_input": "bool",
}

agent_properties = {
"formatting_errors": "int",
"id": "object",
"role": "str",
"goal": "str",
"backstory": "str",
"cache": "bool",
"config": "object",
"max_rpm": "int",
"verbose": "bool",
"allow_delegation": "bool",
"tools": "object",
"max_iter": "int",
"max_execution_time": "object",
"agent_executor": "object",
"tools_handler": "object",
"force_answer_max_iterations": "int",
"crew": "object",
"cache_handler": "object",
"step_callback": "object",
"i18n": "object",
"llm": "object",
"function_calling_llm": "object",
"callbacks": "object",
"system_template": "object",
"prompt_template": "object",
"response_template": "object",
}
def patch_memory(operation_name, version, tracer: Tracer):
def traced_method(wrapped, instance, args, kwargs):
service_provider = SERVICE_PROVIDERS["CREWAI"]
extra_attributes = baggage.get_baggage(LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY)
span_attributes = {
"langtrace.sdk.name": "langtrace-python-sdk",
"langtrace.service.name": service_provider,
"langtrace.service.type": "framework",
"langtrace.service.version": version,
"langtrace.version": v(LANGTRACE_SDK_NAME),
**(extra_attributes if extra_attributes is not None else {}),
}

inputs = {}
if len(args) > 0:
inputs["args"] = serialize_args(*args)
if len(kwargs) > 0:
inputs["kwargs"] = serialize_kwargs(**kwargs)
span_attributes["crewai.memory.storage.rag_storage.inputs"] = json.dumps(inputs)

attributes = FrameworkSpanAttributes(**span_attributes)

with tracer.start_as_current_span(
get_span_name(operation_name), kind=SpanKind.CLIENT
) as span:

try:
set_span_attributes(span, attributes)
result = wrapped(*args, **kwargs)
if result is not None and len(result) > 0:
set_span_attribute(span, "crewai.memory.storage.rag_storage.outputs", str(result))
if result:
span.set_status(Status(StatusCode.OK))
span.end()
return result

except Exception as err:
# Record the exception in the span
span.record_exception(err)

# Set the span status to indicate an error
span.set_status(Status(StatusCode.ERROR, str(err)))

# Reraise the exception to ensure it's not swallowed
raise

return traced_method


def patch_crew(operation_name, version, tracer: Tracer):
Expand Down Expand Up @@ -161,25 +135,26 @@ def run(self):
instance_name = self.instance.__class__.__name__
if instance_name == "Crew":
self.set_crew_attributes()
set_span_attribute(self.span, "crewai.crew.config", json.dumps(self.crew))
for key, value in self.crew.items():
key = f"crewai.crew.{key}"
set_span_attribute(self.span, key, value)

elif instance_name == "Agent":
agent = self.set_agent_attributes()
# for key, value in agent.items():
# set_span_attribute(self.span, key, value)
set_span_attribute(self.span, "crewai.agent.config", json.dumps(agent))
for key, value in agent.items():
key = f"crewai.agent.{key}"
set_span_attribute(self.span, key, value)

elif instance_name == "Task":
task = self.set_task_attributes()
# uncomment if you want to spread attributes for the UI instead of dumping the whole object
# for key, value in task.items():
# set_span_attribute(self.span, key, value)
set_span_attribute(self.span, "crewai.task.config", json.dumps(task))
for key, value in task.items():
key = f"crewai.task.{key}"
set_span_attribute(self.span, key, value)

def set_crew_attributes(self):
for key, value in self.instance.__dict__.items():
if key == "tasks":
self._parse_tasks(value)

elif key == "agents":
self._parse_agents(value)
else:
Expand Down
5 changes: 5 additions & 0 deletions src/langtrace_python_sdk/instrumentation/embedchain/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def traced_method(wrapped, instance, args, kwargs):
**(extra_attributes if extra_attributes is not None else {}),
}

if hasattr(instance, 'config') and isinstance(instance.config, object):
config_dict = instance.config.__dict__
if isinstance(config_dict, dict):
span_attributes["embedchain.config"] = json.dumps(config_dict)

if len(args) > 0:
span_attributes["embedchain.inputs"] = json.dumps(args)

Expand Down
111 changes: 62 additions & 49 deletions src/langtrace_python_sdk/instrumentation/langchain/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
"""

import importlib.metadata
import inspect
import logging
from typing import Collection

Expand All @@ -28,46 +27,46 @@
logging.basicConfig(level=logging.FATAL)


def patch_module_classes(
module_name, tracer, version, task, trace_output=True, trace_input=True
):
"""
Generic function to patch all public methods of all classes in a given module.

Parameters:
- module: The module object containing the classes to patch.
- module_name: The name of the module, used in the prefix for `wrap_function_wrapper`.
- tracer: The tracer object used in `generic_patch`.
- version: The version parameter used in `generic_patch`.
- task: The name used to identify the type of task in `generic_patch`.
- exclude_private: Whether to exclude private methods (those starting with '_').
- trace_output: Whether to trace the output of the patched methods.
- trace_input: Whether to trace the input of the patched methods.
"""
# import the module
module = importlib.import_module(module_name)
# loop through all public classes in the module
for name, obj in inspect.getmembers(
module,
lambda member: inspect.isclass(member) and member.__module__ == module.__name__,
):
# loop through all public methods of the class
for method_name, _ in inspect.getmembers(obj, predicate=inspect.isfunction):
# Skip private methods
if method_name.startswith("_"):
continue
try:
method_path = f"{name}.{method_name}"
wrap_function_wrapper(
module_name,
method_path,
generic_patch(
method_path, task, tracer, version, trace_output, trace_input
),
)
# pylint: disable=broad-except
except Exception:
pass
# def patch_module_classes(
# module_name, tracer, version, task, trace_output=True, trace_input=True
# ):
# """
# Generic function to patch all public methods of all classes in a given module.

# Parameters:
# - module: The module object containing the classes to patch.
# - module_name: The name of the module, used in the prefix for `wrap_function_wrapper`.
# - tracer: The tracer object used in `generic_patch`.
# - version: The version parameter used in `generic_patch`.
# - task: The name used to identify the type of task in `generic_patch`.
# - exclude_private: Whether to exclude private methods (those starting with '_').
# - trace_output: Whether to trace the output of the patched methods.
# - trace_input: Whether to trace the input of the patched methods.
# """
# # import the module
# module = importlib.import_module(module_name)
# # loop through all public classes in the module
# for name, obj in inspect.getmembers(
# module,
# lambda member: inspect.isclass(member) and member.__module__ == module.__name__,
# ):
# # loop through all public methods of the class
# for method_name, _ in inspect.getmembers(obj, predicate=inspect.isfunction):
# # Skip private methods
# if method_name.startswith("_"):
# continue
# try:
# method_path = f"{name}.{method_name}"
# wrap_function_wrapper(
# module_name,
# method_path,
# generic_patch(
# method_path, task, tracer, version, trace_output, trace_input
# ),
# )
# # pylint: disable=broad-except
# except Exception:
# pass


class LangchainInstrumentation(BaseInstrumentor):
Expand All @@ -83,14 +82,28 @@ def _instrument(self, **kwargs):
tracer = get_tracer(__name__, "", tracer_provider)
version = importlib.metadata.version("langchain")

modules_to_patch = [
("langchain.text_splitter", "split_text", True, True),
]

for module_name, task, trace_output, trace_input in modules_to_patch:
patch_module_classes(
module_name, tracer, version, task, trace_output, trace_input
)
wrap_function_wrapper(
"langchain.agents.agent",
"RunnableAgent.plan",
generic_patch(
"RunnableAgent.plan", "plan", tracer, version, True, True
),
)

wrap_function_wrapper(
"langchain.agents.agent",
"RunnableAgent.aplan",
generic_patch(
"RunnableAgent.aplan", "plan", tracer, version, True, True
),
)

# modules_to_patch = []

# for module_name, task, trace_output, trace_input in modules_to_patch:
# patch_module_classes(
# module_name, tracer, version, task, trace_output, trace_input
# )

def _uninstrument(self, **kwargs):
pass
7 changes: 6 additions & 1 deletion src/langtrace_python_sdk/instrumentation/langchain/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
SERVICE_PROVIDERS,
)
from importlib_metadata import version as v
from langtrace_python_sdk.utils.misc import serialize_args, serialize_kwargs


def generic_patch(
Expand All @@ -52,8 +53,12 @@ def traced_method(wrapped, instance, args, kwargs):
**(extra_attributes if extra_attributes is not None else {}),
}

inputs = {}
if len(args) > 0 and trace_input:
span_attributes["langchain.inputs"] = to_json_string(args)
inputs["args"] = serialize_args(*args)
if len(kwargs) > 0 and trace_input:
inputs["kwargs"] = serialize_kwargs(**kwargs)
span_attributes["langchain.inputs"] = json.dumps(inputs)

attributes = FrameworkSpanAttributes(**span_attributes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ def _instrument(self, **kwargs):
"format",
"format_messages",
"format_prompt",
"transform",
"stream",
"__or__",
"__init__",
"__repr__",
Expand Down
Loading