Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Retire openapi3, use openapi-service-client instead #7514

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
222 changes: 49 additions & 173 deletions haystack/components/connectors/openapi_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
# SPDX-License-Identifier: Apache-2.0

import json
from collections import defaultdict
from copy import copy
from typing import Any, Dict, List, Optional, Union

from haystack import component, logging
Expand All @@ -13,25 +11,22 @@

logger = logging.getLogger(__name__)

with LazyImport("Run 'pip install openapi3'") as openapi_imports:
from openapi3 import OpenAPI
with LazyImport("Run 'pip install openapi-service-client'") as openapi_imports:
from openapi_service_client import ClientConfigurationBuilder, OpenAPIServiceClient
from openapi_service_client.providers import AnthropicLLMProvider, CohereLLMProvider, OpenAILLMProvider


@component
class OpenAPIServiceConnector:
"""
A component which connects the Haystack framework to OpenAPI services.

The `OpenAPIServiceConnector` component connects the Haystack framework to OpenAPI services, enabling it to call
operations as defined in the OpenAPI specification of the service.
The `OpenAPIServiceConnector` component connects the Haystack framework to OpenAPI services.

It integrates with `ChatMessage` dataclass, where the payload in messages is used to determine the method to be
called and the parameters to be passed. The message payload should be an OpenAI JSON formatted function calling
string consisting of the method name and the parameters to be passed to the method. The method name and parameters
are then used to invoke the method on the OpenAPI service. The response from the service is returned as a
`ChatMessage`.
called and the parameters to be passed. The response from the service is returned as a `ChatMessage`.

Function calling payloads from OpenAI, Anthropic, and Cohere LLMs are supported.

Before using this component, users usually resolve service endpoint parameters with a help of
Before using this component, users usually resolve function calling function definitions with a help of
`OpenAPIServiceToFunctions` component.

The example below demonstrates how to use the `OpenAPIServiceConnector` to invoke a method on a https://serper.dev/
Expand Down Expand Up @@ -69,18 +64,31 @@ class OpenAPIServiceConnector:

"""

def __init__(self):
def __init__(self, provider_map: Optional[Dict[str, Any]] = None, default_provider: Optional[str] = None):
"""
Initializes the OpenAPIServiceConnector instance

:param provider_map: A dictionary mapping provider names to their respective LLMProvider instances. The default
providers are OpenAILLMProvider, AnthropicLLMProvider, and CohereLLMProvider.
"""
openapi_imports.check()
self.provider_map = provider_map or {
"openai": OpenAILLMProvider(),
"anthropic": AnthropicLLMProvider(),
"cohere": CohereLLMProvider(),
}
default_provider = default_provider or "openai"
if default_provider not in self.provider_map:
raise ValueError(f"Default provider {default_provider} not found in provider map.")
self.default_provider = default_provider

@component.output_types(service_response=Dict[str, Any])
def run(
self,
messages: List[ChatMessage],
service_openapi_spec: Dict[str, Any],
service_credentials: Optional[Union[dict, str]] = None,
llm_provider: Optional[str] = None,
) -> Dict[str, List[ChatMessage]]:
"""
Processes a list of chat messages to invoke a method on an OpenAPI service.
Expand All @@ -91,10 +99,11 @@ def run(
:param messages: A list of `ChatMessage` objects containing the messages to be processed. The last message
should contain the function invocation payload in OpenAI function calling format. See the example in the class
docstring for the expected format.
:param service_openapi_spec: The OpenAPI JSON specification object of the service to be invoked. All the refs
should already be resolved.
:param service_openapi_spec: The OpenAPI JSON specification object of the service to be invoked.
:param service_credentials: The credentials to be used for authentication with the service.
Currently, only the http and apiKey OpenAPI security schemes are supported.
:param llm_provider: The name of the LLM provider that generated the function calling payload.
Default is "openai".

:return: A dictionary with the following keys:
- `service_response`: a list of `ChatMessage` objects, each containing the response from the service. The
Expand All @@ -108,163 +117,30 @@ def run(
last_message = messages[-1]
if not last_message.is_from(ChatRole.ASSISTANT):
raise ValueError(f"{last_message} is not from the assistant.")

function_invocation_payloads = self._parse_message(last_message)

# instantiate the OpenAPI service for the given specification
openapi_service = OpenAPI(service_openapi_spec)
self._authenticate_service(openapi_service, service_credentials)

response_messages = []
for method_invocation_descriptor in function_invocation_payloads:
service_response = self._invoke_method(openapi_service, method_invocation_descriptor)
# openapi3 parses the JSON service response into a model object, which is not our focus at the moment.
# Instead, we require direct access to the raw JSON data of the response, rather than the model objects
# provided by the openapi3 library. This approach helps us avoid issues related to (de)serialization.
# By accessing the raw JSON response through `service_response._raw_data`, we can serialize this data
# into a string. Finally, we use this string to create a ChatMessage object.
response_messages.append(ChatMessage.from_user(json.dumps(service_response._raw_data)))

return {"service_response": response_messages}

def _parse_message(self, message: ChatMessage) -> List[Dict[str, Any]]:
"""
Parses the message to extract the method invocation descriptor.

