Skip to content

Commit

Permalink
Update events pipeline with new API and preset definition
Browse files Browse the repository at this point in the history
Test Plan: unit

Reviewers: schrockn

Reviewed By: schrockn

Differential Revision: https://dagster.phacility.com/D438
  • Loading branch information
Nate Kupp committed Jun 20, 2019
1 parent d9944b0 commit a3fbff1
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 33 deletions.
4 changes: 2 additions & 2 deletions examples/dagster_examples/__init__.py
Expand Up @@ -12,7 +12,7 @@ def define_demo_repo():
define_airline_demo_ingest_pipeline,
define_airline_demo_warehouse_pipeline,
)
from dagster_examples.event_pipeline_demo.pipelines import define_event_ingest_pipeline
from dagster_examples.event_pipeline_demo.pipelines import event_ingest_pipeline
from dagster_examples.pyspark_pagerank.pyspark_pagerank_pipeline import define_pipeline
from dagster_pandas.examples import define_papermill_pandas_hello_world_pipeline

Expand All @@ -27,7 +27,7 @@ def define_demo_repo():
'composition': define_composition_pipeline,
'airline_demo_ingest_pipeline': define_airline_demo_ingest_pipeline,
'airline_demo_warehouse_pipeline': define_airline_demo_warehouse_pipeline,
'event_ingest_pipeline': define_event_ingest_pipeline,
'event_ingest_pipeline': event_ingest_pipeline,
'pyspark_pagerank': define_pipeline,
'papermill_pandas_hello_world_pipeline': define_papermill_pandas_hello_world_pipeline,
},
Expand Down
2 changes: 1 addition & 1 deletion examples/dagster_examples/event_pipeline_demo/__init__.py
@@ -1 +1 @@
from .pipelines import define_event_ingest_pipeline
from .pipelines import event_ingest_pipeline
43 changes: 20 additions & 23 deletions examples/dagster_examples/event_pipeline_demo/pipelines.py
Expand Up @@ -5,20 +5,20 @@
import shutil

from dagster import (
file_relative_path,
lambda_solid,
pipeline,
solid,
Bool,
DependencyDefinition,
Dict,
Field,
InputDefinition,
List,
ModeDefinition,
OutputDefinition,
PresetDefinition,
Path,
PipelineDefinition,
SolidInvocation,
String,
lambda_solid,
solid,
)
from dagster.core.types.runtime import Stringish
from dagster.utils import safe_isfile, mkdir_p
Expand Down Expand Up @@ -119,7 +119,19 @@ def gunzipper(gzip_file):
return [path_prefix]


def define_event_ingest_pipeline():
@pipeline(
mode_definitions=[
ModeDefinition(name='local', resources={'s3': s3_resource, 'snowflake': snowflake_resource})
],
preset_definitions=[
PresetDefinition(
name='local',
mode='local',
environment_files=[file_relative_path(__file__, 'environments/default.yaml')],
)
],
)
def event_ingest_pipeline():
event_ingest = SparkSolidDefinition(
name='event_ingest',
main_class='io.dagster.events.EventPipeline',
Expand All @@ -133,20 +145,5 @@ def define_event_ingest_pipeline():
src='file:///tmp/dagster/events/data/output/2019/01/01/*.parquet',
table='events',
)

return PipelineDefinition(
name='event_ingest_pipeline',
solid_defs=[download_from_s3_to_file, gunzipper, event_ingest, snowflake_load],
dependencies={
SolidInvocation('gunzipper'): {
'gzip_file': DependencyDefinition('download_from_s3_to_file')
},
SolidInvocation('event_ingest'): {'spark_inputs': DependencyDefinition('gunzipper')},
SolidInvocation('snowflake_load'): {
'start': DependencyDefinition('event_ingest', 'paths')
},
},
mode_definitions=[
ModeDefinition(resources={'s3': s3_resource, 'snowflake': snowflake_resource})
],
)
# pylint: disable=no-value-for-parameter
snowflake_load(start=event_ingest(spark_inputs=gunzipper(gzip_file=download_from_s3_to_file())))
4 changes: 2 additions & 2 deletions examples/dagster_examples/event_pipeline_demo/repository.py
@@ -1,10 +1,10 @@
from dagster import RepositoryDefinition

from .pipelines import define_event_ingest_pipeline
from .pipelines import event_ingest_pipeline


def define_repo():
return RepositoryDefinition(
name='event_pipeline_demo_repo',
pipeline_dict={'event_ingest_pipeline': define_event_ingest_pipeline},
pipeline_dict={'event_ingest_pipeline': event_ingest_pipeline},
)
@@ -1,3 +1,3 @@
repository:
module: event_pipeline_demo.repository
module: dagster_examples.event_pipeline_demo.repository
fn: define_repo
Expand Up @@ -6,16 +6,19 @@
# pylint: disable=unused-import
from dagster_airflow.test_fixtures import dagster_airflow_python_operator_pipeline

from dagster_examples.event_pipeline_demo.pipelines import define_event_ingest_pipeline
from dagster_examples.event_pipeline_demo.pipelines import event_ingest_pipeline


@pytest.mark.skip
class TestAirflowizedEventPipeline(object):
config_yaml = [
script_relative_path('../../dagster_examples/airline_demo/environments/default.yaml')
]
handle = ExecutionTargetHandle.for_pipeline_fn(define_event_ingest_pipeline)

pipeline_name = 'event_ingest_pipeline'
handle = ExecutionTargetHandle.for_pipeline_module(
'dagster_examples.event_pipeline_demo', pipeline_name
)

# pylint: disable=redefined-outer-name
def test_airflowized_event_pipeline(self, dagster_airflow_python_operator_pipeline):
Expand Down
Expand Up @@ -7,7 +7,7 @@
from dagster import execute_pipeline
from dagster.seven import mock
from dagster.utils import load_yaml_from_globs, script_relative_path
from dagster_examples.event_pipeline_demo.pipelines import define_event_ingest_pipeline
from dagster_examples.event_pipeline_demo.pipelines import event_ingest_pipeline


def create_mock_connector(*_args, **_kwargs):
Expand Down Expand Up @@ -56,7 +56,7 @@ def test_event_pipeline(snowflake_connect):
'../../dagster_examples/event_pipeline_demo/environments/default.yaml'
)
)
result_pipeline = execute_pipeline(define_event_ingest_pipeline(), config)
result_pipeline = execute_pipeline(event_ingest_pipeline, config)
assert result_pipeline.success

# We're not testing Snowflake loads here, so at least test that we called the connect
Expand Down

0 comments on commit a3fbff1

Please sign in to comment.