-
Notifications
You must be signed in to change notification settings - Fork 13.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[AIRFLOW-4030] second attempt to add singularity to airflow (#7191)
* adding singularity operator and tests Signed-off-by: Vanessa Sochat <vsochat@stanford.edu> * removing encoding pragmas and fixing up dockerfile to pass linting Signed-off-by: Vanessa Sochat <vsochat@stanford.edu> * make workdir in /tmp because AIRFLOW_SOURCES not defined yet Signed-off-by: Vanessa Sochat <vsochat@stanford.edu> * curl needs to follow redirects with -L Signed-off-by: Vanessa Sochat <vsochat@stanford.edu> * moving files to where they are supposed to be, more changes to mock, no clue Signed-off-by: vsoch <vsochat@stanford.edu> * removing trailing whitespace, moving example_dag for singularity, adding licenses to empty init files Signed-off-by: vsoch <vsochat@stanford.edu> * ran isort on example dags file Signed-off-by: vsoch <vsochat@stanford.edu> * adding missing init in example_dags folder for singularity Signed-off-by: vsoch <vsochat@stanford.edu> * removing code from __init__.py files for singularity operator to fix documentation generation Signed-off-by: vsoch <vsochat@stanford.edu> * forgot to update link to singularity in operators and hooks ref Signed-off-by: vsoch <vsochat@stanford.edu> * command must have been provided on init of singularity operator instance Signed-off-by: vsoch <vsochat@stanford.edu> * I guess I'm required to have a task_id? Signed-off-by: vsoch <vsochat@stanford.edu> * try adding working_dir to singularity operator type definitions Signed-off-by: vsoch <vsochat@stanford.edu> * disable too many arguments for pylint of singularity operator init Signed-off-by: vsoch <vsochat@stanford.edu> * move pylint disable up to line 64 - doesnt catch at end of statement like other examples Signed-off-by: vsoch <vsochat@stanford.edu> * two spaces before inline comment Signed-off-by: vsoch <vsochat@stanford.edu> * I dont see task_id as a param for other providers, removing for singularity operator Signed-off-by: vsoch <vsochat@stanford.edu> * adding debug print Signed-off-by: vsoch <vsochat@stanford.edu> * allow for return of just image and/or lines Signed-off-by: vsoch <vsochat@stanford.edu> * dont understand how mock works, but the image should exist after its pulled.... Signed-off-by: vsoch <vsochat@stanford.edu> * try removing shutil, the client should handle pull folder instead Signed-off-by: vsoch <vsochat@stanford.edu> * try changing pull-file to same uri that is expected to be pulled Signed-off-by: vsoch <vsochat@stanford.edu> * import of AirflowException moved to exceptions Signed-off-by: vsoch <vsochat@stanford.edu> * DAG module was moved to airflow.models Signed-off-by: vsoch <vsochat@stanford.edu> * ensure pull is called with pull_folder Signed-off-by: vsoch <vsochat@stanford.edu>
- Loading branch information
Showing
13 changed files
with
546 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. |
62 changes: 62 additions & 0 deletions
62
airflow/providers/singularity/example_dags/example_singularity_operator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
from datetime import datetime, timedelta | ||
|
||
from airflow.models import DAG | ||
from airflow.operators.bash_operator import BashOperator | ||
from airflow.providers.singularity.operators.singularity import SingularityOperator | ||
|
||
default_args = { | ||
'owner': 'airflow', | ||
'depends_on_past': False, | ||
'start_date': datetime.utcnow(), | ||
'email': ['airflow@example.com'], | ||
'email_on_failure': False, | ||
'email_on_retry': False, | ||
'retries': 1, | ||
'retry_delay': timedelta(minutes=5) | ||
} | ||
|
||
dag = DAG( | ||
'singularity_sample', default_args=default_args, schedule_interval=timedelta(minutes=10)) | ||
|
||
t1 = BashOperator( | ||
task_id='print_date', | ||
bash_command='date', | ||
dag=dag) | ||
|
||
t2 = BashOperator( | ||
task_id='sleep', | ||
bash_command='sleep 5', | ||
retries=3, | ||
dag=dag) | ||
|
||
t3 = SingularityOperator(command='/bin/sleep 30', | ||
image='docker://busybox:1.30.1', | ||
task_id='singularity_op_tester', | ||
dag=dag) | ||
|
||
t4 = BashOperator( | ||
task_id='print_hello', | ||
bash_command='echo "hello world!!!"', | ||
dag=dag) | ||
|
||
t1.set_downstream(t2) | ||
t1.set_downstream(t3) | ||
t3.set_downstream(t4) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
import ast | ||
import os | ||
import shutil | ||
from typing import Any, Dict, List, Optional, Union | ||
|
||
from spython.main import Client | ||
|
||
from airflow.exceptions import AirflowException | ||
from airflow.models import BaseOperator | ||
from airflow.utils.decorators import apply_defaults | ||
|
||
|
||
class SingularityOperator(BaseOperator): | ||
""" | ||
Execute a command inside a Singularity container | ||
Singularity has more seamless connection to the host than Docker, so | ||
no special binds are needed to ensure binding content in the user $HOME | ||
and temporary directories. If the user needs custom binds, this can | ||
be done with --volumes | ||
:param image: Singularity image or URI from which to create the container. | ||
:type image: str | ||
:param auto_remove: Delete the container when the process exits | ||
The default is False. | ||
:type auto_remove: bool | ||
:param command: Command to be run in the container. (templated) | ||
:type command: str or list | ||
:param start_command: start command to pass to the container instance | ||
:type start_command: string or list | ||
:param environment: Environment variables to set in the container. (templated) | ||
:type environment: dict | ||
:param working_dir: Set a working directory for the instance. | ||
:type working_dir: str | ||
:param force_pull: Pull the image on every run. Default is False. | ||
:type force_pull: bool | ||
:param volumes: List of volumes to mount into the container, e.g. | ||
``['/host/path:/container/path', '/host/path2:/container/path2']``. | ||
:param options: other flags (list) to provide to the instance start | ||
:type options: list | ||
:param working_dir: Working directory to | ||
set on the container (equivalent to the -w switch the docker client) | ||
:type working_dir: str | ||
""" | ||
template_fields = ('command', 'environment',) | ||
template_ext = ('.sh', '.bash',) | ||
|
||
@apply_defaults | ||
def __init__( # pylint: disable=too-many-arguments | ||
self, | ||
image: str, | ||
command: Union[int, List[str]], | ||
start_command: Optional[Union[str, List[str]]] = None, | ||
environment: Optional[Dict[str, Any]] = None, | ||
pull_folder: Optional[str] = None, | ||
working_dir: Optional[str] = None, | ||
force_pull: Optional[bool] = False, | ||
volumes: Optional[List[str]] = None, | ||
options: Optional[List[str]] = None, | ||
auto_remove: Optional[bool] = False, | ||
*args, | ||
**kwargs) -> None: | ||
|
||
super(SingularityOperator, self).__init__(*args, **kwargs) | ||
self.auto_remove = auto_remove | ||
self.command = command | ||
self.start_command = start_command | ||
self.environment = environment or {} | ||
self.force_pull = force_pull | ||
self.image = image | ||
self.instance = None | ||
self.options = options or [] | ||
self.pull_folder = pull_folder | ||
self.volumes = volumes or [] | ||
self.working_dir = working_dir | ||
self.cli = None | ||
self.container = None | ||
|
||
def execute(self, context): | ||
|
||
self.log.info('Preparing Singularity container %s', self.image) | ||
self.cli = Client | ||
|
||
if not self.command: | ||
raise AirflowException('You must define a command.') | ||
|
||
# Pull the container if asked, and ensure not a binary file | ||
if self.force_pull and not os.path.exists(self.image): | ||
self.log.info('Pulling container %s', self.image) | ||
image = self.cli.pull(self.image, stream=True, pull_folder=self.pull_folder) | ||
|
||
# If we need to stream result for the user, returns lines | ||
if isinstance(image, list): | ||
lines = image.pop() | ||
image = image[0] | ||
for line in lines: | ||
self.log.info(line) | ||
|
||
# Update the image to be a filepath on the system | ||
self.image = image | ||
|
||
# Prepare list of binds | ||
for bind in self.volumes: | ||
self.options = self.options + ['--bind', bind] | ||
|
||
# Does the user want a custom working directory? | ||
if self.working_dir is not None: | ||
self.options = self.options + ['--workdir', self.working_dir] | ||
|
||
# Export environment before instance is run | ||
for enkey, envar in self.environment.items(): | ||
self.log.debug('Exporting %s=%s', envar, enkey) | ||
os.putenv(enkey, envar) | ||
os.environ[enkey] = envar | ||
|
||
# Create a container instance | ||
self.log.debug('Options include: %s', self.options) | ||
self.instance = self.cli.instance(self.image, | ||
options=self.options, | ||
args=self.start_command, | ||
start=False) | ||
|
||
self.instance.start() | ||
self.log.info(self.instance.cmd) | ||
self.log.info('Created instance %s from %s', self.instance, self.image) | ||
|
||
self.log.info('Running command %s', self._get_command()) | ||
self.cli.quiet = True | ||
result = self.cli.execute(self.instance, | ||
self._get_command(), | ||
return_result=True) | ||
|
||
# Stop the instance | ||
self.log.info('Stopping instance %s', self.instance) | ||
self.instance.stop() | ||
|
||
if self.auto_remove is True: | ||
if self.auto_remove and os.path.exists(self.image): | ||
shutil.rmtree(self.image) | ||
|
||
# If the container failed, raise the exception | ||
if result['return_code'] != 0: | ||
message = result['message'] | ||
raise AirflowException(f'Singularity failed: {message}') | ||
|
||
self.log.info('Output from command %s', result['message']) | ||
|
||
def _get_command(self): | ||
if self.command is not None and self.command.strip().find('[') == 0: | ||
commands = ast.literal_eval(self.command) | ||
else: | ||
commands = self.command | ||
return commands | ||
|
||
def on_kill(self): | ||
if self.instance is not None: | ||
self.log.info('Stopping Singularity instance') | ||
self.instance.stop() | ||
|
||
# If an image exists, clean it up | ||
if self.auto_remove is True: | ||
if self.auto_remove and os.path.exists(self.image): | ||
shutil.rmtree(self.image) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.