Skip to content

Commit

Permalink
[AIRFLOW-3932] Update unit tests and documentation for safe mode flag. (
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcarp authored and Tao Feng committed Feb 25, 2019
1 parent 31cd02f commit c50a851
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 8 deletions.
15 changes: 10 additions & 5 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ def __init__(
self,
dag_folder=None,
executor=None,
include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES')):
include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode=configuration.conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):

# do not use default arg in signature, to fix import cycle on plugin load
if executor is None:
Expand All @@ -304,7 +305,10 @@ def __init__(
self.import_errors = {}
self.has_logged = False

self.collect_dags(dag_folder=dag_folder, include_examples=include_examples)
self.collect_dags(
dag_folder=dag_folder,
include_examples=include_examples,
safe_mode=safe_mode)

def size(self):
"""
Expand Down Expand Up @@ -539,7 +543,8 @@ def collect_dags(
self,
dag_folder=None,
only_if_updated=True,
include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES')):
include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode=configuration.conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):
"""
Given a file path or a folder, this method looks for python modules,
imports them and adds them to the dagbag collection.
Expand All @@ -560,13 +565,13 @@ def collect_dags(
FileLoadStat = namedtuple(
'FileLoadStat', "file duration dag_num task_num dags")

safe_mode = configuration.conf.getboolean('core', 'dag_discovery_safe_mode')
for filepath in list_py_file_paths(dag_folder, safe_mode=safe_mode,
include_examples=include_examples):
try:
ts = timezone.utcnow()
found_dags = self.process_file(
filepath, only_if_updated=only_if_updated)
filepath, only_if_updated=only_if_updated,
safe_mode=safe_mode)

td = timezone.utcnow() - ts
td = td.total_seconds() + (
Expand Down
6 changes: 4 additions & 2 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ the ``DAG`` objects. You can have as many DAGs as you want, each describing an
arbitrary number of tasks. In general, each one should correspond to a single
logical workflow.

.. note:: When searching for DAGs, Airflow will only consider files where the string
"airflow" and "DAG" both appear in the contents of the ``.py`` file.
.. note:: When searching for DAGs, Airflow only considers python files
that contain the strings "airflow" and "DAG" by default. To consider
all python files instead, disable the ``DAG_DISCOVERY_SAFE_MODE``
configuration flag.

Scope
-----
Expand Down
38 changes: 37 additions & 1 deletion tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import unittest
import urllib
import uuid
import shutil
from tempfile import NamedTemporaryFile, mkdtemp

import pendulum
Expand Down Expand Up @@ -1443,7 +1444,7 @@ def setUpClass(cls):

@classmethod
def tearDownClass(cls):
os.rmdir(cls.empty_dir)
shutil.rmtree(cls.empty_dir)

def test_get_existing_dag(self):
"""
Expand Down Expand Up @@ -1479,6 +1480,41 @@ def test_dont_load_example(self):

self.assertEqual(dagbag.size(), 0)

def test_safe_mode_heuristic_match(self):
"""With safe mode enabled, a file matching the discovery heuristics
should be discovered.
"""
with NamedTemporaryFile(dir=self.empty_dir, suffix=".py") as fp:
fp.write("# airflow".encode())
fp.write("# DAG".encode())
fp.flush()
dagbag = models.DagBag(
dag_folder=self.empty_dir, include_examples=False, safe_mode=True)
self.assertEqual(len(dagbag.dagbag_stats), 1)
self.assertEqual(
dagbag.dagbag_stats[0].file,
"/{}".format(os.path.basename(fp.name)))

def test_safe_mode_heuristic_mismatch(self):
"""With safe mode enabled, a file not matching the discovery heuristics
should not be discovered.
"""
with NamedTemporaryFile(dir=self.empty_dir, suffix=".py"):
dagbag = models.DagBag(
dag_folder=self.empty_dir, include_examples=False, safe_mode=True)
self.assertEqual(len(dagbag.dagbag_stats), 0)

def test_safe_mode_disabled(self):
"""With safe mode disabled, an empty python file should be discovered.
"""
with NamedTemporaryFile(dir=self.empty_dir, suffix=".py") as fp:
dagbag = models.DagBag(
dag_folder=self.empty_dir, include_examples=False, safe_mode=False)
self.assertEqual(len(dagbag.dagbag_stats), 1)
self.assertEqual(
dagbag.dagbag_stats[0].file,
"/{}".format(os.path.basename(fp.name)))

def test_process_file_that_contains_multi_bytes_char(self):
"""
test that we're able to parse file that contains multi-byte char
Expand Down

0 comments on commit c50a851

Please sign in to comment.