diff --git a/aixplain/factories/pipeline_factory.py b/aixplain/factories/pipeline_factory.py index cc94ff79..fac42283 100644 --- a/aixplain/factories/pipeline_factory.py +++ b/aixplain/factories/pipeline_factory.py @@ -225,16 +225,13 @@ def list( return {"results": pipelines, "page_total": page_total, "page_number": page_number, "total": total} @classmethod - def create( - cls, name: Text, pipeline: Union[Text, Dict], status: Text = "draft", api_key: Optional[Text] = None - ) -> Pipeline: - """Pipeline Creation + def create(cls, name: Text, pipeline: Union[Text, Dict], api_key: Optional[Text] = None) -> Pipeline: + """Draft Pipeline Creation Args: name (Text): Pipeline Name pipeline (Union[Text, Dict]): Pipeline as a Python dictionary or in a JSON file - status (Text, optional): Status of the pipeline. Currently only draft pipelines can be saved. Defaults to "draft". - api_key (Optional[Text], optional): API Key. Defaults to None. + api_key (Optional[Text], optional): Team API Key to create the Pipeline. Defaults to None. Raises: Exception: Currently just the creation of draft pipelines are supported @@ -243,15 +240,17 @@ def create( Pipeline: instance of the new pipeline """ try: - assert status == "draft", "Pipeline Creation Error: Currently just the creation of draft pipelines are supported." if isinstance(pipeline, str) is True: _, ext = os.path.splitext(pipeline) assert ( os.path.exists(pipeline) and ext == ".json" - ), "Pipeline Creation Error: Make sure the pipeline to be save is in a JSON file." + ), "Pipeline Creation Error: Make sure the pipeline to be saved is in a JSON file." with open(pipeline) as f: pipeline = json.load(f) + for i, node in enumerate(pipeline["nodes"]): + if "functionType" in node and node["functionType"] == "AI": + pipeline["nodes"][i]["functionType"] = pipeline["nodes"][i]["functionType"].lower() # prepare payload payload = {"name": name, "status": "draft", "architecture": pipeline} url = urljoin(cls.backend_url, "sdk/pipelines") diff --git a/aixplain/modules/model/llm_model.py b/aixplain/modules/model/llm_model.py index 349ea595..14b9c7f4 100644 --- a/aixplain/modules/model/llm_model.py +++ b/aixplain/modules/model/llm_model.py @@ -196,12 +196,12 @@ def run_async( payload = {"data": data} parameters.update( { - "context": context, - "prompt": prompt, - "history": history, - "temperature": temperature, - "max_tokens": max_tokens, - "top_p": top_p, + "context": payload["context"] if "context" in payload else context, + "prompt": payload["prompt"] if "prompt" in payload else prompt, + "history": payload["history"] if "history" in payload else history, + "temperature": payload["temperature"] if "temperature" in payload else temperature, + "max_tokens": payload["max_tokens"] if "max_tokens" in payload else max_tokens, + "top_p": payload["top_p"] if "top_p" in payload else top_p, } ) payload.update(parameters) diff --git a/aixplain/modules/pipeline.py b/aixplain/modules/pipeline.py index 3de49756..ed131018 100644 --- a/aixplain/modules/pipeline.py +++ b/aixplain/modules/pipeline.py @@ -101,12 +101,12 @@ def __polling( time.sleep(wait_time) if wait_time < 60: wait_time *= 1.1 - except Exception as e: + except Exception: logging.error(f"Polling for Pipeline: polling for {name} : Continue") if response_body and response_body["status"] == "SUCCESS": try: logging.debug(f"Polling for Pipeline: Final status of polling for {name} : SUCCESS - {response_body}") - except Exception as e: + except Exception: logging.error(f"Polling for Pipeline: Final status of polling for {name} : ERROR - {response_body}") else: logging.error( @@ -130,7 +130,7 @@ def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict: try: resp = r.json() logging.info(f"Single Poll for Pipeline: Status of polling for {name} : {resp}") - except Exception as e: + except Exception: resp = {"status": "FAILED"} return resp @@ -206,7 +206,7 @@ def __prepare_payload(self, data: Union[Text, Dict], data_asset: Optional[Union[ if isinstance(payload, int) is True or isinstance(payload, float) is True: payload = str(payload) payload = {"data": payload} - except Exception as e: + except Exception: payload = {"data": data} else: payload = {} @@ -251,7 +251,7 @@ def __prepare_payload(self, data: Union[Text, Dict], data_asset: Optional[Union[ if target_row.id == data[node_label]: data_found = True break - if data_found == True: + if data_found is True: break except Exception: data_asset_found = False @@ -303,17 +303,19 @@ def run_async( poll_url = resp["url"] response = {"status": "IN_PROGRESS", "url": poll_url} - except Exception as e: + except Exception: response = {"status": "FAILED"} if resp is not None: response["error"] = resp return response - def update(self, pipeline: Union[Text, Dict]): + def update(self, pipeline: Union[Text, Dict], save_as_asset: bool = False, api_key: Optional[Text] = None): """Update Pipeline Args: pipeline (Union[Text, Dict]): Pipeline as a Python dictionary or in a JSON file + save_as_asset (bool, optional): Save as asset (True) or draft (False). Defaults to False. + api_key (Optional[Text], optional): Team API Key to create the Pipeline. Defaults to None. Raises: Exception: Make sure the pipeline to be save is in a JSON file. @@ -323,17 +325,38 @@ def update(self, pipeline: Union[Text, Dict]): _, ext = os.path.splitext(pipeline) assert ( os.path.exists(pipeline) and ext == ".json" - ), "Pipeline Update Error: Make sure the pipeline to be save is in a JSON file." + ), "Pipeline Update Error: Make sure the pipeline to be saved is in a JSON file." with open(pipeline) as f: pipeline = json.load(f) + for i, node in enumerate(pipeline["nodes"]): + if "functionType" in node and node["functionType"] == "AI": + pipeline["nodes"][i]["functionType"] = pipeline["nodes"][i]["functionType"].lower() # prepare payload - payload = {"name": self.name, "status": "draft", "architecture": pipeline} + status = "draft" + if save_as_asset is True: + status = "onboarded" + payload = {"name": self.name, "status": status, "architecture": pipeline} url = urljoin(config.BACKEND_URL, f"sdk/pipelines/{self.id}") - headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + api_key = api_key if api_key is not None else config.TEAM_API_KEY + headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"} logging.info(f"Start service for PUT Update Pipeline - {url} - {headers} - {json.dumps(payload)}") r = _request_with_retry("put", url, headers=headers, json=payload) response = r.json() logging.info(f"Pipeline {response['id']} Updated.") except Exception as e: raise Exception(e) + + def delete(self) -> None: + """Delete Dataset service""" + try: + url = urljoin(config.BACKEND_URL, f"sdk/pipelines/{self.id}") + headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + logging.info(f"Start service for DELETE Pipeline - {url} - {headers}") + r = _request_with_retry("delete", url, headers=headers) + if r.status_code != 200: + raise Exception() + except Exception: + message = "Pipeline Deletion Error: Make sure the pipeline exists and you are the owner." + logging.error(message) + raise Exception(f"{message}") diff --git a/tests/functional/pipelines/create_test.py b/tests/functional/pipelines/create_test.py index f2c1a9c9..6431bd41 100644 --- a/tests/functional/pipelines/create_test.py +++ b/tests/functional/pipelines/create_test.py @@ -30,6 +30,7 @@ def test_create_pipeline_from_json(): assert isinstance(pipeline, Pipeline) assert pipeline.id != "" + pipeline.delete() def test_create_pipeline_from_string(): @@ -42,6 +43,7 @@ def test_create_pipeline_from_string(): assert isinstance(pipeline, Pipeline) assert pipeline.id != "" + pipeline.delete() def test_update_pipeline(): @@ -52,13 +54,14 @@ def test_update_pipeline(): pipeline_name = str(uuid4()) pipeline = PipelineFactory.create(name=pipeline_name, pipeline=pipeline_dict) - pipeline.update(pipeline=pipeline_json) + pipeline.update(pipeline=pipeline_json, save_as_asset=True) assert isinstance(pipeline, Pipeline) assert pipeline.id != "" + pipeline.delete() def test_create_pipeline_wrong_path(): pipeline_name = str(uuid4()) with pytest.raises(Exception): - pipeline = PipelineFactory.create(name=pipeline_name, pipeline="/") + PipelineFactory.create(name=pipeline_name, pipeline="/") diff --git a/tests/functional/pipelines/run_test.py b/tests/functional/pipelines/run_test.py index e8bc4d9c..25fadaf4 100644 --- a/tests/functional/pipelines/run_test.py +++ b/tests/functional/pipelines/run_test.py @@ -224,7 +224,7 @@ def test_run_router(input_data: str, output_data: str, version: str): @pytest.mark.parametrize( - "input_data,output_data", + "input_data,output_data,version", [ ("I love it.", "PositiveOutput", "2.0"), ("I hate it.", "NegativeOutput", "2.0"), diff --git a/tests/unit/pipeline_test.py b/tests/unit/pipeline_test.py index 68a399aa..e983a298 100644 --- a/tests/unit/pipeline_test.py +++ b/tests/unit/pipeline_test.py @@ -24,7 +24,6 @@ from aixplain.factories import PipelineFactory from aixplain.modules import Pipeline from urllib.parse import urljoin -import pytest def test_create_pipeline(): @@ -34,6 +33,6 @@ def test_create_pipeline(): ref_response = {"id": "12345"} mock.post(url, headers=headers, json=ref_response) ref_pipeline = Pipeline(id="12345", name="Pipeline Test", api_key=config.TEAM_API_KEY) - hyp_pipeline = PipelineFactory.create(pipeline={}, name="Pipeline Test") + hyp_pipeline = PipelineFactory.create(pipeline={"nodes": []}, name="Pipeline Test") assert hyp_pipeline.id == ref_pipeline.id assert hyp_pipeline.name == ref_pipeline.name