-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support fnmatch pattern in SFTP Hook and Sensor #17436
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
I think my #15409 (comment) still applies. |
We are using ghcr.io as image cache for our CI builds and Breeze and it seems ghcr.io is being "rebuilt" while running. We had been using "airflow-.." image convention before, bacause multiple nesting levels of images were not supported, however we experienced errors recently with pushing 2.1 images (https://issues.apache.org/jira/browse/INFRA-22124) and during investigation it turned out, that it is possible now to use "/" in the name of the image, and while it still does not introduce multiple nesting levels and folder structure, the UI of GitHub treats it like that and if you have image which starts wiht "airflow/", the airflow prefix is stripped out and you can also have even more "/" in then name to introduce further hierarchy. Since we have to change image naming convention due to (still unresolved) bug with no permission to push the v2-1-test image we've decided to change naming convention for all our cache images to follow this - now available - "/" connvention to make it better structured and easier to manage/understand. Some more optimisations are implemented - Python, prod-build and ci-manifest images are only pushed when "latest" image is prepared. They are not needed for the COMMIT builds because we only need final images for those builds. This simplified the code quite a bit. The push of cache image in CI is done in one job for both CI and PROD images and the image is rebuilt again with latest constraints, to account for the latest constraints but to make sure that UPGRADE_TO_NEWER_DEPENDENCIES is not set during the build (which invalidates the cache for next non-upgrade builds) Backwards-compatibility was implemented to allow PRs that have not been upgraded to continue building after this one is merged, also a workaround has been implemented to make this change to work even if it is not merged yet to main. This "legacy" mode will be removed in ~week when everybody rebase on top of main. Documentation is updated reflecting those changes.
Add missing exception handling in success/retry/failure callbacks
…orceHook (#17399) Adding other auth type inputs to SalesforceHook
Adding myself to INTHEWILD
CeleryKubernetesExecutor lets us use both celery and kubernetes executors. KEDA lets us scale down to zero when there are no celery tasks running. If we have no celery tasks running, and we run a k8s task, then KEDA will launch a worker even though there are still no celery tasks. We can prevent this from happening by ignoring the kubernetes queue in the KEDA query.
We are preparing Helm chart from main branch only and we never run it from airflow version branches (similarly as providers) This change disables Helm Chart tests in case default branch is different than main.
This test caused missing failure that manifested in 2.1.1 in the #16810
There was a change in #16521 that introduced schema field in DBApiHook, but unfortunately using it in provider Hooks deriving from DBApiHook is backwards incompatible for Airflow 2.1 and below. This caused Postgres 2.1.0 release backwards incompatibility and failures for Airflow 2.1.0. Since the change is small and most of DBApi-derived hooks already set the schema field on their own, the best approach is to make the schema field private for the DBApiHook and make a change in Postgres Hook to store the schema in the same way as all other operators. Fixes: #17422
…#17486) We have a global limit (60 seconds) for individual test execution, however 'test_airflow_context' of the Python Virtualenv test might take longer in case they are run in parallel - because they are using dill serialization including a lot of serializable data from the context of the task. We give the test 120 seconds to complete now.
When we build production image during test in main, we install providers from current sources, to make sure that all the tests including Helm Chart/Kubernetes tests are using latest sources for providers. However, when we build the prod image during v* branches, we want to build the production image using latest released providers instead, because this will be the way it will be built shortly when we release it. We do not run providers test not helm chart tests in this branch so it is more important to build the image in the way it will be built for releases - we run verification then and install dependencies in the very same way it will be done during release.
We get a lot of bug reports for providers, and it's rare that the versions being used are in the initial report. Let's ask for them.
Update in the wild with our organization.
The resource check in breeze was slow (3 docker commands instead of one) and it used an extra image which needed to be downloaded. The new check uses already available airflow CI image and it performs all check in one docker command - thus is a lot faster and it also checks the image at the same time.
There are several ways people might get the quick-start docker-compose running messed up (especially on linux): 1) they do not run initialization steps and run docker-compose-up 2) they do not run docker-compose-init first Also on MacOS/Windows default memory/disk settings are not enough to run Airflow via docker-compose and people are reporting "Airflow not working" where they simply do not allocate enough resources. Finally the docker compose does not support all versions of airflow and various problems might occur when you use this docker compose with old version of airflow. This change adds the following improvements: * automated check of minimum version of airflow supported * mkdir -p in the directories creation in instructions * automated checking if AIRFLOW_UID has been set (and printing error and instruction link in case it is not) * prints warning about too-low memory, cpu, disk allocation and instruction link where to read about it * automated fixing of ownership of the directories created in case they were not created initially and ended up owned by root user
This reverts commit 763860c.
It seems like your rebase went wrong. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like your rebase went wrong.
very wrong.... Sorry for this!
Should I close this PR and create a cleaner one?
airflow/providers/sftp/hooks/sftp.py
Outdated
def retrieve_file( | ||
self, remote_full_path: str, local_full_path: str, fnmatch_pattern: Optional[str] = None | ||
) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
airflow/providers/sftp/hooks/sftp.py
Outdated
if fnmatch_pattern: | ||
for file in conn.listdir(path): | ||
if fnmatch(file, fnmatch_pattern): | ||
path = file | ||
break | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uranusjr , how about this method called by the SFTPSensor?
Should we create a different method to be able to catch all the files matching the pattern or we just wait for the the first of the list as it is implemented now?
if not self.fnmatch_pattern: | ||
self.log.info('Poking for %s', self.path) | ||
try: | ||
mod_time = self.hook.get_mod_time(self.path) | ||
self.log.info('Found File %s last modified: %s', str(self.path), str(mod_time)) | ||
except OSError as e: | ||
if e.errno != SFTP_NO_SUCH_FILE: | ||
raise e | ||
return False | ||
else: | ||
self.log.info('Poking for files matching pattern %s into %s', self.fnmatch_pattern, self.path) | ||
try: | ||
mod_time = self.hook.get_mod_time_pattern(self.path, self.fnmatch_pattern) | ||
self.log.info('Found Files last modified %s', str(mod_time)) | ||
except OSError as e: | ||
if e.errno != SFTP_NO_SUCH_FILE: | ||
raise e | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we have two options:
- not having specified
fnmatch_pattern
- having specified it
In the first case, the parameter path
is as usual the full path to the file.
In the second case, the path
will be the path to the directory whilst fnmatch_pattern
will be useful to catch all the files we are interested in.
In this second case, the new method get_mod_time_pattern, which returns a dictionaries with full paths to files as keys and their modification time as values.
:type fnmatch_pattern: str | ||
""" | ||
conn = self.get_conn() | ||
for file in conn.listdir(dir_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is wrong because it should raise errors in case no matches are found, at the moment it's not
I'm going ahead closing this because I realized that the contribution is far from being ready to merge. |
Effort to give users the possibility to just operates on specific files using the SFTP provider. Please refer to the old PR below for details.
related
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.