Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added windows extensions #16110

Merged
merged 19 commits into from Dec 20, 2021
Merged

Added windows extensions #16110

merged 19 commits into from Dec 20, 2021

Conversation

casra-developers
Copy link
Contributor

This PR was created after a discussion in this post. @potiuk asked to be mentioned here so he can work with us to integrate those gradual changes.

The aim of this PR is to gradually enable support for the Windows Platform. The changes in this PR allow Airflow to be used with on a Windows System as Dask-Worker. Other things like Web-Server and Task-Scheduling are not possible because of the way processes are handled in Airflow. Next steps would be to find suitable alternatives to those POSIX process management concepts that work on Windows.

@boring-cyborg boring-cyborg bot added area:core-operators Operators, Sensors and hooks within Core Airflow area:logging area:Scheduler Scheduler or dag parsing Issues labels May 27, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented May 27, 2021

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, pylint and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@potiuk
Copy link
Member

potiuk commented May 27, 2021

Looks like a weekend adventure to take a look (and possibly finally use the dual boot dusted Windows partition on my Linux)

@casra-developers
Copy link
Contributor Author

@potiuk please let me now how I can assist you (sadly not during the weekend since I have to book my hours ^^). I am aware that the changes so far are more of workarounds than actual platform support.

@potiuk
Copy link
Member

potiuk commented May 27, 2021

Thanks @casra-developers . We can work on asynchronous fashion all right here. And I believe adding full windows support to get Airflow up and running on Windows will take quite some time (if it will be there at all). For now what we can do we can add partial support:

  1. for development purpose
  2. to run particular parts of Airlfow - mainly workers - on Windows, to run particular windows-only tasks (which is your case)

And both 1) and 2) are mostly about a series of hacks to make it works for those particular scenarios. So we are well aligned here I think :P

Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments on the implementation style. I have a hunch the windows_extensions submodule could have legal implications but that’s another topic…

Comment on lines 380 to 393
# find python executable folder
candidates = [os.path.join(tmp_dir, 'bin'), os.path.join(tmp_dir, 'scripts')]
python_folder = None
for candidate in candidates:
if os.path.isdir(candidate):
python_folder = candidate
break

if python_folder is None:
raise AirflowException(f'Unable to find python executable in "{tempdir}"')

execute_in_subprocess(
cmd=[
f'{tmp_dir}/bin/python',
os.path.join(python_folder, 'python'),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simplified with shutil.which. Instead of trying to find a parent directory to os.path.join later:

candidate_dirs = [os.path.join(tmp_dir, "bin"), os.path.join(tmp_dir, "Scripts")]
python_executable = shutil.which("python", path=os.pathsep.join(candidate_dirs))
if not python_executable:
    raise AirflowException(...)
execute_in_subprocess(cmd=[python_executable, ...])

Also, I feel the AirflowException should show tmp_dir instead. Showing tempdir can be misleading because the Python executable isn’t directly in the temp directory, but subdirectory tmp_dir.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice, I didn't know about this function. Updated in the latest commit.

Comment on lines 3 to 7
def is_windows() -> bool:
"""
Returns true if executing system is Windows
"""
return platform.system() == 'Windows'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be a good idea to functools.lru_cache this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obsolete now since we are using a constant in the latest commit but I will keep this in mind.

@@ -86,6 +87,10 @@ def __init__(self, local_task_job):

# pylint: disable=consider-using-with
self._error_file = NamedTemporaryFile(delete=True)

# HOTFIX: When reporting exceptions, this file was usually locked because it was still opened by this process
self._error_file.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wrong. The documentation says this would destroy the file immediately, rendering this attribute useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Windows we basically just create an empty text file. Not closing it immediately raised an exception while logging since the file was opened by another python process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok nevermind... I removed the line and it still works on our Dask-Worker.

Comment on lines 142 to 165
if is_windows():
# pylint: disable=subprocess-popen-preexec-fn,consider-using-with
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy()
)
else:
# pylint: disable=subprocess-popen-preexec-fn,consider-using-with
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
preexec_fn=os.setsid, # does not exist on Windows
)
Copy link
Member

