From e8673ef99d1fb7b2f523ac2bb644f6e798511b30 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 4 Dec 2021 00:17:07 +0800 Subject: [PATCH 01/15] Init plugin Signed-off-by: Kevin Su --- flytekit/bin/entrypoint.py | 38 ++++- flytekit/core/map_task.py | 2 +- plugins/flytekit-aws-batch/README.md | 9 ++ .../flytekitplugins/awsbatch/__init__.py | 1 + .../flytekitplugins/awsbatch/task.py | 71 +++++++++ plugins/flytekit-aws-batch/requirements.in | 2 + plugins/flytekit-aws-batch/requirements.txt | 148 ++++++++++++++++++ plugins/flytekit-aws-batch/setup.py | 34 ++++ plugins/flytekit-aws-batch/tests/__init__.py | 0 .../tests/test_aws_batch.py | 52 ++++++ 10 files changed, 353 insertions(+), 4 deletions(-) create mode 100644 plugins/flytekit-aws-batch/README.md create mode 100644 plugins/flytekit-aws-batch/flytekitplugins/awsbatch/__init__.py create mode 100644 plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py create mode 100644 plugins/flytekit-aws-batch/requirements.in create mode 100644 plugins/flytekit-aws-batch/requirements.txt create mode 100644 plugins/flytekit-aws-batch/setup.py create mode 100644 plugins/flytekit-aws-batch/tests/__init__.py create mode 100644 plugins/flytekit-aws-batch/tests/test_aws_batch.py diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 6153ed6fa9..10bcc2ae97 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -57,7 +57,9 @@ def _compute_array_job_index(): offset = 0 if _os.environ.get("BATCH_JOB_ARRAY_INDEX_OFFSET"): offset = int(_os.environ.get("BATCH_JOB_ARRAY_INDEX_OFFSET")) - return offset + int(_os.environ.get(_os.environ.get("BATCH_JOB_ARRAY_INDEX_VAR_NAME"))) + if _os.environ.get("BATCH_JOB_ARRAY_INDEX_VAR_NAME"): + return offset + int(_os.environ.get(_os.environ.get("BATCH_JOB_ARRAY_INDEX_VAR_NAME"))) + return offset def _map_job_index_to_child_index(local_input_dir, datadir, index): @@ -380,11 +382,34 @@ def _execute_map_task( raw_output_data_prefix, max_concurrency, test, + is_aws_batch_single_job: bool, dynamic_addl_distro: str, dynamic_dest_dir: str, resolver: str, resolver_args: List[str], ): + """ + This function should be called by map task and aws-batch task + resolver should be something like: + flytekit.core.python_auto_container.default_task_resolver + resolver args should be something like + task_module app.workflows task_name task_1 + have dashes seems to mess up click, like --task_module seems to interfere + + :param inputs: Where to read inputs + :param output_prefix: Where to write primitive outputs + :param raw_output_data_prefix: Where to write offloaded data (files, directories, dataframes). + :param test: Dry run + :param is_aws_batch_single_job: True if the aws batch job type is Single job + :param resolver: The task resolver to use. This needs to be loadable directly from importlib (and thus cannot be + nested). + :param resolver_args: Args that will be passed to the aforementioned resolver's load_task function + :param dynamic_addl_distro: In the case of parent tasks executed using the 'fast' mode this captures where the + compressed code archive has been uploaded. + :param dynamic_dest_dir: In the case of parent tasks executed using the 'fast' mode this captures where compressed + code archives should be installed in the flyte task container. + :return: + """ if len(resolver_args) < 1: raise Exception(f"Resolver args cannot be <1, got {resolver_args}") @@ -394,8 +419,12 @@ def _execute_map_task( # Use the resolver to load the actual task object _task_def = resolver_obj.load_task(loader_args=resolver_args) if not isinstance(_task_def, PythonFunctionTask): - raise Exception("Map tasks cannot be run with instance tasks.") - map_task = MapPythonTask(_task_def, max_concurrency) + raise Exception("Map tasks cannot be run with instance tasks.", _task_def) + + if is_aws_batch_single_job: + map_task = _task_def + else: + map_task = MapPythonTask(_task_def, max_concurrency) task_index = _compute_array_job_index() output_prefix = _os.path.join(output_prefix, str(task_index)) @@ -508,6 +537,7 @@ def fast_execute_task_cmd(additional_distribution, dest_dir, task_execute_cmd): @_click.option("--raw-output-data-prefix", required=False) @_click.option("--max-concurrency", type=int, required=False) @_click.option("--test", is_flag=True) +@_click.option("--is-aws-batch-single-job", is_flag=True) @_click.option("--dynamic-addl-distro", required=False) @_click.option("--dynamic-dest-dir", required=False) @_click.option("--resolver", required=True) @@ -522,6 +552,7 @@ def map_execute_task_cmd( raw_output_data_prefix, max_concurrency, test, + is_aws_batch_single_job, dynamic_addl_distro, dynamic_dest_dir, resolver, @@ -535,6 +566,7 @@ def map_execute_task_cmd( raw_output_data_prefix, max_concurrency, test, + is_aws_batch_single_job, dynamic_addl_distro, dynamic_dest_dir, resolver, diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index fa48b474ca..96a8682bc6 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -52,7 +52,7 @@ def __init__( collection_interface = transform_interface_to_list_interface(python_function_task.python_interface) instance = next(self._ids) - name = f"{python_function_task._task_function.__module__}.mapper_{python_function_task._task_function.__name__}_{instance}" + name = f"{python_function_task.task_function.__module__}.mapper_{python_function_task.task_function.__name__}_{instance}" self._run_task = python_function_task self._max_concurrency = concurrency diff --git a/plugins/flytekit-aws-batch/README.md b/plugins/flytekit-aws-batch/README.md new file mode 100644 index 0000000000..b56663237b --- /dev/null +++ b/plugins/flytekit-aws-batch/README.md @@ -0,0 +1,9 @@ +# Flytekit AWS Batch Plugin + +Flyte backend can be connected with AWS batch. Once enabled, it allows you to run flyte task on AWS batch service + +To install the plugin, run the following command: + +```bash +pip install flytekitplugins-awsbatch +``` diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/__init__.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/__init__.py new file mode 100644 index 0000000000..09859ac3fd --- /dev/null +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/__init__.py @@ -0,0 +1 @@ +from .task import AWSBatch diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py new file mode 100644 index 0000000000..63c3b3414a --- /dev/null +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -0,0 +1,71 @@ +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional, Union + +from dataclasses_json import dataclass_json +from google.protobuf import json_format +from google.protobuf.struct_pb2 import Struct + +from flytekit import PythonFunctionTask, TaskMetadata +from flytekit.extend import SerializationSettings, TaskPlugins + + +@dataclass_json +@dataclass +class AWSBatch(object): + """ + Use this to configure a job definition for a AWS batch job. Task's marked with this will automatically execute + natively onto AWS batch service. + Refer to AWS job definition template for more detail: https://docs.aws.amazon.com/batch/latest/userguide/job-definition-template.html + """ + + parameters: Optional[Dict[str, str]] = None + schedulingPriority: Optional[int] = None + PlatformCapabilities: Optional[List[str]] = None + PropagateTags: Optional[bool] = None + RetryStrategy: Optional[Dict[str, Union[str, int, dict]]] = None + Tags: Optional[Dict[str, str]] = None + Timeout: Optional[Dict[str, int]] = None + + +class AWSBatchFunctionTask(PythonFunctionTask): + """ + Actual Plugin that transforms the local python code for execution within AWS batch job + """ + + _AWS_BATCH_TASK_TYPE = "aws-batch" + + def __init__(self, task_config: AWSBatch, task_function: Callable, **kwargs): + if task_config is None: + task_config = AWSBatch() + super(AWSBatchFunctionTask, self).__init__( + task_config=task_config, task_type=self._AWS_BATCH_TASK_TYPE, task_function=task_function, **kwargs + ) + self._run_task = PythonFunctionTask(task_config=None, task_function=task_function) + self._task_config = task_config + + def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: + s = Struct() + s.update(self._task_config.to_dict()) + return json_format.MessageToDict(s) + + def get_command(self, settings: SerializationSettings) -> List[str]: + container_args = [ + "pyflyte-map-execute", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}", + "--is-aws-batch-single-job", + "--resolver", + self._run_task.task_resolver.location, + "--", + *self._run_task.task_resolver.loader_args(settings, self._run_task), + ] + + return container_args + + +# Inject the AWS batch plugin into flytekits dynamic plugin loading system +TaskPlugins.register_pythontask_plugin(AWSBatch, AWSBatchFunctionTask) diff --git a/plugins/flytekit-aws-batch/requirements.in b/plugins/flytekit-aws-batch/requirements.in new file mode 100644 index 0000000000..86c87a0a26 --- /dev/null +++ b/plugins/flytekit-aws-batch/requirements.in @@ -0,0 +1,2 @@ +. +-e file:.#egg=flytekitplugins-aws-batch diff --git a/plugins/flytekit-aws-batch/requirements.txt b/plugins/flytekit-aws-batch/requirements.txt new file mode 100644 index 0000000000..b31eaacb21 --- /dev/null +++ b/plugins/flytekit-aws-batch/requirements.txt @@ -0,0 +1,148 @@ +# +# This file is autogenerated by pip-compile with python 3.9 +# To update, run: +# +# pip-compile requirements.in +# +-e file:.#egg=flytekitplugins-aws-batch + # via -r requirements.in +arrow==1.2.1 + # via jinja2-time +binaryornot==0.4.4 + # via cookiecutter +certifi==2021.10.8 + # via requests +chardet==4.0.0 + # via binaryornot +charset-normalizer==2.0.7 + # via requests +checksumdir==1.2.0 + # via flytekit +click==7.1.2 + # via + # cookiecutter + # flytekit +cloudpickle==2.0.0 + # via flytekit +cookiecutter==1.7.3 + # via flytekit +croniter==1.0.15 + # via flytekit +dataclasses-json==0.5.6 + # via flytekit +decorator==5.1.0 + # via retry +deprecated==1.2.13 + # via flytekit +diskcache==5.2.1 + # via flytekit +docker-image-py==0.1.12 + # via flytekit +docstring-parser==0.12 + # via flytekit +flyteidl==0.21.8 + # via flytekit +flytekit==0.24.0 + # via flytekitplugins-aws-batch +grpcio==1.41.1 + # via flytekit +idna==3.3 + # via requests +importlib-metadata==4.8.2 + # via keyring +jinja2==3.0.3 + # via + # cookiecutter + # jinja2-time +jinja2-time==0.2.0 + # via cookiecutter +keyring==23.2.1 + # via flytekit +markupsafe==2.0.1 + # via jinja2 +marshmallow==3.14.0 + # via + # dataclasses-json + # marshmallow-enum + # marshmallow-jsonschema +marshmallow-enum==1.5.1 + # via dataclasses-json +marshmallow-jsonschema==0.13.0 + # via flytekit +mypy-extensions==0.4.3 + # via typing-inspect +natsort==8.0.0 + # via flytekit +numpy==1.21.4 + # via + # pandas + # pyarrow +pandas==1.3.4 + # via flytekit +poyo==0.5.0 + # via cookiecutter +protobuf==3.19.1 + # via + # flyteidl + # flytekit +py==1.11.0 + # via retry +pyarrow==6.0.0 + # via flytekit +python-dateutil==2.8.1 + # via + # arrow + # croniter + # flytekit + # pandas +python-json-logger==2.0.2 + # via flytekit +python-slugify==5.0.2 + # via cookiecutter +pytimeparse==1.1.8 + # via flytekit +pytz==2018.4 + # via + # flytekit + # pandas +regex==2021.11.10 + # via docker-image-py +requests==2.26.0 + # via + # cookiecutter + # flytekit + # responses +responses==0.15.0 + # via flytekit +retry==0.9.2 + # via flytekit +six==1.16.0 + # via + # cookiecutter + # flytekit + # grpcio + # python-dateutil + # responses +sortedcontainers==2.4.0 + # via flytekit +statsd==3.3.0 + # via flytekit +text-unidecode==1.3 + # via python-slugify +typing-extensions==3.10.0.2 + # via typing-inspect +typing-inspect==0.7.1 + # via dataclasses-json +urllib3==1.26.7 + # via + # flytekit + # requests + # responses +wheel==0.37.0 + # via flytekit +wrapt==1.13.3 + # via + # deprecated + # flytekit +zipp==3.6.0 + # via importlib-metadata diff --git a/plugins/flytekit-aws-batch/setup.py b/plugins/flytekit-aws-batch/setup.py new file mode 100644 index 0000000000..1def5b768b --- /dev/null +++ b/plugins/flytekit-aws-batch/setup.py @@ -0,0 +1,34 @@ +from setuptools import setup + +PLUGIN_NAME = "awsbatch" + +microlib_name = f"flytekitplugins-{PLUGIN_NAME}" + +plugin_requires = ["flytekit>=0.19.0,<1.0.0"] + +__version__ = "0.0.0+develop" + +setup( + name=microlib_name, + version=__version__, + author="flyteorg", + author_email="admin@flyte.org", + description="This package holds the AWS Batch plugins for flytekit", + namespace_packages=["flytekitplugins"], + packages=[f"flytekitplugins.{PLUGIN_NAME}"], + install_requires=plugin_requires, + license="apache2", + python_requires=">=3.7", + classifiers=[ + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) diff --git a/plugins/flytekit-aws-batch/tests/__init__.py b/plugins/flytekit-aws-batch/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/flytekit-aws-batch/tests/test_aws_batch.py b/plugins/flytekit-aws-batch/tests/test_aws_batch.py new file mode 100644 index 0000000000..3f4d11c7e4 --- /dev/null +++ b/plugins/flytekit-aws-batch/tests/test_aws_batch.py @@ -0,0 +1,52 @@ +from flytekitplugins.awsbatch import AWSBatch + +from flytekit import PythonFunctionTask, task +from flytekit.extend import Image, ImageConfig, SerializationSettings + +config = AWSBatch( + parameters={"codec": "mp4"}, + PlatformCapabilities=["EC2"], + PropagateTags=True, + RetryStrategy={"attempts": 10}, + Tags={"hello": "world"}, + Timeout={"attemptDurationSeconds": 60}, +) + + +def test_spark_task(): + @task(task_config=config) + def t1(a: int) -> str: + inc = a + 2 + return str(inc) + + assert t1.task_config is not None + assert t1.task_config == config + assert t1.task_type == "aws-batch" + assert isinstance(t1, PythonFunctionTask) + + default_img = Image(name="default", fqn="test", tag="tag") + settings = SerializationSettings( + project="project", + domain="domain", + version="version", + env={"FOO": "baz"}, + image_config=ImageConfig(default_image=default_img, images=[default_img]), + ) + assert t1.get_custom(settings) == config.to_dict() + assert t1.get_command(settings) == [ + "pyflyte-map-execute", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}", + "--is-aws-batch-single-job", + "--resolver", + "flytekit.core.python_auto_container.default_task_resolver", + "--", + "task-module", + "tests.test_aws_batch", + "task-name", + "t1", + ] From 01cf848c6389b03e2290c68eb68f6d96b17a537d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 17 Dec 2021 15:39:27 +0800 Subject: [PATCH 02/15] Fixed lint Signed-off-by: Kevin Su --- plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index 63c3b3414a..3da66f7559 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -5,7 +5,7 @@ from google.protobuf import json_format from google.protobuf.struct_pb2 import Struct -from flytekit import PythonFunctionTask, TaskMetadata +from flytekit import PythonFunctionTask from flytekit.extend import SerializationSettings, TaskPlugins From 7de5364eba9f281d682762eaf15556929f9b5b3f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 20 Jan 2022 00:52:08 +0800 Subject: [PATCH 03/15] address comment Signed-off-by: Kevin Su --- .github/workflows/pythonbuild.yml | 1 + .../flytekitplugins/awsbatch/__init__.py | 2 +- .../flytekitplugins/awsbatch/task.py | 30 ++++++----- plugins/flytekit-aws-batch/requirements.in | 2 +- plugins/flytekit-aws-batch/requirements.txt | 52 +++++++++---------- plugins/flytekit-aws-batch/setup.py | 2 + .../tests/test_aws_batch.py | 14 ++--- 7 files changed, 55 insertions(+), 48 deletions(-) diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 924e33fa63..891693f671 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -58,6 +58,7 @@ jobs: python-version: [3.8, 3.9] plugin-names: - flytekit-aws-athena + - flytekit-aws-batch - flytekit-aws-sagemaker - flytekit-data-fsspec - flytekit-dolt diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/__init__.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/__init__.py index 09859ac3fd..244716ebe7 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/__init__.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/__init__.py @@ -1 +1 @@ -from .task import AWSBatch +from .task import AWSBatchConfig diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index 3da66f7559..005b1908e2 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -11,20 +11,26 @@ @dataclass_json @dataclass -class AWSBatch(object): +class AWSBatchConfig(object): """ Use this to configure a job definition for a AWS batch job. Task's marked with this will automatically execute natively onto AWS batch service. - Refer to AWS job definition template for more detail: https://docs.aws.amazon.com/batch/latest/userguide/job-definition-template.html + Refer to AWS job definition template for more detail: https://docs.aws.amazon.com/batch/latest/userguide/job-definition-template.html, + and https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.register_job_definition """ parameters: Optional[Dict[str, str]] = None schedulingPriority: Optional[int] = None - PlatformCapabilities: Optional[List[str]] = None - PropagateTags: Optional[bool] = None - RetryStrategy: Optional[Dict[str, Union[str, int, dict]]] = None - Tags: Optional[Dict[str, str]] = None - Timeout: Optional[Dict[str, int]] = None + platformCapabilities: Optional[List[str]] = None + propagateTags: Optional[bool] = None + retryStrategy: Optional[Dict[str, Union[str, int, dict]]] = None + tags: Optional[Dict[str, str]] = None + timeout: Optional[Dict[str, int]] = None + + def to_dict(self): + s = Struct() + s.update(self.to_dict()) + return json_format.MessageToDict(s) class AWSBatchFunctionTask(PythonFunctionTask): @@ -34,9 +40,9 @@ class AWSBatchFunctionTask(PythonFunctionTask): _AWS_BATCH_TASK_TYPE = "aws-batch" - def __init__(self, task_config: AWSBatch, task_function: Callable, **kwargs): + def __init__(self, task_config: AWSBatchConfig, task_function: Callable, **kwargs): if task_config is None: - task_config = AWSBatch() + task_config = AWSBatchConfig() super(AWSBatchFunctionTask, self).__init__( task_config=task_config, task_type=self._AWS_BATCH_TASK_TYPE, task_function=task_function, **kwargs ) @@ -44,9 +50,7 @@ def __init__(self, task_config: AWSBatch, task_function: Callable, **kwargs): self._task_config = task_config def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: - s = Struct() - s.update(self._task_config.to_dict()) - return json_format.MessageToDict(s) + return self._task_config.to_dict() def get_command(self, settings: SerializationSettings) -> List[str]: container_args = [ @@ -68,4 +72,4 @@ def get_command(self, settings: SerializationSettings) -> List[str]: # Inject the AWS batch plugin into flytekits dynamic plugin loading system -TaskPlugins.register_pythontask_plugin(AWSBatch, AWSBatchFunctionTask) +TaskPlugins.register_pythontask_plugin(AWSBatchConfig, AWSBatchFunctionTask) diff --git a/plugins/flytekit-aws-batch/requirements.in b/plugins/flytekit-aws-batch/requirements.in index 86c87a0a26..4c2c1fae97 100644 --- a/plugins/flytekit-aws-batch/requirements.in +++ b/plugins/flytekit-aws-batch/requirements.in @@ -1,2 +1,2 @@ . --e file:.#egg=flytekitplugins-aws-batch +-e file:.#egg=flytekitplugins-awsbatch diff --git a/plugins/flytekit-aws-batch/requirements.txt b/plugins/flytekit-aws-batch/requirements.txt index b31eaacb21..9181490234 100644 --- a/plugins/flytekit-aws-batch/requirements.txt +++ b/plugins/flytekit-aws-batch/requirements.txt @@ -4,7 +4,7 @@ # # pip-compile requirements.in # --e file:.#egg=flytekitplugins-aws-batch +-e file:.#egg=flytekitplugins-awsbatch # via -r requirements.in arrow==1.2.1 # via jinja2-time @@ -14,7 +14,7 @@ certifi==2021.10.8 # via requests chardet==4.0.0 # via binaryornot -charset-normalizer==2.0.7 +charset-normalizer==2.0.10 # via requests checksumdir==1.2.0 # via flytekit @@ -26,29 +26,29 @@ cloudpickle==2.0.0 # via flytekit cookiecutter==1.7.3 # via flytekit -croniter==1.0.15 +croniter==1.2.0 # via flytekit dataclasses-json==0.5.6 # via flytekit -decorator==5.1.0 +decorator==5.1.1 # via retry deprecated==1.2.13 # via flytekit -diskcache==5.2.1 +diskcache==5.4.0 # via flytekit docker-image-py==0.1.12 # via flytekit -docstring-parser==0.12 +docstring-parser==0.13 # via flytekit -flyteidl==0.21.8 +flyteidl==0.21.23 # via flytekit -flytekit==0.24.0 - # via flytekitplugins-aws-batch -grpcio==1.41.1 +flytekit==0.26.0 + # via flytekitplugins-awsbatch +grpcio==1.43.0 # via flytekit idna==3.3 # via requests -importlib-metadata==4.8.2 +importlib-metadata==4.10.1 # via keyring jinja2==3.0.3 # via @@ -56,11 +56,11 @@ jinja2==3.0.3 # jinja2-time jinja2-time==0.2.0 # via cookiecutter -keyring==23.2.1 +keyring==23.5.0 # via flytekit markupsafe==2.0.1 # via jinja2 -marshmallow==3.14.0 +marshmallow==3.14.1 # via # dataclasses-json # marshmallow-enum @@ -71,23 +71,23 @@ marshmallow-jsonschema==0.13.0 # via flytekit mypy-extensions==0.4.3 # via typing-inspect -natsort==8.0.0 +natsort==8.0.2 # via flytekit -numpy==1.21.4 +numpy==1.22.1 # via # pandas # pyarrow -pandas==1.3.4 +pandas==1.3.5 # via flytekit poyo==0.5.0 # via cookiecutter -protobuf==3.19.1 +protobuf==3.19.3 # via # flyteidl # flytekit py==1.11.0 # via retry -pyarrow==6.0.0 +pyarrow==6.0.1 # via flytekit python-dateutil==2.8.1 # via @@ -101,18 +101,18 @@ python-slugify==5.0.2 # via cookiecutter pytimeparse==1.1.8 # via flytekit -pytz==2018.4 +pytz==2021.3 # via # flytekit # pandas -regex==2021.11.10 +regex==2022.1.18 # via docker-image-py -requests==2.26.0 +requests==2.27.1 # via # cookiecutter # flytekit # responses -responses==0.15.0 +responses==0.17.0 # via flytekit retry==0.9.2 # via flytekit @@ -129,20 +129,20 @@ statsd==3.3.0 # via flytekit text-unidecode==1.3 # via python-slugify -typing-extensions==3.10.0.2 +typing-extensions==4.0.1 # via typing-inspect typing-inspect==0.7.1 # via dataclasses-json -urllib3==1.26.7 +urllib3==1.26.8 # via # flytekit # requests # responses -wheel==0.37.0 +wheel==0.37.1 # via flytekit wrapt==1.13.3 # via # deprecated # flytekit -zipp==3.6.0 +zipp==3.7.0 # via importlib-metadata diff --git a/plugins/flytekit-aws-batch/setup.py b/plugins/flytekit-aws-batch/setup.py index 1def5b768b..43613fe244 100644 --- a/plugins/flytekit-aws-batch/setup.py +++ b/plugins/flytekit-aws-batch/setup.py @@ -25,6 +25,8 @@ "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", "Topic :: Scientific/Engineering", "Topic :: Scientific/Engineering :: Artificial Intelligence", "Topic :: Software Development", diff --git a/plugins/flytekit-aws-batch/tests/test_aws_batch.py b/plugins/flytekit-aws-batch/tests/test_aws_batch.py index 3f4d11c7e4..af4c547edb 100644 --- a/plugins/flytekit-aws-batch/tests/test_aws_batch.py +++ b/plugins/flytekit-aws-batch/tests/test_aws_batch.py @@ -1,15 +1,15 @@ -from flytekitplugins.awsbatch import AWSBatch +from flytekitplugins.awsbatch import AWSBatchConfig from flytekit import PythonFunctionTask, task from flytekit.extend import Image, ImageConfig, SerializationSettings -config = AWSBatch( +config = AWSBatchConfig( parameters={"codec": "mp4"}, - PlatformCapabilities=["EC2"], - PropagateTags=True, - RetryStrategy={"attempts": 10}, - Tags={"hello": "world"}, - Timeout={"attemptDurationSeconds": 60}, + platformCapabilities=["EC2"], + propagateTags=True, + retryStrategy={"attempts": 10}, + tags={"hello": "world"}, + timeout={"attemptDurationSeconds": 60}, ) From 85c85525f278d29698aa5d6f0a58ccc9af48024f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 20 Jan 2022 19:41:05 +0800 Subject: [PATCH 04/15] Fixed typo Signed-off-by: Kevin Su --- plugins/flytekit-aws-batch/tests/test_aws_batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-aws-batch/tests/test_aws_batch.py b/plugins/flytekit-aws-batch/tests/test_aws_batch.py index af4c547edb..9ccfe24f82 100644 --- a/plugins/flytekit-aws-batch/tests/test_aws_batch.py +++ b/plugins/flytekit-aws-batch/tests/test_aws_batch.py @@ -13,7 +13,7 @@ ) -def test_spark_task(): +def test_aws_batch_task(): @task(task_config=config) def t1(a: int) -> str: inc = a + 2 From 97508bbb50dff27824f16d673cf1b187704d1d31 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 22 Jan 2022 23:50:36 +0800 Subject: [PATCH 05/15] Updated AWS config --- .../flytekitplugins/awsbatch/task.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index 005b1908e2..188b81749b 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -13,15 +13,13 @@ @dataclass class AWSBatchConfig(object): """ - Use this to configure a job definition for a AWS batch job. Task's marked with this will automatically execute + Use this to configure SubmitJobInput for a AWS batch job. Task's marked with this will automatically execute natively onto AWS batch service. - Refer to AWS job definition template for more detail: https://docs.aws.amazon.com/batch/latest/userguide/job-definition-template.html, - and https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.register_job_definition + Refer to AWS SubmitJobInput for more detail: https://docs.aws.amazon.com/sdk-for-go/api/service/batch/#SubmitJobInput """ - parameters: Optional[Dict[str, str]] = None schedulingPriority: Optional[int] = None - platformCapabilities: Optional[List[str]] = None + platformCapabilities: str = "EC2" propagateTags: Optional[bool] = None retryStrategy: Optional[Dict[str, Union[str, int, dict]]] = None tags: Optional[Dict[str, str]] = None @@ -52,6 +50,9 @@ def __init__(self, task_config: AWSBatchConfig, task_function: Callable, **kwarg def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: return self._task_config.to_dict() + def get_config(self, settings: SerializationSettings) -> Dict[str, str]: + return {"platformCapabilities": self._task_config.platformCapabilities} + def get_command(self, settings: SerializationSettings) -> List[str]: container_args = [ "pyflyte-map-execute", From f81516b82c24b0a291987bee1ef868f9e44829a5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 24 Jan 2022 17:22:04 +0800 Subject: [PATCH 06/15] Fixed lint Signed-off-by: Kevin Su --- plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index 188b81749b..0c6d3d95c7 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -17,6 +17,7 @@ class AWSBatchConfig(object): natively onto AWS batch service. Refer to AWS SubmitJobInput for more detail: https://docs.aws.amazon.com/sdk-for-go/api/service/batch/#SubmitJobInput """ + parameters: Optional[Dict[str, str]] = None schedulingPriority: Optional[int] = None platformCapabilities: str = "EC2" From cbded6c0b09206e73cea73156acdc5dd1bffd77e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 24 Jan 2022 18:26:25 +0800 Subject: [PATCH 07/15] Added comment Signed-off-by: Kevin Su --- plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index 0c6d3d95c7..68efb89d8a 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -49,9 +49,12 @@ def __init__(self, task_config: AWSBatchConfig, task_function: Callable, **kwarg self._task_config = task_config def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: + # task_config will be used to create SubmitJobInput in propeller except platformCapabilities. return self._task_config.to_dict() def get_config(self, settings: SerializationSettings) -> Dict[str, str]: + # Parameters in taskTemplate config will be used to create aws job definition. + # More detail about job definition: https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html return {"platformCapabilities": self._task_config.platformCapabilities} def get_command(self, settings: SerializationSettings) -> List[str]: From 68fd00edd2911bd8578e6b3dc22795b16b7d54b7 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 27 Jan 2022 00:05:00 +0800 Subject: [PATCH 08/15] Update config Signed-off-by: Kevin Su --- plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index 68efb89d8a..2f2b8d5852 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -22,9 +22,7 @@ class AWSBatchConfig(object): schedulingPriority: Optional[int] = None platformCapabilities: str = "EC2" propagateTags: Optional[bool] = None - retryStrategy: Optional[Dict[str, Union[str, int, dict]]] = None tags: Optional[Dict[str, str]] = None - timeout: Optional[Dict[str, int]] = None def to_dict(self): s = Struct() From 35f8464d4af012ef6ad9398c0f98d92f5aa30da4 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 10 Feb 2022 00:39:42 +0800 Subject: [PATCH 09/15] Fixed tests Signed-off-by: Kevin Su --- plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py | 2 +- plugins/flytekit-aws-batch/tests/test_aws_batch.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index 2f2b8d5852..3b19f15404 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional from dataclasses_json import dataclass_json from google.protobuf import json_format diff --git a/plugins/flytekit-aws-batch/tests/test_aws_batch.py b/plugins/flytekit-aws-batch/tests/test_aws_batch.py index 9ccfe24f82..44264dcacb 100644 --- a/plugins/flytekit-aws-batch/tests/test_aws_batch.py +++ b/plugins/flytekit-aws-batch/tests/test_aws_batch.py @@ -5,11 +5,9 @@ config = AWSBatchConfig( parameters={"codec": "mp4"}, - platformCapabilities=["EC2"], + platformCapabilities="EC2", propagateTags=True, - retryStrategy={"attempts": 10}, tags={"hello": "world"}, - timeout={"attemptDurationSeconds": 60}, ) From 7629730151683ebba2752cb81bcebfaa12520028 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 10 Feb 2022 17:15:49 +0800 Subject: [PATCH 10/15] Fixed tests Signed-off-by: Kevin Su --- flytekit/bin/entrypoint.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 3f5f07af13..411bb9e4b7 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -551,6 +551,7 @@ def map_execute_task_cmd( raw_output_data_prefix=raw_output_data_prefix, max_concurrency=max_concurrency, test=test, + is_aws_batch_single_job=is_aws_batch_single_job, dynamic_addl_distro=dynamic_addl_distro, dynamic_dest_dir=dynamic_dest_dir, resolver=resolver, From 7a2320b5ef157c34cce4bfe81559dcbc854be5fe Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 10 Feb 2022 17:30:49 +0800 Subject: [PATCH 11/15] Fixed tests Signed-off-by: Kevin Su --- plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index 3b19f15404..fd9e63fe89 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -43,7 +43,6 @@ def __init__(self, task_config: AWSBatchConfig, task_function: Callable, **kwarg super(AWSBatchFunctionTask, self).__init__( task_config=task_config, task_type=self._AWS_BATCH_TASK_TYPE, task_function=task_function, **kwargs ) - self._run_task = PythonFunctionTask(task_config=None, task_function=task_function) self._task_config = task_config def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: @@ -66,9 +65,9 @@ def get_command(self, settings: SerializationSettings) -> List[str]: "{{.rawOutputDataPrefix}}", "--is-aws-batch-single-job", "--resolver", - self._run_task.task_resolver.location, + self.task_resolver.location, "--", - *self._run_task.task_resolver.loader_args(settings, self._run_task), + *self.task_resolver.loader_args(settings, self), ] return container_args From b09a9c76aac65c65af1c43d32060bf01585bf812 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 16 Feb 2022 10:50:03 +0800 Subject: [PATCH 12/15] use pyflyte execute Signed-off-by: Kevin Su --- flytekit/bin/entrypoint.py | 11 +---------- flytekit/core/map_task.py | 2 +- .../flytekitplugins/awsbatch/task.py | 7 ++++--- plugins/flytekit-aws-batch/tests/test_aws_batch.py | 5 ++--- 4 files changed, 8 insertions(+), 17 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 411bb9e4b7..d629196d6b 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -340,7 +340,6 @@ def _execute_map_task( raw_output_data_prefix, max_concurrency, test, - is_aws_batch_single_job: bool, resolver: str, resolver_args: List[str], checkpoint_path: Optional[str] = None, @@ -360,7 +359,6 @@ def _execute_map_task( :param output_prefix: Where to write primitive outputs :param raw_output_data_prefix: Where to write offloaded data (files, directories, dataframes). :param test: Dry run - :param is_aws_batch_single_job: True if the aws batch job type is Single job :param resolver: The task resolver to use. This needs to be loadable directly from importlib (and thus cannot be nested). :param resolver_args: Args that will be passed to the aforementioned resolver's load_task function @@ -382,11 +380,7 @@ def _execute_map_task( _task_def = resolver_obj.load_task(loader_args=resolver_args) if not isinstance(_task_def, PythonFunctionTask): raise Exception("Map tasks cannot be run with instance tasks.", _task_def) - - if is_aws_batch_single_job: - map_task = _task_def - else: - map_task = MapPythonTask(_task_def, max_concurrency) + map_task = MapPythonTask(_task_def, max_concurrency) task_index = _compute_array_job_index() output_prefix = _os.path.join(output_prefix, str(task_index)) @@ -514,7 +508,6 @@ def fast_execute_task_cmd(additional_distribution, dest_dir, task_execute_cmd): @_click.option("--raw-output-data-prefix", required=False) @_click.option("--max-concurrency", type=int, required=False) @_click.option("--test", is_flag=True) -@_click.option("--is-aws-batch-single-job", is_flag=True) @_click.option("--dynamic-addl-distro", required=False) @_click.option("--dynamic-dest-dir", required=False) @_click.option("--resolver", required=True) @@ -531,7 +524,6 @@ def map_execute_task_cmd( raw_output_data_prefix, max_concurrency, test, - is_aws_batch_single_job, dynamic_addl_distro, dynamic_dest_dir, resolver, @@ -551,7 +543,6 @@ def map_execute_task_cmd( raw_output_data_prefix=raw_output_data_prefix, max_concurrency=max_concurrency, test=test, - is_aws_batch_single_job=is_aws_batch_single_job, dynamic_addl_distro=dynamic_addl_distro, dynamic_dest_dir=dynamic_dest_dir, resolver=resolver, diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 42ba785a41..1d321b8d6c 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -71,7 +71,7 @@ def __init__( def get_command(self, settings: SerializationSettings) -> List[str]: container_args = [ - "pyflyte-map-execute", + "pyflyte-execute", "--inputs", "{{.input}}", "--output-prefix", diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index fd9e63fe89..a5ed7c8d52 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -56,14 +56,15 @@ def get_config(self, settings: SerializationSettings) -> Dict[str, str]: def get_command(self, settings: SerializationSettings) -> List[str]: container_args = [ - "pyflyte-map-execute", + "pyflyte-execute", "--inputs", "{{.input}}", "--output-prefix", - "{{.outputPrefix}}", + # FlytePropeller will always read the output from this directory (outputPrefix/0) + # More detail, see https://github.com/flyteorg/flyteplugins/blob/0dd93c23ed2edeca65d58e89b0edb613f88120e0/go/tasks/plugins/array/catalog.go#L501. + "{{.outputPrefix}}/0", "--raw-output-data-prefix", "{{.rawOutputDataPrefix}}", - "--is-aws-batch-single-job", "--resolver", self.task_resolver.location, "--", diff --git a/plugins/flytekit-aws-batch/tests/test_aws_batch.py b/plugins/flytekit-aws-batch/tests/test_aws_batch.py index 44264dcacb..5cd1e5e5c2 100644 --- a/plugins/flytekit-aws-batch/tests/test_aws_batch.py +++ b/plugins/flytekit-aws-batch/tests/test_aws_batch.py @@ -32,14 +32,13 @@ def t1(a: int) -> str: ) assert t1.get_custom(settings) == config.to_dict() assert t1.get_command(settings) == [ - "pyflyte-map-execute", + "pyflyte-execute", "--inputs", "{{.input}}", "--output-prefix", - "{{.outputPrefix}}", + "{{.outputPrefix}}/0", "--raw-output-data-prefix", "{{.rawOutputDataPrefix}}", - "--is-aws-batch-single-job", "--resolver", "flytekit.core.python_auto_container.default_task_resolver", "--", From 92920993101cda883123a400cb5a41804885b925 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 16 Feb 2022 16:52:23 +0800 Subject: [PATCH 13/15] Fixed tests Signed-off-by: Kevin Su --- flytekit/core/map_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 1d321b8d6c..42ba785a41 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -71,7 +71,7 @@ def __init__( def get_command(self, settings: SerializationSettings) -> List[str]: container_args = [ - "pyflyte-execute", + "pyflyte-map-execute", "--inputs", "{{.input}}", "--output-prefix", From 6c4cd486b5158be1e740484beaa0a53fdebac2f2 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 16 Feb 2022 17:01:31 +0800 Subject: [PATCH 14/15] Fixed tests Signed-off-by: Kevin Su --- flytekit/bin/entrypoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index d629196d6b..9d07d38a63 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -379,7 +379,7 @@ def _execute_map_task( # Use the resolver to load the actual task object _task_def = resolver_obj.load_task(loader_args=resolver_args) if not isinstance(_task_def, PythonFunctionTask): - raise Exception("Map tasks cannot be run with instance tasks.", _task_def) + raise Exception("Map tasks cannot be run with instance tasks.") map_task = MapPythonTask(_task_def, max_concurrency) task_index = _compute_array_job_index() From caa6f1ea998bab12b9d0a2384af646b930bf66b2 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 17 Feb 2022 18:06:30 +0800 Subject: [PATCH 15/15] Added comment Signed-off-by: Kevin Su --- plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py index a5ed7c8d52..c9b30b7af1 100644 --- a/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py +++ b/plugins/flytekit-aws-batch/flytekitplugins/awsbatch/task.py @@ -60,7 +60,9 @@ def get_command(self, settings: SerializationSettings) -> List[str]: "--inputs", "{{.input}}", "--output-prefix", - # FlytePropeller will always read the output from this directory (outputPrefix/0) + # As of FlytePropeller v0.16.28, aws array batch plugin support to run single job. + # This task will call aws batch plugin to execute the task on aws batch service. + # For single job, FlytePropeller will always read the output from this directory (outputPrefix/0) # More detail, see https://github.com/flyteorg/flyteplugins/blob/0dd93c23ed2edeca65d58e89b0edb613f88120e0/go/tasks/plugins/array/catalog.go#L501. "{{.outputPrefix}}/0", "--raw-output-data-prefix",