Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow tutorial to use functional DAGs #11308

Merged
merged 9 commits into from Oct 13, 2020
Merged

Conversation

vikramkoka
Copy link
Contributor

Created a new Airflow ETL tutorial to use functional DAGs. Also created a DAG to perform the same operations without using functional DAGs to be compatible with Airflow 1.10.x and to show the difference between the "functional" and the "classic" ways side by side to illustrate the differences.

I tried to make this simple and not rely on any operators. I will create the html documentation page to cover this as soon as I get confirmation from @casassg and @turbaszek that this is in the right general direction. Also tagging @ashb for general review.

closes: #9041


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@github-actions
Copy link

github-actions bot commented Oct 6, 2020

The Build Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

Comment on lines 62 to 64
# [START documentation]
dag.doc_md = __doc__
# [END documentation]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this to DAG invocation?

data_string = u'{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return data_string
# [END extract]
extract.doc_md = """\
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken we should be able to do:

@task(doc_md="here goes the docs")
def extract(**kwargs):
    ...

@turbaszek
Copy link
Member

@vikramkoka I think this a good example 👍

@turbaszek turbaszek added the AIP-31 Task Flow API for nicer DAG definition label Oct 6, 2020
@casassg
Copy link
Contributor

casassg commented Oct 6, 2020

Looks good. I would try to use multiple outputs better instead of json dumping and such. That feels against what we have been trying to do. Lets return dictionary and load the number directly

@turbaszek
Copy link
Member

A thought: should add somewhere the get_current_context function, just to showcase?

@vikramkoka
Copy link
Contributor Author

Looks good. I would try to use multiple outputs better instead of json dumping and such. That feels against what we have been trying to do. Lets return dictionary and load the number directly

Thank you both for the detailed, prompt feedback! I really appreciate it and will make the changes.

@casassg
Copy link
Contributor

casassg commented Oct 6, 2020

(note that you may need to clean up code, my code suggestions was mostly to point how to use it)

Also, may be interesting to add a unit test that executes the DAG end to end?

@vikramkoka
Copy link
Contributor Author

(note that you may need to clean up code, my code suggestions was mostly to point how to use it)

Also, may be interesting to add a unit test that executes the DAG end to end?

Yes, absolutely. Will make the changes and keep updating the PR, just so that you have the visibility.

@vikramkoka
Copy link
Contributor Author

A thought: should add somewhere the get_current_context function, just to showcase?

Your point is well taken and I should have stated up front, that I would like to add one more example which is more complex which contains both the 'get_current_context', as well as a BashOperator to demonstrate how to use functional Dags with non-python (more classic) operators.

@turbaszek
Copy link
Member

Also, may be interesting to add a unit test that executes the DAG end to end?

Plus +1 for that, this should be simple as adding this DAG id to:

@pytest.mark.system("core")
class TestExampleDagsSystem(SystemTest):
@parameterized.expand([
"example_bash_operator",
"example_branch_operator"
])
def test_dag_example(self, dag_id):
self.run_dag(dag_id=dag_id)

@github-actions
Copy link

github-actions bot commented Oct 8, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

github-actions bot commented Oct 9, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

github-actions bot commented Oct 9, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

github-actions bot commented Oct 9, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

github-actions bot commented Oct 9, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

@github-actions
Copy link

github-actions bot commented Oct 9, 2020

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.

airflow/example_dags/tutorial_functional_etl_dag.py Outdated Show resolved Hide resolved
airflow/example_dags/tutorial_functional_etl_dag.py Outdated Show resolved Hide resolved

What's Next?
-------------
You have seen how simple it is to write DAGs using the Functional DAG paradigm within
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have this point to https://github.com/apache/airflow/blob/master/docs/concepts.rst#functional-dags and https://github.com/apache/airflow/blob/master/docs/concepts.rst#functional-dags more specifically.

Also, it may be interesting to link the AIP-31 for more insight into this and potentially my talk and Jonathan's article as learn more resources? CC @turbaszek for thoughts

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for linking AIP and this article for more insight:
https://medium.com/databand-ai/aip-31-airflow-functional-dag-definition-b34852a632d0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the links to the AIP and to the Functional DAGs section of the Concept doc.

I was not sure if it was ok to add a link to an external site which has a paywall (Medium) from the Apache open source docs, so did not add that.

vikramkoka and others added 9 commits October 13, 2020 15:41
Created a new Airflow tutorial to use Decorated Flows (a.k.a. functional
DAGs). Also created a DAG to perform the same operations without using
functional DAGs to be compatible with Airflow 1.10.x and to show the
difference.
It makes sense to simplify the return variables being passed around without needlessly converting to JSON and then reconverting back.

Co-authored-by: Gerard Casas Saez <casassg@users.noreply.github.com>
Fixed data passing between tasks to be more natural without converting to JSON and converting back to variables.
Based on feedback on the PR, updated the DAG options (including schedule) and the fixed the task documentation to avoid indentation.
Added the tutorial documentation to the docs directory. Fixed linting errors in the example dags.
Tweaked some doc references in the example dags for inclusion into the tutorial documentation.
Added the example dags to example tests.
Had a multiple_outputs=True defined in the Extract task defn, which was unnecessary. - Removed based on feedback.
Added word summarization to the spelling list in the docs directory to
fix spell check error in tutorial. Also fixed another sentence error in
the tutorial document.
Copy link
Contributor Author

@vikramkoka vikramkoka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me @ashb

@ashb ashb marked this pull request as ready for review October 13, 2020 15:48
@ashb ashb added this to the Airflow 2.0.0-alpha1 milestone Oct 13, 2020
@ashb
Copy link
Member

ashb commented Oct 13, 2020

Ran tests local (this time on the right commit) so merging.

@ashb ashb merged commit 095756c into apache:master Oct 13, 2020
@ashb ashb deleted the func_tutorial branch October 13, 2020 15:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-31 Task Flow API for nicer DAG definition
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[AIP-31] Update Airflow tutorial to use functional DAGs
5 participants