diff --git a/aixplain/factories/agent_factory/utils.py b/aixplain/factories/agent_factory/utils.py index 54f746d8..d6857468 100644 --- a/aixplain/factories/agent_factory/utils.py +++ b/aixplain/factories/agent_factory/utils.py @@ -19,12 +19,13 @@ def build_agent(payload: Dict, api_key: Text = config.TEAM_API_KEY) -> Agent: if tool["type"] == "model": supplier = "aixplain" for supplier_ in Supplier: - if tool["supplier"] is not None and tool["supplier"].lower() in [ - supplier_.value["code"].lower(), - supplier_.value["name"].lower(), - ]: - supplier = supplier_ - break + if isinstance(tool["supplier"], str): + if tool["supplier"] is not None and tool["supplier"].lower() in [ + supplier_.value["code"].lower(), + supplier_.value["name"].lower(), + ]: + supplier = supplier_ + break tool = ModelTool( function=Function(tool.get("function", None)), diff --git a/aixplain/factories/pipeline_factory/utils.py b/aixplain/factories/pipeline_factory/utils.py index 08954571..2a7de16b 100644 --- a/aixplain/factories/pipeline_factory/utils.py +++ b/aixplain/factories/pipeline_factory/utils.py @@ -14,6 +14,8 @@ Route, Script, Link, + BareSegmentor, + BareReconstructor, ) from typing import Dict @@ -32,7 +34,9 @@ def build_from_response(response: Dict, load_architecture: bool = False) -> Pipe response["api_key"] = config.TEAM_API_KEY # instantiating pipeline generic info - pipeline = Pipeline(response["id"], response["name"], response["api_key"]) + pipeline = Pipeline( + id=response["id"], name=response["name"], api_key=response["api_key"], status=response.get("status", "draft") + ) if load_architecture is True: try: # instantiating nodes @@ -45,28 +49,45 @@ def build_from_response(response: Dict, load_architecture: bool = False) -> Pipe elif node_json["type"].lower() == "asset": if node_json["functionType"] == "metric": node = BareMetric(asset_id=node_json["assetId"]) + elif node_json["functionType"] == "reconstructor": + node = BareReconstructor(asset_id=node_json["assetId"]) + elif node_json["functionType"] == "segmentor": + node = BareSegmentor(asset_id=node_json["assetId"]) else: node = BareAsset(asset_id=node_json["assetId"]) - elif node_json["type"].lower() == "segmentor": - raise NotImplementedError() - elif node_json["type"].lower() == "reconstructor": - raise NotImplementedError() elif node_json["type"].lower() == "decision": - node = Decision(routes=[Route(**route) for route in node_json["routes"]]) + node = Decision( + routes=[Route(**route) for route in node_json["routes"]] + ) elif node_json["type"].lower() == "router": - node = Router(routes=[Route(**route) for route in node_json["routes"]]) + node = Router( + routes=[Route(**route) for route in node_json["routes"]] + ) elif node_json["type"].lower() == "script": - node = Script(fileId=node_json["fileId"], fileMetadata=node_json["fileMetadata"]) + node = Script( + fileId=node_json["fileId"], + fileMetadata=node_json["fileMetadata"], + ) elif node_json["type"].lower() == "output": node = Output() if "inputValues" in node_json: [ node.inputs.create_param( - data_type=DataType(input_param["dataType"]) if "dataType" in input_param else None, + data_type=( + DataType(input_param["dataType"]) + if "dataType" in input_param + else None + ), code=input_param["code"], - value=input_param["value"] if "value" in input_param else None, - is_required=input_param["isRequired"] if "isRequired" in input_param else False, + value=( + input_param["value"] if "value" in input_param else None + ), + is_required=( + input_param["isRequired"] + if "isRequired" in input_param + else False + ), ) for input_param in node_json["inputValues"] if input_param["code"] not in node.inputs @@ -74,10 +95,22 @@ def build_from_response(response: Dict, load_architecture: bool = False) -> Pipe if "outputValues" in node_json: [ node.outputs.create_param( - data_type=DataType(output_param["dataType"]) if "dataType" in output_param else None, + data_type=( + DataType(output_param["dataType"]) + if "dataType" in output_param + else None + ), code=output_param["code"], - value=output_param["value"] if "value" in output_param else None, - is_required=output_param["isRequired"] if "isRequired" in output_param else False, + value=( + output_param["value"] + if "value" in output_param + else None + ), + is_required=( + output_param["isRequired"] + if "isRequired" in output_param + else False + ), ) for output_param in node_json["outputValues"] if output_param["code"] not in node.outputs diff --git a/aixplain/modules/pipeline/asset.py b/aixplain/modules/pipeline/asset.py index 10ee3bf0..88364873 100644 --- a/aixplain/modules/pipeline/asset.py +++ b/aixplain/modules/pipeline/asset.py @@ -25,6 +25,7 @@ import json import os import logging +from aixplain.enums.asset_status import AssetStatus from aixplain.modules.asset import Asset from aixplain.utils import config from aixplain.utils.file_utils import _request_with_retry @@ -56,6 +57,7 @@ def __init__( url: Text = config.BACKEND_URL, supplier: Text = "aiXplain", version: Text = "1.0", + status: AssetStatus = AssetStatus.DRAFT, **additional_info, ) -> None: """Create a Pipeline with the necessary information @@ -67,6 +69,7 @@ def __init__( url (Text, optional): running URL of platform. Defaults to config.BACKEND_URL. supplier (Text, optional): Pipeline supplier. Defaults to "aiXplain". version (Text, optional): version of the pipeline. Defaults to "1.0". + status (AssetStatus, optional): Pipeline status. Defaults to AssetStatus.DRAFT. **additional_info: Any additional Pipeline info to be saved """ if not name: @@ -75,6 +78,12 @@ def __init__( super().__init__(id, name, "", supplier, version) self.api_key = api_key self.url = f"{url}/assets/pipeline/execution/run" + if isinstance(status, str): + try: + status = AssetStatus(status) + except Exception: + status = AssetStatus.DRAFT + self.status = status self.additional_info = additional_info def __polling( @@ -224,7 +233,6 @@ def run( data_asset=data_asset, name=name, batch_mode=batch_mode, - version=version, **kwargs, ) @@ -235,20 +243,7 @@ def run( poll_url = response["url"] end = time.time() - response = self.__polling( - poll_url, name=name, timeout=timeout, wait_time=wait_time - ) - - if self._should_fallback_to_v2(response, version): - return self.run( - data, - data_asset=data_asset, - name=name, - batch_mode=batch_mode, - version=self.VERSION_2_0, - **kwargs, - ) - response["version"] = version + response = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time) return response except Exception as e: error_message = f"Error in request for {name}: {str(e)}" @@ -441,16 +436,6 @@ def run_async( if resp is not None: response["error"] = resp - if self._should_fallback_to_v2(response, version): - return self.run_async( - data, - data_asset=data_asset, - name=name, - batch_mode=batch_mode, - version=self.VERSION_2_0, - **kwargs, - ) - response["version"] = version return response def update( @@ -477,8 +462,7 @@ def update( stack = inspect.stack() if len(stack) > 2 and stack[1].function != "save": warnings.warn( - "update() is deprecated and will be removed in a future version. " - "Please use save() instead.", + "update() is deprecated and will be removed in a future version. " "Please use save() instead.", DeprecationWarning, stacklevel=2, ) @@ -566,9 +550,7 @@ def save( ), "Pipeline Update Error: Make sure the pipeline to be saved is in a JSON file." with open(pipeline) as f: pipeline = json.load(f) - self.update( - pipeline=pipeline, save_as_asset=save_as_asset, api_key=api_key - ) + self.update(pipeline=pipeline, save_as_asset=save_as_asset, api_key=api_key) for i, node in enumerate(pipeline["nodes"]): if "functionType" in node: @@ -591,12 +573,19 @@ def save( "Authorization": f"Token {api_key}", "Content-Type": "application/json", } - logging.info( - f"Start service for Save Pipeline - {url} - {headers} - {json.dumps(payload)}" - ) - r = _request_with_retry(method, url, headers=headers, json=payload) + logging.info(f"Start service for Save Pipeline - {url} - {headers} - {json.dumps(payload)}") + r = _request_with_retry("post", url, headers=headers, json=payload) response = r.json() self.id = response["id"] logging.info(f"Pipeline {response['id']} Saved.") except Exception as e: raise Exception(e) + + def deploy(self, api_key: Optional[Text] = None) -> None: + """Deploy the Pipeline.""" + assert self.status == "draft", "Pipeline Deployment Error: Pipeline must be in draft status." + assert self.status != "onboarded", "Pipeline Deployment Error: Pipeline must be onboarded." + + pipeline = self.to_dict() + self.update(pipeline=pipeline, save_as_asset=True, api_key=api_key, name=self.name) + self.status = AssetStatus.ONBOARDED diff --git a/aixplain/modules/pipeline/designer/__init__.py b/aixplain/modules/pipeline/designer/__init__.py index 6a493aa4..7d880167 100644 --- a/aixplain/modules/pipeline/designer/__init__.py +++ b/aixplain/modules/pipeline/designer/__init__.py @@ -11,6 +11,8 @@ BaseMetric, BareAsset, BareMetric, + BareSegmentor, + BareReconstructor, ) from .pipeline import DesignerPipeline from .base import ( diff --git a/aixplain/modules/pipeline/designer/nodes.py b/aixplain/modules/pipeline/designer/nodes.py index 7e6e1803..fbe27991 100644 --- a/aixplain/modules/pipeline/designer/nodes.py +++ b/aixplain/modules/pipeline/designer/nodes.py @@ -474,19 +474,11 @@ class BaseReconstructor(AssetNode[TI, TO]): class ReconstructorInputs(Inputs): - data: InputParam = None - - def __init__(self, node: Node): - super().__init__(node) - self.data = self.create_param("data") + pass class ReconstructorOutputs(Outputs): - data: OutputParam = None - - def __init__(self, node: Node): - super().__init__(node) - self.data = self.create_param("data") + pass class BareReconstructor(BaseReconstructor[ReconstructorInputs, ReconstructorOutputs]): diff --git a/aixplain/modules/team_agent/__init__.py b/aixplain/modules/team_agent/__init__.py index 80729d80..b7094348 100644 --- a/aixplain/modules/team_agent/__init__.py +++ b/aixplain/modules/team_agent/__init__.py @@ -70,7 +70,7 @@ def __init__( version: Optional[Text] = None, cost: Optional[Dict] = None, use_mentalist_and_inspector: bool = True, - status: AssetStatus = AssetStatus.ONBOARDING, + status: AssetStatus = AssetStatus.DRAFT, **additional_info, ) -> None: """Create a FineTune with the necessary information. @@ -97,7 +97,7 @@ def __init__( try: status = AssetStatus(status) except Exception: - status = AssetStatus.ONBOARDING + status = AssetStatus.DRAFT self.status = status def run( @@ -286,8 +286,9 @@ def to_dict(self) -> Dict: "llmId": self.llm_id, "supervisorId": self.llm_id, "plannerId": self.llm_id if self.use_mentalist_and_inspector else None, - "supplier": self.supplier, + "supplier": self.supplier.value["code"] if isinstance(self.supplier, Supplier) else self.supplier, "version": self.version, + "status": self.status.value, } def validate(self) -> None: diff --git a/tests/functional/pipelines/create_test.py b/tests/functional/pipelines/create_test.py index 6cf3d718..2cad384a 100644 --- a/tests/functional/pipelines/create_test.py +++ b/tests/functional/pipelines/create_test.py @@ -43,6 +43,11 @@ def test_create_pipeline_from_string(): assert isinstance(pipeline, Pipeline) assert pipeline.id != "" + assert pipeline.status.value == "draft" + + pipeline.deploy() + pipeline = PipelineFactory.get(pipeline.id) + assert pipeline.status.value == "onboarded" pipeline.delete() diff --git a/tests/functional/team_agent/team_agent_functional_test.py b/tests/functional/team_agent/team_agent_functional_test.py index e60e453a..a402f324 100644 --- a/tests/functional/team_agent/team_agent_functional_test.py +++ b/tests/functional/team_agent/team_agent_functional_test.py @@ -33,6 +33,7 @@ def read_data(data_path): return json.load(open(data_path, "r")) + @pytest.fixture(scope="function") def delete_agents_and_team_agents(): for team_agent in TeamAgentFactory.list()["results"]: @@ -94,6 +95,7 @@ def test_end2end(run_input_map, delete_agents_and_team_agents): team_agent.deploy() team_agent = TeamAgentFactory.get(team_agent.id) assert team_agent is not None + assert team_agent.status == AssetStatus.ONBOARDED response = team_agent.run(data=run_input_map["query"]) assert response is not None @@ -161,6 +163,7 @@ def test_fail_non_existent_llm(): ) assert str(exc_info.value) == "Large Language Model with ID 'non_existent_llm' not found." + def test_add_remove_agents_from_team_agent(run_input_map, delete_agents_and_team_agents): assert delete_agents_and_team_agents @@ -210,12 +213,12 @@ def test_add_remove_agents_from_team_agent(run_input_map, delete_agents_and_team assert new_agent.id in [agent.id for agent in team_agent.agents] assert len(team_agent.agents) == len(agents) + 1 - removed_agent = team_agent.agents.pop(0) + removed_agent = team_agent.agents.pop(0) team_agent.update() team_agent = TeamAgentFactory.get(team_agent.id) assert removed_agent.id not in [agent.id for agent in team_agent.agents] - assert len(team_agent.agents) == len(agents) + assert len(team_agent.agents) == len(agents) team_agent.delete() new_agent.delete() diff --git a/tests/unit/pipeline_test.py b/tests/unit/pipeline_test.py index d1b0f9b2..913fe295 100644 --- a/tests/unit/pipeline_test.py +++ b/tests/unit/pipeline_test.py @@ -96,3 +96,18 @@ def test_get_pipeline_error_response(): PipelineFactory.get(pipeline_id=pipeline_id) assert "Pipeline GET Error: Failed to retrieve pipeline test-pipeline-id. Status Code: 404" in str(excinfo.value) + + +def test_deploy_pipeline(): + with requests_mock.Mocker() as mock: + pipeline_id = "test-pipeline-id" + url = urljoin(config.BACKEND_URL, f"sdk/pipelines/{pipeline_id}") + headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + + mock.put(url, headers=headers, json={"status": "SUCCESS", "id": pipeline_id}) + + pipeline = Pipeline(id=pipeline_id, api_key=config.TEAM_API_KEY, name="Test Pipeline", url=config.BACKEND_URL) + pipeline.deploy() + + assert pipeline.id == pipeline_id + assert pipeline.status.value == "onboarded" diff --git a/tests/unit/team_agent_test.py b/tests/unit/team_agent_test.py index 56564b73..e6901cec 100644 --- a/tests/unit/team_agent_test.py +++ b/tests/unit/team_agent_test.py @@ -186,11 +186,32 @@ def test_create_team_agent(): llm_id="6646261c6eb563165658bbb1", agents=[agent], ) - assert team_agent.id is not None - assert team_agent.name == team_ref_response["name"] - assert team_agent.description == team_ref_response["description"] - assert team_agent.llm_id == team_ref_response["llmId"] - assert team_agent.use_mentalist_and_inspector is True - assert team_agent.status == AssetStatus.DRAFT - assert len(team_agent.agents) == 1 - assert team_agent.agents[0].id == team_ref_response["agents"][0]["assetId"] + assert team_agent.id is not None + assert team_agent.name == team_ref_response["name"] + assert team_agent.description == team_ref_response["description"] + assert team_agent.llm_id == team_ref_response["llmId"] + assert team_agent.use_mentalist_and_inspector is True + assert team_agent.status == AssetStatus.DRAFT + assert len(team_agent.agents) == 1 + assert team_agent.agents[0].id == team_ref_response["agents"][0]["assetId"] + + url = urljoin(config.BACKEND_URL, f"sdk/agent-communities/{team_agent.id}") + team_ref_response = { + "id": "team_agent_123", + "name": "TEST Multi agent", + "status": "onboarded", + "teamId": 645, + "description": "TEST Multi agent", + "llmId": "6646261c6eb563165658bbb1", + "assets": [], + "agents": [{"assetId": "123", "type": "AGENT", "number": 0, "label": "AGENT"}], + "links": [], + "plannerId": "6646261c6eb563165658bbb1", + "supervisorId": "6646261c6eb563165658bbb1", + "createdAt": "2024-10-28T19:30:25.344Z", + "updatedAt": "2024-10-28T19:30:25.344Z", + } + mock.put(url, headers=headers, json=team_ref_response) + + team_agent.deploy() + assert team_agent.status.value == "onboarded"