diff --git a/Dockerfile b/Dockerfile index d470fc17dd994..ab9f8ec0016b2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -180,6 +180,41 @@ RUN HADOOP_DISTRO="cdh" \ ENV PATH "${PATH}:/opt/hive/bin" +# Install Singularity (for Singularity executor testing) +RUN apt-get update \ + && apt-get install --no-install-recommends -y \ + uuid-dev \ + libgpgme11-dev \ + squashfs-tools \ + libseccomp-dev \ + pkg-config \ + cryptsetup \ + && apt-get autoremove -yqq --purge \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +ENV GOLANG_VERSION=1.13.8 +ENV SINGULARITY_VERSION=3.5.2 + +RUN curl -L -o go${GOLANG_VERSION}.linux-amd64.tar.gz "https://dl.google.com/go/go${GOLANG_VERSION}.linux-amd64.tar.gz" \ + && tar -C /usr/local -xzvf "go${GOLANG_VERSION}.linux-amd64.tar.gz" \ + && rm "go${GOLANG_VERSION}.linux-amd64.tar.gz" + +ENV PATH="${PATH}:/usr/local/go/bin" + +WORKDIR /tmp + +RUN curl -L -o singularity-${SINGULARITY_VERSION}.tar.gz https://github.com/sylabs/singularity/releases/download/v${SINGULARITY_VERSION}/singularity-${SINGULARITY_VERSION}.tar.gz \ + && tar -xzf singularity-${SINGULARITY_VERSION}.tar.gz + +WORKDIR /tmp/singularity + +RUN ./mconfig \ + && make -C builddir \ + && make -C builddir install + +WORKDIR / + # Install Minicluster ENV MINICLUSTER_HOME="/opt/minicluster" diff --git a/airflow/providers/singularity/__init__.py b/airflow/providers/singularity/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/airflow/providers/singularity/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/singularity/example_dags/__init__.py b/airflow/providers/singularity/example_dags/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/airflow/providers/singularity/example_dags/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/singularity/example_dags/example_singularity_operator.py b/airflow/providers/singularity/example_dags/example_singularity_operator.py new file mode 100644 index 0000000000000..09057fa5ee115 --- /dev/null +++ b/airflow/providers/singularity/example_dags/example_singularity_operator.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime, timedelta + +from airflow.models import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.providers.singularity.operators.singularity import SingularityOperator + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime.utcnow(), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5) +} + +dag = DAG( + 'singularity_sample', default_args=default_args, schedule_interval=timedelta(minutes=10)) + +t1 = BashOperator( + task_id='print_date', + bash_command='date', + dag=dag) + +t2 = BashOperator( + task_id='sleep', + bash_command='sleep 5', + retries=3, + dag=dag) + +t3 = SingularityOperator(command='/bin/sleep 30', + image='docker://busybox:1.30.1', + task_id='singularity_op_tester', + dag=dag) + +t4 = BashOperator( + task_id='print_hello', + bash_command='echo "hello world!!!"', + dag=dag) + +t1.set_downstream(t2) +t1.set_downstream(t3) +t3.set_downstream(t4) diff --git a/airflow/providers/singularity/operators/__init__.py b/airflow/providers/singularity/operators/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/airflow/providers/singularity/operators/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/singularity/operators/singularity.py b/airflow/providers/singularity/operators/singularity.py new file mode 100644 index 0000000000000..24412a09349fc --- /dev/null +++ b/airflow/providers/singularity/operators/singularity.py @@ -0,0 +1,181 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import ast +import os +import shutil +from typing import Any, Dict, List, Optional, Union + +from spython.main import Client + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class SingularityOperator(BaseOperator): + """ + Execute a command inside a Singularity container + + Singularity has more seamless connection to the host than Docker, so + no special binds are needed to ensure binding content in the user $HOME + and temporary directories. If the user needs custom binds, this can + be done with --volumes + + :param image: Singularity image or URI from which to create the container. + :type image: str + :param auto_remove: Delete the container when the process exits + The default is False. + :type auto_remove: bool + :param command: Command to be run in the container. (templated) + :type command: str or list + :param start_command: start command to pass to the container instance + :type start_command: string or list + :param environment: Environment variables to set in the container. (templated) + :type environment: dict + :param working_dir: Set a working directory for the instance. + :type working_dir: str + :param force_pull: Pull the image on every run. Default is False. + :type force_pull: bool + :param volumes: List of volumes to mount into the container, e.g. + ``['/host/path:/container/path', '/host/path2:/container/path2']``. + :param options: other flags (list) to provide to the instance start + :type options: list + :param working_dir: Working directory to + set on the container (equivalent to the -w switch the docker client) + :type working_dir: str + """ + template_fields = ('command', 'environment',) + template_ext = ('.sh', '.bash',) + + @apply_defaults + def __init__( # pylint: disable=too-many-arguments + self, + image: str, + command: Union[int, List[str]], + start_command: Optional[Union[str, List[str]]] = None, + environment: Optional[Dict[str, Any]] = None, + pull_folder: Optional[str] = None, + working_dir: Optional[str] = None, + force_pull: Optional[bool] = False, + volumes: Optional[List[str]] = None, + options: Optional[List[str]] = None, + auto_remove: Optional[bool] = False, + *args, + **kwargs) -> None: + + super(SingularityOperator, self).__init__(*args, **kwargs) + self.auto_remove = auto_remove + self.command = command + self.start_command = start_command + self.environment = environment or {} + self.force_pull = force_pull + self.image = image + self.instance = None + self.options = options or [] + self.pull_folder = pull_folder + self.volumes = volumes or [] + self.working_dir = working_dir + self.cli = None + self.container = None + + def execute(self, context): + + self.log.info('Preparing Singularity container %s', self.image) + self.cli = Client + + if not self.command: + raise AirflowException('You must define a command.') + + # Pull the container if asked, and ensure not a binary file + if self.force_pull and not os.path.exists(self.image): + self.log.info('Pulling container %s', self.image) + image = self.cli.pull(self.image, stream=True, pull_folder=self.pull_folder) + + # If we need to stream result for the user, returns lines + if isinstance(image, list): + lines = image.pop() + image = image[0] + for line in lines: + self.log.info(line) + + # Update the image to be a filepath on the system + self.image = image + + # Prepare list of binds + for bind in self.volumes: + self.options = self.options + ['--bind', bind] + + # Does the user want a custom working directory? + if self.working_dir is not None: + self.options = self.options + ['--workdir', self.working_dir] + + # Export environment before instance is run + for enkey, envar in self.environment.items(): + self.log.debug('Exporting %s=%s', envar, enkey) + os.putenv(enkey, envar) + os.environ[enkey] = envar + + # Create a container instance + self.log.debug('Options include: %s', self.options) + self.instance = self.cli.instance(self.image, + options=self.options, + args=self.start_command, + start=False) + + self.instance.start() + self.log.info(self.instance.cmd) + self.log.info('Created instance %s from %s', self.instance, self.image) + + self.log.info('Running command %s', self._get_command()) + self.cli.quiet = True + result = self.cli.execute(self.instance, + self._get_command(), + return_result=True) + + # Stop the instance + self.log.info('Stopping instance %s', self.instance) + self.instance.stop() + + if self.auto_remove is True: + if self.auto_remove and os.path.exists(self.image): + shutil.rmtree(self.image) + + # If the container failed, raise the exception + if result['return_code'] != 0: + message = result['message'] + raise AirflowException(f'Singularity failed: {message}') + + self.log.info('Output from command %s', result['message']) + + def _get_command(self): + if self.command is not None and self.command.strip().find('[') == 0: + commands = ast.literal_eval(self.command) + else: + commands = self.command + return commands + + def on_kill(self): + if self.instance is not None: + self.log.info('Stopping Singularity instance') + self.instance.stop() + + # If an image exists, clean it up + if self.auto_remove is True: + if self.auto_remove and os.path.exists(self.image): + shutil.rmtree(self.image) diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst index 2ee78a63acfdb..0ea5eebd17f22 100644 --- a/docs/autoapi_templates/index.rst +++ b/docs/autoapi_templates/index.rst @@ -174,6 +174,8 @@ All operators are in the following packages: airflow/providers/sftp/sensors/index + airflow/providers/singularity/operators/index + airflow/providers/slack/operators/index airflow/providers/snowflake/operators/index diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index 8456594bae951..aaef771868b39 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -1210,6 +1210,12 @@ These integrations allow you to perform various operations using various softwar - - + * - `Singularity `__ + - + - + - :mod:`airflow.providers.singularity.operators.singularity` + - + * - `SQLite `__ - - :mod:`airflow.providers.sqlite.hooks.sqlite` diff --git a/scripts/ci/in_container/entrypoint_ci.sh b/scripts/ci/in_container/entrypoint_ci.sh index aaa8959df36cd..1f5e37df318aa 100755 --- a/scripts/ci/in_container/entrypoint_ci.sh +++ b/scripts/ci/in_container/entrypoint_ci.sh @@ -164,7 +164,6 @@ if [[ ${RUNTIME:=""} == "kubernetes" ]]; then export AIRFLOW_KUBERNETES_IMAGE_TAG fi - if [[ "${ENABLE_KIND_CLUSTER}" == "true" ]]; then export CLUSTER_NAME="airflow-python-${PYTHON_VERSION}-${KUBERNETES_VERSION}" "${MY_DIR}/kubernetes/setup_kind_cluster.sh" diff --git a/setup.py b/setup.py index 4140c35174253..bceb3f6e77037 100644 --- a/setup.py +++ b/setup.py @@ -336,6 +336,7 @@ def write_version(filename: str = os.path.join(*["airflow", "git_version"])): 'blinker>=1.1', 'sentry-sdk>=0.8.0', ] +singularity = ['spython>=0.0.56'] slack = [ 'slackclient>=1.0.0,<2.0.0', ] @@ -424,8 +425,8 @@ def write_version(filename: str = os.path.join(*["airflow", "git_version"])): devel_all = (all_dbs + atlas + aws + azure + celery + cgroups + datadog + devel + doc + docker + druid + elasticsearch + gcp + grpc + jdbc + jenkins + kerberos + kubernetes + ldap + odbc + oracle + pagerduty + papermill + password + pinot + redis + salesforce + samba + segment + sendgrid + - sentry + slack + snowflake + ssh + statsd + tableau + virtualenv + webhdfs + yandexcloud + - zendesk) + sentry + singularity + slack + snowflake + ssh + statsd + tableau + virtualenv + webhdfs + + yandexcloud + zendesk) # Snakebite are not Python 3 compatible :'( if PY3: @@ -566,6 +567,7 @@ def do_setup(): 'segment': segment, 'sendgrid': sendgrid, 'sentry': sentry, + 'singularity': singularity, 'slack': slack, 'snowflake': snowflake, 'ssh': ssh, diff --git a/tests/providers/singularity/__init__.py b/tests/providers/singularity/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/tests/providers/singularity/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/singularity/operators/__init__.py b/tests/providers/singularity/operators/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/tests/providers/singularity/operators/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/singularity/operators/test_singularity.py b/tests/providers/singularity/operators/test_singularity.py new file mode 100644 index 0000000000000..05f9796fb1c07 --- /dev/null +++ b/tests/providers/singularity/operators/test_singularity.py @@ -0,0 +1,171 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +import mock +import six +from parameterized import parameterized +from spython.instance import Instance + +from airflow.exceptions import AirflowException +from airflow.providers.singularity.operators.singularity import SingularityOperator + + +class SingularityOperatorTestCase(unittest.TestCase): + @mock.patch('airflow.providers.singularity.operators.singularity.Client') + def test_execute(self, client_mock): + instance = mock.Mock(autospec=Instance, **{ + 'start.return_value': 0, + 'stop.return_value': 0, + }) + + client_mock.instance.return_value = instance + client_mock.execute.return_value = {'return_code': 0, + 'message': 'message'} + + task = SingularityOperator( + task_id='task-id', + image="docker://busybox", + command="echo hello" + ) + task.execute({}) + + client_mock.instance.assert_called_once_with("docker://busybox", + options=[], + args=None, + start=False) + + client_mock.execute.assert_called_once_with(mock.ANY, + "echo hello", + return_result=True) + + execute_args, _ = client_mock.execute.call_args + self.assertIs(execute_args[0], instance) + + instance.start.assert_called_once_with() + instance.stop.assert_called_once_with() + + @parameterized.expand([ + ("",), + (None,), + ]) + def test_command_is_required(self, command): + task = SingularityOperator( + task_id='task-id', + image="docker://busybox", + command=command + ) + with six.assertRaisesRegex(self, AirflowException, "You must define a command."): + task.execute({}) + + @mock.patch('airflow.providers.singularity.operators.singularity.Client') + def test_image_should_be_pulled_when_not_exists(self, client_mock): + instance = mock.Mock(autospec=Instance, **{ + 'start.return_value': 0, + 'stop.return_value': 0, + }) + + client_mock.pull.return_value = '/tmp/busybox_latest.sif' + client_mock.instance.return_value = instance + client_mock.execute.return_value = {'return_code': 0, + 'message': 'message'} + + task = SingularityOperator( + task_id='task-id', + image="docker://busybox", + command="echo hello", + pull_folder="/tmp", + force_pull=True + ) + task.execute({}) + + client_mock.instance.assert_called_once_with( + "/tmp/busybox_latest.sif", options=[], args=None, start=False + ) + client_mock.pull.assert_called_once_with( + "docker://busybox", stream=True, pull_folder="/tmp" + ) + client_mock.execute.assert_called_once_with(mock.ANY, + "echo hello", + return_result=True) + + @parameterized.expand([ + (None, [], ), + ([], [], ), + (["AAA"], ['--bind', 'AAA'], ), + (["AAA", "BBB"], ['--bind', 'AAA', '--bind', 'BBB'], ), + (["AAA", "BBB", "CCC"], ['--bind', 'AAA', '--bind', 'BBB', '--bind', 'CCC'], ), + + ]) + @mock.patch('airflow.providers.singularity.operators.singularity.Client') + def test_bind_options(self, volumes, expected_options, client_mock): + instance = mock.Mock(autospec=Instance, **{ + 'start.return_value': 0, + 'stop.return_value': 0, + }) + client_mock.pull.return_value = 'docker://busybox' + client_mock.instance.return_value = instance + client_mock.execute.return_value = {'return_code': 0, + 'message': 'message'} + + task = SingularityOperator( + task_id='task-id', + image="docker://busybox", + command="echo hello", + force_pull=True, + volumes=volumes + ) + task.execute({}) + + client_mock.instance.assert_called_once_with( + "docker://busybox", options=expected_options, args=None, start=False + ) + + @parameterized.expand([ + (None, [], ), + ("", ['--workdir', ''], ), + ("/work-dir/", ['--workdir', '/work-dir/'], ), + ]) + @mock.patch('airflow.providers.singularity.operators.singularity.Client') + def test_working_dir(self, working_dir, expected_working_dir, client_mock): + instance = mock.Mock(autospec=Instance, **{ + 'start.return_value': 0, + 'stop.return_value': 0, + }) + client_mock.pull.return_value = 'docker://busybox' + client_mock.instance.return_value = instance + client_mock.execute.return_value = {'return_code': 0, + 'message': 'message'} + + task = SingularityOperator( + task_id='task-id', + image="docker://busybox", + command="echo hello", + force_pull=True, + working_dir=working_dir + ) + task.execute({}) + + client_mock.instance.assert_called_once_with( + "docker://busybox", options=expected_working_dir, args=None, start=False + ) + + +if __name__ == "__main__": + unittest.main()