@uranusjr uranusjr May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use start_new_session=True instead. This options is a shorthand to preexec_fn=os.setsid if available, and silently ignored on Windows (if I’m understanding the documentation correctly).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to work, I will change it. Thanks!

'which are configured differently to not find the log files.'
]))
result_name = result_name.replace(':', '_')
return result_name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should just always replace : everywhere. It’s only there because of the datetime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also be for it. The warning is fine to keep like it, but we could simply change default configuration of log_filename_template to contain the replace filter. This will keep backwards compatibility for people who already have airflow but all new installations will use the ones with replace.

Comment on lines 30 to 31
if is_windows():
from airflow.windows_extensions import termios, tty, pty
else:
import pty
import termios
import tty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should have a global shim (maybe a new airflow.platform that works similarly to airflow.compat) and use it in all places instead of doing if is_windows() everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do that. If I understand you correctly you basically do import distinction in one file and than just import from there.

@@ -955,7 +955,7 @@ def __deepcopy__(self, memo):

for k, v in self.__dict__.items():
if k not in shallow_copy:
setattr(result, k, copy.deepcopy(v, memo)) # noqa
setattr(result, k, v if type(v).__name__ == 'module' else copy.deepcopy(v, memo)) # modules cannot be pickled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What module is being copied? We've already got the shallow_copy_attrs list -- so anything not deep-copyable should probably go in that list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I am not sure if this comes from the different implementations of some dependencies between Windows and the other systems. This was basically one approach to get rid of the errors popping up without having to go through all the offending attributes. I will see if I can make this a bit less "hacky".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to get clarity here indeed .

@@ -0,0 +1,16 @@
import platform

def is_windows() -> bool:
Copy link
Member

@ashb ashb May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've already got airflow.utils.platform -- put all this in there instead of creating a new module.

Comment on lines 3 to 7
def is_windows() -> bool:
"""
Returns true if executing system is Windows
"""
return platform.system() == 'Windows'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a function for this either.

