Skip to content
Permalink
Browse files

Initial k8s operator and split out of shared functionality

Summary:
1. Moving shit around: move our Python and Docker operator into two files
2. Vendor Airflow k8s operator
3. Add James' `DagsterKubernetesPodOperator`

As a follow-up, will need to figure out how we add tests for (3).

Test Plan:
tested manually in local airflow + minikube with:

```
'''
The airflow DAG scaffold for dagster_examples.toys.optional_outputs.optional_outputs

Note that this docstring must contain the strings "airflow" and "DAG" for
Airflow to properly detect it as a DAG
See: http://bit.ly/307VMum
'''
import datetime
import yaml

from dagster_airflow.factory import make_airflow_dag_kubernetized

ENVIRONMENT = '''
loggers:
  console:
    config:
      log_level: DEBUG

solids:
  multiply_the_word:
    inputs:
      word:
        value: bar
    config:
      factor: 2

storage:
  s3:
    config:
      s3_bucket: dagster-airflow-scratch

'''

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime(2019, 5, 7),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
}

dag, tasks = make_airflow_dag_kubernetized(
    module_name='dagster_airflow_tests.test_project.dagster_airflow_demo',
    pipeline_name='demo_pipeline',
    environment_dict=yaml.load(ENVIRONMENT),
    dag_kwargs={'default_args': DEFAULT_ARGS, 'max_active_runs': 1},
    image='dagster-airflow-demo',
    namespace='default',
)
```

Reviewers: #ft, schrockn

Reviewed By: #ft, schrockn

Subscribers: max, schrockn

Differential Revision: https://dagster.phacility.com/D801
  • Loading branch information...
Eronarn authored and natekupp committed Jul 27, 2019
1 parent a8adcc7 commit 0c6d7f755c79dab88449537b65588942d73832a4
@@ -1 +0,0 @@
from .operators import DagsterDockerOperator, DagsterPythonOperator
@@ -8,7 +8,9 @@
from dagster.core.execution.api import create_execution_plan

from .compile import coalesce_execution_steps
from .operators import DagsterDockerOperator, DagsterPythonOperator
from .operators.python_operator import DagsterPythonOperator
from .operators.docker_operator import DagsterDockerOperator


