In [None]:
import dagster

from dagster_contrib.dagster_examples.pandas_hello_world.pipeline import define_pipeline

In [None]:
sum_sq_pipeline = define_pipeline()
sum_sq_pipeline

from dagster.graphviz import build_graphviz_graph
build_graphviz_graph(sum_sq_pipeline)

Below we'll demonstrate different ways of executing pipelines that may be useful in an interactive context:

1) Execute a pipeline in memory upto a particiular solid in the pipeline. Current stupid name: `execute_solid_in_pipeline`

2) Execute a pipeline in memory but able to execute each result individually. Another stupid name `execute_pipeline_and_collect`. Note: the more elegantly named `execute_pipeline` returns an iterator, but that is less friendly in a notebook environment

In [None]:
# This executes a pipeline through the solid name provided ('sum_sq')
# Note that it does *not* execute the "always_fails" solid because it is not
# in the dependency chain of of "sum_sq"
from dagster.core.execution import (ExecutionContext, execute_pipeline_through_solid)

result = dagster.execute_pipeline_through_solid(
    dagster.ExecutionContext.create(), 
    sum_sq_pipeline, 
    {'num_csv': {'path': 'num.csv'}},
    'sum_sq'
)
result.result_value

In [None]:

# This will execute the two pipeline steps and return the two results corresponding to each step
results = dagster.execute_pipeline(
    dagster.ExecutionContext.create(), 
    sum_sq_pipeline, 
    {'num_csv': {'path': 'num.csv'}},
    through_solids=['sum_sq'],
)

[result.name for result in results]

In [None]:
result_dict = { result.name : result for result in results }
# the first result is the intermediate result out of the 'sum' solid
result_dict['sum'].result_value

In [None]:
# the next result is the output of the sum_sq
result_dict['sum_sq'].result_value

Now let's inject some errors in the pipeline by invoking the always_fails

In [None]:

# This will execute the two pipeline steps and return the two results corresponding to each step
results = dagster.execute_pipeline(
    dagster.ExecutionContext.create(), 
    sum_sq_pipeline, 
    {'num_csv': {'path': 'num.csv'}},
    through_solids=['always_fails'],
)

result_dict = { result.name : result for result in results }
result_dict

In [None]:
result_dict['sum'].success

In [None]:
result_dict['sum'].result_value

In [None]:
result_dict['always_fails'].success

In [None]:
result_dict['always_fails'].reason

In [None]:
# This method allows a result to reraise a user error with a clean callstack

result_dict['always_fails'].reraise_user_error()

Now let's actually output some stuff to a file

In [None]:

output_path = '/tmp/notebook_sum_sq.csv'

results = dagster.output_pipeline(
    dagster.ExecutionContext.create(), 
    sum_sq_pipeline, 
    input_arg_dicts={'num_csv': {'path': 'num.csv'}},
    output_arg_dicts={'sum_sq': {'CSV': {'path': output_path}}},
)

[result.name for result in results]

In [None]:
import pandas as pd
pd.read_csv(output_path)