Suggested change
def is_windows() -> bool:
"""
Returns true if executing system is Windows
"""
return platform.system() == 'Windows'
IS_WINDOWS = platform.system() == 'Windows'
"""True if operating system is Windows"""```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, has been changed in the latest commit.

:param posix_option: Option for Windows system (e.g. path to executable)
:returns: Choice based on platform
"""
return windows_option if is_windows() else posix_option
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New line missing at end of file.

@@ -0,0 +1,35 @@
import enum

class Signals(enum.Enum):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do need this on Windows? -- it doesn't support most of these signals does it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, this was basically just to satisfy some of the scripts in airflow.utils. We tried to approximate the behavior of the various signal method by using things like e.g. threads for timers. Certainly not ideal, I am very open to better options.

@@ -0,0 +1 @@
from airflow.windows_extensions.Signals import Signals
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from airflow.windows_extensions.Signals import Signals
from airflow.windows_extensions.signals import Signals

Module names should be lower case by convention.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait -- we have Signals and signals.

Don't do that. Put them both in signals.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it’s Signals and signal, but that’s still (more?) confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uups, mistake

# Only tested on Linux.
# See: W. Richard Stevens. 1992. Advanced Programming in the
# UNIX Environment. Chapter 19.
# Author: Steen Lumholt -- with additions by Guido.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did this file come from? What is it licensed under?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is copied from the installation directory of Python. Under Windows this is "...\Python37\Lib\pty.py". The license should be this one https://docs.python.org/3/license.html. We put it there to have our own tty module injected instead of the one it uses normally.

Comment on lines 49 to 59
def _open_terminal():
"""Open pty master and return (master_fd, tty_name)."""
for x in 'pqrstuvwxyzPQRST':
for y in '0123456789abcdef':
pty_name = '/dev/pty' + x + y
try:
fd = os.open(pty_name, os.O_RDWR)
except OSError:
continue
return (fd, '/dev/tty' + x + y)
raise OSError('out of pty devices')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to work on windows?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good question. I would have assumed as much since this file comes with every python installation on Windows, but then again the reason why we pasted it in here was that the tty module did not work properly.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to see windows support coming in, but some refactoring to do on this PR as uranusjr has suggested

  • To remove the if_windows() form the code base and instead just import the "right" thing form airflow.platform etc.
  • I'm not sure that the pty module you have will work on windows?

@casra-developers
Copy link
Contributor Author

Thank you so much for all the feedback. I will look into it and try address all the various points.

@casra-developers
Copy link
Contributor Author

Some comments on the implementation style. I have a hunch the windows_extensions submodule could have legal implications but that’s another topic…

When we made this module internally we did not really consider this but you are right. I'm not sure about the nomenclature to be honest but maybe "win_extensions" or "nt_extensions" would be less problematic.

@uranusjr
Copy link
Member

The module name should be fine, I think. I was actually referring to the code inside it; Apache has some pretty strict rules on what code can be used in a project, and that module seems like copied from other projects which would be problematic.

@ashb
Copy link
Member

ashb commented May 27, 2021

The module name should be fine, I think. I was actually referring to the code inside it; Apache has some pretty strict rules on what code can be used in a project, and that module seems like copied from other projects which would be problematic.

So long as it is appropriately licensed, and you can attribute it then it's fine, and we'll put a section in the NOTICE file

@@ -34,6 +35,15 @@ def main():
os.environ['KRB5CCNAME'] = conf.get('kerberos', 'ccache')
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')

# if dags folder has to be set to configured value, make sure it is set properly (needed on Dask-Workers)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably not an ideal solution and I forgot to put this in from our repository in the first commit. On Windows the Dask-Worker tries to get the DAG at the location where it is stored on the Airflow server (e.g. /home/airflow/dags). So here we actually force it to use the DAG path in the config file... I don't know how this is handled on a setup involving only Linux machines.

Copy link
Member

@ashb ashb Jun 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the right fix for this.

There's a bug in a previous feature where the "dag folder" should be replaced with DAG_FOLDER which is then automaticall replaced, but this isn't working.

We should fix that rather than adding a new config and new way of making this work.

See

def process_subdir(subdir: Optional[str]):
"""Expands path to absolute by replacing 'DAGS_FOLDER', '~', '.', etc."""
if subdir:
if not settings.DAGS_FOLDER:
raise ValueError("DAGS_FOLDER variable in settings should be filled.")
subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER)
subdir = os.path.abspath(os.path.expanduser(subdir))
return subdir
for where the code is (the problem is on the command we send to the executor.)

#8061

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Also see #16423 - might be the right time to fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more comment here. If we are talking about scheduler running on Linux/Unix and worker running on Windows, we also have problem with directory separator (/ -> \) for dags that are in sub-folders.

I thought we could make airflow to accept the file URI instead in the command line: https://datatracker.ietf.org/doc/html/rfc8089 but it does not support relative paths (officially at least).

So probably the best approach is to make Airlfow replaces / to \ on windows.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Windows API can handle both forward and backward slashes (except some very old legacy stuff, none of which is used by Python’s path libraries), so it should be fine if we use forward slashes everywhere. For rare cases where some third-party packages rely on legacy APIs, we can call os.path.normpath to convert the slashes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah cool. Did not know that (not a Windows user :D)

I thought it's a Java-specfic behaviour only (it was part of the specification from the beginning). So no problem here.

@casra-developers
Copy link
Contributor Author

How should we go about the licensing topic? Should I copy the license note from python.org somewhere?

@potiuk
Copy link
Member

potiuk commented Dec 15, 2021

I feel your frustration in the question. So let me try to be helpful and explain a bit.

I'd advise you to continue rebasing and ping us when you see tests succed. There are conflicts now as you can see. And Ping everyone here when it succeds. In a way when you are a PR "author" it's part of the author's "job" to drag attention of others when you see your PR is ready to merge (green or you only see unrelated issues). This is the best you can do.

It becomes quite obvious if you try to put yourself in our shoes. For you this is only (or maybe one of a few) PRs that you care about. Yet we - as committers had 340 (!) PR merged overt the course of 1 month https://github.com/apache/airflow/pulse/monthly) . There are just a handful of commiters and this is ~20 PRs merged per working day. Some of them taking 20-30 comments on. As you can possibly imagine none of us is on a lookou to merge PRs at the moment they succeed. We have likely 100-200 completeed PR jobs a day. This is also a reason why sometimes main is broken as things slip through.

So it's simply much easier for you to pay an attention and ping use when you see things are "good" for the PR you "care" about.

I understand you would like to work in "fire and forget" mode, but simply this is difficult (but possibly merge queue feature which I wrote about earlier, will help with that).

Just as a general note, there is also the old saying In fact if something is more painful, do it more often. if you rebase more often, it pains less overall becaue a) you do it in smaller chunks, b) you learn how to do it fast. I've learned how to rebase my commits and resolve conflicts quickly during working on Airlfow.

So, apologies if it takes longer than you thought and that you have to do it several times, but this is how it works and best you can do is to "vet" your PR and be a little annoying (but not after some time - immediately when you see it is ready to merge).

I hope that is helpful and provides you the right context on why things are like that . Thanks for your understanding.

@casra-developers
Copy link
Contributor Author

@potiuk thank you for your very helpful advice. In my case it is less about frustration but more about the time I am able to spend on this topic. Since I am part of an SD Dev team at CASRA, these are company resources which are invested. The company has decided that it wants to support Open Source projects such as Airflow but obviously we have to commit to a budget.

With that said, it is true that the rebasing gets less painful every time I do it and therefore it is now something that I can do quite comfortably besides other tasks (including the testing on our Airflow-Linux-Windows test environment).

Something which I probably should have asked earlier is if it is expected of me to investigate those 4 failing checks or if this is something that will be fixed by just rebasing to the current main branch? To be honest, I looked at the checks but I cannot really see how they would relate to my changes therefore I did not feel responsible to investigate further.

Please let me know if I can assist in any way. Airflow is an awesome platform and I am happy to help within the scope available to me.

@potiuk
Copy link
Member

potiuk commented Dec 16, 2021

Sure. Yeah. I understand the time constrints. If in doubt - just rebase :)

@potiuk
Copy link
Member

potiuk commented Dec 16, 2021

If it does not help - ask here

@potiuk potiuk closed this Dec 16, 2021
@potiuk potiuk reopened this Dec 16, 2021
@potiuk
Copy link
Member

potiuk commented Dec 16, 2021

Ah .. closed/reopened by accident :)

@casra-developers
Copy link
Contributor Author

casra-developers commented Dec 16, 2021

@potiuk Resolved all conflicts and tested within our Linux-Windows setup.

@potiuk
Copy link
Member

potiuk commented Dec 16, 2021

🤞

@potiuk
Copy link
Member

potiuk commented Dec 16, 2021

You got unlucky on temporary docker unavailability :(. But also there was linux-only static check fail, I added a fixup to ignore it and committed it to re-run. 🤞 again.

@casra-developers
Copy link
Contributor Author

Did another rebase 👍

@potiuk
Copy link
Member

potiuk commented Dec 17, 2021

Did another rebase +1

I told you it gets the easier, the more often you do it ;)

@potiuk
Copy link
Member

potiuk commented Dec 17, 2021

Seems like the pwname not used is the last problem @casra-developers

@potiuk
Copy link
Member

potiuk commented Dec 19, 2021

Added a fixup to fix the static-checks

@casra-developers
Copy link
Contributor Author

@potiuk thank you very much for adding the #noqa in base_task_runner.py. I've rebased the branch 👍

@potiuk potiuk merged commit ee68a25 into apache:main Dec 20, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Dec 20, 2021

Awesome work, congrats on your first merged pull request!

@potiuk
Copy link
Member

potiuk commented Dec 20, 2021

🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉

@casra-developers
Copy link
Contributor Author

Awesome! Thanks everyone for the patience and support :-)

@potiuk
Copy link
Member

potiuk commented Dec 20, 2021

Likewise!

@ephraimbuddy ephraimbuddy added the type:new-feature Changelog: New Features label Apr 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:logging area:Scheduler Scheduler or dag parsing Issues type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants