diff --git a/aixplain/modules/pipeline/asset.py b/aixplain/modules/pipeline/asset.py index 0e9ed56e..f9e29235 100644 --- a/aixplain/modules/pipeline/asset.py +++ b/aixplain/modules/pipeline/asset.py @@ -139,6 +139,11 @@ def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict: r = _request_with_retry("get", poll_url, headers=headers) try: resp = r.json() + if isinstance(resp["data"], str): + try: + resp["data"] = json.loads(resp["data"])["response"] + except Exception: + resp = r.json() logging.info(f"Single Poll for Pipeline: Status of polling for {name} : {resp}") except Exception: resp = {"status": "FAILED"} @@ -151,6 +156,7 @@ def run( name: Text = "pipeline_process", timeout: float = 20000.0, wait_time: float = 1.0, + batch_mode: bool = True, **kwargs, ) -> Dict: """Runs a pipeline call. @@ -161,6 +167,7 @@ def run( name (Text, optional): ID given to a call. Defaults to "pipeline_process". timeout (float, optional): total polling time. Defaults to 20000.0. wait_time (float, optional): wait time in seconds between polling calls. Defaults to 1.0. + batch_mode (bool, optional): Whether to run the pipeline in batch mode or online. Defaults to True. kwargs: A dictionary of keyword arguments. The keys are the argument names Returns: @@ -168,7 +175,7 @@ def run( """ start = time.time() try: - response = self.run_async(data, data_asset=data_asset, name=name, **kwargs) + response = self.run_async(data, data_asset=data_asset, name=name, batch_mode=batch_mode, **kwargs) if response["status"] == "FAILED": end = time.time() response["elapsed_time"] = end - start @@ -297,7 +304,12 @@ def __prepare_payload( return payload def run_async( - self, data: Union[Text, Dict], data_asset: Optional[Union[Text, Dict]] = None, name: Text = "pipeline_process", **kwargs + self, + data: Union[Text, Dict], + data_asset: Optional[Union[Text, Dict]] = None, + name: Text = "pipeline_process", + batch_mode: bool = True, + **kwargs, ) -> Dict: """Runs asynchronously a pipeline call. @@ -305,6 +317,7 @@ def run_async( data (Union[Text, Dict]): link to the input data data_asset (Optional[Union[Text, Dict]], optional): Data asset to be processed by the pipeline. Defaults to None. name (Text, optional): ID given to a call. Defaults to "pipeline_process". + batch_mode (bool, optional): Whether to run the pipeline in batch mode or online. Defaults to True. kwargs: A dictionary of keyword arguments. The keys are the argument names Returns: @@ -316,6 +329,7 @@ def run_async( } payload = self.__prepare_payload(data=data, data_asset=data_asset) + payload["batchmode"] = batch_mode payload.update(kwargs) payload = json.dumps(payload) call_url = f"{self.url}/{self.id}" diff --git a/tests/functional/pipelines/run_test.py b/tests/functional/pipelines/run_test.py index 25fadaf4..dbdb76fa 100644 --- a/tests/functional/pipelines/run_test.py +++ b/tests/functional/pipelines/run_test.py @@ -51,7 +51,7 @@ def test_get_pipeline(): def test_run_single_str(batchmode: bool, version: str): pipeline = PipelineFactory.list(query="SingleNodePipeline")["results"][0] - response = pipeline.run(data="Translate this thing", **{"batchmode": batchmode, "version": version}) + response = pipeline.run(data="Translate this thing", batch_mode=batchmode, **{"version": version}) assert response["status"] == "SUCCESS" @@ -71,7 +71,7 @@ def test_run_single_local_file(batchmode: bool, version: str): with open(fname, "w") as f: f.write("Translate this thing") - response = pipeline.run(data=fname, **{"batchmode": batchmode, "version": version}) + response = pipeline.run(data=fname, batch_mode=batchmode, **{"version": version}) os.remove(fname) assert response["status"] == "SUCCESS" @@ -90,7 +90,8 @@ def test_run_with_url(batchmode: bool, version: str): response = pipeline.run( data="https://aixplain-platform-assets.s3.amazonaws.com/data/dev/64c81163f8bdcac7443c2dad/data/f8.txt", - **{"batchmode": batchmode, "version": version} + batch_mode=batchmode, + **{"version": version} ) assert response["status"] == "SUCCESS" @@ -110,7 +111,7 @@ def test_run_with_dataset(batchmode: bool, version: str): data_id = dataset.source_data["en"].id pipeline = PipelineFactory.list(query="SingleNodePipeline")["results"][0] - response = pipeline.run(data=data_id, data_asset=data_asset_id, **{"batchmode": batchmode, "version": version}) + response = pipeline.run(data=data_id, data_asset=data_asset_id, batch_mode=batchmode, **{"version": version}) assert response["status"] == "SUCCESS" @@ -128,7 +129,8 @@ def test_run_multipipe_with_strings(batchmode: bool, version: str): response = pipeline.run( data={"Input": "Translate this thing.", "Reference": "Traduza esta coisa."}, - **{"batchmode": batchmode, "version": version} + batch_mode=batchmode, + **{"version": version} ) assert response["status"] == "SUCCESS" @@ -154,7 +156,8 @@ def test_run_multipipe_with_datasets(batchmode: bool, version: str): response = pipeline.run( data={"Input": input_id, "Reference": reference_id}, data_asset={"Input": data_asset_id, "Reference": data_asset_id}, - **{"batchmode": batchmode, "version": version} + batch_mode=batchmode, + **{"version": version} ) assert response["status"] == "SUCCESS"