Skip to content

Commit

Permalink
use pyflyte execute
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <pingsutw@apache.org>
  • Loading branch information
pingsutw committed Feb 16, 2022
1 parent 7a2320b commit b09a9c7
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 17 deletions.
11 changes: 1 addition & 10 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ def _execute_map_task(
raw_output_data_prefix,
max_concurrency,
test,
is_aws_batch_single_job: bool,
resolver: str,
resolver_args: List[str],
checkpoint_path: Optional[str] = None,
Expand All @@ -360,7 +359,6 @@ def _execute_map_task(
:param output_prefix: Where to write primitive outputs
:param raw_output_data_prefix: Where to write offloaded data (files, directories, dataframes).
:param test: Dry run
:param is_aws_batch_single_job: True if the aws batch job type is Single job
:param resolver: The task resolver to use. This needs to be loadable directly from importlib (and thus cannot be
nested).
:param resolver_args: Args that will be passed to the aforementioned resolver's load_task function
Expand All @@ -382,11 +380,7 @@ def _execute_map_task(
_task_def = resolver_obj.load_task(loader_args=resolver_args)
if not isinstance(_task_def, PythonFunctionTask):
raise Exception("Map tasks cannot be run with instance tasks.", _task_def)

if is_aws_batch_single_job:
map_task = _task_def
else:
map_task = MapPythonTask(_task_def, max_concurrency)
map_task = MapPythonTask(_task_def, max_concurrency)

task_index = _compute_array_job_index()
output_prefix = _os.path.join(output_prefix, str(task_index))
Expand Down Expand Up @@ -514,7 +508,6 @@ def fast_execute_task_cmd(additional_distribution, dest_dir, task_execute_cmd):
@_click.option("--raw-output-data-prefix", required=False)
@_click.option("--max-concurrency", type=int, required=False)
@_click.option("--test", is_flag=True)
@_click.option("--is-aws-batch-single-job", is_flag=True)
@_click.option("--dynamic-addl-distro", required=False)
@_click.option("--dynamic-dest-dir", required=False)
@_click.option("--resolver", required=True)
Expand All @@ -531,7 +524,6 @@ def map_execute_task_cmd(
raw_output_data_prefix,
max_concurrency,
test,
is_aws_batch_single_job,
dynamic_addl_distro,
dynamic_dest_dir,
resolver,
Expand All @@ -551,7 +543,6 @@ def map_execute_task_cmd(
raw_output_data_prefix=raw_output_data_prefix,
max_concurrency=max_concurrency,
test=test,
is_aws_batch_single_job=is_aws_batch_single_job,
dynamic_addl_distro=dynamic_addl_distro,
dynamic_dest_dir=dynamic_dest_dir,
resolver=resolver,
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(

def get_command(self, settings: SerializationSettings) -> List[str]:
container_args = [
"pyflyte-map-execute",
"pyflyte-execute",
"--inputs",
"{{.input}}",
"--output-prefix",
Expand Down
7 changes: 4 additions & 3 deletions plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,15 @@ def get_config(self, settings: SerializationSettings) -> Dict[str, str]:

def get_command(self, settings: SerializationSettings) -> List[str]:
container_args = [
"pyflyte-map-execute",
"pyflyte-execute",
"--inputs",
"{{.input}}",
"--output-prefix",
"{{.outputPrefix}}",
# FlytePropeller will always read the output from this directory (outputPrefix/0)
# More detail, see https://github.com/flyteorg/flyteplugins/blob/0dd93c23ed2edeca65d58e89b0edb613f88120e0/go/tasks/plugins/array/catalog.go#L501.
"{{.outputPrefix}}/0",
"--raw-output-data-prefix",
"{{.rawOutputDataPrefix}}",
"--is-aws-batch-single-job",
"--resolver",
self.task_resolver.location,
"--",
Expand Down
5 changes: 2 additions & 3 deletions plugins/flytekit-aws-batch/tests/test_aws_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ def t1(a: int) -> str:
)
assert t1.get_custom(settings) == config.to_dict()
assert t1.get_command(settings) == [
"pyflyte-map-execute",
"pyflyte-execute",
"--inputs",
"{{.input}}",
"--output-prefix",
"{{.outputPrefix}}",
"{{.outputPrefix}}/0",
"--raw-output-data-prefix",
"{{.rawOutputDataPrefix}}",
"--is-aws-batch-single-job",
"--resolver",
"flytekit.core.python_auto_container.default_task_resolver",
"--",
Expand Down

0 comments on commit b09a9c7

Please sign in to comment.