DEFAULT_ARGS = {
'depends_on_past': False,
@@ -240,3 +242,39 @@ def make_airflow_dag_containerized_for_handle(
op_kwargs=op_kwargs,
operator=DagsterDockerOperator,
)


def make_airflow_dag_kubernetized(
module_name,
pipeline_name,
image,
namespace,
environment_dict=None,
mode=None,
dag_id=None,
dag_description=None,
dag_kwargs=None,
op_kwargs=None,
):
from .operators.kubernetes_operator import DagsterKubernetesPodOperator

check.str_param(module_name, 'module_name')

handle = ExecutionTargetHandle.for_pipeline_module(module_name, pipeline_name)

# See: https://github.com/dagster-io/dagster/issues/1663
op_kwargs = check.opt_dict_param(op_kwargs, 'op_kwargs', key_type=str)
op_kwargs['image'] = image
op_kwargs['namespace'] = namespace

return _make_airflow_dag(
handle=handle,
pipeline_name=pipeline_name,
environment_dict=environment_dict,
mode=mode,
dag_id=dag_id,
dag_description=dag_description,
dag_kwargs=dag_kwargs,
op_kwargs=op_kwargs,
operator=DagsterKubernetesPodOperator,
)
@@ -1,44 +1,21 @@
'''The dagster-airflow operators.'''
import ast
import json
import logging
import os

from contextlib import contextmanager

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowException
from airflow.utils.file import TemporaryDirectory
from docker import APIClient, from_env

from dagster import DagsterEventType, check, seven
from dagster.core.events import DagsterEvent
from docker import APIClient, from_env

from dagster_graphql.client.mutations import execute_start_pipeline_execution_query
from dagster import check, seven
from dagster_airflow.vendor.docker_operator import DockerOperator
from dagster_graphql.client.query import START_PIPELINE_EXECUTION_QUERY

from .util import airflow_storage_exception, construct_variables, parse_raw_res

DOCKER_TEMPDIR = '/tmp'

DEFAULT_ENVIRONMENT = {
'AWS_ACCESS_KEY_ID': os.getenv('AWS_ACCESS_KEY_ID'),
'AWS_SECRET_ACCESS_KEY': os.getenv('AWS_SECRET_ACCESS_KEY'),
}

LINE_LENGTH = 100


def skip_self_if_necessary(events):
'''Using AirflowSkipException is a canonical way for tasks to skip themselves; see example
here: http://bit.ly/2YtigEm
'''
check.list_param(events, 'events', of_type=DagsterEvent)
from .util import construct_variables, get_aws_environment, parse_raw_res, skip_self_if_necessary

skipped = any([e.event_type_value == DagsterEventType.STEP_SKIPPED.value for e in events])

if skipped:
raise AirflowSkipException('Dagster emitted skip event, skipping execution in Airflow')
DOCKER_TEMPDIR = '/tmp'


class ModifiedDockerOperator(DockerOperator):
@@ -160,7 +137,17 @@ def __init__(
host_tmp_dir = kwargs.pop('host_tmp_dir', seven.get_system_temp_directory())

if 'storage' not in environment_dict:
raise airflow_storage_exception(tmp_dir)
raise AirflowException(
'No storage config found -- must configure either filesystem or s3 storage for '
'the DagsterDockerOperator. Ex.: \n'
'storage:\n'
' filesystem:\n'
' base_dir: \'/some/shared/volume/mount/special_place\''
'\n\n --or--\n\n'
'storage:\n'
' s3:\n'
' s3_bucket: \'my-s3-bucket\'\n'
)

check.invariant(
'in_memory' not in environment_dict.get('storage', {}),
@@ -193,7 +180,7 @@ def __init__(
self.airflow_ts = kwargs.get('ts')

if 'environment' not in kwargs:
kwargs['environment'] = DEFAULT_ENVIRONMENT
kwargs['environment'] = get_aws_environment()

super(DagsterDockerOperator, self).__init__(
task_id=task_id, dag=dag, tmp_dir=tmp_dir, host_tmp_dir=host_tmp_dir, *args, **kwargs
@@ -259,7 +246,7 @@ def execute(self, context):

except ImportError:
raise AirflowException(
'To use the DagsterPythonOperator, dagster and dagster_graphql must be installed '
'To use the DagsterDockerOperator, dagster and dagster_graphql must be installed '
'in your Airflow environment.'
)
if 'run_id' in self.params:
@@ -293,53 +280,3 @@ def __get_tls_config(self):
@contextmanager
def get_host_tmp_dir(self):
yield self.host_tmp_dir


class DagsterPythonOperator(PythonOperator):
def __init__(
self,
task_id,
handle,
pipeline_name,
environment_dict,
mode,
step_keys,
dag,
*args,
**kwargs
):
if 'storage' not in environment_dict:
raise airflow_storage_exception('/tmp/special_place')

check.invariant(
'in_memory' not in environment_dict.get('storage', {}),
'Cannot use in-memory storage with Airflow, must use filesystem or S3',
)

def python_callable(ts, dag_run, **kwargs): # pylint: disable=unused-argument
run_id = dag_run.run_id

# TODO: https://github.com/dagster-io/dagster/issues/1342
redacted = construct_variables(mode, 'REDACTED', pipeline_name, run_id, ts, step_keys)
logging.info(
'Executing GraphQL query: {query}\n'.format(query=START_PIPELINE_EXECUTION_QUERY)
+ 'with variables:\n'
+ seven.json.dumps(redacted, indent=2)
)
events = execute_start_pipeline_execution_query(
handle,
construct_variables(mode, environment_dict, pipeline_name, run_id, ts, step_keys),
)

skip_self_if_necessary(events)

return events

super(DagsterPythonOperator, self).__init__(
task_id=task_id,
provide_context=True,
python_callable=python_callable,
dag=dag,
*args,
**kwargs
)

0 comments on commit 0c6d7f7

Please sign in to comment.
You can’t perform that action at this time.