Skip to content
Permalink
Browse files
Support glob syntax in .airflowignore files (#21392) (#22051)
A new configuration parameter "CORE_IGNORE_FILE_SYNTAX" is added to
allow patterns in .airflowignore files to be interpreted as either
regular expressions (the default) or glob expressions as found in
.gitignore files. This allows users to use patterns they will be
familiar with from tools such as git, helm and docker.

Glob expressions support wildcard matches ("*", "?") within a directory
as well as character classes ("[0-9]"). In addition, zero or more
directories can be matched using "**". Patterns can be negated by
prefixing a "!" at the beginning of the pattern.

The "fnmatch" library in core Python does not produce patterns that are
fully compliant with the kind of patterns that users will be used to
from gitignore or dockerignore files, so the globs are parsed using
the pathspec package from PyPI.

To aid with debugging ignorefile patterns a more helpful error
message is emitted in the logs for invalid patterns, which are
now skipped rather than causing a hard-to-read scheduler stack trace.

closes: #21392
  • Loading branch information
ianbuss committed Apr 13, 2022
1 parent 7331eef commit 0cd8833df74f4b0498026c4103bab130e1fc1068
Showing 16 changed files with 335 additions and 59 deletions.
@@ -233,6 +233,14 @@
type: string
example: ~
default: "True"
- name: dag_ignore_file_syntax
description: |
The pattern syntax used in the ".airflowignore" files in the DAG directories. Valid values are
``regexp`` or ``glob``.
version_added: 2.3.0
type: string
example: ~
default: "regexp"
- name: default_task_retries
description: |
The number of retries each task is going to have by default. Can be overridden at dag or task level.
@@ -139,6 +139,10 @@ dag_run_conf_overrides_params = True
# When discovering DAGs, ignore any files that don't contain the strings ``DAG`` and ``airflow``.
dag_discovery_safe_mode = True

# The pattern syntax used in the ".airflowignore" files in the DAG directories. Valid values are
# ``regexp`` or ``glob``.
dag_ignore_file_syntax = regexp

# The number of retries each task is going to have by default. Can be overridden at dag or task level.
default_task_retries = 0

@@ -245,6 +245,7 @@ class AirflowConfigParser(ConfigParser):
_available_logging_levels = ['CRITICAL', 'FATAL', 'ERROR', 'WARN', 'WARNING', 'INFO', 'DEBUG']
enums_options = {
("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
('core', 'mp_start_method'): multiprocessing.get_all_start_methods(),
("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"],
("logging", "logging_level"): _available_logging_levels,
@@ -494,11 +494,12 @@ def collect_dags(
Note that if a ``.airflowignore`` file is found while processing
the directory, it will behave much like a ``.gitignore``,
ignoring files that match any of the regex patterns specified
ignoring files that match any of the patterns specified
in the file.
**Note**: The patterns in .airflowignore are treated as
un-anchored regexes, not shell-like glob patterns.
**Note**: The patterns in ``.airflowignore`` are interpreted as either
un-anchored regexes or gitignore-like glob expressions, depending on
the ``DAG_IGNORE_FILE_SYNTAX`` configuration parameter.
"""
if self.read_dags_from_db:
return
@@ -20,8 +20,12 @@
import os
import re
import zipfile
from collections import OrderedDict
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Pattern, Union, overload
from typing import TYPE_CHECKING, Dict, Generator, List, NamedTuple, Optional, Pattern, Type, Union, overload

from pathspec.patterns import GitWildMatchPattern
from typing_extensions import Protocol

from airflow.configuration import conf

@@ -31,6 +35,88 @@
log = logging.getLogger(__name__)


class _IgnoreRule(Protocol):
"""Interface for ignore rules for structural subtyping"""

@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) -> Optional['_IgnoreRule']:
pass

@staticmethod
def match(path: Path, rules: List['_IgnoreRule']) -> bool:
pass


class _RegexpIgnoreRule(NamedTuple):
"""Typed namedtuple with utility functions for regexp ignore rules"""

pattern: Pattern
base_dir: Path

@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) -> Optional[_IgnoreRule]:
"""Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid"""
try:
return _RegexpIgnoreRule(re.compile(pattern), base_dir.resolve())
except re.error as e:
log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e)
return None

@staticmethod
def match(path: Path, rules: List[_IgnoreRule]) -> bool:
"""Match a list of ignore rules against the supplied path"""
test_path: Path = path.resolve()
for rule in rules:
if not isinstance(rule, _RegexpIgnoreRule):
raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}")
if rule.pattern.search(str(test_path.relative_to(rule.base_dir))) is not None:
return True
return False


class _GlobIgnoreRule(NamedTuple):
"""Typed namedtuple with utility functions for glob ignore rules"""

pattern: Pattern
raw_pattern: str
include: Optional[bool] = None
relative_to: Optional[Path] = None

@staticmethod
def compile(pattern: str, _, definition_file: Path) -> Optional[_IgnoreRule]:
"""Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid"""
relative_to: Optional[Path] = None
if pattern.strip() == "/":
# "/" doesn't match anything in gitignore
log.warning("Ignoring no-op glob pattern '/' from %s", definition_file)
return None
if pattern.startswith("/") or "/" in pattern.rstrip("/"):
# See https://git-scm.com/docs/gitignore
# > If there is a separator at the beginning or middle (or both) of the pattern, then the
# > pattern is relative to the directory level of the particular .gitignore file itself.
# > Otherwise the pattern may also match at any level below the .gitignore level.
relative_to = definition_file.resolve().parent
ignore_pattern = GitWildMatchPattern(pattern)
return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to)

@staticmethod
def match(path: Path, rules: List[_IgnoreRule]) -> bool:
"""Match a list of ignore rules against the supplied path"""
test_path: Path = path.resolve()
matched = False
for r in rules:
if not isinstance(r, _GlobIgnoreRule):
raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(r)}")
rule: _GlobIgnoreRule = r # explicit typing to make mypy play nicely
rel_path = str(test_path.relative_to(rule.relative_to) if rule.relative_to else test_path.name)
if rule.raw_pattern.endswith("/") and test_path.is_dir():
# ensure the test path will potentially match a directory pattern if it is a directory
rel_path += "/"
if rule.include is not None and rule.pattern.match(rel_path) is not None:
matched = rule.include
return matched


def TemporaryDirectory(*args, **kwargs):
"""This function is deprecated. Please use `tempfile.TemporaryDirectory`"""
import warnings
@@ -108,46 +194,77 @@ def open_maybe_zipped(fileloc, mode='r'):
return open(fileloc, mode=mode)


def find_path_from_directory(base_dir_path: str, ignore_file_name: str) -> Generator[str, None, None]:
def _find_path_from_directory(
base_dir_path: str,
ignore_file_name: str,
ignore_rule_type: Type[_IgnoreRule],
) -> Generator[str, None, None]:
"""
Search the file and return the path of the file that should not be ignored.
:param base_dir_path: the base path to be searched for.
:param ignore_file_name: the file name in which specifies a regular expression pattern is written.
Recursively search the base path and return the list of file paths that should not be ignored by
regular expressions in any ignore files at each directory level.
:param base_dir_path: the base path to be searched
:param ignore_file_name: the file name containing regular expressions for files that should be ignored.
:param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface.
:return : file path not to be ignored.
:return: a generator of file paths which should not be ignored.
"""
patterns_by_dir: Dict[str, List[Pattern[str]]] = {}

for root, dirs, files in os.walk(str(base_dir_path), followlinks=True):
patterns: List[Pattern[str]] = patterns_by_dir.get(root, [])

ignore_file_path = os.path.join(root, ignore_file_name)
if os.path.isfile(ignore_file_path):
with open(ignore_file_path) as file:
lines_no_comments = [re.sub(r"\s*#.*", "", line) for line in file.read().split("\n")]
patterns += [re.compile(line) for line in lines_no_comments if line]
patterns = list(set(patterns))

dirs[:] = [
subdir
for subdir in dirs
if not any(
p.search(os.path.join(os.path.relpath(root, str(base_dir_path)), subdir)) for p in patterns
)
]

patterns_by_dir.update({os.path.join(root, sd): patterns.copy() for sd in dirs})

for file in files: # type: ignore
patterns_by_dir: Dict[Path, List[_IgnoreRule]] = {}

for root, dirs, files in os.walk(base_dir_path, followlinks=True):
patterns: List[_IgnoreRule] = patterns_by_dir.get(Path(root), [])

ignore_file_path = Path(root) / ignore_file_name
if ignore_file_path.is_file():
with open(ignore_file_path) as ifile:
lines_no_comments = [re.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")]
# append new patterns and filter out "None" objects, which are invalid patterns
patterns += [
p
for p in [
ignore_rule_type.compile(line, Path(base_dir_path), ignore_file_path)
for line in lines_no_comments
if line
]
if p is not None
]
# evaluation order of patterns is important with negation
# so that later patterns can override earlier patterns
patterns = list(OrderedDict.fromkeys(patterns).keys())

dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)]

patterns_by_dir.update({Path(root) / sd: patterns.copy() for sd in dirs})

for file in files:
if file == ignore_file_name:
continue
abs_file_path = os.path.join(root, str(file))
rel_file_path = os.path.join(os.path.relpath(root, str(base_dir_path)), str(file))
if any(p.search(rel_file_path) for p in patterns):
abs_file_path = Path(root) / file
if ignore_rule_type.match(abs_file_path, patterns):
continue
yield str(abs_file_path)


def find_path_from_directory(
base_dir_path: str,
ignore_file_name: str,
ignore_file_syntax: str = conf.get('core', 'DAG_IGNORE_FILE_SYNTAX', fallback="regexp"),
) -> Generator[str, None, None]:
"""
Recursively search the base path and return the list of file paths that should not be ignored.
:param base_dir_path: the base path to be searched
:param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored
:param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob
:return: a generator of file paths.
"""
if ignore_file_syntax == "glob":
return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule)
elif ignore_file_syntax == "regexp" or not ignore_file_syntax:
return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule)
else:
raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")


def list_py_file_paths(
directory: Union[str, "pathlib.Path"],
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE', fallback=True),
@@ -132,7 +132,7 @@ While both DAG constructors get called when the file is accessed, only ``dag_1``

To consider all Python files instead, disable the ``DAG_DISCOVERY_SAFE_MODE`` configuration flag.

You can also provide an ``.airflowignore`` file inside your ``DAG_FOLDER``, or any of its subfolders, which describes files for the loader to ignore. It covers the directory it's in plus all subfolders underneath it, and should be one regular expression per line, with ``#`` indicating comments.
You can also provide an ``.airflowignore`` file inside your ``DAG_FOLDER``, or any of its subfolders, which describes patterns of files for the loader to ignore. It covers the directory it's in plus all subfolders underneath it. See :ref:`.airflowignore <concepts:airflowignore>` below for details of the file syntax.


.. _concepts:dag-run:
@@ -700,26 +700,55 @@ Note that packaged DAGs come with some caveats:

In general, if you have a complex set of compiled dependencies and modules, you are likely better off using the Python ``virtualenv`` system and installing the necessary packages on your target systems with ``pip``.

.. _concepts:airflowignore:

``.airflowignore``
------------------

A ``.airflowignore`` file specifies the directories or files in ``DAG_FOLDER``
or ``PLUGINS_FOLDER`` that Airflow should intentionally ignore.
Each line in ``.airflowignore`` specifies a regular expression pattern,
and directories or files whose names (not DAG id) match any of the patterns
would be ignored (under the hood, ``Pattern.search()`` is used to match the pattern).
Overall it works like a ``.gitignore`` file.
Use the ``#`` character to indicate a comment; all characters
An ``.airflowignore`` file specifies the directories or files in ``DAG_FOLDER``
or ``PLUGINS_FOLDER`` that Airflow should intentionally ignore. Airflow supports
two syntax flavors for patterns in the file, as specified by the ``DAG_IGNORE_FILE_SYNTAX``
configuration parameter (*added in Airflow 2.3*): ``regexp`` and ``glob``.

.. note::

The default ``DAG_IGNORE_FILE_SYNTAX`` is ``regexp`` to ensure backwards compatibility.

For the ``regexp`` pattern syntax (the default), each line in ``.airflowignore``
specifies a regular expression pattern, and directories or files whose names (not DAG id)
match any of the patterns would be ignored (under the hood, ``Pattern.search()`` is used
to match the pattern). Use the ``#`` character to indicate a comment; all characters
on a line following a ``#`` will be ignored.

``.airflowignore`` file should be put in your ``DAG_FOLDER``.
For example, you can prepare a ``.airflowignore`` file with content
With the ``glob`` syntax, the patterns work just like those in a ``.gitignore`` file:

* The ``*`` character will any number of characters, except ``/``
* The ``?`` character will match any single character, except ``/``
* The range notation, e.g. ``[a-zA-Z]``, can be used to match one of the characters in a range
* A pattern can be negated by prefixing with ``!``. Patterns are evaluated in order so
a negation can override a previously defined pattern in the same file or patterns defined in
a parent directory.
* A double asterisk (``**``) can be used to match across directories. For example, ``**/__pycache__/``
will ignore ``__pycache__`` directories in each sub-directory to infinite depth.
* If there is a ``/`` at the beginning or middle (or both) of the pattern, then the pattern
is relative to the directory level of the particular .airflowignore file itself. Otherwise the
pattern may also match at any level below the .airflowignore level.

The ``.airflowignore`` file should be put in your ``DAG_FOLDER``. For example, you can prepare
a ``.airflowignore`` file using the ``regexp`` syntax with content

.. code-block::

project_a
tenant_[\d]

Or, equivalently, in the ``glob`` syntax

.. code-block::

**/*project_a*
tenant_[0-9]*
Then files like ``project_a_dag_1.py``, ``TESTING_project_a.py``, ``tenant_1.py``,
``project_a/dag_1.py``, and ``tenant_1/dag_1.py`` in your ``DAG_FOLDER`` would be ignored
(If a directory's name matches any of the patterns, this directory and all its subfolders
@@ -80,8 +80,8 @@ Then you can import and use the ``ALL_TASKS`` constant in all your DAGs like tha
pass
Don't forget that in this case you need to add empty ``__init__.py`` file in the ``my_company_utils`` folder
and you should add the ``my_company_utils/.*`` line to ``.airflowignore`` file, so that the whole folder is
ignored by the scheduler when it looks for DAGs.
and you should add the ``my_company_utils/.*`` line to ``.airflowignore`` file (if using the regexp ignore
syntax), so that the whole folder is ignored by the scheduler when it looks for DAGs.


Dynamic DAGs with external configuration from a structured data file
@@ -117,9 +117,9 @@ In the case above, these are the ways you could import the python files:
You can see the ``.airflowignore`` file at the root of your folder. This is a file that you can put in your
``dags`` folder to tell Airflow which files from the folder should be ignored when the Airflow
scheduler looks for DAGs. It should contain regular expressions for the paths that should be ignored. You
do not need to have that file in any other folder in ``PYTHONPATH`` (and also you can only keep
shared code in the other folders, not the actual DAGs).
scheduler looks for DAGs. It should contain either regular expressions (the default) or glob expressions
for the paths that should be ignored. You do not need to have that file in any other folder in
``PYTHONPATH`` (and also you can only keep shared code in the other folders, not the actual DAGs).

In the example above the dags are only in ``my_custom_dags`` folder, the ``common_package`` should not be
scanned by scheduler when searching for DAGS, so we should ignore ``common_package`` folder. You also
@@ -131,6 +131,13 @@ from. Your ``.airflowignore`` should look then like this:
my_company/common_package/.*
my_company/my_custom_dags/base_dag\.py

If ``DAG_IGNORE_FILE_SYNTAX`` is set to ``glob``, the equivalent ``.airflowignore`` file would be:

.. code-block:: none

my_company/common_package/
my_company/my_custom_dags/base_dag.py

Built-in ``PYTHONPATH`` entries in Airflow
------------------------------------------

@@ -142,6 +142,7 @@ install_requires =
markupsafe>=1.1.1,<2.1.0
marshmallow-oneofschema>=2.0.1
packaging>=14.0
pathspec~=0.9.0
pendulum>=2.0
pluggy>=1.0
psutil>=4.2.0
@@ -1 +1,3 @@
.*_invalid.* # Skip invalid files
subdir3 # Skip the nested subdir3 directory
# *badrule # This rule is an invalid regex. It would be warned about and skipped.

0 comments on commit 0cd8833

Please sign in to comment.