From b09a9c76aac65c65af1c43d32060bf01585bf812 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 16 Feb 2022 10:50:03 +0800 Subject: [PATCH] use pyflyte execute Signed-off-by: Kevin Su --- flytekit/bin/entrypoint.py | 11 +---------- flytekit/core/map_task.py | 2 +- .../flytekitplugins/awsbatch/task.py | 7 ++++--- plugins/flytekit-aws-batch/tests/test_aws_batch.py | 5 ++--- 4 files changed, 8 insertions(+), 17 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 411bb9e4b7..d629196d6b 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -340,7 +340,6 @@ def _execute_map_task( raw_output_data_prefix, max_concurrency, test, - is_aws_batch_single_job: bool, resolver: str, resolver_args: List[str], checkpoint_path: Optional[str] = None, @@ -360,7 +359,6 @@ def _execute_map_task( :param output_prefix: Where to write primitive outputs :param raw_output_data_prefix: Where to write offloaded data (files, directories, dataframes). :param test: Dry run - :param is_aws_batch_single_job: True if the aws batch job type is Single job :param resolver: The task resolver to use. This needs to be loadable directly from importlib (and thus cannot be nested). :param resolver_args: Args that will be passed to the aforementioned resolver's load_task function @@ -382,11 +380,7 @@ def _execute_map_task( _task_def = resolver_obj.load_task(loader_args=resolver_args) if not isinstance(_task_def, PythonFunctionTask): raise Exception("Map tasks cannot be run with instance tasks.", _task_def) - - if is_aws_batch_single_job: - map_task = _task_def - else: - map_task = MapPythonTask(_task_def, max_concurrency) + map_task = MapPythonTask(_task_def, max_concurrency) task_index = _compute_array_job_index() output_prefix = _os.path.join(output_prefix, str(task_index)) @@ -514,7 +508,6 @@ def fast_execute_task_cmd(additional_distribution, dest_dir, task_execute_cmd): @_click.option("--raw-output-data-prefix", required=False) @_click.option("--max-concurrency", type=int, required=False) @_click.option("--test", is_flag=True) -@_click.option("--is-aws-batch-single-job", is_flag=True) @_click.option("--dynamic-addl-distro", required=False) @_click.option("--dynamic-dest-dir", required=False) @_click.option("--resolver", required=True) @@ -531,7 +524,6 @@ def map_execute_task_cmd( raw_output_data_prefix, max_concurrency, test, - is_aws_batch_single_job, dynamic_addl_distro, dynamic_dest_dir, resolver, @@ -551,7 +543,6 @@ def map_execute_task_cmd( raw_output_data_prefix=raw_output_data_prefix, max_concurrency=max_concurrency, test=test, - is_aws_batch_single_job=is_aws_batch_single_job, dynamic_addl_distro=dynamic_addl_distro, dynamic_dest_dir=dynamic_dest_dir, resolver=resolver, diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 42ba785a41..1d321b8d6c 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -71,7 +71,7 @@ def __init__( def get_command(self, settings: SerializationSettings) -> List[str]: container_args = [ - "pyflyte-map-execute", + "pyflyte-execute", "--inputs", "{{.input}}", "--output-prefix", diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index fd9e63fe89..a5ed7c8d52 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -56,14 +56,15 @@ def get_config(self, settings: SerializationSettings) -> Dict[str, str]: def get_command(self, settings: SerializationSettings) -> List[str]: container_args = [ - "pyflyte-map-execute", + "pyflyte-execute", "--inputs", "{{.input}}", "--output-prefix", - "{{.outputPrefix}}", + # FlytePropeller will always read the output from this directory (outputPrefix/0) + # More detail, see https://github.com/flyteorg/flyteplugins/blob/0dd93c23ed2edeca65d58e89b0edb613f88120e0/go/tasks/plugins/array/catalog.go#L501. + "{{.outputPrefix}}/0", "--raw-output-data-prefix", "{{.rawOutputDataPrefix}}", - "--is-aws-batch-single-job", "--resolver", self.task_resolver.location, "--", diff --git a/plugins/flytekit-aws-batch/tests/test_aws_batch.py b/plugins/flytekit-aws-batch/tests/test_aws_batch.py index 44264dcacb..5cd1e5e5c2 100644 --- a/plugins/flytekit-aws-batch/tests/test_aws_batch.py +++ b/plugins/flytekit-aws-batch/tests/test_aws_batch.py @@ -32,14 +32,13 @@ def t1(a: int) -> str: ) assert t1.get_custom(settings) == config.to_dict() assert t1.get_command(settings) == [ - "pyflyte-map-execute", + "pyflyte-execute", "--inputs", "{{.input}}", "--output-prefix", - "{{.outputPrefix}}", + "{{.outputPrefix}}/0", "--raw-output-data-prefix", "{{.rawOutputDataPrefix}}", - "--is-aws-batch-single-job", "--resolver", "flytekit.core.python_auto_container.default_task_resolver", "--",