Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solid input requirements #22

Closed
freiksenet opened this issue Jun 5, 2018 · 7 comments
Closed

Solid input requirements #22

freiksenet opened this issue Jun 5, 2018 · 7 comments
Assignees

Comments

@freiksenet
Copy link
Contributor

To test --from parameter, I've added an intermediate input to one of the solids in pandas_hello_world as follows:

    def sum_transform_fn(num_csv):
        sum_df = num_csv.copy()
        sum_df['sum'] = sum_df['num1'] + sum_df['num2']
        return sum_df

    sum_solid = dagster_pd.dataframe_solid(
        name='sum', inputs=[dagster_pd.csv_input('num_csv')], transform_fn=sum_transform_fn
    )

    def sum_sq_transform_fn(sum):
        sum_sq = sum.copy()
        sum_sq['sum_sq'] = sum['sum']**2
        return sum_sq

    sum_sq_solid = dagster_pd.dataframe_solid(
        name='sum_sq',
        inputs=[
            dagster_pd.csv_input('sum_csv'),
            dagster_pd.depends_on(sum_solid),
        ],
        transform_fn=sum_sq_transform_fn
    )

When I try to run it, it complains about one of the inputs not being defined. I'm not sure that's an expected behavior, because I though only one input is needed per solid. I'm not sure how to run intermediate solids if one can't specify the results of intermediate computations as alternative input.

@schrockn
Copy link
Member

schrockn commented Jun 5, 2018

    sum_sq_solid = dagster_pd.dataframe_solid(
        name='sum_sq',
        inputs=[
            dagster_pd.depends_on(sum_solid),
        ],
        transform_fn=sum_sq_transform_fn
    )

@schrockn
Copy link
Member

schrockn commented Jun 5, 2018

sum:
   source:CSVORPARQUET
   args:
      format: CSV
      path: whatever

@schrockn schrockn assigned freiksenet and unassigned schrockn Jun 5, 2018
@freiksenet
Copy link
Contributor Author

freiksenet commented Jun 6, 2018

Here is summary of my proposal:

  • Get rid of typed solid inputs. Inputs are either other solids or named inputs
  • Named inputs have an expectation of OUTPUT format, for example dataframe or raw, but they don't have expectation of how exactly to get there
def define_pipeline():
    def sum_transform_fn(num_csv):
        sum_df = num_csv.copy()
        sum_df['sum'] = sum_df['num1'] + sum_df['num2']
        return sum_df

    sum_solid = dagster_pd.dataframe_solid(
        inputs=[dagster_pd.dataframe_input('num')
        name='sum')

    def sum_sq_transform_fn(sum):
        sum_sq = sum.copy()
        sum_sq['sum_sq'] = sum['sum']**2
        return sum_sq

    sum_sq_solid = dagster_pd.dataframe_solid(
        name='sum_sq', input=[dagster_pd.depends_on(sum_solid)], transform_fn=sum_sq_transform_fn
    )

    def always_fails_transform_fn(*_args, **_kwargs):
        raise Exception('I am a programmer and I make error')

    always_fails_solid = dagster_pd.dataframe_solid(
        name='always_fails',
        input=[dagster_pd.depends_on(sum_solid)],
        transform_fn=always_fails_transform_fn
    )

    return dagster.core.pipeline(name='pandas_hello_world', solids=[sum_solid, sum_sq_solid])

Env

environment:
  inputs:
    # either name dependencie name or name of output solid
    - input_name: num
     # Source defines how the input is going to be read into
      source: CSV
      args:
        path: "pandas_hello_world/num.csv"
    - input_name: sum
      # No CSVORPARQUET source cause the actual sournce type is only defined in env
      source: CSV
      args:
        path: "sum.csv"
        format: 'CSV'

This allows changing the source type

environment:
  inputs:
    # either name dependencie name or name of output solid
    - input_name: num
     # Source defines how the input is going to be read into
      source: SQL
      args:
        sql_query: "SELECT * FROM NUM"
    - input_name: sum
      # No CSVORPARQUET source cause the actual sournce type is only defined in env
      source: CSV
      args:
        path: "sum.csv"
        format: 'CSV'

@freiksenet
Copy link
Contributor Author

This keeps the ability to have some expectations of what the source should return, while not binding pipelines to concrete inputs.

@freiksenet
Copy link
Contributor Author

def process_raw():
   # Some code to get raw binary data from file and return dataframe
   pass

# We can still typecheck inside the solid that output is valid
transforming_solid = dagster_pd.dataframe_solid(
        inputs=[
          dagster.raw_input('num')
        ],
        name='preprocessing'
        transform_fn=process_raw
)

@freiksenet
Copy link
Contributor Author

Ok, I'll just do it in even shorter way.

Currently you need to specify input=[dagster_pd.csv_input('num_csv')]. This binds pipeline and solid to concrete input_source.

In addition, when you specify input=[dagster_pd.depends_on(solid)], it gets weird CSVORPARQUET input source type, exactly because input has some opinions on what it's sources should be.

I propose that you can use any input source for any input, without specifying it from pipeline, as long as the result of that source is compatible. So the first thing becomes input=[dagster_pd.dataframe_input(name='num')]. Then you can specify CSV source for it. Or SQL source. Or parquet source. As long as all those sources return dataframe.

For raw data (like getting json file), we use dagster.raw_input or eg dagster.json_input. Again, actual source is specified in env.

@schrockn
Copy link
Member

schrockn commented Jun 6, 2018 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants