Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set inlets and outlets for operators #808

Merged
merged 1 commit into from
Sep 9, 2022
Merged

Set inlets and outlets for operators #808

merged 1 commit into from
Sep 9, 2022

Conversation

kaxil
Copy link
Collaborator

@kaxil kaxil commented Sep 8, 2022

This PR adds inlets and outlets to the operators where it is trivial and clear on what the input and output datasets are.

If users pass their inlets and outlets to the operator, SDK will use those

part of: #611

image

Does this introduce a breaking change?

No

Checklist

  • Created tests which fail without the change (if possible)
  • Extended the README / documentation, if necessary

super().__init__(
task_id=task_id,
**kwargs_with_datasets(
kwargs=kwargs, input_ds=source_table, output_ds=target_table
Copy link
Collaborator

@tatiana tatiana Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the input_ds be both target_table and source_table in this case?
It feels that target_table is also an append input (it is a dataset expected to exist before calling this method).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, target_table is just the destination

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, it won't make sense on what happened, 2 inputs and the output is the same as one of the input ds

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task does change one of the inputs (table being appended to) - so, from this perspective, the output is one of the inputs (likely modified, considering the source_table has content). Otherwise, from a conceptual level, it could seem this task only generates this dataset - without relying on its pre-existence.

task_id=task_id,
**kwargs_with_datasets(
kwargs=kwargs,
input_ds=source_table,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the input_ds be both target_table and source_table in this case?
It feels that target_table is also a merge input (it is a dataset expected to exist before calling this method).

Copy link
Collaborator Author

@kaxil kaxil Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as append - #808 (comment)

Otherwise we would have to do the same for load_file where the destination already exists but it doesn't make sense to add it to input_ds

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of load_file, it would only make sense adding it if the mode is "append" - but that is a good point

):
"""
Extract inlets and outlets from kwargs if users have passed it. If not, set input datasets as inlets and
set output dataset as outlets
Copy link
Collaborator

@phanikumv phanikumv Sep 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a note somewhere in the documentation that we support Datasets feature with python sdk on Airflow 2.4 ? I know we are adding docstring but an explicit call out on the docs will be helpful to users I think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I have that planned for a separate PR as mentioned in #611 (comment)

Copy link
Member

@feluelle feluelle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM in general 👍

python-sdk/src/astro/airflow/datasets.py Outdated Show resolved Hide resolved
python-sdk/tests/airflow/test_datasets.py Outdated Show resolved Hide resolved
python-sdk/tests/airflow/test_datasets.py Outdated Show resolved Hide resolved
Copy link
Contributor

@sunank200 sunank200 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Added few suggestions

):
"""
Extract inlets and outlets from kwargs if users have passed it. If not, set input datasets as inlets and
set output dataset as outlets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

python-sdk/src/astro/airflow/datasets.py Outdated Show resolved Hide resolved
python-sdk/src/astro/airflow/datasets.py Outdated Show resolved Hide resolved
python-sdk/tests/airflow/test_datasets.py Outdated Show resolved Hide resolved
This PR adds inlets and outlets to the operators where it is trivial and clear on what the input and output datasets are.

If users pass their inlets and outlets to the operator, SDK will use those
@kaxil kaxil merged commit 190db7b into main Sep 9, 2022
@kaxil kaxil deleted the set-inlets-outlets branch September 9, 2022 13:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants