Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a second example for TaskFlow API #12170

Closed
wants to merge 3 commits into from

Conversation

vikramkoka
Copy link
Contributor

Added a second example DAG for the TaskFlow API, this time using Pandas dataframes and files to be more realistic. This also illustrates the mix of task dependencies between task decorators and sensors.

@kaxil @turbaszek @casassg @ashb


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

order_data_df = pd.read_csv(order_data_file)

# convert to JSON so that it be returned using xcom
order_data_df_str = order_data_df.to_json(orient='split')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imho this example would show the full capabilities of TaskFlow API if it would use custom XCom backend for persisting data frame on external storage like gcs or s3. I know that @marclamberti was working on something around that 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks cool. Yeah, I generally pass xcoms with blob paths or filters (for parquet datasets) in order pass data to other tasks and would not really ever pass an actual dataframe or even json version of the dataframe through xcom.

Is it feasible for Airflow to support generating tasks similar to .pipe() with pandas (https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.pipe.html)? Or how your write queries with Spark.

etl = (extract().pipe(transform).pipe(load, var="total_order_value"))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turbaszek Yeah, Vikram is going to do a follow-up to show usage of Custom XCom backends in more depth and how users can use it too :) For now this PR shows how you can do it without needing to write a Custom XCom backend.

Imho this example would show the full capabilities of TaskFlow API if it would use custom XCom backend for persisting data frame on external storage like gcs or s3. I know that @marclamberti was working on something around that 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now this PR shows how you can do it without needing to write a Custom XCom backend.

@kaxil I think this PR shows "bad" example of using XCom - the dataframe can be arbitrary large and impact performance of Airflow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely a valid concern, I think the plan was to add a notice / warning to show all the use cases but was missed:

  1. Using XCom to pass Pandas Dataframe using TaskGroup for small data that fits DB (48 KB)
  2. To use XCom Backend to store it on GCS / S3 and still pass pandas Dataframe for larger data
  3. Using pickling vs json to (de)serialize XCom data.

The other nice to have would be to move the comments from the DAG file into docs similar to how you did in #11308 @vikramkoka so that we can have NOTES / WARNINGS that are highlighted and more readable too. I can help out in this effort.

How does that sound @turbaszek ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaxil Yes, I was planning on adding the 1,2, 3 use cases to the tutorial document as soon as I finished the PR for the custom Xcom backend.

I had not thought about adding that to this example file as well, but could do so.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with 2 and 3 but 1 still concerns me as this may make users to demand higher memory allowance or just forget about it ("I did as it was in example and it doesn't work").

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Nov 7, 2020
@github-actions
Copy link

github-actions bot commented Nov 7, 2020

The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!

Copy link
Member

@turbaszek turbaszek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have objections - showing that XCom can be used to store dump of arbitrary large dataframes in my opinion is not a best practice and may confuse users.

@github-actions github-actions bot removed the full tests needed We need to run full set of tests for this PR to merge label Nov 7, 2020
# [END instantiate_dag]

# [START documentation]
dag.doc_md = __doc__
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't needed any more I think - the decorator got changed to do this automatically I think.

Right @casassg ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to use the @dag decorator for that. I would strongly suggest doing so instead as its more in the TaskFlow API

@stale
Copy link

stale bot commented Dec 25, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 25, 2020
@potiuk potiuk added this to the Airflow 2.0 clean-up milestone Dec 25, 2020
@stale stale bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 25, 2020
@potiuk
Copy link
Member

potiuk commented Dec 25, 2020

Should you please complete it :) @vikramkoka @casassg ?

@vikramkoka
Copy link
Contributor Author

vikramkoka commented Dec 26, 2020 via email

Added a second example DAG for the TaskFlow API, this time using Pandas dataframe and files to be more realistic. This also illustrates the mix of task dependencies between task decorators and sensors.
Updated this taskflow API example to use DAG decorator and doc cleanup
Updated second taskflow api example with DAG decorator and added custom xcom backend example
which extends base Xcom functionality to support persistence of Pandas Dataframes to local filesystem
@kaxil kaxil requested a review from turbaszek January 11, 2021 17:38
Copy link
Member

@turbaszek turbaszek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general this looks good to me. I'm not sure if using dbm is not an overkill (especially if we want explain what is crucial for custom XCom).

Also, do we plan to add docs (I see the start/end anchors) or we just want to keep and example DAG?

@vikramkoka
Copy link
Contributor Author

In general this looks good to me. I'm not sure if using dbm is not an overkill (especially if we want explain what is crucial for custom XCom).

Also, do we plan to add docs (I see the start/end anchors) or we just want to keep and example DAG?

I was planning on adding a doc after this was approved by everyone as being a reasonable approach.

@@ -0,0 +1,149 @@
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion on where this file should live @turbaszek?

It can stay here too -- If we decide to keep it here, we need to apply the following diff as one of the tests is failing:

diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index da27016a0..f8b6f3811 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3235,6 +3235,7 @@ class TestSchedulerJob(unittest.TestCase):

         ignored_files = {
             'helper.py',
+            'tutorial_xcom_pandas.py',
         }
         example_dag_folder = airflow.example_dags.__path__[0]
         for root, _, files in os.walk(example_dag_folder):  # pylint: disable=too-many-nested-blocks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaxil there was discussion on devlist and I think we've never reached consensus. But in general we were against having custom XCom backend in code, but we may revisit it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turbaszek It is possible that I misunderstood the email thread on the devlist, but I thought the conclusion from that thread was that including examples of custom XCom backend is good. And that is exactly what this attempts to be - an example.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vikramkoka I have no strong opinion. I would only consider adding .airflowignore to avoid parsing non-dag files in example directories. Also if the example is there in code, we should make sure that it can be used by anyone.

@vikramkoka vikramkoka closed this Jan 18, 2021
@vikramkoka vikramkoka deleted the taskflow_api branch January 18, 2021 18:44
@vikramkoka
Copy link
Contributor Author

Closing pull request and withdrawing change of custom xcom backends

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants