From ee68a25facb5541000d5d6524a4009d7e3895ae1 Mon Sep 17 00:00:00 2001 From: casra-developers <68997390+casra-developers@users.noreply.github.com> Date: Mon, 20 Dec 2021 13:33:27 +0100 Subject: [PATCH] Added windows extensions (#16110) --- airflow/task/task_runner/base_task_runner.py | 35 +++++++++++++----- airflow/utils/configuration.py | 3 +- airflow/utils/platform.py | 3 ++ airflow/utils/process_utils.py | 11 ++++-- airflow/utils/timeout.py | 37 +++++++++++++++++++- 5 files changed, 75 insertions(+), 14 deletions(-) diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py index 5551508f49f74..64d528b4cce57 100644 --- a/airflow/task/task_runner/base_task_runner.py +++ b/airflow/task/task_runner/base_task_runner.py @@ -19,6 +19,13 @@ import os import subprocess import threading + +from airflow.utils.platform import IS_WINDOWS + +if not IS_WINDOWS: + # ignored to avoid flake complaining on Linux + from pwd import getpwnam # noqa + from tempfile import NamedTemporaryFile from typing import Optional, Union @@ -136,15 +143,25 @@ def run_command(self, run_with=None): self.log.info("Running on host: %s", get_hostname()) self.log.info('Running: %s', full_cmd) - proc = subprocess.Popen( - full_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - close_fds=True, - env=os.environ.copy(), - preexec_fn=os.setsid, - ) + if IS_WINDOWS: + proc = subprocess.Popen( + full_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + close_fds=True, + env=os.environ.copy(), + ) + else: + proc = subprocess.Popen( + full_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + close_fds=True, + env=os.environ.copy(), + preexec_fn=os.setsid, + ) # Start daemon thread to read subprocess logging output log_reader = threading.Thread( diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py index cc9273c69ef6e..ec810a9d55d50 100644 --- a/airflow/utils/configuration.py +++ b/airflow/utils/configuration.py @@ -21,6 +21,7 @@ from tempfile import mkstemp from airflow.configuration import conf +from airflow.utils.platform import IS_WINDOWS def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True): @@ -44,7 +45,7 @@ def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True): with os.fdopen(temp_fd, 'w') as temp_file: # Set the permissions before we write anything to it. - if chmod is not None: + if chmod is not None and not IS_WINDOWS: os.fchmod(temp_fd, chmod) json.dump(cfg_dict, temp_file) diff --git a/airflow/utils/platform.py b/airflow/utils/platform.py index 0c1db2d3296a2..0b36946a74423 100644 --- a/airflow/utils/platform.py +++ b/airflow/utils/platform.py @@ -20,8 +20,11 @@ import logging import os import pkgutil +import platform import sys +IS_WINDOWS = platform.system() == 'Windows' + log = logging.getLogger(__name__) diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py index 3b12115288da5..a369bd641cb27 100644 --- a/airflow/utils/process_utils.py +++ b/airflow/utils/process_utils.py @@ -20,14 +20,19 @@ import errno import logging import os -import pty import select import shlex import signal import subprocess import sys -import termios -import tty + +from airflow.utils.platform import IS_WINDOWS + +if not IS_WINDOWS: + import tty + import termios + import pty + from contextlib import contextmanager from typing import Dict, List diff --git a/airflow/utils/timeout.py b/airflow/utils/timeout.py index 22a0faf4c7cfa..6b4464b96073a 100644 --- a/airflow/utils/timeout.py +++ b/airflow/utils/timeout.py @@ -18,12 +18,41 @@ import os import signal +from threading import Timer +from typing import ContextManager, Optional, Type from airflow.exceptions import AirflowTaskTimeout from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.platform import IS_WINDOWS +_timeout = ContextManager[None] -class timeout(LoggingMixin): + +class _timeout_windows(_timeout, LoggingMixin): + def __init__(self, seconds=1, error_message='Timeout'): + super().__init__() + self._timer: Optional[Timer] = None + self.seconds = seconds + self.error_message = error_message + ', PID: ' + str(os.getpid()) + + def handle_timeout(self, *args): # pylint: disable=unused-argument + """Logs information and raises AirflowTaskTimeout.""" + self.log.error("Process timed out, PID: %s", str(os.getpid())) + raise AirflowTaskTimeout(self.error_message) + + def __enter__(self): + if self._timer: + self._timer.cancel() + self._timer = Timer(self.seconds, self.handle_timeout) + self._timer.start() + + def __exit__(self, type_, value, traceback): + if self._timer: + self._timer.cancel() + self._timer = None + + +class _timeout_posix(_timeout, LoggingMixin): """To be used in a ``with`` block and timeout its content.""" def __init__(self, seconds=1, error_message='Timeout'): @@ -48,3 +77,9 @@ def __exit__(self, type_, value, traceback): signal.setitimer(signal.ITIMER_REAL, 0) except ValueError: self.log.warning("timeout can't be used in the current context", exc_info=True) + + +if IS_WINDOWS: + timeout: Type[_timeout] = _timeout_windows +else: + timeout = _timeout_posix