diff --git a/aixplain/factories/__init__.py b/aixplain/factories/__init__.py index 36147c6e..7b876899 100644 --- a/aixplain/factories/__init__.py +++ b/aixplain/factories/__init__.py @@ -20,6 +20,7 @@ limitations under the License. """ from .asset_factory import AssetFactory +from .agent_factory import AgentFactory from .benchmark_factory import BenchmarkFactory from .corpus_factory import CorpusFactory from .data_factory import DataFactory diff --git a/aixplain/factories/agent_factory/__init__.py b/aixplain/factories/agent_factory/__init__.py new file mode 100644 index 00000000..36380a76 --- /dev/null +++ b/aixplain/factories/agent_factory/__init__.py @@ -0,0 +1,166 @@ +__author__ = "lucaspavanelli" + +""" +Copyright 2024 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Thiago Castro Ferreira and Lucas Pavanelli +Date: May 16th 2024 +Description: + Agent Factory Class +""" + +import json +import logging + +from aixplain.enums.supplier import Supplier +from aixplain.modules.agent import Agent, Tool +from aixplain.modules.agent.tool.model_tool import ModelTool +from aixplain.modules.agent.tool.pipeline_tool import PipelineTool +from aixplain.utils import config +from typing import Dict, List, Optional, Text, Union + +from aixplain.factories.agent_factory.utils import build_agent +from aixplain.utils.file_utils import _request_with_retry +from urllib.parse import urljoin + + +class AgentFactory: + @classmethod + def create( + cls, + name: Text, + llm_id: Text, + tools: List[Tool] = [], + description: Text = "", + api_key: Text = config.TEAM_API_KEY, + supplier: Union[Dict, Text, Supplier, int] = "aiXplain", + version: Optional[Text] = None, + ) -> Agent: + """Create a new agent in the platform.""" + try: + agent = None + url = urljoin(config.BACKEND_URL, "sdk/agents") + headers = {"x-api-key": api_key} + + if isinstance(supplier, dict): + supplier = supplier["code"] + elif isinstance(supplier, Supplier): + supplier = supplier.value["code"] + + tool_payload = [] + for tool in tools: + if isinstance(tool, ModelTool): + tool_payload.append( + { + "function": tool.function.value, + "type": "model", + "description": tool.description, + "supplier": tool.supplier.value["code"] if tool.supplier else None, + "version": tool.version if tool.version else None, + } + ) + elif isinstance(tool, PipelineTool): + tool_payload.append( + { + "assetId": tool.pipeline, + "description": tool.description, + "type": "pipeline", + } + ) + else: + raise Exception("Agent Creation Error: Tool type not supported.") + + payload = { + "name": name, + "assets": tool_payload, + "description": description, + "supplier": supplier, + "version": version, + } + if llm_id is not None: + payload["llmId"] = llm_id + + logging.info(f"Start service for POST Create Agent - {url} - {headers} - {json.dumps(payload)}") + r = _request_with_retry("post", url, headers=headers, json=payload) + if 200 <= r.status_code < 300: + response = r.json() + agent = build_agent(payload=response, api_key=api_key) + else: + error = r.json() + error_msg = "Agent Onboarding Error: Please contant the administrators." + if "message" in error: + msg = error["message"] + if error["message"] == "err.name_already_exists": + msg = "Agent name already exists." + elif error["message"] == "err.asset_is_not_available": + msg = "Some the tools are not available." + error_msg = f"Agent Onboarding Error (HTTP {r.status_code}): {msg}" + logging.exception(error_msg) + raise Exception(error_msg) + except Exception as e: + raise Exception(e) + return agent + + @classmethod + def list(cls) -> Dict: + """List all agents available in the platform.""" + url = urljoin(config.BACKEND_URL, "sdk/agents") + headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"} + + payload = {} + logging.info(f"Start service for GET List Agents - {url} - {headers} - {json.dumps(payload)}") + try: + r = _request_with_retry("get", url, headers=headers) + resp = r.json() + + if 200 <= r.status_code < 300: + agents, page_total, total = [], 0, 0 + results = resp + page_total = len(results) + total = len(results) + logging.info(f"Response for GET List Agents - Page Total: {page_total} / Total: {total}") + for agent in results: + agents.append(build_agent(agent)) + return {"results": agents, "page_total": page_total, "page_number": 0, "total": total} + else: + error_msg = "Agent Listing Error: Please contant the administrators." + if "message" in resp: + msg = resp["message"] + error_msg = f"Agent Listing Error (HTTP {r.status_code}): {msg}" + logging.exception(error_msg) + raise Exception(error_msg) + except Exception as e: + raise Exception(e) + + @classmethod + def get(cls, agent_id: Text, api_key: Optional[Text] = None) -> Agent: + """Get agent by id.""" + url = urljoin(config.BACKEND_URL, f"sdk/agents/{agent_id}") + if config.AIXPLAIN_API_KEY != "": + headers = {"x-aixplain-key": f"{config.AIXPLAIN_API_KEY}", "Content-Type": "application/json"} + else: + api_key = api_key if api_key is not None else config.TEAM_API_KEY + headers = {"x-api-key": api_key, "Content-Type": "application/json"} + logging.info(f"Start service for GET Agent - {url} - {headers}") + r = _request_with_retry("get", url, headers=headers) + resp = r.json() + if 200 <= r.status_code < 300: + return build_agent(resp) + else: + msg = "Please contant the administrators." + if "message" in resp: + msg = resp["message"] + error_msg = f"Agent Get Error (HTTP {r.status_code}): {msg}" + raise Exception(error_msg) diff --git a/aixplain/factories/agent_factory/utils.py b/aixplain/factories/agent_factory/utils.py new file mode 100644 index 00000000..6363a08e --- /dev/null +++ b/aixplain/factories/agent_factory/utils.py @@ -0,0 +1,48 @@ +__author__ = "thiagocastroferreira" + +import aixplain.utils.config as config +from aixplain.enums import Function, Supplier +from aixplain.enums.asset_status import AssetStatus +from aixplain.modules.agent import Agent, ModelTool, PipelineTool +from typing import Dict, Text +from urllib.parse import urljoin + + +def build_agent(payload: Dict, api_key: Text = config.TEAM_API_KEY) -> Agent: + """Instantiate a new agent in the platform.""" + tools = payload["assets"] + for i, tool in enumerate(tools): + if tool["type"] == "model": + for supplier in Supplier: + if tool["supplier"] is not None and tool["supplier"].lower() in [ + supplier.value["code"].lower(), + supplier.value["name"].lower(), + ]: + tool["supplier"] = supplier + break + + tool = ModelTool( + function=Function(tool["function"]), + supplier=tool["supplier"], + version=tool["version"], + ) + elif tool["type"] == "pipeline": + tool = PipelineTool(description=tool["description"], pipeline=tool["assetId"]) + else: + raise Exception("Agent Creation Error: Tool type not supported.") + tools[i] = tool + + agent = Agent( + id=payload["id"], + name=payload["name"] if "name" in payload else "", + tools=tools, + description=payload["description"] if "description" in payload else "", + supplier=payload["teamId"] if "teamId" in payload else None, + version=payload["version"] if "version" in payload else None, + cost=payload["cost"] if "cost" in payload else None, + llm_id=payload["llmId"] if "llmId" in payload else "6646261c6eb563165658bbb1", + api_key=api_key, + status=AssetStatus(payload["status"]), + ) + agent.url = urljoin(config.BACKEND_URL, f"sdk/agents/{agent.id}/run") + return agent diff --git a/aixplain/factories/pipeline_factory.py b/aixplain/factories/pipeline_factory.py index cc94ff79..61bcb214 100644 --- a/aixplain/factories/pipeline_factory.py +++ b/aixplain/factories/pipeline_factory.py @@ -45,6 +45,11 @@ class PipelineFactory: aixplain_key = config.AIXPLAIN_API_KEY backend_url = config.BACKEND_URL + @classmethod + def __get_typed_nodes(cls, response: Dict, type: str) -> List[Dict]: + # read "nodes" field from response and return the nodes that are marked by "type": type + return [node for node in response["nodes"] if node["type"].lower() == type.lower()] + @classmethod def __from_response(cls, response: Dict) -> Pipeline: """Converts response Json to 'Pipeline' object @@ -57,7 +62,9 @@ def __from_response(cls, response: Dict) -> Pipeline: """ if "api_key" not in response: response["api_key"] = config.TEAM_API_KEY - return Pipeline(response["id"], response["name"], response["api_key"]) + input = cls.__get_typed_nodes(response, "input") + output = cls.__get_typed_nodes(response, "output") + return Pipeline(response["id"], response["name"], response["api_key"], input=input, output=output) @classmethod def get(cls, pipeline_id: Text, api_key: Optional[Text] = None) -> Pipeline: @@ -225,16 +232,13 @@ def list( return {"results": pipelines, "page_total": page_total, "page_number": page_number, "total": total} @classmethod - def create( - cls, name: Text, pipeline: Union[Text, Dict], status: Text = "draft", api_key: Optional[Text] = None - ) -> Pipeline: - """Pipeline Creation + def create(cls, name: Text, pipeline: Union[Text, Dict], api_key: Optional[Text] = None) -> Pipeline: + """Draft Pipeline Creation Args: name (Text): Pipeline Name pipeline (Union[Text, Dict]): Pipeline as a Python dictionary or in a JSON file - status (Text, optional): Status of the pipeline. Currently only draft pipelines can be saved. Defaults to "draft". - api_key (Optional[Text], optional): API Key. Defaults to None. + api_key (Optional[Text], optional): Team API Key to create the Pipeline. Defaults to None. Raises: Exception: Currently just the creation of draft pipelines are supported @@ -243,15 +247,17 @@ def create( Pipeline: instance of the new pipeline """ try: - assert status == "draft", "Pipeline Creation Error: Currently just the creation of draft pipelines are supported." if isinstance(pipeline, str) is True: _, ext = os.path.splitext(pipeline) assert ( os.path.exists(pipeline) and ext == ".json" - ), "Pipeline Creation Error: Make sure the pipeline to be save is in a JSON file." + ), "Pipeline Creation Error: Make sure the pipeline to be saved is in a JSON file." with open(pipeline) as f: pipeline = json.load(f) + for i, node in enumerate(pipeline["nodes"]): + if "functionType" in node and node["functionType"] == "AI": + pipeline["nodes"][i]["functionType"] = pipeline["nodes"][i]["functionType"].lower() # prepare payload payload = {"name": name, "status": "draft", "architecture": pipeline} url = urljoin(cls.backend_url, "sdk/pipelines") diff --git a/aixplain/modules/__init__.py b/aixplain/modules/__init__.py index 488c8c2f..c7246dac 100644 --- a/aixplain/modules/__init__.py +++ b/aixplain/modules/__init__.py @@ -33,3 +33,5 @@ from .finetune.status import FinetuneStatus from .benchmark import Benchmark from .benchmark_job import BenchmarkJob +from .agent import Agent +from .agent.tool import Tool diff --git a/aixplain/modules/agent/__init__.py b/aixplain/modules/agent/__init__.py new file mode 100644 index 00000000..2f244d56 --- /dev/null +++ b/aixplain/modules/agent/__init__.py @@ -0,0 +1,195 @@ +__author__ = "aiXplain" + +""" +Copyright 2024 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Lucas Pavanelli and Thiago Castro Ferreira +Date: May 16th 2024 +Description: + Agentification Class +""" +import json +import logging +import time +import traceback + +from aixplain.utils.file_utils import _request_with_retry +from aixplain.enums.supplier import Supplier +from aixplain.enums.asset_status import AssetStatus +from aixplain.modules.model import Model +from aixplain.modules.agent.tool import Tool +from aixplain.modules.agent.tool.model_tool import ModelTool +from aixplain.modules.agent.tool.pipeline_tool import PipelineTool +from typing import Dict, List, Text, Optional, Union +from urllib.parse import urljoin + +from aixplain.utils import config + + +class Agent(Model): + """Advanced AI system capable of performing tasks by leveraging specialized software tools and resources from aiXplain marketplace. + + Attributes: + id (Text): ID of the Agent + name (Text): Name of the Agent + tools (List[Tool]): List of tools that the Agent uses. + description (Text, optional): description of the Agent. Defaults to "". + llm_id (Text, optional): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1). + supplier (Text): Supplier of the Agent. + version (Text): Version of the Agent. + backend_url (str): URL of the backend. + api_key (str): The TEAM API key used for authentication. + cost (Dict, optional): model price. Defaults to None. + """ + + def __init__( + self, + id: Text, + name: Text, + tools: List[Tool] = [], + description: Text = "", + llm_id: Text = "6646261c6eb563165658bbb1", + api_key: Optional[Text] = config.TEAM_API_KEY, + supplier: Union[Dict, Text, Supplier, int] = "aiXplain", + version: Optional[Text] = None, + cost: Optional[Dict] = None, + status: AssetStatus = AssetStatus.ONBOARDING, + **additional_info, + ) -> None: + """Create a FineTune with the necessary information. + + Args: + id (Text): ID of the Agent + name (Text): Name of the Agent + tools (List[Tool]): List of tools that the Agent uses. + description (Text, optional): description of the Agent. Defaults to "". + llm_id (Text, optional): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1). + supplier (Text): Supplier of the Agent. + version (Text): Version of the Agent. + backend_url (str): URL of the backend. + api_key (str): The TEAM API key used for authentication. + cost (Dict, optional): model price. Defaults to None. + """ + # assert len(tools) > 0, "At least one tool must be provided." + super().__init__(id, name, description, api_key, supplier, version, cost=cost) + self.additional_info = additional_info + self.tools = tools + self.llm_id = llm_id + if isinstance(status, str): + try: + status = AssetStatus(status) + except Exception: + status = AssetStatus.ONBOARDING + self.status = status + + def run( + self, + query: Text, + session_id: Optional[Text] = None, + history: Optional[List[Dict]] = None, + name: Text = "model_process", + timeout: float = 300, + parameters: Dict = {}, + wait_time: float = 0.5, + ) -> Dict: + """Runs an agent call. + + Args: + query (Text): query to be processed by the agent. + session_id (Optional[Text], optional): conversation Session ID. Defaults to None. + history (Optional[List[Dict]], optional): chat history (in case session ID is None). Defaults to None. + name (Text, optional): ID given to a call. Defaults to "model_process". + timeout (float, optional): total polling time. Defaults to 300. + parameters (Dict, optional): optional parameters to the model. Defaults to "{}". + wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5. + + Returns: + Dict: parsed output from model + """ + start = time.time() + try: + response = self.run_async(query=query, session_id=session_id, history=history, name=name, parameters=parameters) + if response["status"] == "FAILED": + end = time.time() + response["elapsed_time"] = end - start + return response + poll_url = response["url"] + end = time.time() + response = self.sync_poll(poll_url, name=name, timeout=timeout, wait_time=wait_time) + return response + except Exception as e: + msg = f"Error in request for {name} - {traceback.format_exc()}" + logging.error(f"Model Run: Error in running for {name}: {e}") + end = time.time() + return {"status": "FAILED", "error": msg, "elapsed_time": end - start} + + def run_async( + self, + query: Text, + session_id: Optional[Text] = None, + history: Optional[List[Dict]] = None, + name: Text = "model_process", + parameters: Dict = {}, + ) -> Dict: + """Runs asynchronously an agent call. + + Args: + query (Text): query to be processed by the agent. + session_id (Optional[Text], optional): conversation Session ID. Defaults to None. + history (Optional[List[Dict]], optional): chat history (in case session ID is None). Defaults to None. + name (Text, optional): ID given to a call. Defaults to "model_process". + parameters (Dict, optional): optional parameters to the model. Defaults to "{}". + + Returns: + dict: polling URL in response + """ + headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} + from aixplain.factories.file_factory import FileFactory + + payload = {"id": self.id, "query": FileFactory.to_link(query), "sessionId": session_id, "history": history} + payload.update(parameters) + payload = json.dumps(payload) + + r = _request_with_retry("post", self.url, headers=headers, data=payload) + logging.info(f"Model Run Async: Start service for {name} - {self.url} - {payload} - {headers}") + + resp = None + try: + resp = r.json() + logging.info(f"Result of request for {name} - {r.status_code} - {resp}") + + poll_url = resp["data"] + response = {"status": "IN_PROGRESS", "url": poll_url} + except Exception: + response = {"status": "FAILED"} + msg = f"Error in request for {name} - {traceback.format_exc()}" + logging.error(f"Model Run Async: Error in running for {name}: {resp}") + if resp is not None: + response["error"] = msg + return response + + def delete(self) -> None: + """Delete Corpus service""" + try: + url = urljoin(config.BACKEND_URL, f"sdk/agents/{self.id}") + headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"} + logging.debug(f"Start service for DELETE Agent - {url} - {headers}") + r = _request_with_retry("delete", url, headers=headers) + if r.status_code != 200: + raise Exception() + except Exception: + message = f"Agent Deletion Error (HTTP {r.status_code}): Make sure the agent exists and you are the owner." + logging.error(message) + raise Exception(f"{message}") diff --git a/aixplain/modules/agent/tool.py b/aixplain/modules/agent/tool.py new file mode 100644 index 00000000..6651afe7 --- /dev/null +++ b/aixplain/modules/agent/tool.py @@ -0,0 +1,59 @@ +__author__ = "aiXplain" + +""" +Copyright 2024 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Lucas Pavanelli and Thiago Castro Ferreira +Date: May 16th 2024 +Description: + Agentification Class +""" +from typing import Text, Optional + +from aixplain.enums.function import Function +from aixplain.enums.supplier import Supplier + + +class Tool: + """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. + + Attributes: + name (Text): name of the tool + description (Text): descriptiion of the tool + function (Function): task that the tool performs + supplier (Optional[Union[Dict, Text, Supplier, int]], optional): Preferred supplier to perform the task. Defaults to None. + """ + + def __init__( + self, + name: Text, + description: Text, + function: Function, + supplier: Optional[Supplier] = None, + **additional_info, + ) -> None: + """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. + + Args: + name (Text): name of the tool + description (Text): descriptiion of the tool + function (Function): task that the tool performs + supplier (Optional[Union[Dict, Text, Supplier, int]], optional): Preferred supplier to perform the task. Defaults to None. + """ + self.name = name + self.description = description + self.function = function + self.supplier = supplier + self.additional_info = additional_info diff --git a/aixplain/modules/agent/tool/__init__.py b/aixplain/modules/agent/tool/__init__.py new file mode 100644 index 00000000..2a22511a --- /dev/null +++ b/aixplain/modules/agent/tool/__init__.py @@ -0,0 +1,53 @@ +__author__ = "aiXplain" + +""" +Copyright 2024 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Lucas Pavanelli and Thiago Castro Ferreira +Date: May 16th 2024 +Description: + Agentification Class +""" +from abc import ABC +from typing import Optional, Text + + +class Tool(ABC): + """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. + + Attributes: + name (Text): name of the tool + description (Text): descriptiion of the tool + version (Text): version of the tool + """ + + def __init__( + self, + name: Text, + description: Text, + version: Optional[Text] = None, + **additional_info, + ) -> None: + """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. + + Args: + name (Text): name of the tool + description (Text): descriptiion of the tool + version (Text): version of the tool + """ + self.name = name + self.description = description + self.version = version + self.additional_info = additional_info diff --git a/aixplain/modules/agent/tool/model_tool.py b/aixplain/modules/agent/tool/model_tool.py new file mode 100644 index 00000000..69bf28d5 --- /dev/null +++ b/aixplain/modules/agent/tool/model_tool.py @@ -0,0 +1,60 @@ +__author__ = "aiXplain" + +""" +Copyright 2024 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Lucas Pavanelli and Thiago Castro Ferreira +Date: May 16th 2024 +Description: + Agentification Class +""" +from typing import Optional + +from aixplain.enums.function import Function +from aixplain.enums.supplier import Supplier +from aixplain.modules.agent.tool import Tool + + +class ModelTool(Tool): + """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. + + Attributes: + function (Function): task that the tool performs + supplier (Optional[Union[Dict, Text, Supplier, int]], optional): Preferred supplier to perform the task. Defaults to None. + """ + + def __init__( + self, + function: Function, + supplier: Optional[Supplier] = None, + **additional_info, + ) -> None: + """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. + + Args: + function (Function): task that the tool performs + supplier (Optional[Union[Dict, Text, Supplier, int]], optional): Preferred supplier to perform the task. Defaults to None. + """ + super().__init__("", "", **additional_info) + if isinstance(function, str): + function = Function(function) + self.function = function + + try: + if isinstance(supplier, dict): + supplier = Supplier(supplier) + except Exception: + supplier = None + self.supplier = supplier diff --git a/aixplain/modules/agent/tool/pipeline_tool.py b/aixplain/modules/agent/tool/pipeline_tool.py new file mode 100644 index 00000000..a517b198 --- /dev/null +++ b/aixplain/modules/agent/tool/pipeline_tool.py @@ -0,0 +1,52 @@ +__author__ = "aiXplain" + +""" +Copyright 2024 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Lucas Pavanelli and Thiago Castro Ferreira +Date: May 16th 2024 +Description: + Agentification Class +""" +from typing import Text, Union + +from aixplain.modules.agent.tool import Tool +from aixplain.modules.pipeline import Pipeline + + +class PipelineTool(Tool): + """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. + + Attributes: + description (Text): descriptiion of the tool + pipeline (Union[Text, Pipeline]): pipeline + """ + + def __init__( + self, + description: Text, + pipeline: Union[Text, Pipeline], + **additional_info, + ) -> None: + """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. + + Args: + description (Text): description of the tool + pipeline (Union[Text, Pipeline]): pipeline + """ + super().__init__("", description, **additional_info) + if isinstance(pipeline, Pipeline): + pipeline = pipeline.id + self.pipeline = pipeline diff --git a/aixplain/modules/asset.py b/aixplain/modules/asset.py index 52b79912..c453415d 100644 --- a/aixplain/modules/asset.py +++ b/aixplain/modules/asset.py @@ -57,7 +57,13 @@ def __init__( elif isinstance(supplier, Dict) is True: self.supplier = Supplier(supplier) else: - self.supplier = supplier + self.supplier = None + for supplier_ in Supplier: + if supplier.lower() in [supplier_.value["code"].lower(), supplier_.value["name"].lower()]: + self.supplier = supplier_ + break + if self.supplier is None: + self.supplier = supplier except Exception: self.supplier = str(supplier) self.version = version diff --git a/aixplain/modules/finetune/__init__.py b/aixplain/modules/finetune/__init__.py index e1b63941..fe2cb15c 100644 --- a/aixplain/modules/finetune/__init__.py +++ b/aixplain/modules/finetune/__init__.py @@ -26,7 +26,6 @@ from urllib.parse import urljoin from aixplain.modules.finetune.cost import FinetuneCost from aixplain.modules.finetune.hyperparameters import Hyperparameters -from aixplain.factories.model_factory import ModelFactory from aixplain.modules.asset import Asset from aixplain.modules.dataset import Dataset from aixplain.modules.model import Model @@ -110,7 +109,7 @@ def start(self) -> Model: """ payload = {} try: - url = urljoin(self.backend_url, f"sdk/finetune") + url = urljoin(self.backend_url, "sdk/finetune") headers = {"Authorization": f"Token {self.api_key}", "Content-Type": "application/json"} payload = { "name": self.name, @@ -134,6 +133,8 @@ def start(self) -> Model: r = _request_with_retry("post", url, headers=headers, json=payload) resp = r.json() logging.info(f"Response for POST Start FineTune - Name: {self.name} / Status {resp}") + from aixplain.factories.model_factory import ModelFactory + return ModelFactory().get(resp["id"]) except Exception: message = "" diff --git a/aixplain/modules/metric.py b/aixplain/modules/metric.py index d591772b..86c08a08 100644 --- a/aixplain/modules/metric.py +++ b/aixplain/modules/metric.py @@ -24,9 +24,6 @@ from typing import Optional, Text, List, Union from aixplain.modules.asset import Asset -from aixplain.utils.file_utils import _request_with_retry -from aixplain.factories.model_factory import ModelFactory - class Metric(Asset): """Represents a metric to be computed on one or more peices of data. It is usually linked to a machine learning task. diff --git a/aixplain/modules/model/__init__.py b/aixplain/modules/model/__init__.py index 285cbe55..4be40225 100644 --- a/aixplain/modules/model/__init__.py +++ b/aixplain/modules/model/__init__.py @@ -24,9 +24,7 @@ import json import logging import traceback -from typing import List -from aixplain.factories.file_factory import FileFactory -from aixplain.enums import Function, Supplier +from aixplain.enums import Supplier, Function from aixplain.modules.asset import Asset from aixplain.utils import config from urllib.parse import urljoin @@ -57,7 +55,7 @@ def __init__( id: Text, name: Text, description: Text = "", - api_key: Optional[Text] = None, + api_key: Text = config.TEAM_API_KEY, supplier: Union[Dict, Text, Supplier, int] = "aiXplain", version: Optional[Text] = None, function: Optional[Function] = None, @@ -163,7 +161,7 @@ def poll(self, poll_url: Text, name: Text = "model_process") -> Dict: resp["status"] = "FAILED" else: resp["status"] = "IN_PROGRESS" - logging.info(f"Single Poll for Model: Status of polling for {name}: {resp}") + logging.debug(f"Single Poll for Model: Status of polling for {name}: {resp}") except Exception as e: resp = {"status": "FAILED"} logging.error(f"Single Poll for Model: Error of polling for {name}: {e}") @@ -218,6 +216,7 @@ def run_async(self, data: Union[Text, Dict], name: Text = "model_process", param dict: polling URL in response """ headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} + from aixplain.factories.file_factory import FileFactory data = FileFactory.to_link(data) if isinstance(data, dict): @@ -229,7 +228,7 @@ def run_async(self, data: Union[Text, Dict], name: Text = "model_process", param if isinstance(payload, int) is True or isinstance(payload, float) is True: payload = str(payload) payload = {"data": payload} - except Exception as e: + except Exception: payload = {"data": data} payload.update(parameters) payload = json.dumps(payload) @@ -245,7 +244,7 @@ def run_async(self, data: Union[Text, Dict], name: Text = "model_process", param poll_url = resp["data"] response = {"status": "IN_PROGRESS", "url": poll_url} - except Exception as e: + except Exception: response = {"status": "FAILED"} msg = f"Error in request for {name} - {traceback.format_exc()}" logging.error(f"Model Run Async: Error in running for {name}: {resp}") @@ -267,7 +266,7 @@ def check_finetune_status(self, after_epoch: Optional[int] = None): """ from aixplain.enums.asset_status import AssetStatus from aixplain.modules.finetune.status import FinetuneStatus - + headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} resp = None try: @@ -311,7 +310,7 @@ def check_finetune_status(self, after_epoch: Optional[int] = None): logging.info(f"Response for GET Check FineTune status Model - Id {self.id} / Status {status.status.value}.") return status - except Exception as e: + except Exception: message = "" if resp is not None and "statusCode" in resp: status_code = resp["statusCode"] diff --git a/aixplain/modules/model/llm_model.py b/aixplain/modules/model/llm_model.py index 349ea595..14b9c7f4 100644 --- a/aixplain/modules/model/llm_model.py +++ b/aixplain/modules/model/llm_model.py @@ -196,12 +196,12 @@ def run_async( payload = {"data": data} parameters.update( { - "context": context, - "prompt": prompt, - "history": history, - "temperature": temperature, - "max_tokens": max_tokens, - "top_p": top_p, + "context": payload["context"] if "context" in payload else context, + "prompt": payload["prompt"] if "prompt" in payload else prompt, + "history": payload["history"] if "history" in payload else history, + "temperature": payload["temperature"] if "temperature" in payload else temperature, + "max_tokens": payload["max_tokens"] if "max_tokens" in payload else max_tokens, + "top_p": payload["top_p"] if "top_p" in payload else top_p, } ) payload.update(parameters) diff --git a/aixplain/modules/pipeline.py b/aixplain/modules/pipeline.py index 3de49756..ed131018 100644 --- a/aixplain/modules/pipeline.py +++ b/aixplain/modules/pipeline.py @@ -101,12 +101,12 @@ def __polling( time.sleep(wait_time) if wait_time < 60: wait_time *= 1.1 - except Exception as e: + except Exception: logging.error(f"Polling for Pipeline: polling for {name} : Continue") if response_body and response_body["status"] == "SUCCESS": try: logging.debug(f"Polling for Pipeline: Final status of polling for {name} : SUCCESS - {response_body}") - except Exception as e: + except Exception: logging.error(f"Polling for Pipeline: Final status of polling for {name} : ERROR - {response_body}") else: logging.error( @@ -130,7 +130,7 @@ def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict: try: resp = r.json() logging.info(f"Single Poll for Pipeline: Status of polling for {name} : {resp}") - except Exception as e: + except Exception: resp = {"status": "FAILED"} return resp @@ -206,7 +206,7 @@ def __prepare_payload(self, data: Union[Text, Dict], data_asset: Optional[Union[ if isinstance(payload, int) is True or isinstance(payload, float) is True: payload = str(payload) payload = {"data": payload} - except Exception as e: + except Exception: payload = {"data": data} else: payload = {} @@ -251,7 +251,7 @@ def __prepare_payload(self, data: Union[Text, Dict], data_asset: Optional[Union[ if target_row.id == data[node_label]: data_found = True break - if data_found == True: + if data_found is True: break except Exception: data_asset_found = False @@ -303,17 +303,19 @@ def run_async( poll_url = resp["url"] response = {"status": "IN_PROGRESS", "url": poll_url} - except Exception as e: + except Exception: response = {"status": "FAILED"} if resp is not None: response["error"] = resp return response - def update(self, pipeline: Union[Text, Dict]): + def update(self, pipeline: Union[Text, Dict], save_as_asset: bool = False, api_key: Optional[Text] = None): """Update Pipeline Args: pipeline (Union[Text, Dict]): Pipeline as a Python dictionary or in a JSON file + save_as_asset (bool, optional): Save as asset (True) or draft (False). Defaults to False. + api_key (Optional[Text], optional): Team API Key to create the Pipeline. Defaults to None. Raises: Exception: Make sure the pipeline to be save is in a JSON file. @@ -323,17 +325,38 @@ def update(self, pipeline: Union[Text, Dict]): _, ext = os.path.splitext(pipeline) assert ( os.path.exists(pipeline) and ext == ".json" - ), "Pipeline Update Error: Make sure the pipeline to be save is in a JSON file." + ), "Pipeline Update Error: Make sure the pipeline to be saved is in a JSON file." with open(pipeline) as f: pipeline = json.load(f) + for i, node in enumerate(pipeline["nodes"]): + if "functionType" in node and node["functionType"] == "AI": + pipeline["nodes"][i]["functionType"] = pipeline["nodes"][i]["functionType"].lower() # prepare payload - payload = {"name": self.name, "status": "draft", "architecture": pipeline} + status = "draft" + if save_as_asset is True: + status = "onboarded" + payload = {"name": self.name, "status": status, "architecture": pipeline} url = urljoin(config.BACKEND_URL, f"sdk/pipelines/{self.id}") - headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + api_key = api_key if api_key is not None else config.TEAM_API_KEY + headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"} logging.info(f"Start service for PUT Update Pipeline - {url} - {headers} - {json.dumps(payload)}") r = _request_with_retry("put", url, headers=headers, json=payload) response = r.json() logging.info(f"Pipeline {response['id']} Updated.") except Exception as e: raise Exception(e) + + def delete(self) -> None: + """Delete Dataset service""" + try: + url = urljoin(config.BACKEND_URL, f"sdk/pipelines/{self.id}") + headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + logging.info(f"Start service for DELETE Pipeline - {url} - {headers}") + r = _request_with_retry("delete", url, headers=headers) + if r.status_code != 200: + raise Exception() + except Exception: + message = "Pipeline Deletion Error: Make sure the pipeline exists and you are the owner." + logging.error(message) + raise Exception(f"{message}") diff --git a/pyproject.toml b/pyproject.toml index 112c8f9a..73980717 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ namespaces = true [project] name = "aiXplain" -version = "0.2.12" +version = "0.2.13rc2" description = "aiXplain SDK adds AI functions to software." readme = "README.md" requires-python = ">=3.5, <4" diff --git a/tests/functional/agent/agent_functional_test.py b/tests/functional/agent/agent_functional_test.py new file mode 100644 index 00000000..f58dcb63 --- /dev/null +++ b/tests/functional/agent/agent_functional_test.py @@ -0,0 +1,75 @@ +__author__ = "lucaspavanelli" + +""" +Copyright 2022 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import json +from dotenv import load_dotenv + +load_dotenv() +from aixplain.factories import AgentFactory +from aixplain.modules.agent import ModelTool, PipelineTool +from aixplain.enums.supplier import Supplier + +import pytest + +RUN_FILE = "tests/functional/agent/data/agent_test_end2end.json" + + +def read_data(data_path): + return json.load(open(data_path, "r")) + + +@pytest.fixture(scope="module", params=read_data(RUN_FILE)) +def run_input_map(request): + return request.param + + +def test_end2end(run_input_map): + tools = [] + if "model_tools" in run_input_map: + for tool in run_input_map["model_tools"]: + for supplier in Supplier: + if tool["supplier"] is not None and tool["supplier"].lower() in [ + supplier.value["code"].lower(), + supplier.value["name"].lower(), + ]: + tool["supplier"] = supplier + break + tools.append(ModelTool(function=tool["function"], supplier=tool["supplier"])) + if "pipeline_tools" in run_input_map: + for tool in run_input_map["pipeline_tools"]: + tools.append(PipelineTool(description=tool["description"], pipeline=tool["pipeline_id"])) + print(f"Creating agent with tools: {tools}") + agent = AgentFactory.create(name=run_input_map["agent_name"], llm_id=run_input_map["llm_id"], tools=tools) + print(f"Agent created: {agent.__dict__}") + print("Running agent") + response = agent.run(query=run_input_map["query"]) + print(f"Agent response: {response}") + assert response is not None + assert response["completed"] is True + assert response["status"].lower() == "success" + assert "data" in response + assert response["data"]["session_id"] is not None + assert response["data"]["output"] is not None + print("Deleting agent") + agent.delete() + + +def test_list_agents(): + agents = AgentFactory.list() + assert "results" in agents + agents_result = agents["results"] + assert type(agents_result) is list diff --git a/tests/functional/agent/data/agent_test_end2end.json b/tests/functional/agent/data/agent_test_end2end.json new file mode 100644 index 00000000..147928fe --- /dev/null +++ b/tests/functional/agent/data/agent_test_end2end.json @@ -0,0 +1,14 @@ +[ + { + "agent_name": "[TEST] Translation agent", + "llm_id": "6626a3a8c8f1d089790cf5a2", + "llm_name": "Groq Llama 3 70B", + "query": "Who is the president of Brazil right now? Translate to pt", + "model_tools": [ + { + "function": "translation", + "supplier": "AWS" + } + ] + } +] diff --git a/tests/functional/pipelines/create_test.py b/tests/functional/pipelines/create_test.py index f2c1a9c9..6431bd41 100644 --- a/tests/functional/pipelines/create_test.py +++ b/tests/functional/pipelines/create_test.py @@ -30,6 +30,7 @@ def test_create_pipeline_from_json(): assert isinstance(pipeline, Pipeline) assert pipeline.id != "" + pipeline.delete() def test_create_pipeline_from_string(): @@ -42,6 +43,7 @@ def test_create_pipeline_from_string(): assert isinstance(pipeline, Pipeline) assert pipeline.id != "" + pipeline.delete() def test_update_pipeline(): @@ -52,13 +54,14 @@ def test_update_pipeline(): pipeline_name = str(uuid4()) pipeline = PipelineFactory.create(name=pipeline_name, pipeline=pipeline_dict) - pipeline.update(pipeline=pipeline_json) + pipeline.update(pipeline=pipeline_json, save_as_asset=True) assert isinstance(pipeline, Pipeline) assert pipeline.id != "" + pipeline.delete() def test_create_pipeline_wrong_path(): pipeline_name = str(uuid4()) with pytest.raises(Exception): - pipeline = PipelineFactory.create(name=pipeline_name, pipeline="/") + PipelineFactory.create(name=pipeline_name, pipeline="/") diff --git a/tests/functional/pipelines/run_test.py b/tests/functional/pipelines/run_test.py index e8bc4d9c..25fadaf4 100644 --- a/tests/functional/pipelines/run_test.py +++ b/tests/functional/pipelines/run_test.py @@ -224,7 +224,7 @@ def test_run_router(input_data: str, output_data: str, version: str): @pytest.mark.parametrize( - "input_data,output_data", + "input_data,output_data,version", [ ("I love it.", "PositiveOutput", "2.0"), ("I hate it.", "NegativeOutput", "2.0"), diff --git a/tests/unit/pipeline_test.py b/tests/unit/pipeline_test.py index 68a399aa..e983a298 100644 --- a/tests/unit/pipeline_test.py +++ b/tests/unit/pipeline_test.py @@ -24,7 +24,6 @@ from aixplain.factories import PipelineFactory from aixplain.modules import Pipeline from urllib.parse import urljoin -import pytest def test_create_pipeline(): @@ -34,6 +33,6 @@ def test_create_pipeline(): ref_response = {"id": "12345"} mock.post(url, headers=headers, json=ref_response) ref_pipeline = Pipeline(id="12345", name="Pipeline Test", api_key=config.TEAM_API_KEY) - hyp_pipeline = PipelineFactory.create(pipeline={}, name="Pipeline Test") + hyp_pipeline = PipelineFactory.create(pipeline={"nodes": []}, name="Pipeline Test") assert hyp_pipeline.id == ref_pipeline.id assert hyp_pipeline.name == ref_pipeline.name