Skip to content
Permalink
Browse files

Surface step skipped in Airflow

Test Plan: unit

Reviewers: #ft, schrockn

Reviewed By: #ft, schrockn

Subscribers: schrockn

Differential Revision: https://dagster.phacility.com/D712
  • Loading branch information...
natekupp committed Jul 26, 2019
1 parent 4d01d8e commit 9cf1616c61a7393b9f1ddaba5a2eadadd50e5608
@@ -2,6 +2,8 @@

from collections import defaultdict, OrderedDict

from dagster import check


def _coalesce_solid_order(execution_plan):
solid_order = [s.solid_handle.to_string() for s in execution_plan.topological_steps()]
@@ -23,6 +25,12 @@ def coalesce_execution_steps(execution_plan):
for solid_handle, solid_steps in itertools.groupby(
execution_plan.topological_steps(), lambda x: x.solid_handle.to_string()
):
steps[solid_handle] += list(solid_steps)

solid_steps = list(solid_steps)
steps[solid_handle] += solid_steps
check.invariant(
len(solid_steps) == 1,
'Saw {num_steps} execution steps for solid {solid_handle}, expected only one.'.format(
num_steps=len(solid_steps), solid_handle=solid_handle
),
)
return OrderedDict([(solid_handle, steps[solid_handle]) for solid_handle in solid_order])
@@ -1,18 +1,21 @@
'''The dagster-airflow operators.'''
import ast
import datetime
import json
import logging
import os

from contextlib import contextmanager

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, SkipMixin
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.file import TemporaryDirectory
from docker import APIClient, from_env

from dagster import check, seven
from dagster import check, seven, DagsterEventType
from dagster.core.events import DagsterEvent
from dagster_graphql.client.mutations import execute_start_pipeline_execution_query
from dagster_graphql.client.query import START_PIPELINE_EXECUTION_QUERY

@@ -29,6 +32,18 @@
LINE_LENGTH = 100


class DagsterSkipMixin(SkipMixin):
def skip_self_if_necessary(self, events, execution_date, task):
check.list_param(events, 'events', of_type=DagsterEvent)
check.inst_param(execution_date, 'execution_date', datetime.datetime)
check.inst_param(task, 'task', BaseOperator)

skipped = any([e.event_type_value == DagsterEventType.STEP_SKIPPED.value for e in events])

if skipped:
self.skip(None, execution_date, [task])


class ModifiedDockerOperator(DockerOperator):
"""ModifiedDockerOperator supports host temporary directories on OSX.
@@ -116,7 +131,7 @@ def __get_tls_config(self):
return super(ModifiedDockerOperator, self)._DockerOperator__get_tls_config()


class DagsterDockerOperator(ModifiedDockerOperator):
class DagsterDockerOperator(ModifiedDockerOperator, DagsterSkipMixin):
'''Dagster operator for Apache Airflow.
Wraps a modified DockerOperator incorporating https://github.com/apache/airflow/pull/4315.
@@ -155,11 +170,11 @@ def __init__(
'Cannot use in-memory storage with Airflow, must use S3',
)

self.docker_conn_id_set = kwargs.get('docker_conn_id') is not None
self.environment_dict = environment_dict
self.pipeline_name = pipeline_name
self.mode = mode
self.step_keys = step_keys
self.docker_conn_id_set = kwargs.get('docker_conn_id') is not None
self._run_id = None

# These shenanigans are so we can override DockerOperator.get_hook in order to configure
@@ -262,7 +277,11 @@ def execute(self, context):
res = parse_raw_res(raw_res)

handle_start_pipeline_execution_errors(res)
return handle_start_pipeline_execution_result(res)
events = handle_start_pipeline_execution_result(res)

self.skip_self_if_necessary(events, context['execution_date'], context['task'])

return events

finally:
self._run_id = None
@@ -279,7 +298,7 @@ def get_host_tmp_dir(self):
yield self.host_tmp_dir


class DagsterPythonOperator(PythonOperator):
class DagsterPythonOperator(PythonOperator, DagsterSkipMixin):
def __init__(
self,
task_id,
@@ -310,11 +329,15 @@ def python_callable(ts, dag_run, **kwargs): # pylint: disable=unused-argument
+ 'with variables:\n'
+ seven.json.dumps(redacted, indent=2)
)
return execute_start_pipeline_execution_query(
events = execute_start_pipeline_execution_query(
handle,
construct_variables(mode, environment_dict, pipeline_name, run_id, ts, step_keys),
)

self.skip_self_if_necessary(events, kwargs['execution_date'], kwargs['task'])

return events

super(DagsterPythonOperator, self).__init__(
task_id=task_id,
provide_context=True,
@@ -58,14 +58,12 @@ def test_pipeline_results(dagster_airflow_python_operator_pipeline):
for task in tasks:
assert isinstance(task, PythonOperator)

results = []
results = {}
for task in tasks:
ti = TaskInstance(task=task, execution_date=execution_date)
context = ti.get_template_context()
context['dag_run'] = namedtuple('_', 'run_id')(run_id=run_id)

res = task.execute(context)
results.append(res)
results[ti] = task.execute(context)

yield results

@@ -115,13 +113,11 @@ def test_pipeline_results(dagster_airflow_docker_operator_pipeline):
for task in tasks:
assert isinstance(task, DagsterDockerOperator)

results = []
results = {}
for task in tasks:
ti = TaskInstance(task=task, execution_date=execution_date)
context = ti.get_template_context()
context['dag_run'] = namedtuple('_', 'run_id')(run_id=run_id)

res = task.execute(context)
results.append(res)
results[ti] = task.execute(context)

yield results
@@ -15,7 +15,6 @@

from dagster_airflow_tests.conftest import IMAGE
from dagster_airflow_tests.marks import nettest
from dagster_airflow_tests.test_project.dagster_airflow_demo import demo_pipeline


AIRFLOW_DEMO_EVENTS = {
@@ -34,7 +33,7 @@

def validate_pipeline_execution(pipeline_exc_result):
seen_events = set()
for result in pipeline_exc_result:
for result in pipeline_exc_result.values():
for event in result:
seen_events.add((event.event_type_value, event.step_key))

@@ -127,3 +126,55 @@ def test_rename_for_airflow():

for before, after in pairs:
assert after == _rename_for_airflow(before)


class TestExecuteSkipsPythonOperator(object):
pipeline_name = 'optional_outputs'
handle = ExecutionTargetHandle.for_pipeline_module(
'dagster_airflow_tests.test_project.dagster_airflow_demo', pipeline_name
)
environment_yaml = [script_relative_path('test_project/env_filesystem.yaml')]
run_id = str(uuid.uuid4())
execution_date = datetime.datetime.utcnow()

# pylint: disable=redefined-outer-name
def test_execute_dag(self, dagster_airflow_python_operator_pipeline):
expected_airflow_task_states = {
('foo', None),
('first_consumer', None),
('second_consumer', 'skipped'),
('third_consumer', 'skipped'),
}

seen = {
(ti.task_id, ti.current_state())
for ti in dagster_airflow_python_operator_pipeline.keys()
}
assert seen == expected_airflow_task_states


class TestExecuteSkipsContainerized(object):
pipeline_name = 'optional_outputs'
handle = ExecutionTargetHandle.for_pipeline_module(
'dagster_airflow_tests.test_project.dagster_airflow_demo', pipeline_name
)
environment_yaml = [script_relative_path('test_project/env_filesystem.yaml')]
run_id = str(uuid.uuid4())
execution_date = datetime.datetime.utcnow()
op_kwargs = {'host_tmp_dir': '/tmp'}
image = IMAGE

# pylint: disable=redefined-outer-name
def test_execute_dag_containerized(self, dagster_airflow_docker_operator_pipeline):
expected_airflow_task_states = {
('foo', None),
('first_consumer', None),
('second_consumer', 'skipped'),
('third_consumer', 'skipped'),
}

seen = {
(ti.task_id, ti.current_state())
for ti in dagster_airflow_docker_operator_pipeline.keys()
}
assert seen == expected_airflow_task_states
@@ -1,10 +1,13 @@
# pylint: disable=no-value-for-parameter, no-member
from collections import defaultdict

from dagster import (
Field,
InputDefinition,
Int,
ModeDefinition,
Output,
OutputDefinition,
RepositoryDefinition,
String,
lambda_solid,
@@ -50,7 +53,32 @@ def demo_error_pipeline():
error_solid()


@solid(
output_defs=[
OutputDefinition(Int, 'out_1', is_optional=True),
OutputDefinition(Int, 'out_2', is_optional=True),
OutputDefinition(Int, 'out_3', is_optional=True),
]
)
def foo(_):
yield Output(1, 'out_1')


@solid
def bar(_, input_arg):
return input_arg


@pipeline
def optional_outputs():
foo_res = foo()
bar.alias('first_consumer')(input_arg=foo_res.out_1)
bar.alias('second_consumer')(input_arg=foo_res.out_2)
bar.alias('third_consumer')(input_arg=foo_res.out_3)


def define_demo_execution_repo():
return RepositoryDefinition(
name='demo_execution_repo', pipeline_defs=[demo_pipeline, demo_error_pipeline]
name='demo_execution_repo',
pipeline_defs=[demo_pipeline, demo_error_pipeline, optional_outputs],
)

0 comments on commit 9cf1616

Please sign in to comment.
You can’t perform that action at this time.