diff --git a/src/examples/crewai_example/simple_agent/main.py b/src/examples/crewai_example/simple_agent/main.py index 8d066839..233b3423 100644 --- a/src/examples/crewai_example/simple_agent/main.py +++ b/src/examples/crewai_example/simple_agent/main.py @@ -18,12 +18,12 @@ def run(self): tasks = PoetryTasks() poetry_agent = agents.create_poet_agent() - poetry_agent_2 = agents.poet_agent_2() - poetry_agent_3 = agents.poet_agent_3() + # poetry_agent_2 = agents.poet_agent_2() + # poetry_agent_3 = agents.poet_agent_3() create_poem = tasks.create_poem(poetry_agent, self.topic) - create_poem_2 = tasks.create_poem(poetry_agent_2, self.topic) - create_poem_3 = tasks.create_poem(poetry_agent_3, self.topic) + # create_poem_2 = tasks.create_poem(poetry_agent_2, self.topic) + # create_poem_3 = tasks.create_poem(poetry_agent_3, self.topic) crew = Crew(agents=[poetry_agent], tasks=[create_poem], verbose=True, memory=True) res = crew.kickoff() diff --git a/src/langtrace_python_sdk/instrumentation/crewai/instrumentation.py b/src/langtrace_python_sdk/instrumentation/crewai/instrumentation.py index 4292dcb0..57a9a269 100644 --- a/src/langtrace_python_sdk/instrumentation/crewai/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/crewai/instrumentation.py @@ -19,7 +19,7 @@ from wrapt import wrap_function_wrapper as _W from typing import Collection from importlib_metadata import version as v -from .patch import patch_crew +from .patch import patch_crew, patch_memory class CrewAIInstrumentation(BaseInstrumentor): @@ -49,7 +49,23 @@ def _instrument(self, **kwargs): "Task.execute_sync", patch_crew("Task.execute", version, tracer), ) - except Exception as e: + _W( + "crewai.memory.storage.rag_storage", + "RAGStorage.save", + patch_memory("RAGStorage.save", version, tracer), + ) + _W( + "crewai.memory.storage.rag_storage", + "RAGStorage.search", + patch_memory("RAGStorage.search", version, tracer), + ) + _W( + "crewai.memory.storage.rag_storage", + "RAGStorage.reset", + patch_memory("RAGStorage.reset", version, tracer), + ) + # pylint: disable=broad-except + except Exception: pass def _uninstrument(self, **kwargs): diff --git a/src/langtrace_python_sdk/instrumentation/crewai/patch.py b/src/langtrace_python_sdk/instrumentation/crewai/patch.py index e67f6ecf..4ab1a393 100644 --- a/src/langtrace_python_sdk/instrumentation/crewai/patch.py +++ b/src/langtrace_python_sdk/instrumentation/crewai/patch.py @@ -11,82 +11,56 @@ from langtrace.trace_attributes import FrameworkSpanAttributes from opentelemetry.trace import SpanKind, Span, Tracer from opentelemetry.trace.status import Status, StatusCode +from langtrace_python_sdk.utils.misc import serialize_args, serialize_kwargs -crew_properties = { - "tasks": "object", - "agents": "object", - "cache": "bool", - "process": "object", - "verbose": "bool", - "memory": "bool", - "embedder": "json", - "full_output": "bool", - "manager_llm": "object", - "manager_agent": "object", - "manager_callbacks": "object", - "function_calling_llm": "object", - "config": "json", - "id": "object", - "max_rpm": "int", - "share_crew": "bool", - "step_callback": "object", - "task_callback": "object", - "prompt_file": "str", - "output_log_file": "bool", -} - -task_properties = { - "id": "object", - "used_tools": "int", - "tools_errors": "int", - "delegations": "int", - "i18n": "object", - "thread": "object", - "prompt_context": "object", - "description": "str", - "expected_output": "str", - "config": "object", - "callback": "str", - "agent": "object", - "context": "object", - "async_execution": "bool", - "output_json": "object", - "output_pydantic": "object", - "output_file": "object", - "output": "object", - "tools": "object", - "human_input": "bool", -} - -agent_properties = { - "formatting_errors": "int", - "id": "object", - "role": "str", - "goal": "str", - "backstory": "str", - "cache": "bool", - "config": "object", - "max_rpm": "int", - "verbose": "bool", - "allow_delegation": "bool", - "tools": "object", - "max_iter": "int", - "max_execution_time": "object", - "agent_executor": "object", - "tools_handler": "object", - "force_answer_max_iterations": "int", - "crew": "object", - "cache_handler": "object", - "step_callback": "object", - "i18n": "object", - "llm": "object", - "function_calling_llm": "object", - "callbacks": "object", - "system_template": "object", - "prompt_template": "object", - "response_template": "object", -} +def patch_memory(operation_name, version, tracer: Tracer): + def traced_method(wrapped, instance, args, kwargs): + service_provider = SERVICE_PROVIDERS["CREWAI"] + extra_attributes = baggage.get_baggage(LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY) + span_attributes = { + "langtrace.sdk.name": "langtrace-python-sdk", + "langtrace.service.name": service_provider, + "langtrace.service.type": "framework", + "langtrace.service.version": version, + "langtrace.version": v(LANGTRACE_SDK_NAME), + **(extra_attributes if extra_attributes is not None else {}), + } + + inputs = {} + if len(args) > 0: + inputs["args"] = serialize_args(*args) + if len(kwargs) > 0: + inputs["kwargs"] = serialize_kwargs(**kwargs) + span_attributes["crewai.memory.storage.rag_storage.inputs"] = json.dumps(inputs) + + attributes = FrameworkSpanAttributes(**span_attributes) + + with tracer.start_as_current_span( + get_span_name(operation_name), kind=SpanKind.CLIENT + ) as span: + + try: + set_span_attributes(span, attributes) + result = wrapped(*args, **kwargs) + if result is not None and len(result) > 0: + set_span_attribute(span, "crewai.memory.storage.rag_storage.outputs", str(result)) + if result: + span.set_status(Status(StatusCode.OK)) + span.end() + return result + + except Exception as err: + # Record the exception in the span + span.record_exception(err) + + # Set the span status to indicate an error + span.set_status(Status(StatusCode.ERROR, str(err))) + + # Reraise the exception to ensure it's not swallowed + raise + + return traced_method def patch_crew(operation_name, version, tracer: Tracer): @@ -161,25 +135,26 @@ def run(self): instance_name = self.instance.__class__.__name__ if instance_name == "Crew": self.set_crew_attributes() - set_span_attribute(self.span, "crewai.crew.config", json.dumps(self.crew)) + for key, value in self.crew.items(): + key = f"crewai.crew.{key}" + set_span_attribute(self.span, key, value) elif instance_name == "Agent": agent = self.set_agent_attributes() - # for key, value in agent.items(): - # set_span_attribute(self.span, key, value) - set_span_attribute(self.span, "crewai.agent.config", json.dumps(agent)) + for key, value in agent.items(): + key = f"crewai.agent.{key}" + set_span_attribute(self.span, key, value) + elif instance_name == "Task": task = self.set_task_attributes() - # uncomment if you want to spread attributes for the UI instead of dumping the whole object - # for key, value in task.items(): - # set_span_attribute(self.span, key, value) - set_span_attribute(self.span, "crewai.task.config", json.dumps(task)) + for key, value in task.items(): + key = f"crewai.task.{key}" + set_span_attribute(self.span, key, value) def set_crew_attributes(self): for key, value in self.instance.__dict__.items(): if key == "tasks": self._parse_tasks(value) - elif key == "agents": self._parse_agents(value) else: diff --git a/src/langtrace_python_sdk/instrumentation/embedchain/patch.py b/src/langtrace_python_sdk/instrumentation/embedchain/patch.py index 26eda3cf..eaf18c8f 100644 --- a/src/langtrace_python_sdk/instrumentation/embedchain/patch.py +++ b/src/langtrace_python_sdk/instrumentation/embedchain/patch.py @@ -52,6 +52,11 @@ def traced_method(wrapped, instance, args, kwargs): **(extra_attributes if extra_attributes is not None else {}), } + if hasattr(instance, 'config') and isinstance(instance.config, object): + config_dict = instance.config.__dict__ + if isinstance(config_dict, dict): + span_attributes["embedchain.config"] = json.dumps(config_dict) + if len(args) > 0: span_attributes["embedchain.inputs"] = json.dumps(args) diff --git a/src/langtrace_python_sdk/instrumentation/langchain/instrumentation.py b/src/langtrace_python_sdk/instrumentation/langchain/instrumentation.py index 6f114ab7..4cc4fe8f 100644 --- a/src/langtrace_python_sdk/instrumentation/langchain/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/langchain/instrumentation.py @@ -15,7 +15,6 @@ """ import importlib.metadata -import inspect import logging from typing import Collection @@ -28,46 +27,46 @@ logging.basicConfig(level=logging.FATAL) -def patch_module_classes( - module_name, tracer, version, task, trace_output=True, trace_input=True -): - """ - Generic function to patch all public methods of all classes in a given module. - - Parameters: - - module: The module object containing the classes to patch. - - module_name: The name of the module, used in the prefix for `wrap_function_wrapper`. - - tracer: The tracer object used in `generic_patch`. - - version: The version parameter used in `generic_patch`. - - task: The name used to identify the type of task in `generic_patch`. - - exclude_private: Whether to exclude private methods (those starting with '_'). - - trace_output: Whether to trace the output of the patched methods. - - trace_input: Whether to trace the input of the patched methods. - """ - # import the module - module = importlib.import_module(module_name) - # loop through all public classes in the module - for name, obj in inspect.getmembers( - module, - lambda member: inspect.isclass(member) and member.__module__ == module.__name__, - ): - # loop through all public methods of the class - for method_name, _ in inspect.getmembers(obj, predicate=inspect.isfunction): - # Skip private methods - if method_name.startswith("_"): - continue - try: - method_path = f"{name}.{method_name}" - wrap_function_wrapper( - module_name, - method_path, - generic_patch( - method_path, task, tracer, version, trace_output, trace_input - ), - ) - # pylint: disable=broad-except - except Exception: - pass +# def patch_module_classes( +# module_name, tracer, version, task, trace_output=True, trace_input=True +# ): +# """ +# Generic function to patch all public methods of all classes in a given module. + +# Parameters: +# - module: The module object containing the classes to patch. +# - module_name: The name of the module, used in the prefix for `wrap_function_wrapper`. +# - tracer: The tracer object used in `generic_patch`. +# - version: The version parameter used in `generic_patch`. +# - task: The name used to identify the type of task in `generic_patch`. +# - exclude_private: Whether to exclude private methods (those starting with '_'). +# - trace_output: Whether to trace the output of the patched methods. +# - trace_input: Whether to trace the input of the patched methods. +# """ +# # import the module +# module = importlib.import_module(module_name) +# # loop through all public classes in the module +# for name, obj in inspect.getmembers( +# module, +# lambda member: inspect.isclass(member) and member.__module__ == module.__name__, +# ): +# # loop through all public methods of the class +# for method_name, _ in inspect.getmembers(obj, predicate=inspect.isfunction): +# # Skip private methods +# if method_name.startswith("_"): +# continue +# try: +# method_path = f"{name}.{method_name}" +# wrap_function_wrapper( +# module_name, +# method_path, +# generic_patch( +# method_path, task, tracer, version, trace_output, trace_input +# ), +# ) +# # pylint: disable=broad-except +# except Exception: +# pass class LangchainInstrumentation(BaseInstrumentor): @@ -83,14 +82,28 @@ def _instrument(self, **kwargs): tracer = get_tracer(__name__, "", tracer_provider) version = importlib.metadata.version("langchain") - modules_to_patch = [ - ("langchain.text_splitter", "split_text", True, True), - ] - - for module_name, task, trace_output, trace_input in modules_to_patch: - patch_module_classes( - module_name, tracer, version, task, trace_output, trace_input - ) + wrap_function_wrapper( + "langchain.agents.agent", + "RunnableAgent.plan", + generic_patch( + "RunnableAgent.plan", "plan", tracer, version, True, True + ), + ) + + wrap_function_wrapper( + "langchain.agents.agent", + "RunnableAgent.aplan", + generic_patch( + "RunnableAgent.aplan", "plan", tracer, version, True, True + ), + ) + + # modules_to_patch = [] + + # for module_name, task, trace_output, trace_input in modules_to_patch: + # patch_module_classes( + # module_name, tracer, version, task, trace_output, trace_input + # ) def _uninstrument(self, **kwargs): pass diff --git a/src/langtrace_python_sdk/instrumentation/langchain/patch.py b/src/langtrace_python_sdk/instrumentation/langchain/patch.py index 511ee137..944932d0 100644 --- a/src/langtrace_python_sdk/instrumentation/langchain/patch.py +++ b/src/langtrace_python_sdk/instrumentation/langchain/patch.py @@ -29,6 +29,7 @@ SERVICE_PROVIDERS, ) from importlib_metadata import version as v +from langtrace_python_sdk.utils.misc import serialize_args, serialize_kwargs def generic_patch( @@ -52,8 +53,12 @@ def traced_method(wrapped, instance, args, kwargs): **(extra_attributes if extra_attributes is not None else {}), } + inputs = {} if len(args) > 0 and trace_input: - span_attributes["langchain.inputs"] = to_json_string(args) + inputs["args"] = serialize_args(*args) + if len(kwargs) > 0 and trace_input: + inputs["kwargs"] = serialize_kwargs(**kwargs) + span_attributes["langchain.inputs"] = json.dumps(inputs) attributes = FrameworkSpanAttributes(**span_attributes) diff --git a/src/langtrace_python_sdk/instrumentation/langchain_core/instrumentation.py b/src/langtrace_python_sdk/instrumentation/langchain_core/instrumentation.py index 57d45373..e10a6510 100644 --- a/src/langtrace_python_sdk/instrumentation/langchain_core/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/langchain_core/instrumentation.py @@ -114,6 +114,8 @@ def _instrument(self, **kwargs): "format", "format_messages", "format_prompt", + "transform", + "stream", "__or__", "__init__", "__repr__", diff --git a/src/langtrace_python_sdk/utils/misc.py b/src/langtrace_python_sdk/utils/misc.py index b033f3bf..a0d20452 100644 --- a/src/langtrace_python_sdk/utils/misc.py +++ b/src/langtrace_python_sdk/utils/misc.py @@ -28,3 +28,35 @@ def to_iso_format(value): else None ) ) + + +def serialize_kwargs(**kwargs): + # Function to check if a value is serializable + def is_serializable(value): + try: + json.dumps(value) + return True + except (TypeError, ValueError): + return False + + # Filter out non-serializable items + serializable_kwargs = {k: v for k, v in kwargs.items() if is_serializable(v)} + + # Convert to string representation + return json.dumps(serializable_kwargs) + + +def serialize_args(*args): + # Function to check if a value is serializable + def is_serializable(value): + try: + json.dumps(value) + return True + except (TypeError, ValueError): + return False + + # Filter out non-serializable items + serializable_args = [arg for arg in args if is_serializable(arg)] + + # Convert to string representation + return json.dumps(serializable_args) diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index 55e47090..3a5935a2 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "2.3.0" +__version__ = "2.3.1"