Skip to content
Permalink
Browse files

Put airflow DAG generation under test

Test Plan: Changes are under test

Reviewers: max, schrockn

Reviewed By: schrockn

Differential Revision: https://dagster.phacility.com/D113
  • Loading branch information...
natekupp committed May 13, 2019
1 parent 4d653a6 commit 34559f1a06fdd7f3600bc5ff662e919c96df9553
@@ -175,14 +175,15 @@ def airflow_tests():
.run(
"cd python_modules/dagster-airflow/dagster_airflow_tests/test_project",
"./build.sh",
"mkdir -p /home/circleci/airflow",
"mkdir -p /airflow",
"export AIRFLOW_HOME=/airflow",
"cd ../../",
"pip install tox",
"tox -e {ver}".format(ver=TOX_MAP[version]),
"mv .coverage {file}".format(file=coverage),
"buildkite-agent artifact upload {file}".format(file=coverage),
)
.on_integration_image(version)
.on_integration_image(version, ['AIRFLOW_HOME'])
.build()
)
return tests
@@ -1,9 +1,11 @@
import contextlib
import os
import sys

import click
import yaml

from dagster.core.definitions.entrypoint import LoaderEntrypoint
from dagster.utils.indenting_printer import IndentingStringIoPrinter

if sys.version_info.major >= 3:
@@ -23,7 +25,8 @@ def _construct_yml(config_file, dag_name):
'filesystem': {'base_dir': '/tmp/dagster-airflow/{}'.format(dag_name)}
}

with StringIO() as f:
# See http://bit.ly/309sTOu
with contextlib.closing(StringIO()) as f:
yaml.dump(configs, f, default_flow_style=False, allow_unicode=True)
return f.getvalue()

@@ -53,6 +56,10 @@ def scaffold(dag_name, module_name, fn_name, output_path, config_file):
if not output_path:
raise 'You must specify --output-path or set AIRFLOW_HOME to use this script.'

# Load the pipeline to determine the pipeline name
pipeline = LoaderEntrypoint.from_module_target(module_name, fn_name).perform_load()
pipeline_name = pipeline.name

# We construct the YAML config and then put it directly in the DAG file
yml = _construct_yml(config_file, dag_name)

@@ -71,19 +78,9 @@ def scaffold(dag_name, module_name, fn_name, output_path, config_file):
printer.line('import datetime')
printer.line('import yaml')
printer.blank_line()

printer.line('from dagster import RepositoryTargetInfo')
printer.line('from dagster_airflow.factory import make_airflow_dag')
printer.blank_line()
printer.comment(
'NOTE: you must ensure that {module_name} is installed or available on sys.path.'.format(
module_name=module_name
)
)
printer.comment('otherwise, this import will fail.')
printer.line(
'from {module_name} import {fn_name}'.format(module_name=module_name, fn_name=fn_name)
)
printer.blank_line()
printer.blank_line()
printer.line('CONFIG = \'\'\'')
printer.line(yml)
@@ -103,13 +100,29 @@ def scaffold(dag_name, module_name, fn_name, output_path, config_file):
printer.blank_line()
printer.line('dag, tasks = make_airflow_dag(')
with printer.with_indent():
printer.line('pipeline={fn_name}(),'.format(fn_name=fn_name))
printer.comment(
'NOTE: you must ensure that {module_name} is installed or available on sys.path'.format(
module_name=module_name
)
)
printer.comment('otherwise, this import will fail.')
printer.line('repository_target_info=RepositoryTargetInfo(')
with printer.with_indent():
printer.line(
'module_name=\'{module_name}\', fn_name=\'{fn_name}\''.format(
module_name=module_name, fn_name=fn_name
)
)
printer.line('),')
printer.line('pipeline_name=\'{pipeline_name}\','.format(pipeline_name=pipeline_name))
printer.line("env_config=yaml.load(CONFIG),")
printer.line("dag_kwargs={'default_args': DEFAULT_ARGS, 'max_active_runs': 1}")
printer.line(')')

# Ensure output_path/dags exists
os.makedirs(os.path.join(output_path, 'dags'), exist_ok=True)
dags_path = os.path.join(output_path, 'dags')
if not os.path.isdir(dags_path):
os.makedirs(dags_path)

dag_file = os.path.join(output_path, 'dags', dag_name + '.py')
with open(dag_file, 'wb') as f:
@@ -0,0 +1,33 @@
import os
import subprocess


def test_build_dags():
'''This test generates Airflow DAGs for several pipelines in examples/toys and writes those DAGs
to $AIRFLOW_HOME/dags.
By invoking DagBag() below, an Airflow DAG refresh is triggered. If there are any failures in
DAG parsing, DagBag() will add an entry to its import_errors property.
By exercising this path, we ensure that our codegen continues to generate valid Airflow DAGs,
and that Airflow is able to successfully parse our DAGs.
'''

path = os.path.dirname(os.path.realpath(__file__))
create_airflow_dags_script_path = os.path.join(path, '..', 'scripts/create_airflow_dags.sh')

p = subprocess.Popen(
create_airflow_dags_script_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate()

print(stdout)
print(stderr)

assert p.returncode == 0

# This forces Airflow to refresh DAGs; see https://stackoverflow.com/a/50356956/11295366
from airflow.models import DagBag

# If Airflow hits an import error, it will add an entry to this dict
assert not DagBag().import_errors
@@ -1,8 +1,8 @@
#!/bin/bash

set -ex

dagster-airflow scaffold --dag-name toys_log_spew --module-name toys.log_spew --fn-name define_spew_pipeline
dagster-airflow scaffold --dag-name toys_many_events --module-name toys.many_events --fn-name define_many_events_pipeline
dagster-airflow scaffold --dag-name toys_resources --module-name toys.resources --fn-name define_resource_pipeline
dagster-airflow scaffold --dag-name toys_sleepy --module-name toys.sleepy --fn-name define_sleepy_pipeline

# This forces Airflow to refresh DAGs; see https://stackoverflow.com/a/50356956/11295366
python -c "from airflow.models import DagBag; d = DagBag();"
@@ -14,6 +14,7 @@ deps =
-e ../dagster
-e ../dagster-graphql
-e ../libraries/dagster-aws
-e ../../examples/toys
-r dev-requirements.txt
commands =
{envpython} --version

0 comments on commit 34559f1

Please sign in to comment.
You can’t perform that action at this time.