Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added windows extensions #16110

Merged
merged 19 commits into from Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7ae0ac9
Added support to use airflow dask workers on Windows
dominik-werner-casra Oct 18, 2021
b1248f8
Added support to use airflow dask workers on Windows
dominik-werner-casra Oct 18, 2021
78e055f
Fixed isort static check
dominik-werner-casra Oct 21, 2021
0ee2341
Merge branch 'master' of https://github.com/casra-developers/airflow
dominik-werner-casra Oct 21, 2021
b216b7e
Fixed issues introduced by flawed merge.
dominik-werner-casra Oct 22, 2021
bd31d29
Reverted some files to main
dominik-werner-casra Oct 22, 2021
b3a62e9
Added support to use airflow dask workers on Windows
dominik-werner-casra Oct 18, 2021
881ffc9
Fixed isort static check
dominik-werner-casra Oct 21, 2021
d0167a2
Added support to use airflow dask workers on Windows
dominik-werner-casra Oct 18, 2021
cc861d5
Fixed issues introduced by flawed merge.
dominik-werner-casra Oct 22, 2021
069cf41
Reverted some files to main
dominik-werner-casra Oct 22, 2021
24276df
Merge branch 'master' of https://github.com/casra-developers/airflow
dominik-werner-casra Nov 8, 2021
ebeba90
Merge branch 'main' of https://github.com/apache/airflow
dominik-werner-casra Dec 6, 2021
3ab7939
Merge branch 'master' of https://github.com/casra-developers/airflow;…
dominik-werner-casra Dec 16, 2021
cac5ff6
Update airflow/task/task_runner/base_task_runner.py
potiuk Dec 16, 2021
89f3b54
Merge branch 'master' of https://github.com/casra-developers/airflow;…
dominik-werner-casra Dec 17, 2021
a62b972
fixup! Merge branch 'master' of https://github.com/casra-developers/a…
potiuk Dec 19, 2021
c0b5caf
Merge branch 'main' of https://github.com/apache/airflow
dominik-werner-casra Dec 20, 2021
1fa59e2
Merge branch 'master' of https://github.com/casra-developers/airflow
dominik-werner-casra Dec 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 26 additions & 9 deletions airflow/task/task_runner/base_task_runner.py
Expand Up @@ -19,6 +19,13 @@
import os
import subprocess
import threading

from airflow.utils.platform import IS_WINDOWS

if not IS_WINDOWS:
# ignored to avoid flake complaining on Linux
from pwd import getpwnam # noqa

from tempfile import NamedTemporaryFile
from typing import Optional, Union

Expand Down Expand Up @@ -136,15 +143,25 @@ def run_command(self, run_with=None):
self.log.info("Running on host: %s", get_hostname())
self.log.info('Running: %s', full_cmd)

proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
preexec_fn=os.setsid,
)
if IS_WINDOWS:
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
)
else:
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
preexec_fn=os.setsid,
)

# Start daemon thread to read subprocess logging output
log_reader = threading.Thread(
Expand Down
3 changes: 2 additions & 1 deletion airflow/utils/configuration.py
Expand Up @@ -21,6 +21,7 @@
from tempfile import mkstemp

from airflow.configuration import conf
from airflow.utils.platform import IS_WINDOWS


def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
Expand All @@ -44,7 +45,7 @@ def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):

with os.fdopen(temp_fd, 'w') as temp_file:
# Set the permissions before we write anything to it.
if chmod is not None:
if chmod is not None and not IS_WINDOWS:
os.fchmod(temp_fd, chmod)
json.dump(cfg_dict, temp_file)

Expand Down
3 changes: 3 additions & 0 deletions airflow/utils/platform.py
Expand Up @@ -20,8 +20,11 @@
import logging
import os
import pkgutil
import platform
import sys

IS_WINDOWS = platform.system() == 'Windows'

log = logging.getLogger(__name__)


Expand Down
11 changes: 8 additions & 3 deletions airflow/utils/process_utils.py
Expand Up @@ -20,14 +20,19 @@
import errno
import logging
import os
import pty
import select
import shlex
import signal
import subprocess
import sys
import termios
import tty

from airflow.utils.platform import IS_WINDOWS

if not IS_WINDOWS:
import tty
import termios
import pty

from contextlib import contextmanager
from typing import Dict, List

Expand Down
37 changes: 36 additions & 1 deletion airflow/utils/timeout.py
Expand Up @@ -18,12 +18,41 @@

import os
import signal
from threading import Timer
from typing import ContextManager, Optional, Type

from airflow.exceptions import AirflowTaskTimeout
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.platform import IS_WINDOWS

_timeout = ContextManager[None]

class timeout(LoggingMixin):

class _timeout_windows(_timeout, LoggingMixin):
def __init__(self, seconds=1, error_message='Timeout'):
super().__init__()
self._timer: Optional[Timer] = None
self.seconds = seconds
self.error_message = error_message + ', PID: ' + str(os.getpid())

def handle_timeout(self, *args): # pylint: disable=unused-argument
"""Logs information and raises AirflowTaskTimeout."""
self.log.error("Process timed out, PID: %s", str(os.getpid()))
raise AirflowTaskTimeout(self.error_message)

def __enter__(self):
if self._timer:
self._timer.cancel()
self._timer = Timer(self.seconds, self.handle_timeout)
self._timer.start()

def __exit__(self, type_, value, traceback):
if self._timer:
self._timer.cancel()
self._timer = None


class _timeout_posix(_timeout, LoggingMixin):
"""To be used in a ``with`` block and timeout its content."""

def __init__(self, seconds=1, error_message='Timeout'):
Expand All @@ -48,3 +77,9 @@ def __exit__(self, type_, value, traceback):
signal.setitimer(signal.ITIMER_REAL, 0)
except ValueError:
self.log.warning("timeout can't be used in the current context", exc_info=True)


if IS_WINDOWS:
timeout: Type[_timeout] = _timeout_windows
else:
timeout = _timeout_posix