:param message: ChatMessage containing the tools calls
:return: A list of function invocation payloads
:raises ValueError: If the content is not valid JSON or lacks required fields.
"""
function_payloads = []
if not last_message.content:
raise ValueError("Function calling message content is empty.")

default_provider = self.provider_map.get(self.default_provider, None)
llm_provider = self.provider_map.get(llm_provider or "openai", None) or default_provider
logger.debug(f"Using LLM provider: {llm_provider.__class__.__name__}")

builder = ClientConfigurationBuilder()
config_openapi = (
builder.with_openapi_spec(service_openapi_spec)
.with_credentials(service_credentials)
.with_provider(llm_provider)
.build()
)
logger.debug(f"Invoking service {config_openapi.get_openapi_spec().get_name()} with {last_message.content}")
openapi_service = OpenAPIServiceClient(config_openapi)
try:
tool_calls = json.loads(message.content)
except json.JSONDecodeError:
raise ValueError("Invalid JSON content, expected OpenAI tools message.", message.content)

for tool_call in tool_calls:
# this should never happen, but just in case do a sanity check
if "type" not in tool_call:
raise ValueError("Message payload doesn't seem to be a tool invocation descriptor", message.content)

# In OpenAPIServiceConnector we know how to handle functions tools only
if tool_call["type"] == "function":
function_call = tool_call["function"]
function_payloads.append(
{"arguments": json.loads(function_call["arguments"]), "name": function_call["name"]}
)
return function_payloads

def _authenticate_service(self, openapi_service: OpenAPI, credentials: Optional[Union[dict, str]] = None):
"""
Authentication with an OpenAPI service.

Authenticates with the OpenAPI service if required, supporting both single (str) and multiple
authentication methods (dict).

OpenAPI spec v3 supports the following security schemes:
http – for Basic, Bearer and other HTTP authentications schemes
apiKey – for API keys and cookie authentication
oauth2 – for OAuth 2
openIdConnect – for OpenID Connect Discovery

Currently, only the http and apiKey schemes are supported. Multiple security schemes can be defined in the
OpenAPI spec, and the credentials should be provided as a dictionary with keys matching the security scheme
names. If only one security scheme is defined, the credentials can be provided as a simple string.

:param openapi_service: The OpenAPI service instance.
:param credentials: Credentials for authentication, which can be either a string (e.g. token) or a dictionary
with keys matching the authentication method names.
:raises ValueError: If authentication fails, is not found, or if appropriate credentials are missing.
"""
if openapi_service.raw_element.get("components", {}).get("securitySchemes"):
service_name = openapi_service.info.title
if not credentials:
raise ValueError(f"Service {service_name} requires authentication but no credentials were provided.")

# a dictionary of security schemes defined in the OpenAPI spec
# each key is the name of the security scheme, and the value is the scheme definition
security_schemes = openapi_service.components.securitySchemes.raw_element
supported_schemes = ["http", "apiKey"] # todo: add support for oauth2 and openIdConnect

authenticated = False
for scheme_name, scheme in security_schemes.items():
if scheme["type"] in supported_schemes:
auth_credentials = None
if isinstance(credentials, str):
auth_credentials = credentials
elif isinstance(credentials, dict) and scheme_name in credentials:
auth_credentials = credentials[scheme_name]
if auth_credentials:
openapi_service.authenticate(scheme_name, auth_credentials)
authenticated = True
break

raise ValueError(
f"Service {service_name} requires {scheme_name} security scheme but no "
f"credentials were provided for it. Check the service configuration and credentials."
)
if not authenticated:
raise ValueError(
f"Service {service_name} requires authentication but no credentials were provided "
f"for it. Check the service configuration and credentials."
)

def _invoke_method(self, openapi_service: OpenAPI, method_invocation_descriptor: Dict[str, Any]) -> Any:
"""
Invokes the specified method on the OpenAPI service.

The method name and arguments are passed in the method_invocation_descriptor.

:param openapi_service: The OpenAPI service instance.
:param method_invocation_descriptor: The method name and arguments to be passed to the method. The payload
should contain the method name (key: "name") and the arguments (key: "arguments"). The name is a string, and
the arguments are a dictionary of key-value pairs.
:return: A service JSON response.
:raises RuntimeError: If the method is not found or invocation fails.
"""
name = method_invocation_descriptor.get("name")
invocation_arguments = copy(method_invocation_descriptor.get("arguments", {}))
if not name or not invocation_arguments:
raise ValueError(
f"Invalid function calling descriptor: {method_invocation_descriptor} . It should contain "
f"a method name and arguments."
payload = (
json.loads(last_message.content) if isinstance(last_message.content, str) else last_message.content
)
service_response = openapi_service.invoke(payload)
except Exception as e:
logger.error(f"Error invoking OpenAPI endpoint. Error: {e}")
service_response = {"error": str(e)}
response_messages = [ChatMessage.from_user(json.dumps(service_response))]

# openapi3 specific method to call the operation, do we have it?
method_to_call = getattr(openapi_service, f"call_{name}", None)
if not callable(method_to_call):
raise RuntimeError(f"Operation {name} not found in OpenAPI specification {openapi_service.info.title}")

# get the operation reference from the method_to_call
operation = method_to_call.operation.__self__
operation_dict = operation.raw_element

# Pack URL/query parameters under "parameters" key
method_call_params: Dict[str, Dict[str, Any]] = defaultdict(dict)
parameters = operation_dict.get("parameters", [])
request_body = operation_dict.get("requestBody", {})

for param in parameters:
param_name = param["name"]
param_value = invocation_arguments.get(param_name)
if param_value:
method_call_params["parameters"][param_name] = param_value
else:
if param.get("required", False):
raise ValueError(f"Missing parameter: '{param_name}' required for the '{name}' operation.")

# Pack request body parameters under "data" key
if request_body:
schema = request_body.get("content", {}).get("application/json", {}).get("schema", {})
required_params = schema.get("required", [])
for param_name in schema.get("properties", {}):
param_value = invocation_arguments.get(param_name)
if param_value:
method_call_params["data"][param_name] = param_value
else:
if param_name in required_params:
raise ValueError(
f"Missing requestBody parameter: '{param_name}' required for the '{name}' operation."
)
# call the underlying service REST API with the parameters
return method_to_call(**method_call_params)
return {"service_response": response_messages}