Skip to content
Permalink
Browse files

Simplify Airflow operators

Summary: This diff simplifies the two base airflow operators we use, in advance of implementing correct Airflow task skip behavior when a step skip event is returned by Dagster step execution

Test Plan: unit

Reviewers: #ft, max

Reviewed By: #ft, max

Subscribers: max

Differential Revision: https://dagster.phacility.com/D685
  • Loading branch information...
natekupp committed Jul 24, 2019
1 parent 1fc0976 commit fa38767633b3f42c48e9002bb24866097750cb90
@@ -2,11 +2,12 @@
import re

from airflow import DAG
from airflow.operators import BaseOperator

from dagster import check, ExecutionTargetHandle, RunConfig
from dagster.core.execution.api import create_execution_plan

from .operators import DagsterDockerOperator, DagsterOperator, DagsterPythonOperator
from .operators import DagsterDockerOperator, DagsterPythonOperator
from .compile import coalesce_execution_steps


@@ -64,7 +65,8 @@ def _make_airflow_dag(
dag_description = check.opt_str_param(
dag_description, 'dag_description', _make_dag_description(pipeline_name)
)
check.subclass_param(operator, 'operator', DagsterOperator)
check.subclass_param(operator, 'operator', BaseOperator)

# black 18.9b0 doesn't support py27-compatible formatting of the below invocation (omitting
# the trailing comma after **check.opt_dict_param...) -- black 19.3b0 supports multiple python
# versions, but currently doesn't know what to do with from __future__ import print_function --
@@ -97,17 +99,33 @@ def _make_airflow_dag(

step_keys = [step.key for step in solid_steps]

task = operator.operator_for_solid(
handle=handle,
pipeline_name=pipeline_name,
environment_dict=environment_dict,
mode=mode,
solid_handle=solid_handle,
step_keys=step_keys,
dag=dag,
dag_id=dag_id,
op_kwargs=op_kwargs,
)
# We separately construct the Airflow operators here with the appropriate args, because if
# Airflow gets extraneous args/kwargs it emits a warning every time it parses the DAG (and
# future Airflow versions will mark this a failure).
# see https://github.com/ambv/black/issues/768
# fmt: off
if operator == DagsterPythonOperator:
task = operator(
handle=handle,
pipeline_name=pipeline_name,
environment_dict=environment_dict,
mode=mode,
task_id=solid_handle,
step_keys=step_keys,
dag=dag,
**op_kwargs
)
else:
task = operator(
pipeline_name=pipeline_name,
environment_dict=environment_dict,
mode=mode,
task_id=solid_handle,
step_keys=step_keys,
dag=dag,
**op_kwargs
)
# fmt: on

tasks[solid_handle] = task

@@ -4,21 +4,19 @@
import logging
import os

from abc import ABCMeta, abstractmethod
from contextlib import contextmanager

from six import with_metaclass

from airflow.exceptions import AirflowException
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.file import TemporaryDirectory
from docker import APIClient, from_env

from dagster import check, seven
from dagster.seven.json import JSONDecodeError
from dagster_graphql.client.mutations import execute_start_pipeline_execution_query
from dagster_graphql.client.query import START_PIPELINE_EXECUTION_QUERY

from .util import convert_airflow_datestr_to_epoch_ts
from .util import airflow_storage_exception, construct_variables, parse_raw_res


DOCKER_TEMPDIR = '/tmp'
@@ -31,94 +29,6 @@
LINE_LENGTH = 100


def parse_raw_res(raw_res):
res = None
# FIXME
# Unfortunately, log lines don't necessarily come back in order...
# This is error-prone, if something else logs JSON
lines = list(filter(None, reversed(raw_res.split('\n'))))

for line in lines:
try:
res = json.loads(line)
break
# If we don't get a GraphQL response, check the next line
except JSONDecodeError:
continue

return res


def airflow_storage_exception(tmp_dir):
return AirflowException(
'No storage config found -- must configure either filesystem or s3 storage for '
'the DagsterPythonOperator. Ex.: \n'
'storage:\n'
' filesystem:\n'
' base_dir: \'{tmp_dir}\''
'\n\n --or--\n\n'
'storage:\n'
' s3:\n'
' s3_bucket: \'my-s3-bucket\'\n'.format(tmp_dir=tmp_dir)
)


def construct_variables(mode, environment_dict, pipeline_name, run_id, ts, step_keys):
check.str_param(mode, 'mode')
# env dict could be either string 'REDACTED' or dict
check.str_param(pipeline_name, 'pipeline_name')
check.str_param(run_id, 'run_id')
check.opt_str_param(ts, 'ts')
check.list_param(step_keys, 'step_keys', of_type=str)

variables = {
'executionParams': {
'environmentConfigData': environment_dict,
'mode': mode,
'selector': {'name': pipeline_name},
'executionMetadata': {'runId': run_id},
'stepKeys': step_keys,
}
}

# If an Airflow timestamp string is provided, stash it (and the converted version) in tags
if ts is not None:
variables['executionParams']['executionMetadata']['tags'] = [
{'key': 'airflow_ts', 'value': ts},
{
'key': 'execution_epoch_time',
'value': '%f' % convert_airflow_datestr_to_epoch_ts(ts),
},
]

return variables


class DagsterOperator(with_metaclass(ABCMeta)): # pylint:disable=no-init
'''Abstract base class for Dagster operators.
Implement operator_for_solid to support dynamic generation of Airflow operators corresponding to
the execution plan steps generated by a Dagster solid.
'''

@classmethod
@abstractmethod
def operator_for_solid(
cls,
handle,
pipeline_name,
environment_dict,
mode,
solid_handle,
step_keys,
dag,
dag_id,
op_kwargs,
):
pass


# pylint: disable=len-as-condition
class ModifiedDockerOperator(DockerOperator):
"""ModifiedDockerOperator supports host temporary directories on OSX.
@@ -206,7 +116,7 @@ def __get_tls_config(self):
return super(ModifiedDockerOperator, self)._DockerOperator__get_tls_config()


class DagsterDockerOperator(ModifiedDockerOperator, DagsterOperator):
class DagsterDockerOperator(ModifiedDockerOperator):
'''Dagster operator for Apache Airflow.
Wraps a modified DockerOperator incorporating https://github.com/apache/airflow/pull/4315.
@@ -221,26 +131,35 @@ class DagsterDockerOperator(ModifiedDockerOperator, DagsterOperator):
# pylint: disable=keyword-arg-before-vararg
def __init__(
self,
step=None,
task_id,
environment_dict=None,
pipeline_name=None,
mode=None,
step_keys=None,
s3_bucket_name=None,
dag=None,
*args,
**kwargs
):
check.str_param(pipeline_name, 'pipeline_name')
step_keys = check.opt_list_param(step_keys, 'step_keys', of_type=str)
environment_dict = check.opt_dict_param(environment_dict, 'environment_dict', key_type=str)

self.step = step
tmp_dir = kwargs.pop('tmp_dir', DOCKER_TEMPDIR)
host_tmp_dir = kwargs.pop('host_tmp_dir', seven.get_system_temp_directory())

if 'storage' not in environment_dict:
raise airflow_storage_exception(tmp_dir)

check.invariant(
'in_memory' not in environment_dict.get('storage', {}),
'Cannot use in-memory storage with Airflow, must use S3',
)

self.environment_dict = environment_dict
self.pipeline_name = pipeline_name
self.mode = mode
self.step_keys = step_keys
self.docker_conn_id_set = kwargs.get('docker_conn_id') is not None
self.s3_bucket_name = s3_bucket_name
self._run_id = None

# These shenanigans are so we can override DockerOperator.get_hook in order to configure
@@ -264,45 +183,9 @@ def __init__(
if 'environment' not in kwargs:
kwargs['environment'] = DEFAULT_ENVIRONMENT

super(DagsterDockerOperator, self).__init__(*args, **kwargs)

@classmethod
def operator_for_solid(
cls,
handle,
pipeline_name,
environment_dict,
mode,
solid_handle,
step_keys,
dag,
dag_id,
op_kwargs,
):
tmp_dir = op_kwargs.pop('tmp_dir', DOCKER_TEMPDIR)
host_tmp_dir = op_kwargs.pop('host_tmp_dir', seven.get_system_temp_directory())

if 'storage' not in environment_dict:
raise airflow_storage_exception(tmp_dir)

# black 18.9b0 doesn't support py27-compatible formatting of the below invocation (omitting
# the trailing comma after **op_kwargs) -- black 19.3b0 supports multiple python versions,
# but currently doesn't know what to do with from __future__ import print_function -- see
# https://github.com/ambv/black/issues/768
# fmt: off
return DagsterDockerOperator(
step=solid_handle,
environment_dict=environment_dict,
dag=dag,
tmp_dir=tmp_dir,
pipeline_name=pipeline_name,
mode=mode,
step_keys=step_keys,
task_id=solid_handle,
host_tmp_dir=host_tmp_dir,
**op_kwargs
super(DagsterDockerOperator, self).__init__(
task_id=task_id, dag=dag, tmp_dir=tmp_dir, host_tmp_dir=host_tmp_dir, *args, **kwargs
)
# fmt: on

@property
def run_id(self):
@@ -313,15 +196,6 @@ def run_id(self):

@property
def query(self):
try:
from dagster_graphql.client.query import START_PIPELINE_EXECUTION_QUERY

except ImportError:
raise AirflowException(
'To use the DagsterDockerOperator, dagster and dagster_graphql must be installed '
'in your Airflow environment.'
)

# TODO: https://github.com/dagster-io/dagster/issues/1342
redacted = construct_variables(
self.mode, 'REDACTED', self.pipeline_name, self.run_id, self.airflow_ts, self.step_keys
@@ -386,6 +260,7 @@ def execute(self, context):
self.log.info('Finished executing container.')

res = parse_raw_res(raw_res)

handle_start_pipeline_execution_errors(res)
return handle_start_pipeline_execution_result(res)

@@ -404,18 +279,26 @@ def get_host_tmp_dir(self):
yield self.host_tmp_dir


class DagsterPythonOperator(PythonOperator, DagsterOperator):
@classmethod
def make_python_callable(cls, handle, pipeline_name, mode, environment_dict, step_keys):
try:
from dagster_graphql.client.mutations import execute_start_pipeline_execution_query
from dagster_graphql.client.query import START_PIPELINE_EXECUTION_QUERY
class DagsterPythonOperator(PythonOperator):
def __init__(
self,
task_id,
handle,
pipeline_name,
environment_dict,
mode,
step_keys,
dag,
*args,
**kwargs
):
if 'storage' not in environment_dict:
raise airflow_storage_exception('/tmp/special_place')

except ImportError:
raise AirflowException(
'To use the DagsterPythonOperator, dagster and dagster_graphql must be installed '
'in your Airflow environment.'
)
check.invariant(
'in_memory' not in environment_dict.get('storage', {}),
'Cannot use in-memory storage with Airflow, must use filesystem or S3',
)

def python_callable(ts, dag_run, **kwargs): # pylint: disable=unused-argument
run_id = dag_run.run_id
@@ -432,40 +315,11 @@ def python_callable(ts, dag_run, **kwargs): # pylint: disable=unused-argument
construct_variables(mode, environment_dict, pipeline_name, run_id, ts, step_keys),
)

return python_callable

@classmethod
def operator_for_solid(
cls,
handle,
pipeline_name,
environment_dict,
mode,
solid_handle,
step_keys,
dag,
dag_id,
op_kwargs,
):
if 'storage' not in environment_dict:
raise airflow_storage_exception('/tmp/special_place')

# black 18.9b0 doesn't support py27-compatible formatting of the below invocation (omitting
# the trailing comma after **op_kwargs) -- black 19.3b0 supports multiple python versions,
# but currently doesn't know what to do with from __future__ import print_function -- see
# https://github.com/ambv/black/issues/768
# fmt: off
return PythonOperator(
task_id=solid_handle,
super(DagsterPythonOperator, self).__init__(
task_id=task_id,
provide_context=True,
python_callable=cls.make_python_callable(
handle,
pipeline_name,
mode,
environment_dict,
step_keys
),
python_callable=python_callable,
dag=dag,
**op_kwargs
*args,
**kwargs
)
# fmt: on

0 comments on commit fa38767

Please sign in to comment.
You can’t perform that action at this time.