-
Notifications
You must be signed in to change notification settings - Fork 569
feat: add instrumentation to embedding functions for various backends #5120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c5eadb3
7593c20
de60831
ec69c3c
dd894b6
05deaa3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -63,6 +63,48 @@ | |
| AgentExecutor = None | ||
|
|
||
|
|
||
| # Conditional imports for embeddings providers | ||
| try: | ||
| from langchain_openai import OpenAIEmbeddings # type: ignore[import-not-found] | ||
| except ImportError: | ||
| OpenAIEmbeddings = None | ||
|
|
||
| try: | ||
| from langchain_openai import AzureOpenAIEmbeddings | ||
| except ImportError: | ||
| AzureOpenAIEmbeddings = None | ||
|
|
||
| try: | ||
| from langchain_google_vertexai import VertexAIEmbeddings # type: ignore[import-not-found] | ||
| except ImportError: | ||
| VertexAIEmbeddings = None | ||
|
|
||
| try: | ||
| from langchain_aws import BedrockEmbeddings # type: ignore[import-not-found] | ||
| except ImportError: | ||
| BedrockEmbeddings = None | ||
|
|
||
| try: | ||
| from langchain_cohere import CohereEmbeddings # type: ignore[import-not-found] | ||
| except ImportError: | ||
| CohereEmbeddings = None | ||
|
|
||
| try: | ||
| from langchain_mistralai import MistralAIEmbeddings # type: ignore[import-not-found] | ||
| except ImportError: | ||
| MistralAIEmbeddings = None | ||
|
|
||
| try: | ||
| from langchain_huggingface import HuggingFaceEmbeddings # type: ignore[import-not-found] | ||
| except ImportError: | ||
| HuggingFaceEmbeddings = None | ||
|
|
||
| try: | ||
| from langchain_ollama import OllamaEmbeddings # type: ignore[import-not-found] | ||
| except ImportError: | ||
| OllamaEmbeddings = None | ||
|
|
||
|
|
||
| DATA_FIELDS = { | ||
| "frequency_penalty": SPANDATA.GEN_AI_REQUEST_FREQUENCY_PENALTY, | ||
| "function_call": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, | ||
|
|
@@ -140,6 +182,16 @@ def setup_once(): | |
| AgentExecutor.invoke = _wrap_agent_executor_invoke(AgentExecutor.invoke) | ||
| AgentExecutor.stream = _wrap_agent_executor_stream(AgentExecutor.stream) | ||
|
|
||
| # Patch embeddings providers | ||
| _patch_embeddings_provider(OpenAIEmbeddings) | ||
| _patch_embeddings_provider(AzureOpenAIEmbeddings) | ||
| _patch_embeddings_provider(VertexAIEmbeddings) | ||
| _patch_embeddings_provider(BedrockEmbeddings) | ||
| _patch_embeddings_provider(CohereEmbeddings) | ||
| _patch_embeddings_provider(MistralAIEmbeddings) | ||
| _patch_embeddings_provider(HuggingFaceEmbeddings) | ||
| _patch_embeddings_provider(OllamaEmbeddings) | ||
|
|
||
|
|
||
| class WatchedSpan: | ||
| span = None # type: Span | ||
|
|
@@ -976,3 +1028,105 @@ async def new_iterator_async(): | |
| return result | ||
|
|
||
| return new_stream | ||
|
|
||
|
|
||
| def _patch_embeddings_provider(provider_class): | ||
| # type: (Any) -> None | ||
| """Patch an embeddings provider class with monitoring wrappers.""" | ||
| if provider_class is None: | ||
| return | ||
|
|
||
| if hasattr(provider_class, "embed_documents"): | ||
| provider_class.embed_documents = _wrap_embedding_method( | ||
| provider_class.embed_documents | ||
| ) | ||
| if hasattr(provider_class, "embed_query"): | ||
| provider_class.embed_query = _wrap_embedding_method(provider_class.embed_query) | ||
| if hasattr(provider_class, "aembed_documents"): | ||
| provider_class.aembed_documents = _wrap_async_embedding_method( | ||
| provider_class.aembed_documents | ||
| ) | ||
| if hasattr(provider_class, "aembed_query"): | ||
| provider_class.aembed_query = _wrap_async_embedding_method( | ||
| provider_class.aembed_query | ||
| ) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Missing guard against double-wrapping embedding methodsThe |
||
|
|
||
|
|
||
| def _wrap_embedding_method(f): | ||
| # type: (Callable[..., Any]) -> Callable[..., Any] | ||
| """Wrap sync embedding methods (embed_documents and embed_query).""" | ||
|
|
||
| @wraps(f) | ||
| def new_embedding_method(self, *args, **kwargs): | ||
| # type: (Any, Any, Any) -> Any | ||
| integration = sentry_sdk.get_client().get_integration(LangchainIntegration) | ||
| if integration is None: | ||
| return f(self, *args, **kwargs) | ||
|
|
||
| model_name = getattr(self, "model", None) or getattr(self, "model_name", None) | ||
| with sentry_sdk.start_span( | ||
| op=OP.GEN_AI_EMBEDDINGS, | ||
| name=f"embeddings {model_name}" if model_name else "embeddings", | ||
| origin=LangchainIntegration.origin, | ||
| ) as span: | ||
| span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") | ||
| if model_name: | ||
| span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) | ||
|
|
||
| # Capture input if PII is allowed | ||
| if ( | ||
| should_send_default_pii() | ||
| and integration.include_prompts | ||
| and len(args) > 0 | ||
| ): | ||
| input_data = args[0] | ||
| # Normalize to list format | ||
| texts = input_data if isinstance(input_data, list) else [input_data] | ||
| set_data_normalized( | ||
| span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False | ||
| ) | ||
|
|
||
| result = f(self, *args, **kwargs) | ||
| return result | ||
|
|
||
| return new_embedding_method | ||
|
|
||
|
|
||
| def _wrap_async_embedding_method(f): | ||
| # type: (Callable[..., Any]) -> Callable[..., Any] | ||
| """Wrap async embedding methods (aembed_documents and aembed_query).""" | ||
|
|
||
| @wraps(f) | ||
| async def new_async_embedding_method(self, *args, **kwargs): | ||
| # type: (Any, Any, Any) -> Any | ||
| integration = sentry_sdk.get_client().get_integration(LangchainIntegration) | ||
| if integration is None: | ||
| return await f(self, *args, **kwargs) | ||
|
|
||
| model_name = getattr(self, "model", None) or getattr(self, "model_name", None) | ||
| with sentry_sdk.start_span( | ||
| op=OP.GEN_AI_EMBEDDINGS, | ||
| name=f"embeddings {model_name}" if model_name else "embeddings", | ||
| origin=LangchainIntegration.origin, | ||
| ) as span: | ||
| span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings") | ||
| if model_name: | ||
| span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) | ||
|
|
||
| # Capture input if PII is allowed | ||
| if ( | ||
| should_send_default_pii() | ||
| and integration.include_prompts | ||
| and len(args) > 0 | ||
| ): | ||
| input_data = args[0] | ||
| # Normalize to list format | ||
| texts = input_data if isinstance(input_data, list) else [input_data] | ||
| set_data_normalized( | ||
| span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False | ||
| ) | ||
|
|
||
| result = await f(self, *args, **kwargs) | ||
| return result | ||
|
|
||
| return new_async_embedding_method | ||
Uh oh!
There was an error while loading. Please reload this page.