Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-1561] Fix scheduler to pick up example DAGs without other DAGs #2635

Merged
merged 1 commit into from
Nov 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
simple_dags = []

try:
dagbag = models.DagBag(file_path)
dagbag = models.DagBag(file_path, include_examples=False)
except Exception:
self.log.exception("Failed at reloading the DAG file %s", file_path)
Stats.incr('dag_file_refresh_error', 1, 1)
Expand Down
12 changes: 4 additions & 8 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,7 @@ def __init__(
self.import_errors = {}
self.has_logged = False

if include_examples:
example_dag_folder = os.path.join(
os.path.dirname(__file__),
'example_dags')
self.collect_dags(example_dag_folder)
self.collect_dags(dag_folder)
self.collect_dags(dag_folder, include_examples)

def size(self):
"""
Expand Down Expand Up @@ -531,7 +526,8 @@ def bag_dag(self, dag, parent_dag, root_dag):
def collect_dags(
self,
dag_folder=None,
only_if_updated=True):
only_if_updated=True,
include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES')):
"""
Given a file path or a folder, this method looks for python modules,
imports them and adds them to the dagbag collection.
Expand All @@ -551,7 +547,7 @@ def collect_dags(
stats = []
FileLoadStat = namedtuple(
'FileLoadStat', "file duration dag_num task_num dags")
for filepath in list_py_file_paths(dag_folder):
for filepath in list_py_file_paths(dag_folder, include_examples):
try:
ts = timezone.utcnow()
found_dags = self.process_file(
Expand Down
7 changes: 6 additions & 1 deletion airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ def get_dag(self, dag_id):
return self.dag_id_to_simple_dag[dag_id]


def list_py_file_paths(directory, safe_mode=True):
def list_py_file_paths(directory, safe_mode=True,
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES')):
"""
Traverse a directory and look for Python files.

Expand Down Expand Up @@ -284,6 +285,10 @@ def list_py_file_paths(directory, safe_mode=True):
except Exception:
log = LoggingMixin().log
log.exception("Error while examining %s", f)
if include_examples:
import airflow.example_dags
example_dag_folder = airflow.example_dags.__path__[0]
file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, False))
return file_paths


Expand Down
14 changes: 13 additions & 1 deletion tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from airflow import AirflowException, settings, models
from airflow import configuration
from airflow.bin import cli
import airflow.example_dags
from airflow.executors import BaseExecutor, SequentialExecutor
from airflow.jobs import BaseJob, BackfillJob, SchedulerJob, LocalTaskJob
from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
Expand Down Expand Up @@ -3335,7 +3336,18 @@ def test_list_py_file_paths(self):
if file_name not in ignored_files:
expected_files.add(
'{}/{}'.format(TEST_DAGS_FOLDER, file_name))
for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
for file_path in list_py_file_paths(TEST_DAGS_FOLDER, include_examples=False):
detected_files.add(file_path)
self.assertEqual(detected_files, expected_files)

example_dag_folder = airflow.example_dags.__path__[0]
for root, dirs, files in os.walk(example_dag_folder):
for file_name in files:
if file_name.endswith('.py') or file_name.endswith('.zip'):
if file_name not in ['__init__.py']:
expected_files.add(os.path.join(root, file_name))
detected_files.clear()
for file_path in list_py_file_paths(TEST_DAGS_FOLDER, include_examples=True):
detected_files.add(file_path)
self.assertEqual(detected_files, expected_files)

Expand Down