diff --git a/aixplain/factories/__init__.py b/aixplain/factories/__init__.py index 70361e77..d540f374 100644 --- a/aixplain/factories/__init__.py +++ b/aixplain/factories/__init__.py @@ -21,6 +21,7 @@ """ from .asset_factory import AssetFactory from .agent_factory import AgentFactory +from .team_agent_factory import TeamAgentFactory from .benchmark_factory import BenchmarkFactory from .corpus_factory import CorpusFactory from .data_factory import DataFactory diff --git a/aixplain/factories/agent_factory/__init__.py b/aixplain/factories/agent_factory/__init__.py index 134b3560..2a16e191 100644 --- a/aixplain/factories/agent_factory/__init__.py +++ b/aixplain/factories/agent_factory/__init__.py @@ -34,7 +34,7 @@ from aixplain.utils import config from typing import Dict, List, Optional, Text, Union -from aixplain.factories.agent_factory.utils import build_agent, validate_llm +from aixplain.factories.agent_factory.utils import build_agent, validate_llm, validate_name from aixplain.utils.file_utils import _request_with_retry from urllib.parse import urljoin @@ -44,36 +44,31 @@ class AgentFactory: def create( cls, name: Text, - llm_id: Text, + description: Text, + llm_id: Text = "669a63646eb56306647e1091", tools: List[Tool] = [], - description: Text = "", api_key: Text = config.TEAM_API_KEY, supplier: Union[Dict, Text, Supplier, int] = "aiXplain", version: Optional[Text] = None, - use_mentalist_and_inspector: bool = False, ) -> Agent: """Create a new agent in the platform. Args: name (Text): name of the agent - llm_id (Text): aiXplain ID of the large language model to be used as agent. + description (Text): description of the agent role. + llm_id (Text, optional): aiXplain ID of the large language model to be used as agent. Defaults to "669a63646eb56306647e1091" (GPT-4o mini). tools (List[Tool], optional): list of tool for the agent. Defaults to []. - description (Text, optional): description of the agent role. Defaults to "". api_key (Text, optional): team/user API key. Defaults to config.TEAM_API_KEY. supplier (Union[Dict, Text, Supplier, int], optional): owner of the agent. Defaults to "aiXplain". version (Optional[Text], optional): version of the agent. Defaults to None. - use_mentalist_and_inspector (bool, optional): flag to enable mentalist and inspector agents (which only works when a supervisor is enabled). Defaults to False. Returns: Agent: created Agent """ + validate_name(name) # validate LLM ID validate_llm(llm_id) - orchestrator_llm_id, mentalist_and_inspector_llm_id = llm_id, None - if use_mentalist_and_inspector is True: - mentalist_and_inspector_llm_id = llm_id - try: agent = None url = urljoin(config.BACKEND_URL, "sdk/agents") @@ -117,8 +112,6 @@ def create( "supplier": supplier, "version": version, "llmId": llm_id, - "supervisorId": orchestrator_llm_id, - "plannerId": mentalist_and_inspector_llm_id, } logging.info(f"Start service for POST Create Agent - {url} - {headers} - {json.dumps(payload)}") @@ -128,13 +121,13 @@ def create( agent = build_agent(payload=response, api_key=api_key) else: error = r.json() - error_msg = "Agent Onboarding Error: Please contant the administrators." + error_msg = "Agent Onboarding Error: Please contact the administrators." if "message" in error: msg = error["message"] if error["message"] == "err.name_already_exists": msg = "Agent name already exists." elif error["message"] == "err.asset_is_not_available": - msg = "Some the tools are not available." + msg = "Some tools are not available." error_msg = f"Agent Onboarding Error (HTTP {r.status_code}): {msg}" logging.exception(error_msg) raise Exception(error_msg) @@ -190,7 +183,7 @@ def list(cls) -> Dict: agents.append(build_agent(agent)) return {"results": agents, "page_total": page_total, "page_number": 0, "total": total} else: - error_msg = "Agent Listing Error: Please contant the administrators." + error_msg = "Agent Listing Error: Please contact the administrators." if "message" in resp: msg = resp["message"] error_msg = f"Agent Listing Error (HTTP {r.status_code}): {msg}" @@ -214,7 +207,7 @@ def get(cls, agent_id: Text, api_key: Optional[Text] = None) -> Agent: if 200 <= r.status_code < 300: return build_agent(resp) else: - msg = "Please contant the administrators." + msg = "Please contact the administrators." if "message" in resp: msg = resp["message"] error_msg = f"Agent Get Error (HTTP {r.status_code}): {msg}" diff --git a/aixplain/factories/agent_factory/utils.py b/aixplain/factories/agent_factory/utils.py index 6aed75ae..d86982ef 100644 --- a/aixplain/factories/agent_factory/utils.py +++ b/aixplain/factories/agent_factory/utils.py @@ -7,6 +7,8 @@ from typing import Dict, Text from urllib.parse import urljoin +GPT_4o_ID = "6646261c6eb563165658bbb1" + def build_agent(payload: Dict, api_key: Text = config.TEAM_API_KEY) -> Agent: """Instantiate a new agent in the platform.""" @@ -41,7 +43,7 @@ def build_agent(payload: Dict, api_key: Text = config.TEAM_API_KEY) -> Agent: supplier=payload["teamId"] if "teamId" in payload else None, version=payload["version"] if "version" in payload else None, cost=payload["cost"] if "cost" in payload else None, - llm_id=payload["llmId"] if "llmId" in payload else "6646261c6eb563165658bbb1", + llm_id=payload["llmId"] if "llmId" in payload else GPT_4o_ID, api_key=api_key, status=AssetStatus(payload["status"]), ) @@ -57,3 +59,11 @@ def validate_llm(model_id: Text) -> None: assert llm.function == Function.TEXT_GENERATION, "Large Language Model must be a text generation model." except Exception: raise Exception(f"Large Language Model with ID '{model_id}' not found.") + + +def validate_name(name: Text) -> None: + import re + + assert ( + re.match("^[a-zA-Z0-9 ]*$", name) is not None + ), "Agent Creation Error: Agent name must not contain special characters." diff --git a/aixplain/factories/pipeline_factory/__init__.py b/aixplain/factories/pipeline_factory/__init__.py index 051c63fb..cb4336fe 100644 --- a/aixplain/factories/pipeline_factory/__init__.py +++ b/aixplain/factories/pipeline_factory/__init__.py @@ -290,7 +290,7 @@ def create( pipeline = json.load(f) for i, node in enumerate(pipeline["nodes"]): - if "functionType" in node and node["functionType"] == "AI": + if "functionType" in node: pipeline["nodes"][i]["functionType"] = pipeline["nodes"][i]["functionType"].lower() # prepare payload payload = { diff --git a/aixplain/factories/pipeline_factory/utils.py b/aixplain/factories/pipeline_factory/utils.py index 465e5e7f..9584863f 100644 --- a/aixplain/factories/pipeline_factory/utils.py +++ b/aixplain/factories/pipeline_factory/utils.py @@ -7,7 +7,8 @@ from aixplain.modules.pipeline.designer import ( Input, Output, - AssetNode, + BareAsset, + BareMetric, Decision, Router, Route, @@ -36,14 +37,16 @@ def build_from_response(response: Dict, load_architecture: bool = False) -> Pipe try: # instantiating nodes for node_json in response["nodes"]: - print(node_json) if node_json["type"].lower() == "input": node = Input( data=node_json["data"] if "data" in node_json else None, data_types=[DataType(dt) for dt in node_json["dataType"]], ) elif node_json["type"].lower() == "asset": - node = AssetNode(asset_id=node_json["assetId"]) + if node_json["functionType"] == "metric": + node = BareMetric(asset_id=node_json["assetId"]) + else: + node = BareAsset(asset_id=node_json["assetId"]) elif node_json["type"].lower() == "segmentor": raise NotImplementedError() elif node_json["type"].lower() == "reconstructor": @@ -53,7 +56,7 @@ def build_from_response(response: Dict, load_architecture: bool = False) -> Pipe elif node_json["type"].lower() == "router": node = Router(routes=[Route(**route) for route in node_json["routes"]]) elif node_json["type"].lower() == "script": - node = Script(fileId=node_json["fileId"]) + node = Script(fileId=node_json["fileId"], fileMetadata=node_json["fileMetadata"]) elif node_json["type"].lower() == "output": node = Output() diff --git a/aixplain/factories/script_factory.py b/aixplain/factories/script_factory.py index 35789561..14835752 100644 --- a/aixplain/factories/script_factory.py +++ b/aixplain/factories/script_factory.py @@ -8,15 +8,12 @@ class ScriptFactory: - @classmethod def upload_script(cls, script_path: str) -> Tuple[str, str]: try: url = f"{config.BACKEND_URL}/sdk/pipelines/script" headers = {"Authorization": f"Token {config.TEAM_API_KEY}"} - r = requests.post( - url, headers=headers, files={"file": open(script_path, "rb")} - ) + r = requests.post(url, headers=headers, files={"file": open(script_path, "rb")}) if 200 <= r.status_code < 300: response = r.json() else: @@ -26,6 +23,6 @@ def upload_script(cls, script_path: str) -> Tuple[str, str]: # get metadata info fname = os.path.splitext(os.path.basename(script_path))[0] - file_size_kb = int(os.path.getsize(script_path) / 1024) - metadata = json.dumps({"name": fname, "size": file_size_kb}) + file_size = int(os.path.getsize(script_path)) + metadata = json.dumps({"name": fname, "size": file_size}) return response["fileId"], metadata diff --git a/aixplain/factories/team_agent_factory/__init__.py b/aixplain/factories/team_agent_factory/__init__.py new file mode 100644 index 00000000..72d47c03 --- /dev/null +++ b/aixplain/factories/team_agent_factory/__init__.py @@ -0,0 +1,166 @@ +__author__ = "lucaspavanelli" + +""" +Copyright 2024 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Thiago Castro Ferreira and Lucas Pavanelli +Date: August 15th 2024 +Description: + TeamAgent Factory Class +""" + +import json +import logging + +from aixplain.enums.supplier import Supplier +from aixplain.factories.agent_factory import AgentFactory +from aixplain.factories.agent_factory.utils import validate_llm, validate_name +from aixplain.modules.agent import Agent +from aixplain.modules.team_agent import TeamAgent +from aixplain.utils import config +from aixplain.factories.team_agent_factory.utils import build_team_agent +from aixplain.utils.file_utils import _request_with_retry +from typing import Dict, List, Optional, Text, Union +from urllib.parse import urljoin + + +class TeamAgentFactory: + @classmethod + def create( + cls, + name: Text, + agents: List[Union[Text, Agent]], + llm_id: Text = "669a63646eb56306647e1091", + description: Text = "", + api_key: Text = config.TEAM_API_KEY, + supplier: Union[Dict, Text, Supplier, int] = "aiXplain", + version: Optional[Text] = None, + use_mentalist_and_inspector: bool = True, + ) -> TeamAgent: + """Create a new team agent in the platform.""" + validate_name(name) + # validate LLM ID + validate_llm(llm_id) + assert len(agents) > 0, "TeamAgent Onboarding Error: At least one agent must be provided." + for agent in agents: + if isinstance(agent, Text) is True: + try: + agent = AgentFactory.get(agent) + except Exception: + raise Exception(f"TeamAgent Onboarding Error: Agent {agent} does not exist.") + else: + assert isinstance(agent, Agent), "TeamAgent Onboarding Error: Agents must be instances of Agent class" + + mentalist_and_inspector_llm_id = None + if use_mentalist_and_inspector is True: + mentalist_and_inspector_llm_id = llm_id + try: + team_agent = None + url = urljoin(config.BACKEND_URL, "sdk/agent-communities") + headers = {"x-api-key": api_key} + + if isinstance(supplier, dict): + supplier = supplier["code"] + elif isinstance(supplier, Supplier): + supplier = supplier.value["code"] + + agent_list = [] + for idx, agent in enumerate(agents): + agent_list.append({"assetId": agent.id, "number": idx, "type": "AGENT", "label": "AGENT"}) + + payload = { + "name": name, + "agents": agent_list, + "links": [], + "description": description, + "llmId": llm_id, + "supervisorId": llm_id, + "plannerId": mentalist_and_inspector_llm_id, + "supplier": supplier, + "version": version, + } + + logging.info(f"Start service for POST Create TeamAgent - {url} - {headers} - {json.dumps(payload)}") + r = _request_with_retry("post", url, headers=headers, json=payload) + if 200 <= r.status_code < 300: + response = r.json() + team_agent = build_team_agent(payload=response, api_key=api_key) + else: + error = r.json() + error_msg = "TeamAgent Onboarding Error: Please contact the administrators." + if "message" in error: + msg = error["message"] + if error["message"] == "err.name_already_exists": + msg = "TeamAgent name already exists." + elif error["message"] == "err.asset_is_not_available": + msg = "Some tools are not available." + error_msg = f"TeamAgent Onboarding Error (HTTP {r.status_code}): {msg}" + logging.exception(error_msg) + raise Exception(error_msg) + except Exception as e: + raise Exception(e) + return team_agent + + @classmethod + def list(cls) -> Dict: + """List all agents available in the platform.""" + url = urljoin(config.BACKEND_URL, "sdk/agent-communities") + headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"} + + payload = {} + logging.info(f"Start service for GET List Agents - {url} - {headers} - {json.dumps(payload)}") + try: + r = _request_with_retry("get", url, headers=headers) + resp = r.json() + + if 200 <= r.status_code < 300: + agents, page_total, total = [], 0, 0 + results = resp + page_total = len(results) + total = len(results) + logging.info(f"Response for GET List Agents - Page Total: {page_total} / Total: {total}") + for agent in results: + agents.append(build_team_agent(agent)) + return {"results": agents, "page_total": page_total, "page_number": 0, "total": total} + else: + error_msg = "Agent Listing Error: Please contact the administrators." + if "message" in resp: + msg = resp["message"] + error_msg = f"Agent Listing Error (HTTP {r.status_code}): {msg}" + logging.exception(error_msg) + raise Exception(error_msg) + except Exception as e: + raise Exception(e) + + @classmethod + def get(cls, agent_id: Text, api_key: Optional[Text] = None) -> Agent: + """Get agent by id.""" + url = urljoin(config.BACKEND_URL, f"sdk/agent-communities/{agent_id}") + if config.AIXPLAIN_API_KEY != "": + headers = {"x-aixplain-key": f"{config.AIXPLAIN_API_KEY}", "Content-Type": "application/json"} + else: + api_key = api_key if api_key is not None else config.TEAM_API_KEY + headers = {"x-api-key": api_key, "Content-Type": "application/json"} + logging.info(f"Start service for GET Agent - {url} - {headers}") + r = _request_with_retry("get", url, headers=headers) + resp = r.json() + if 200 <= r.status_code < 300: + return build_team_agent(resp) + else: + msg = "Please contact the administrators." + if "message" in resp: + msg = resp["message"] + error_msg = f"Agent Get Error (HTTP {r.status_code}): {msg}" + raise Exception(error_msg) diff --git a/aixplain/factories/team_agent_factory/utils.py b/aixplain/factories/team_agent_factory/utils.py new file mode 100644 index 00000000..42fa5f6c --- /dev/null +++ b/aixplain/factories/team_agent_factory/utils.py @@ -0,0 +1,34 @@ +__author__ = "lucaspavanelli" + +import aixplain.utils.config as config +from aixplain.enums.asset_status import AssetStatus +from aixplain.modules.team_agent import TeamAgent +from aixplain.factories.agent_factory import AgentFactory +from typing import Dict, Text +from urllib.parse import urljoin + +GPT_4o_ID = "6646261c6eb563165658bbb1" + + +def build_team_agent(payload: Dict, api_key: Text = config.TEAM_API_KEY) -> TeamAgent: + """Instantiate a new team agent in the platform.""" + agents = payload["agents"] + for i, agent in enumerate(agents): + agent = AgentFactory.get(agent["assetId"]) + agents[i] = agent + + team_agent = TeamAgent( + id=payload["id"], + name=payload["name"] if "name" in payload else "", + agents=agents, + description=payload["description"] if "description" in payload else "", + supplier=payload["teamId"] if "teamId" in payload else None, + version=payload["version"] if "version" in payload else None, + cost=payload["cost"] if "cost" in payload else None, + llm_id=payload["llmId"] if "llmId" in payload else GPT_4o_ID, + use_mentalist_and_inspector=True if "plannerId" in payload and payload["plannerId"] is not None else False, + api_key=api_key, + status=AssetStatus(payload["status"]), + ) + team_agent.url = urljoin(config.BACKEND_URL, f"sdk/agent-communities/{team_agent.id}/run") + return team_agent diff --git a/aixplain/modules/__init__.py b/aixplain/modules/__init__.py index c7246dac..bad0c225 100644 --- a/aixplain/modules/__init__.py +++ b/aixplain/modules/__init__.py @@ -35,3 +35,4 @@ from .benchmark_job import BenchmarkJob from .agent import Agent from .agent.tool import Tool +from .team_agent import TeamAgent diff --git a/aixplain/modules/agent/__init__.py b/aixplain/modules/agent/__init__.py index c0604f6a..546ea4d8 100644 --- a/aixplain/modules/agent/__init__.py +++ b/aixplain/modules/agent/__init__.py @@ -47,7 +47,7 @@ class Agent(Model): name (Text): Name of the Agent tools (List[Tool]): List of tools that the Agent uses. description (Text, optional): description of the Agent. Defaults to "". - llm_id (Text, optional): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1). + llm_id (Text): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1). supplier (Text): Supplier of the Agent. version (Text): Version of the Agent. backend_url (str): URL of the backend. @@ -59,8 +59,8 @@ def __init__( self, id: Text, name: Text, + description: Text, tools: List[Tool] = [], - description: Text = "", llm_id: Text = "6646261c6eb563165658bbb1", api_key: Optional[Text] = config.TEAM_API_KEY, supplier: Union[Dict, Text, Supplier, int] = "aiXplain", @@ -69,13 +69,13 @@ def __init__( status: AssetStatus = AssetStatus.ONBOARDING, **additional_info, ) -> None: - """Create a FineTune with the necessary information. + """Create an Agent with the necessary information. Args: id (Text): ID of the Agent name (Text): Name of the Agent + description (Text): description of the Agent. tools (List[Tool]): List of tools that the Agent uses. - description (Text, optional): description of the Agent. Defaults to "". llm_id (Text, optional): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1). supplier (Text): Supplier of the Agent. version (Text): Version of the Agent. @@ -83,7 +83,6 @@ def __init__( api_key (str): The TEAM API key used for authentication. cost (Dict, optional): model price. Defaults to None. """ - # assert len(tools) > 0, "At least one tool must be provided." super().__init__(id, name, description, api_key, supplier, version, cost=cost) self.additional_info = additional_info self.tools = tools @@ -144,7 +143,7 @@ def run( return response except Exception as e: msg = f"Error in request for {name} - {traceback.format_exc()}" - logging.error(f"Model Run: Error in running for {name}: {e}") + logging.error(f"Agent Run: Error in running for {name}: {e}") end = time.time() return {"status": "FAILED", "error": msg, "elapsed_time": end - start} @@ -210,7 +209,7 @@ def run_async( payload = json.dumps(payload) r = _request_with_retry("post", self.url, headers=headers, data=payload) - logging.info(f"Model Run Async: Start service for {name} - {self.url} - {payload} - {headers}") + logging.info(f"Agent Run Async: Start service for {name} - {self.url} - {payload} - {headers}") resp = None try: @@ -222,13 +221,13 @@ def run_async( except Exception: response = {"status": "FAILED"} msg = f"Error in request for {name} - {traceback.format_exc()}" - logging.error(f"Model Run Async: Error in running for {name}: {resp}") + logging.error(f"Agent Run Async: Error in running for {name}: {resp}") if resp is not None: response["error"] = msg return response def delete(self) -> None: - """Delete Corpus service""" + """Delete Agent service""" try: url = urljoin(config.BACKEND_URL, f"sdk/agents/{self.id}") headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"} diff --git a/aixplain/modules/agent/tool/__init__.py b/aixplain/modules/agent/tool/__init__.py index 2a22511a..9c7a7a09 100644 --- a/aixplain/modules/agent/tool/__init__.py +++ b/aixplain/modules/agent/tool/__init__.py @@ -29,7 +29,7 @@ class Tool(ABC): Attributes: name (Text): name of the tool - description (Text): descriptiion of the tool + description (Text): description of the tool version (Text): version of the tool """ diff --git a/aixplain/modules/agent/tool/model_tool.py b/aixplain/modules/agent/tool/model_tool.py index c88f1ee0..3a84c45b 100644 --- a/aixplain/modules/agent/tool/model_tool.py +++ b/aixplain/modules/agent/tool/model_tool.py @@ -20,7 +20,7 @@ Description: Agentification Class """ -from typing import Optional, Union, Text +from typing import Optional, Union, Text, Dict from aixplain.enums.function import Function from aixplain.enums.supplier import Supplier @@ -32,23 +32,24 @@ class ModelTool(Tool): """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. Attributes: - function (Function): task that the tool performs - supplier (Optional[Union[Dict, Text, Supplier, int]], optional): Preferred supplier to perform the task. Defaults to None. + function (Optional[Union[Function, Text]]): task that the tool performs. + supplier (Optional[Union[Dict, Supplier]]): Preferred supplier to perform the task. + model (Optional[Union[Text, Model]]): Model function. """ def __init__( self, - function: Optional[Function] = None, - supplier: Optional[Supplier] = None, + function: Optional[Union[Function, Text]] = None, + supplier: Optional[Union[Dict, Supplier]] = None, model: Optional[Union[Text, Model]] = None, **additional_info, ) -> None: """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. Args: - function (Optional[Function], optional): task that the tool performs. Defaults to None. - supplier (Optional[Supplier], optional): Preferred supplier to perform the task. Defaults to None.. Defaults to None. - model (Optional[Union[Text, Model]], optional): Model function. Defaults to None. + function (Optional[Union[Function, Text]]): task that the tool performs. Defaults to None. + supplier (Optional[Union[Dict, Supplier]]): Preferred supplier to perform the task. Defaults to None. Defaults to None. + model (Optional[Union[Text, Model]]): Model function. Defaults to None. """ assert ( function is not None or model is not None diff --git a/aixplain/modules/agent/tool/pipeline_tool.py b/aixplain/modules/agent/tool/pipeline_tool.py index 5ad2915a..fa8394ea 100644 --- a/aixplain/modules/agent/tool/pipeline_tool.py +++ b/aixplain/modules/agent/tool/pipeline_tool.py @@ -30,7 +30,7 @@ class PipelineTool(Tool): """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. Attributes: - description (Text): descriptiion of the tool + description (Text): description of the tool pipeline (Union[Text, Pipeline]): pipeline """ diff --git a/aixplain/modules/pipeline/asset.py b/aixplain/modules/pipeline/asset.py index 860a08a5..0e9ed56e 100644 --- a/aixplain/modules/pipeline/asset.py +++ b/aixplain/modules/pipeline/asset.py @@ -262,10 +262,11 @@ def __prepare_payload( dasset = DatasetFactory.get(str(data_asset[node_label])) asset_payload["dataAsset"]["dataset_id"] = dasset.id - if ( - len([dfield for dfield in dasset.source_data if dasset.source_data[dfield].id == data[node_label]]) - > 0 - ): + source_data_list = [ + dfield for dfield in dasset.source_data if dasset.source_data[dfield].id == data[node_label] + ] + + if len(source_data_list) > 0: data_found = True else: for target in dasset.target_data: @@ -341,9 +342,11 @@ def run_async( error = "Validation-related error: Please ensure all required fields are provided and correctly formatted." else: status_code = str(r.status_code) - error = f"Status {status_code}: Unspecified error: An unspecified error occurred while processing your request." + error = ( + f"Status {status_code}: Unspecified error: An unspecified error occurred while processing your request." + ) response = {"status": "FAILED", "error_message": error} - logging.error(f"Error in request for {name} - {r.status_code}: {error}") + logging.error(f"Error in request for {name} - {r.status_code}: {error}") except Exception: response = {"status": "FAILED"} if resp is not None: @@ -355,6 +358,7 @@ def update( pipeline: Union[Text, Dict], save_as_asset: bool = False, api_key: Optional[Text] = None, + name: Optional[Text] = None, ): """Update Pipeline @@ -376,12 +380,14 @@ def update( pipeline = json.load(f) for i, node in enumerate(pipeline["nodes"]): - if "functionType" in node and node["functionType"] == "AI": + if "functionType" in node: pipeline["nodes"][i]["functionType"] = pipeline["nodes"][i]["functionType"].lower() # prepare payload status = "draft" if save_as_asset is True: status = "onboarded" + if name: + self.name = name payload = { "name": self.name, "status": status, @@ -431,7 +437,7 @@ def save(self, save_as_asset: bool = False, api_key: Optional[Text] = None): pipeline = self.to_dict() for i, node in enumerate(pipeline["nodes"]): - if "functionType" in node and node["functionType"] == "AI": + if "functionType" in node: pipeline["nodes"][i]["functionType"] = pipeline["nodes"][i]["functionType"].lower() # prepare payload status = "draft" diff --git a/aixplain/modules/pipeline/designer/__init__.py b/aixplain/modules/pipeline/designer/__init__.py index 0bb56542..81571f21 100644 --- a/aixplain/modules/pipeline/designer/__init__.py +++ b/aixplain/modules/pipeline/designer/__init__.py @@ -8,6 +8,9 @@ Router, BaseReconstructor, BaseSegmentor, + BaseMetric, + BareAsset, + BareMetric ) from .pipeline import DesignerPipeline from .base import ( @@ -36,6 +39,7 @@ __all__ = [ "DesignerPipeline", "AssetNode", + "BareAsset", "Decision", "Script", "Input", @@ -63,4 +67,6 @@ "ParamProxy", "TI", "TO", + "BaseMetric", + "BareMetric" ] diff --git a/aixplain/modules/pipeline/designer/base.py b/aixplain/modules/pipeline/designer/base.py index 8bea73d6..76e6196d 100644 --- a/aixplain/modules/pipeline/designer/base.py +++ b/aixplain/modules/pipeline/designer/base.py @@ -188,8 +188,8 @@ def validate(self): if from_param.data_type and to_param.data_type: if from_param.data_type != to_param.data_type: raise ValueError( - f"Data type mismatch between {from_param.data_type} and {to_param.data_type}" - ) # noqa + f"Data type mismatch between {from_param.data_type} and {to_param.data_type}" # noqa + ) def attach_to(self, pipeline: "DesignerPipeline"): """ @@ -344,6 +344,9 @@ def __init__( if pipeline: self.attach_to(pipeline) + def build_label(self): + return f"{self.type.value}(ID={self.number})" + def attach_to(self, pipeline: "DesignerPipeline"): """ Attach the node to the pipeline. @@ -359,7 +362,7 @@ def attach_to(self, pipeline: "DesignerPipeline"): if self.number is None: self.number = len(pipeline.nodes) if self.label is None: - self.label = f"{self.type.value}(ID={self.number})" + self.label = self.build_label() assert not pipeline.get_node(self.number), "Node number already exists" pipeline.nodes.append(self) diff --git a/aixplain/modules/pipeline/designer/enums.py b/aixplain/modules/pipeline/designer/enums.py index 4c044dba..fe4cbfed 100644 --- a/aixplain/modules/pipeline/designer/enums.py +++ b/aixplain/modules/pipeline/designer/enums.py @@ -22,8 +22,6 @@ class NodeType(str, Enum): INPUT = "INPUT" OUTPUT = "OUTPUT" SCRIPT = "SCRIPT" - SEGMENTOR = "SEGMENT" - RECONSTRUCTOR = "RECONSTRUCT" ROUTER = "ROUTER" DECISION = "DECISION" @@ -33,9 +31,11 @@ class AssetType(str, Enum): class FunctionType(str, Enum): - AI = "AI" - SEGMENTOR = "SEGMENTOR" - RECONSTRUCTOR = "RECONSTRUCTOR" + AI = "ai" + SEGMENTOR = "segmentor" + RECONSTRUCTOR = "reconstructor" + UTILITY = "utility" + METRIC = "metric" class ParamType: diff --git a/aixplain/modules/pipeline/designer/nodes.py b/aixplain/modules/pipeline/designer/nodes.py index 22152239..a6879e04 100644 --- a/aixplain/modules/pipeline/designer/nodes.py +++ b/aixplain/modules/pipeline/designer/nodes.py @@ -55,8 +55,9 @@ def __init__( supplier: str = None, version: str = None, pipeline: "DesignerPipeline" = None, + **kwargs, ): - super().__init__(pipeline=pipeline) + super().__init__(pipeline=pipeline, **kwargs) self.asset_id = asset_id self.supplier = supplier self.version = version @@ -84,9 +85,7 @@ def populate_asset(self): if self.function: if self.asset.function.value != self.function: - raise ValueError( - f"Function {self.function} is not supported by asset {self.asset_id}" - ) # noqa + raise ValueError(f"Function {self.function} is not supported by asset {self.asset_id}") # noqa else: self.function = self.asset.function.value self._auto_populate_params() @@ -129,6 +128,18 @@ def serialize(self) -> dict: return obj +class BareAssetInputs(Inputs): + pass + + +class BareAssetOutputs(Outputs): + pass + + +class BareAsset(AssetNode[BareAssetInputs, BareAssetOutputs]): + pass + + class InputInputs(Inputs): pass @@ -163,10 +174,11 @@ def __init__( data: Optional[str] = None, data_types: Optional[List[DataType]] = None, pipeline: "DesignerPipeline" = None, + **kwargs, ): from aixplain.factories.file_factory import FileFactory - super().__init__(pipeline=pipeline) + super().__init__(pipeline=pipeline, **kwargs) self.data_types = data_types or [] self.data = data @@ -205,12 +217,8 @@ class Output(Node[OutputInputs, OutputOutputs]): inputs_class: Type[TI] = OutputInputs outputs_class: Type[TO] = OutputOutputs - def __init__( - self, - data_types: Optional[List[DataType]] = None, - pipeline: "DesignerPipeline" = None, - ): - super().__init__(pipeline=pipeline) + def __init__(self, data_types: Optional[List[DataType]] = None, pipeline: "DesignerPipeline" = None, **kwargs): + super().__init__(pipeline=pipeline, **kwargs) self.data_types = data_types or [] def serialize(self) -> dict: @@ -237,21 +245,25 @@ def __init__( pipeline: "DesignerPipeline" = None, script_path: Optional[str] = None, fileId: Optional[str] = None, + fileMetadata: Optional[str] = None, + **kwargs, ): from aixplain.factories.script_factory import ScriptFactory - super().__init__(pipeline=pipeline) + super().__init__(pipeline=pipeline, **kwargs) assert script_path or fileId, "script_path or fileId is required" if not fileId: - self.fileId = ScriptFactory.upload_script(script_path) + self.fileId, self.fileMetadata = ScriptFactory.upload_script(script_path) else: self.fileId = fileId + self.fileMetadata = fileMetadata def serialize(self) -> dict: obj = super().serialize() obj["fileId"] = self.fileId + obj["fileMetadata"] = self.fileMetadata return obj @@ -266,13 +278,7 @@ class Route(Serializable): operation: Operation type: RouteType - def __init__( - self, - value: DataType, - path: List[Union[Node, int]], - operation: Operation, - type: RouteType, - ): + def __init__(self, value: DataType, path: List[Union[Node, int]], operation: Operation, type: RouteType, **kwargs): """ Post init method to convert the nodes to node numbers if they are nodes. @@ -286,10 +292,7 @@ def __init__( raise ValueError("Path is not valid, should be a list of nodes") # convert nodes to node numbers if they are nodes - self.path = [ - node.number if isinstance(node, Node) else node - for node in self.path - ] + self.path = [node.number if isinstance(node, Node) else node for node in self.path] def serialize(self) -> dict: return { @@ -327,10 +330,8 @@ class Router(Node[RouterInputs, RouterOutputs], LinkableMixin): inputs_class: Type[TI] = RouterInputs outputs_class: Type[TO] = RouterOutputs - def __init__( - self, routes: List[Route], pipeline: "DesignerPipeline" = None - ): - super().__init__(pipeline=pipeline) + def __init__(self, routes: List[Route], pipeline: "DesignerPipeline" = None, **kwargs): + super().__init__(pipeline=pipeline, **kwargs) self.routes = routes def serialize(self) -> dict: @@ -368,10 +369,8 @@ class Decision(Node[DecisionInputs, DecisionOutputs], LinkableMixin): inputs_class: Type[TI] = DecisionInputs outputs_class: Type[TO] = DecisionOutputs - def __init__( - self, routes: List[Route], pipeline: "DesignerPipeline" = None - ): - super().__init__(pipeline=pipeline) + def __init__(self, routes: List[Route], pipeline: "DesignerPipeline" = None, **kwargs): + super().__init__(pipeline=pipeline, **kwargs) self.routes = routes def link( @@ -396,7 +395,7 @@ class BaseSegmentor(AssetNode[TI, TO]): into smaller fragments for much easier and efficient processing. """ - type: NodeType = NodeType.SEGMENTOR + type: NodeType = NodeType.ASSET functionType: FunctionType = FunctionType.SEGMENTOR @@ -418,7 +417,7 @@ class BareSegmentor(BaseSegmentor[SegmentorInputs, SegmentorOutputs]): into smaller fragments for much easier and efficient processing. """ - type: NodeType = NodeType.SEGMENTOR + type: NodeType = NodeType.ASSET functionType: FunctionType = FunctionType.SEGMENTOR inputs_class: Type[TI] = SegmentorInputs outputs_class: Type[TO] = SegmentorOutputs @@ -430,7 +429,7 @@ class BaseReconstructor(AssetNode[TI, TO]): output of the segmented lines of execution. """ - type: NodeType = NodeType.RECONSTRUCTOR + type: NodeType = NodeType.ASSET functionType: FunctionType = FunctionType.RECONSTRUCTOR @@ -450,15 +449,46 @@ def __init__(self, node: Node): self.data = self.create_param("data") -class BareReconstructor( - BaseReconstructor[ReconstructorInputs, ReconstructorOutputs] -): +class BareReconstructor(BaseReconstructor[ReconstructorInputs, ReconstructorOutputs]): """ Reconstructor node class, this node will be used to reconstruct the output of the segmented lines of execution. """ - type: NodeType = NodeType.RECONSTRUCTOR + type: NodeType = NodeType.ASSET functionType: FunctionType = FunctionType.RECONSTRUCTOR inputs_class: Type[TI] = ReconstructorInputs outputs_class: Type[TO] = ReconstructorOutputs + + +class BaseMetric(AssetNode[TI, TO]): + functionType: FunctionType = FunctionType.METRIC + + def build_label(self): + return f"METRIC({self.number})" + + +class MetricInputs(Inputs): + + hypotheses: InputParam = None + references: InputParam = None + sources: InputParam = None + + def __init__(self, node: Node): + super().__init__(node) + self.hypotheses = self.create_param("hypotheses") + self.references = self.create_param("references") + self.sources = self.create_param("sources") + + +class MetricOutputs(Outputs): + + data: OutputParam = None + + def __init__(self, node: Node): + super().__init__(node) + self.data = self.create_param("data") + + +class BareMetric(BaseMetric[MetricInputs, MetricOutputs]): + pass diff --git a/aixplain/modules/pipeline/designer/pipeline.py b/aixplain/modules/pipeline/designer/pipeline.py index 5304d202..b2ebd19b 100644 --- a/aixplain/modules/pipeline/designer/pipeline.py +++ b/aixplain/modules/pipeline/designer/pipeline.py @@ -13,6 +13,7 @@ Route, BareReconstructor, BareSegmentor, + BareMetric ) from .enums import NodeType, RouteType, Operation @@ -326,3 +327,14 @@ def bare_segmentor(self, *args, **kwargs) -> BareSegmentor: :return: the node """ return BareSegmentor(*args, pipeline=self, **kwargs) + + def metric(self, *args, **kwargs) -> BareMetric: + """ + Shortcut to create an metric node for the current pipeline. + All params will be passed as keyword arguments to the node + constructor. + + :param kwargs: keyword arguments + :return: the node + """ + return BareMetric(*args, pipeline=self, **kwargs) diff --git a/aixplain/modules/pipeline/generate.py b/aixplain/modules/pipeline/generate.py index c71e8ae6..a64917c1 100644 --- a/aixplain/modules/pipeline/generate.py +++ b/aixplain/modules/pipeline/generate.py @@ -31,6 +31,7 @@ AssetNode, BaseReconstructor, BaseSegmentor, + BaseMetric ) from .default import DefaultPipeline from aixplain.modules import asset @@ -160,6 +161,8 @@ def populate_specs(functions: list): base_class = "BaseSegmentor" elif is_reconstructor: base_class = "BaseReconstructor" + elif "metric" in function_name.split("_"): # noqa: Advise a better distinguisher please + base_class = "BaseMetric" spec = { "id": function["id"], diff --git a/aixplain/modules/pipeline/pipeline.py b/aixplain/modules/pipeline/pipeline.py index 36bc643d..e5675e4b 100644 --- a/aixplain/modules/pipeline/pipeline.py +++ b/aixplain/modules/pipeline/pipeline.py @@ -14,6 +14,7 @@ AssetNode, BaseReconstructor, BaseSegmentor, + BaseMetric ) from .default import DefaultPipeline from aixplain.modules import asset @@ -907,7 +908,7 @@ def __init__(self, node=None): self.data = self.create_param(code="data", data_type=DataType.TEXT) -class ReferencelessAudioGenerationMetric(AssetNode[ReferencelessAudioGenerationMetricInputs, ReferencelessAudioGenerationMetricOutputs]): +class ReferencelessAudioGenerationMetric(BaseMetric[ReferencelessAudioGenerationMetricInputs, ReferencelessAudioGenerationMetricOutputs]): """ The Referenceless Audio Generation Metric is a tool designed to evaluate the quality of generated audio content without the need for a reference or original @@ -1080,7 +1081,7 @@ def __init__(self, node=None): self.data = self.create_param(code="data", data_type=DataType.TEXT) -class AudioGenerationMetric(AssetNode[AudioGenerationMetricInputs, AudioGenerationMetricOutputs]): +class AudioGenerationMetric(BaseMetric[AudioGenerationMetricInputs, AudioGenerationMetricOutputs]): """ The Audio Generation Metric is a quantitative measure used to evaluate the quality, accuracy, and overall performance of audio generated by artificial @@ -1471,7 +1472,7 @@ def __init__(self, node=None): self.data = self.create_param(code="data", data_type=DataType.TEXT) -class MetricAggregation(AssetNode[MetricAggregationInputs, MetricAggregationOutputs]): +class MetricAggregation(BaseMetric[MetricAggregationInputs, MetricAggregationOutputs]): """ Metric Aggregation is a function that computes and summarizes numerical data by applying statistical operations, such as averaging, summing, or finding the @@ -1790,7 +1791,7 @@ def __init__(self, node=None): self.data = self.create_param(code="data", data_type=DataType.TEXT) -class ReferencelessTextGenerationMetric(AssetNode[ReferencelessTextGenerationMetricInputs, ReferencelessTextGenerationMetricOutputs]): +class ReferencelessTextGenerationMetric(BaseMetric[ReferencelessTextGenerationMetricInputs, ReferencelessTextGenerationMetricOutputs]): """ The Referenceless Text Generation Metric is a method for evaluating the quality of generated text without requiring a reference text for comparison, often @@ -1830,7 +1831,7 @@ def __init__(self, node=None): self.data = self.create_param(code="data", data_type=DataType.TEXT) -class TextGenerationMetricDefault(AssetNode[TextGenerationMetricDefaultInputs, TextGenerationMetricDefaultOutputs]): +class TextGenerationMetricDefault(BaseMetric[TextGenerationMetricDefaultInputs, TextGenerationMetricDefaultOutputs]): """ The "Text Generation Metric Default" function provides a standard set of evaluation metrics for assessing the quality and performance of text generation @@ -2130,7 +2131,7 @@ def __init__(self, node=None): self.data = self.create_param(code="data", data_type=DataType.TEXT) -class TextGenerationMetric(AssetNode[TextGenerationMetricInputs, TextGenerationMetricOutputs]): +class TextGenerationMetric(BaseMetric[TextGenerationMetricInputs, TextGenerationMetricOutputs]): """ A Text Generation Metric is a quantitative measure used to evaluate the quality and effectiveness of text produced by natural language processing models, often @@ -2981,7 +2982,7 @@ def __init__(self, node=None): self.data = self.create_param(code="data", data_type=DataType.TEXT) -class ReferencelessTextGenerationMetricDefault(AssetNode[ReferencelessTextGenerationMetricDefaultInputs, ReferencelessTextGenerationMetricDefaultOutputs]): +class ReferencelessTextGenerationMetricDefault(BaseMetric[ReferencelessTextGenerationMetricDefaultInputs, ReferencelessTextGenerationMetricDefaultOutputs]): """ The Referenceless Text Generation Metric Default is a function designed to evaluate the quality of generated text without relying on reference texts for @@ -3665,7 +3666,7 @@ def __init__(self, node=None): self.data = self.create_param(code="data", data_type=DataType.NUMBER) -class ClassificationMetric(AssetNode[ClassificationMetricInputs, ClassificationMetricOutputs]): +class ClassificationMetric(BaseMetric[ClassificationMetricInputs, ClassificationMetricOutputs]): """ A Classification Metric is a quantitative measure used to evaluate the quality and effectiveness of classification models. diff --git a/aixplain/modules/team_agent/__init__.py b/aixplain/modules/team_agent/__init__.py new file mode 100644 index 00000000..420fc23a --- /dev/null +++ b/aixplain/modules/team_agent/__init__.py @@ -0,0 +1,246 @@ +__author__ = "aiXplain" + +""" +Copyright 2024 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Author: Lucas Pavanelli and Thiago Castro Ferreira +Date: August 15th 2024 +Description: + Team Agent Class +""" + +import json +import logging +import time +import traceback + +from aixplain.utils.file_utils import _request_with_retry +from aixplain.enums.supplier import Supplier +from aixplain.enums.asset_status import AssetStatus +from aixplain.enums.storage_type import StorageType +from aixplain.modules.model import Model +from aixplain.modules.agent import Agent +from typing import Dict, List, Text, Optional, Union +from urllib.parse import urljoin + +from aixplain.utils import config + + +class TeamAgent(Model): + """Advanced AI system capable of using multiple agents to perform a variety of tasks. + + Attributes: + id (Text): ID of the Team Agent + name (Text): Name of the Team Agent + agents (List[Agent]): List of Agents that the Team Agent uses. + description (Text, optional): description of the Team Agent. Defaults to "". + llm_id (Text, optional): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1). + supplier (Text): Supplier of the Team Agent. + version (Text): Version of the Team Agent. + backend_url (str): URL of the backend. + api_key (str): The TEAM API key used for authentication. + cost (Dict, optional): model price. Defaults to None. + use_mentalist_and_inspector (bool): Use Mentalist and Inspector tools. Defaults to True. + """ + + def __init__( + self, + id: Text, + name: Text, + agents: List[Agent] = [], + description: Text = "", + llm_id: Text = "6646261c6eb563165658bbb1", + api_key: Optional[Text] = config.TEAM_API_KEY, + supplier: Union[Dict, Text, Supplier, int] = "aiXplain", + version: Optional[Text] = None, + cost: Optional[Dict] = None, + use_mentalist_and_inspector: bool = True, + status: AssetStatus = AssetStatus.ONBOARDING, + **additional_info, + ) -> None: + """Create a FineTune with the necessary information. + + Args: + id (Text): ID of the Team Agent + name (Text): Name of the Team Agent + agents (List[Agent]): List of agents that the Team Agent uses. + description (Text, optional): description of the Team Agent. Defaults to "". + llm_id (Text, optional): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1). + supplier (Text): Supplier of the Team Agent. + version (Text): Version of the Team Agent. + backend_url (str): URL of the backend. + api_key (str): The TEAM API key used for authentication. + cost (Dict, optional): model price. Defaults to None. + use_mentalist_and_inspector (bool): Use Mentalist and Inspector tools. Defaults to True. + """ + super().__init__(id, name, description, api_key, supplier, version, cost=cost) + self.additional_info = additional_info + self.agents = agents + self.llm_id = llm_id + self.use_mentalist_and_inspector = use_mentalist_and_inspector + if isinstance(status, str): + try: + status = AssetStatus(status) + except Exception: + status = AssetStatus.ONBOARDING + self.status = status + + def run( + self, + data: Optional[Union[Dict, Text]] = None, + query: Optional[Text] = None, + session_id: Optional[Text] = None, + history: Optional[List[Dict]] = None, + name: Text = "model_process", + timeout: float = 300, + parameters: Dict = {}, + wait_time: float = 0.5, + content: Optional[Union[Dict[Text, Text], List[Text]]] = None, + ) -> Dict: + """Runs a team agent call. + + Args: + data (Optional[Union[Dict, Text]], optional): data to be processed by the team agent. Defaults to None. + query (Optional[Text], optional): query to be processed by the team agent. Defaults to None. + session_id (Optional[Text], optional): conversation Session ID. Defaults to None. + history (Optional[List[Dict]], optional): chat history (in case session ID is None). Defaults to None. + name (Text, optional): ID given to a call. Defaults to "model_process". + timeout (float, optional): total polling time. Defaults to 300. + parameters (Dict, optional): optional parameters to the model. Defaults to "{}". + wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5. + content (Union[Dict[Text, Text], List[Text]], optional): Content inputs to be processed according to the query. Defaults to None. + + Returns: + Dict: parsed output from model + """ + start = time.time() + try: + response = self.run_async( + data=data, + query=query, + session_id=session_id, + history=history, + name=name, + parameters=parameters, + content=content, + ) + if response["status"] == "FAILED": + end = time.time() + response["elapsed_time"] = end - start + return response + poll_url = response["url"] + end = time.time() + response = self.sync_poll(poll_url, name=name, timeout=timeout, wait_time=wait_time) + return response + except Exception as e: + msg = f"Error in request for {name} - {traceback.format_exc()}" + logging.error(f"Team Agent Run: Error in running for {name}: {e}") + end = time.time() + return {"status": "FAILED", "error": msg, "elapsed_time": end - start} + + def run_async( + self, + data: Optional[Union[Dict, Text]] = None, + query: Optional[Text] = None, + session_id: Optional[Text] = None, + history: Optional[List[Dict]] = None, + name: Text = "model_process", + parameters: Dict = {}, + content: Optional[Union[Dict[Text, Text], List[Text]]] = None, + ) -> Dict: + """Runs asynchronously a Team Agent call. + + Args: + data (Optional[Union[Dict, Text]], optional): data to be processed by the Team Agent. Defaults to None. + query (Optional[Text], optional): query to be processed by the Team Agent. Defaults to None. + session_id (Optional[Text], optional): conversation Session ID. Defaults to None. + history (Optional[List[Dict]], optional): chat history (in case session ID is None). Defaults to None. + name (Text, optional): ID given to a call. Defaults to "model_process". + parameters (Dict, optional): optional parameters to the model. Defaults to "{}". + content (Union[Dict[Text, Text], List[Text]], optional): Content inputs to be processed according to the query. Defaults to None. + + Returns: + dict: polling URL in response + """ + from aixplain.factories.file_factory import FileFactory + + assert data is not None or query is not None, "Either 'data' or 'query' must be provided." + if data is not None: + if isinstance(data, dict): + assert "query" in data and data["query"] is not None, "When providing a dictionary, 'query' must be provided." + query = data.get("query") + if session_id is None: + session_id = data.get("session_id") + if history is None: + history = data.get("history") + if content is None: + content = data.get("content") + else: + query = data + + # process content inputs + if content is not None: + assert FileFactory.check_storage_type(query) == StorageType.TEXT, "When providing 'content', query must be text." + + if isinstance(content, list): + assert len(content) <= 3, "The maximum number of content inputs is 3." + for input_link in content: + input_link = FileFactory.to_link(input_link) + query += f"\n{input_link}" + elif isinstance(content, dict): + for key, value in content.items(): + assert "{{" + key + "}}" in query, f"Key '{key}' not found in query." + value = FileFactory.to_link(value) + query = query.replace("{{" + key + "}}", f"'{value}'") + + headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} + + payload = {"id": self.id, "query": FileFactory.to_link(query), "sessionId": session_id, "history": history} + payload.update(parameters) + payload = json.dumps(payload) + + r = _request_with_retry("post", self.url, headers=headers, data=payload) + logging.info(f"Team Agent Run Async: Start service for {name} - {self.url} - {payload} - {headers}") + + resp = None + try: + resp = r.json() + logging.info(f"Result of request for {name} - {r.status_code} - {resp}") + + poll_url = resp["data"] + response = {"status": "IN_PROGRESS", "url": poll_url} + except Exception: + response = {"status": "FAILED"} + msg = f"Error in request for {name} - {traceback.format_exc()}" + logging.error(f"Team Agent Run Async: Error in running for {name}: {resp}") + if resp is not None: + response["error"] = msg + return response + + def delete(self) -> None: + """Delete Corpus service""" + try: + url = urljoin(config.BACKEND_URL, f"sdk/agent-communities/{self.id}") + headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"} + logging.debug(f"Start service for DELETE Team Agent - {url} - {headers}") + r = _request_with_retry("delete", url, headers=headers) + if r.status_code != 200: + raise Exception() + except Exception: + message = ( + f"Team Agent Deletion Error (HTTP {r.status_code}): Make sure the Team Agent exists and you are the owner." + ) + logging.error(message) + raise Exception(f"{message}") diff --git a/aixplain/processes/data_onboarding/onboard_functions.py b/aixplain/processes/data_onboarding/onboard_functions.py index 35a64e12..01a3fe9b 100644 --- a/aixplain/processes/data_onboarding/onboard_functions.py +++ b/aixplain/processes/data_onboarding/onboard_functions.py @@ -325,9 +325,9 @@ def create_data_asset(payload: Dict, data_asset_type: Text = "corpus", api_key: response = r.json() msg = response["message"] error_msg = f"Data Asset Onboarding Error: {msg}" - except Exception as e: + except Exception: error_msg = ( - f"Data Asset Onboarding Error: Failure on creating the {data_asset_type}. Please contant the administrators." + f"Data Asset Onboarding Error: Failure on creating the {data_asset_type}. Please contact the administrators." ) return {"success": False, "error": error_msg} @@ -352,7 +352,7 @@ def is_data(data_id: Text) -> bool: if "id" in resp: return True return False - except: + except Exception: return False @@ -379,13 +379,13 @@ def split_data(paths: List, split_rate: List[float], split_labels: List[Text]) - if column_name is not None: break - except Exception as e: + except Exception: message = f'Data Asset Onboarding Error: Local file "{path}" not found.' logging.exception(message) raise Exception(message) if column_name is None: - message = f"Data Asset Onboarding Error: All split names are used." + message = "Data Asset Onboarding Error: All split names are used." raise Exception(message) for path in paths: diff --git a/tests/functional/agent/agent_functional_test.py b/tests/functional/agent/agent_functional_test.py index 0acdb5be..0d433899 100644 --- a/tests/functional/agent/agent_functional_test.py +++ b/tests/functional/agent/agent_functional_test.py @@ -20,6 +20,7 @@ load_dotenv() from aixplain.factories import AgentFactory +from aixplain.enums.function import Function from aixplain.enums.supplier import Supplier import pytest @@ -55,7 +56,9 @@ def test_end2end(run_input_map): for tool in run_input_map["pipeline_tools"]: tools.append(AgentFactory.create_pipeline_tool(pipeline=tool["pipeline_id"], description=tool["description"])) print(f"Creating agent with tools: {tools}") - agent = AgentFactory.create(name=run_input_map["agent_name"], llm_id=run_input_map["llm_id"], tools=tools) + agent = AgentFactory.create( + name=run_input_map["agent_name"], description=run_input_map["agent_name"], llm_id=run_input_map["llm_id"], tools=tools + ) print(f"Agent created: {agent.__dict__}") print("Running agent") response = agent.run(data=run_input_map["query"]) @@ -79,5 +82,10 @@ def test_list_agents(): def test_fail_non_existent_llm(): with pytest.raises(Exception) as exc_info: - AgentFactory.create(name="Test Agent", llm_id="non_existent_llm", tools=[]) + AgentFactory.create( + name="Test Agent", + description="Test description", + llm_id="non_existent_llm", + tools=[AgentFactory.create_model_tool(function=Function.TRANSLATION)], + ) assert str(exc_info.value) == "Large Language Model with ID 'non_existent_llm' not found." diff --git a/tests/functional/agent/data/agent_test_end2end.json b/tests/functional/agent/data/agent_test_end2end.json index 94bfc94b..595d22a9 100644 --- a/tests/functional/agent/data/agent_test_end2end.json +++ b/tests/functional/agent/data/agent_test_end2end.json @@ -1,6 +1,6 @@ [ { - "agent_name": "[TEST] Translation agent", + "agent_name": "TEST Translation agent", "llm_id": "6626a3a8c8f1d089790cf5a2", "llm_name": "Groq Llama 3 70B", "query": "Who is the president of Brazil right now? Translate to pt", diff --git a/tests/functional/model/run_model_test.py b/tests/functional/model/run_model_test.py index 79979357..47f351bb 100644 --- a/tests/functional/model/run_model_test.py +++ b/tests/functional/model/run_model_test.py @@ -1,20 +1,31 @@ __author__ = "thiagocastroferreira" -import pytest from aixplain.enums import Function from aixplain.factories import ModelFactory from aixplain.modules import LLM +from datetime import datetime, timedelta, timezone + + +def pytest_generate_tests(metafunc): + if "llm_model" in metafunc.fixturenames: + four_weeks_ago = datetime.now(timezone.utc) - timedelta(weeks=4) + models = ModelFactory.list(function=Function.TEXT_GENERATION)["results"] + + predefined_models = ["Groq Llama 3 70B", "Chat GPT 3.5", "GPT-4o", "GPT 4 (32k)"] + recent_models = [model for model in models if model.created_at and model.created_at >= four_weeks_ago] + combined_models = recent_models + [ + ModelFactory.list(query=model, function=Function.TEXT_GENERATION)["results"][0] for model in predefined_models + ] + metafunc.parametrize("llm_model", combined_models) -@pytest.mark.parametrize("llm_model", ["Groq Llama 3 70B", "Chat GPT 3.5", "GPT-4o", "GPT 4 (32k)"]) def test_llm_run(llm_model): """Testing LLMs with history context""" - model = ModelFactory.list(query=llm_model, function=Function.TEXT_GENERATION)["results"][0] - assert isinstance(model, LLM) + assert isinstance(llm_model, LLM) - response = model.run( + response = llm_model.run( data="What is my name?", history=[{"role": "user", "content": "Hello! My name is Thiago."}, {"role": "assistant", "content": "Hello!"}], ) diff --git a/tests/functional/pipelines/create_test.py b/tests/functional/pipelines/create_test.py index 6431bd41..6cf3d718 100644 --- a/tests/functional/pipelines/create_test.py +++ b/tests/functional/pipelines/create_test.py @@ -54,7 +54,8 @@ def test_update_pipeline(): pipeline_name = str(uuid4()) pipeline = PipelineFactory.create(name=pipeline_name, pipeline=pipeline_dict) - pipeline.update(pipeline=pipeline_json, save_as_asset=True) + pipeline.update(pipeline=pipeline_json, save_as_asset=True, name="NEW NAME") + assert pipeline.name == "NEW NAME" assert isinstance(pipeline, Pipeline) assert pipeline.id != "" pipeline.delete() diff --git a/tests/functional/pipelines/designer_test.py b/tests/functional/pipelines/designer_test.py index 62f42f7e..d8caaf35 100644 --- a/tests/functional/pipelines/designer_test.py +++ b/tests/functional/pipelines/designer_test.py @@ -1,7 +1,7 @@ import pytest from aixplain.enums import DataType -from aixplain.factories import PipelineFactory +from aixplain.factories import PipelineFactory, DatasetFactory from aixplain.modules.pipeline.designer import ( Link, Operation, @@ -110,9 +110,7 @@ def test_routing_pipeline(pipeline): translation = pipeline.asset(TRANSLATION_ASSET) speech_recognition = pipeline.asset(SPEECH_RECOGNITION_ASSET) - input.route( - translation.inputs.text, speech_recognition.inputs.source_audio - ) + input.route(translation.inputs.text, speech_recognition.inputs.source_audio) translation.use_output("data") speech_recognition.use_output("data") @@ -135,17 +133,11 @@ def test_scripting_pipeline(pipeline): input = pipeline.input() - segmentor = pipeline.speaker_diarization_audio( - asset_id=SPEAKER_DIARIZATION_AUDIO_ASSET - ) + segmentor = pipeline.speaker_diarization_audio(asset_id=SPEAKER_DIARIZATION_AUDIO_ASSET) - speech_recognition = pipeline.speech_recognition( - asset_id=SPEECH_RECOGNITION_ASSET - ) + speech_recognition = pipeline.speech_recognition(asset_id=SPEECH_RECOGNITION_ASSET) - script = pipeline.script( - script_path="tests/functional/pipelines/data/script.py" - ) + script = pipeline.script(script_path="tests/functional/pipelines/data/script.py") script.inputs.create_param(code="transcripts", data_type=DataType.TEXT) script.inputs.create_param(code="speakers", data_type=DataType.LABEL) script.outputs.create_param(code="data", data_type=DataType.TEXT) @@ -177,9 +169,7 @@ def test_decision_pipeline(pipeline): input = pipeline.input() - sentiment_analysis = pipeline.sentiment_analysis( - asset_id=SENTIMENT_ANALYSIS_ASSET - ) + sentiment_analysis = pipeline.sentiment_analysis(asset_id=SENTIMENT_ANALYSIS_ASSET) positive_output = pipeline.output() negative_output = pipeline.output() @@ -220,19 +210,15 @@ def test_decision_pipeline(pipeline): def test_reconstructing_pipeline(pipeline): input = pipeline.input() - segmentor = pipeline.speaker_diarization_audio( - asset_id="62fab6ecb39cca09ca5bc365" - ) + segmentor = pipeline.speaker_diarization_audio(asset_id="62fab6ecb39cca09ca5bc365") - speech_recognition = pipeline.speech_recognition( - asset_id="60ddefab8d38c51c5885ee38" - ) + speech_recognition = pipeline.speech_recognition(asset_id="60ddefab8d38c51c5885ee38") - reconstructor = pipeline.bare_reconstructor() + reconstructor = pipeline.text_reconstruction(asset_id="636cf7ab0f8ddf0db97929e4") input.outputs.input.link(segmentor.inputs.audio) segmentor.outputs.audio.link(speech_recognition.inputs.source_audio) - speech_recognition.outputs.data.link(reconstructor.inputs.data) + speech_recognition.outputs.data.link(reconstructor.inputs.text) reconstructor.use_output("data") @@ -246,3 +232,43 @@ def test_reconstructing_pipeline(pipeline): assert len(output["data"]) > 0 assert output["data"][0].get("segments") is not None assert len(output["data"][0]["segments"]) > 0 + + +def test_metric_pipeline(pipeline): + + dataset = DatasetFactory.list(query="for_functional_tests")["results"][0] + data_asset_id = dataset.id + reference_id = dataset.target_data["pt"][0].id + + # Instantiate input nodes + text_input_node = pipeline.input(label="TextInput") + reference_input_node = pipeline.input(label="ReferenceInput") + + # Instantiate the metric node + translation_metric_node = pipeline.text_generation_metric(asset_id="639874ab506c987b1ae1acc6") + + # Instantiate output node + score_output_node = pipeline.output() + + # Link the nodes + text_input_node.link(translation_metric_node, from_param="input", to_param="hypotheses") + + reference_input_node.link(translation_metric_node, from_param="input", to_param="references") + + translation_metric_node.link(score_output_node, from_param="data", to_param="output") + + translation_metric_node.inputs.score_identifier = "bleu" + + # Save and run the pipeline + pipeline.save() + + output = pipeline.run( + data={"TextInput": reference_id, "ReferenceInput": reference_id}, + data_asset={"TextInput": data_asset_id, "ReferenceInput": data_asset_id}, + ) + + assert output["status"] == "SUCCESS" + assert output.get("data") is not None + assert len(output["data"]) > 0 + assert output["data"][0].get("segments") is not None + assert len(output["data"][0]["segments"]) > 0 diff --git a/tests/functional/team_agent/data/team_agent_test_end2end.json b/tests/functional/team_agent/data/team_agent_test_end2end.json new file mode 100644 index 00000000..ed6437d2 --- /dev/null +++ b/tests/functional/team_agent/data/team_agent_test_end2end.json @@ -0,0 +1,32 @@ +[ + { + "team_agent_name": "TEST Multi agent", + "llm_id": "6626a3a8c8f1d089790cf5a2", + "llm_name": "Groq Llama 3 70B", + "query": "Who is the president of Brazil right now? Translate to pt and synthesize in audio", + "agents": [ + { + "agent_name": "TEST Translation agent", + "llm_id": "6626a3a8c8f1d089790cf5a2", + "llm_name": "Groq Llama 3 70B", + "model_tools": [ + { + "function": "translation", + "supplier": "AWS" + } + ] + }, + { + "agent_name": "TEST Speech Synthesis agent", + "llm_id": "6626a3a8c8f1d089790cf5a2", + "llm_name": "Groq Llama 3 70B", + "model_tools": [ + { + "function": "speech-synthesis", + "supplier": "Google" + } + ] + } + ] + } +] diff --git a/tests/functional/team_agent/team_agent_functional_test.py b/tests/functional/team_agent/team_agent_functional_test.py new file mode 100644 index 00000000..46adfcbc --- /dev/null +++ b/tests/functional/team_agent/team_agent_functional_test.py @@ -0,0 +1,94 @@ +__author__ = "lucaspavanelli" + +""" +Copyright 2022 The aiXplain SDK authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import json +from dotenv import load_dotenv + +load_dotenv() +from aixplain.factories import AgentFactory, TeamAgentFactory +from aixplain.enums.function import Function +from aixplain.enums.supplier import Supplier + +import pytest + +RUN_FILE = "tests/functional/team_agent/data/team_agent_test_end2end.json" + + +def read_data(data_path): + return json.load(open(data_path, "r")) + + +@pytest.fixture(scope="module", params=read_data(RUN_FILE)) +def run_input_map(request): + return request.param + + +def test_end2end(run_input_map): + for agent in AgentFactory.list()["results"]: + agent.delete() + + agents = [] + for agent in run_input_map["agents"]: + tools = [] + if "model_tools" in agent: + for tool in agent["model_tools"]: + for supplier in Supplier: + if tool["supplier"] is not None and tool["supplier"].lower() in [ + supplier.value["code"].lower(), + supplier.value["name"].lower(), + ]: + tool["supplier"] = supplier + break + tools.append(AgentFactory.create_model_tool(**tool)) + if "pipeline_tools" in agent: + for tool in agent["pipeline_tools"]: + tools.append(AgentFactory.create_pipeline_tool(pipeline=tool["pipeline_id"], description=tool["description"])) + print(f"Creating agent with tools: {tools}") + agent = AgentFactory.create( + name=agent["agent_name"], description=agent["agent_name"], llm_id=agent["llm_id"], tools=tools + ) + agents.append(agent) + + team_agent = TeamAgentFactory.create( + name=run_input_map["team_agent_name"], + agents=agents, + description=run_input_map["team_agent_name"], + llm_id=run_input_map["llm_id"], + use_mentalist_and_inspector=True, + ) + print("Running team agent") + response = team_agent.run(data=run_input_map["query"]) + print(f"Team Agent response: {response}") + assert response is not None + assert response["completed"] is True + assert response["status"].lower() == "success" + assert "data" in response + assert response["data"]["session_id"] is not None + assert response["data"]["output"] is not None + print("Deleting team agent") + team_agent.delete() + + +def test_fail_non_existent_llm(): + with pytest.raises(Exception) as exc_info: + AgentFactory.create( + name="Test Agent", + description="", + llm_id="non_existent_llm", + tools=[AgentFactory.create_model_tool(function=Function.TRANSLATION)], + ) + assert str(exc_info.value) == "Large Language Model with ID 'non_existent_llm' not found." diff --git a/tests/unit/agent_test.py b/tests/unit/agent_test.py index 8a619011..1be0682e 100644 --- a/tests/unit/agent_test.py +++ b/tests/unit/agent_test.py @@ -8,21 +8,21 @@ def test_fail_no_data_query(): - agent = Agent("123", "Test Agent") + agent = Agent("123", "Test Agent", "Sample Description") with pytest.raises(Exception) as exc_info: agent.run_async() assert str(exc_info.value) == "Either 'data' or 'query' must be provided." def test_fail_query_must_be_provided(): - agent = Agent("123", "Test Agent") + agent = Agent("123", "Test Agent", "Sample Description") with pytest.raises(Exception) as exc_info: agent.run_async(data={}) assert str(exc_info.value) == "When providing a dictionary, 'query' must be provided." def test_fail_query_as_text_when_content_not_empty(): - agent = Agent("123", "Test Agent") + agent = Agent("123", "Test Agent", "Sample Description") with pytest.raises(Exception) as exc_info: agent.run_async( data={"query": "https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav"}, @@ -32,7 +32,7 @@ def test_fail_query_as_text_when_content_not_empty(): def test_fail_content_exceed_maximum(): - agent = Agent("123", "Test Agent") + agent = Agent("123", "Test Agent", "Sample Description") with pytest.raises(Exception) as exc_info: agent.run_async( data={"query": "Transcribe the audios:"}, @@ -47,14 +47,14 @@ def test_fail_content_exceed_maximum(): def test_fail_key_not_found(): - agent = Agent("123", "Test Agent") + agent = Agent("123", "Test Agent", "Sample Description") with pytest.raises(Exception) as exc_info: agent.run_async(data={"query": "Translate the text: {{input1}}"}, content={"input2": "Hello, how are you?"}) assert str(exc_info.value) == "Key 'input2' not found in query." def test_sucess_query_content(): - agent = Agent("123", "Test Agent") + agent = Agent("123", "Test Agent", "Sample Description") with requests_mock.Mocker() as mock: url = agent.url headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"} @@ -69,7 +69,10 @@ def test_sucess_query_content(): def test_invalid_pipelinetool(): with pytest.raises(Exception) as exc_info: AgentFactory.create( - name="Test", tools=[PipelineTool(pipeline="309851793", description="Test")], llm_id="6646261c6eb563165658bbb1" + name="Test", + description="Test Description", + tools=[PipelineTool(pipeline="309851793", description="Test")], + llm_id="6646261c6eb563165658bbb1", ) assert str(exc_info.value) == "Pipeline Tool Unavailable. Make sure Pipeline '309851793' exists or you have access to it." @@ -80,6 +83,12 @@ def test_invalid_modeltool(): assert str(exc_info.value) == "Model Tool Unavailable. Make sure Model '309851793' exists or you have access to it." +def test_invalid_agent_name(): + with pytest.raises(Exception) as exc_info: + AgentFactory.create(name="[Test]", description="", tools=[], llm_id="6646261c6eb563165658bbb1") + assert str(exc_info.value) == "Agent Creation Error: Agent name must not contain special characters." + + def test_create_agent(): from aixplain.enums import Supplier diff --git a/tests/unit/model_test.py b/tests/unit/model_test.py index ff44d821..c52bb950 100644 --- a/tests/unit/model_test.py +++ b/tests/unit/model_test.py @@ -71,7 +71,6 @@ def test_failed_poll(): (501, "Status 501: Unspecified error: An unspecified error occurred while processing your request."), ], ) - def test_run_async_errors(status_code, error_message): base_url = config.MODELS_RUN_URL model_id = "model-id" @@ -83,4 +82,3 @@ def test_run_async_errors(status_code, error_message): response = test_model.run_async(data="input_data") assert response["status"] == "FAILED" assert response["error_message"] == error_message - diff --git a/tests/unit/team_agent_test.py b/tests/unit/team_agent_test.py new file mode 100644 index 00000000..fd738c04 --- /dev/null +++ b/tests/unit/team_agent_test.py @@ -0,0 +1,73 @@ +import pytest +import requests_mock +from aixplain.modules import TeamAgent +from aixplain.factories import TeamAgentFactory +from aixplain.utils import config + + +def test_fail_no_data_query(): + team_agent = TeamAgent("123", "Test Team Agent") + with pytest.raises(Exception) as exc_info: + team_agent.run_async() + assert str(exc_info.value) == "Either 'data' or 'query' must be provided." + + +def test_fail_query_must_be_provided(): + team_agent = TeamAgent("123", "Test Team Agent") + with pytest.raises(Exception) as exc_info: + team_agent.run_async(data={}) + assert str(exc_info.value) == "When providing a dictionary, 'query' must be provided." + + +def test_fail_query_as_text_when_content_not_empty(): + team_agent = TeamAgent("123", "Test Team Agent") + with pytest.raises(Exception) as exc_info: + team_agent.run_async( + data={"query": "https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav"}, + content=["https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav"], + ) + assert str(exc_info.value) == "When providing 'content', query must be text." + + +def test_fail_content_exceed_maximum(): + team_agent = TeamAgent("123", "Test Team Agent") + with pytest.raises(Exception) as exc_info: + team_agent.run_async( + data={"query": "Transcribe the audios:"}, + content=[ + "https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav", + "https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav", + "https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav", + "https://aixplain-platform-assets.s3.amazonaws.com/samples/en/CPAC1x2.wav", + ], + ) + assert str(exc_info.value) == "The maximum number of content inputs is 3." + + +def test_fail_key_not_found(): + team_agent = TeamAgent("123", "Test Team Agent") + with pytest.raises(Exception) as exc_info: + team_agent.run_async(data={"query": "Translate the text: {{input1}}"}, content={"input2": "Hello, how are you?"}) + assert str(exc_info.value) == "Key 'input2' not found in query." + + +def test_sucess_query_content(): + team_agent = TeamAgent("123", "Test Team Agent") + with requests_mock.Mocker() as mock: + url = team_agent.url + headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"} + ref_response = {"data": "Hello, how are you?", "status": "IN_PROGRESS"} + mock.post(url, headers=headers, json=ref_response) + + response = team_agent.run_async( + data={"query": "Translate the text: {{input1}}"}, content={"input1": "Hello, how are you?"} + ) + assert response["status"] == ref_response["status"] + assert response["url"] == ref_response["data"] + + +def test_fail_number_agents(): + with pytest.raises(Exception) as exc_info: + TeamAgentFactory.create(name="Test Team Agent", agents=[]) + + assert str(exc_info.value) == "TeamAgent Onboarding Error: At least one agent must be provided."