Skip to content

Conversation

@josh-fell
Copy link
Contributor

Closes #10285

  • Updated example DAG files such that xcom_pull() calls use an operator's .output property as well access of TaskInstance objects from context to use get_current_context() function
  • Added comments to which task dependencies, if any, are handled and/or created via XComArgs for transparency
  • Removed or refactored the default_args pattern where necessary as requested by @ashb (i.e. removed a separated default_args declaration for deference for declaration as part of the DAG object)
  • Other miscellaneous updates based on .output refactoring

Note: There are several instances where the xcom_pull() call was not updated. These instances involve accessing a specific value within the XCom or calling user-defined macros with an XCom value. Reference #16618 for an open issue to enhance the XComArg functionality to provide similar behavior as the classic xcom_pull() method.

Note: Not all DAGs were tested functionally (i.e. with hard integrations to source systems and executed), however each DAG was tested to compile and generate a DAG graph as expected locally.

An detailed summary of all changes made as part of this PR can be found below:

DAG File Converted xcom_pull()? Other Updates? Comments
airflow/example_dags/example_bash_operator.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_branch_datetime_operator.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_branch_day_of_week_operator.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_branch_operator.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_branch_python_dop_operator_3.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_dag_decorator.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_kubernetes_executor_config.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_kubernetes_executor.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_passing_params_via_test_command.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_python_operator.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_python_operator.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_short_circuit_operator.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_skip_dag.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_trigger_controller_dag.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/example_trigger_target_dag.py No Yes Removed unneeded default_args pattern.

Updated to use get_current_context().
airflow/example_dags/example_xcom.py Yes Yes Included examples for context retrieval using get_current_context() based on previous PR comments to show new and old means of XCom access.

Removed explicit task dependencies that are created via XComArgs.

Removed unneeded default_args pattern.
airflow/example_dags/example_xcomargs.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/tutorial_etl_dag.py No No Not updated based on earlier comment to keep this file unchanged during initial PR.
airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py No Yes Removed unneeded default_args pattern.
airflow/example_dags/tutorial_taskflow_api_etl.py No Yes Removed unneeded default_args pattern.
airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py Yes Yes Removed explicit task dependencies that are created via XComArgs.

Removed unneeded default_args pattern.
airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py Yes Yes Removed explicit task dependencies that are created via XComArgs.

Refactored default_args pattern.
airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py No Yes Refactored default_args pattern.
airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py Yes Yes Removed explicit task dependencies that are created via XComArgs.

Refactored default_args pattern.
airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py Yes ** Yes ** Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update this occurrence:
"{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}".

All other xcom_pull() calls have been updated.

Removed explicit task dependencies that are created via XComArgs.

Refactored default_args pattern.
airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py No No Current XComArg object does not support accessing specific values of an iterable XCom value. Could not update any occurrences in this DAG.
airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py Yes Yes Removed explicit task dependencies that are created via XComArgs.

Updated to use get_current_context().
airflow/providers/apache/beam/example_dags/example_beam.py No No Current XComArg object does not support accessing specific values of an iterable XCom value. Could not update any occurrences in this DAG.
airflow/providers/apache/cassandra/example_dags/example_cassandra_dag.py No Yes Removed unneeded default_args pattern.
airflow/providers/apache/hive/example_dags/example_twitter_dag.py No Yes Refactored default_args pattern.
airflow/providers/apache/kylin/example_dags/example_kylin_dag.py Yes Yes Removed explicit task dependencies that are created via XComArgs.

Removed unneeded default_args pattern.
airflow/providers/apache/pig/example_dags/example_pig.py No Yes Removed unneeded default_args pattern.
airflow/providers/apache/spark/example_dags/example_spark_dag.py No Yes Removed unneeded default_args pattern.
airflow/providers/asana/example_dags/example_asana.py No Yes Removed unneeded default_args pattern.
airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py No Yes Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update this occurrence:
"echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"".

Removed unneeded default_args pattern.
airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py No Yes Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update this occurrence:
"{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}".

Refactored default_args pattern.
airflow/providers/asana/example_dags/example_asana.py No Yes Removed unneeded default_args pattern.
airflow/providers/dingding/example_dags/example_dingding.py No Yes Refactored default_args pattern.
airflow/providers/docker/example_dags/example_docker_copy_data.py Yes Yes Removed explicit task dependencies that are created via XComArgs.

Refactored default_args pattern.

Refactored ShortCircuitOperator callable logic.
airflow/providers/docker/example_dags/example_docker_swarm.py No Yes Refactored default_args pattern.
airflow/providers/docker/example_dags/example_docker.py No Yes Refactored default_args pattern.
airflow/providers/google/cloud/example_dags/example_automl_nl_text_classification.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_automl_nl_text_extraction.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_automl_nl_text_sentiment.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_automl_tables.py Yes ** Yes ** Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update the following occurrences:
"{{ extract_object_id(task_instance.xcom_pull('list_tables_spec_task')[0]) }}"
'{{ task_instance.xcom_pull("create_dataset_task")["name"] }}'
"{{ get_target_column_spec(task_instance.xcom_pull('list_columns_spec_task'), target) }}"

All other xcom_pull() calls have been updated.

Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_automl_translation.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_automl_video_intelligence_classification.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_automl_video_intelligence_tracking.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_automl_vision_classification.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_automl_vision_object_detection.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py No Yes Refactored default_args pattern.
airflow/providers/google/cloud/example_dags/example_bigquery_dts.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_bigquery_operations.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/cloud/example_dags/example_bigquery_queries.py Yes Yes Added globals assignment so both example DAGs are exposed rather than only the last one in the loop.
airflow/providers/google/cloud/example_dags/example_cloud_build.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py Yes ** No ** Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update this occurrence:
"user-{{ task_instance.xcom_pull('get-instance')['persistence_iam_identity']"

All other xcom_pull() calls have been updated.
airflow/providers/google/cloud/example_dags/example_datacatalog.py Yes ** Yes ** Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update this occurrence:
"echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\""

All other xcom_pull() calls have been updated.
airflow/providers/google/cloud/example_dags/example_cloud_sql.py Yes Yes Removed explicit task dependencies that are created via XComArgs.

Removed duplicate/redundant task.
airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py No Yes Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.

Removed unused default_args variable.
airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py Yes ** Yes ** Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update this occurrence:
"echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\""

All other xcom_pull() calls have been updated.
airflow/providers/google/cloud/example_dags/example_dataflow.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/cloud/example_dags/example_dataproc.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_datastore.py Yes ** Yes ** Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update these occurrences:
"{{task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}"
"{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}"

All other xcom_pull() calls have been updated.

Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_functions.py No Yes Added default_args to DAG as there was logic to populate the dict but not being applied.
airflow/providers/google/cloud/example_dags/example_dlp.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/cloud/example_dags/example_gcs.py Yes No
airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/cloud/example_dags/example_mlengine.py Yes Yes Removed unneeded default_args pattern.
airflow/providers/google/cloud/example_dags/example_natural_language.py Yes No
airflow/providers/google/cloud/example_dags/example_pubsub.py Yes ** Yes ** Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update the following occurrence:
"""{% for m in task_instance.xcom_pull('pull_messages') %} echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}" {% endfor %}"""

All other xcom_pull() calls have been updated.

Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_stackdriver.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/cloud/example_dags/example_tasks.py Yes No
airflow/providers/google/cloud/example_dags/example_translate.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/cloud/example_dags/example_video_intelligence.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/cloud/example_dags/example_vision.py Yes ** Yes ** Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update the following occurrences:
"echo {{ task_instance.xcom_pull('annotate_image')['logoAnnotations'][0]['description'] }}"
"echo {{ task_instance.xcom_pull('detect_text')['textAnnotations'][0] }}"
"echo {{ task_instance.xcom_pull('document_detect_text')['textAnnotations'][0] }}"
"echo {{ task_instance.xcom_pull('detect_labels')['labelAnnotations'][0] }}"

All other xcom_pull() calls have been updated.

Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/cloud/example_dags/example_workflows.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/leveldb/example_dags/example_leveldb.py No Yes Removed unneeded default_args pattern.
airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/marketing_platform/example_dags/example_display_video.py Yes ** Yes ** Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update the following occurrence:
'{{ task_instance.xcom_pull("create_sdf_download_task")["name"] }}'

All other xcom_pull() calls have been updated.

Removed explicit task dependencies that are created via XComArgs.

Updated this instance to reference the correct task rather than an non-existant one:
'{{ task_instance.xcom_pull("upload_sdf_to_bigquery")}}'
airflow/providers/google/marketing_platform/example_dags/example_search_ads.py Yes Yes Removed explicit task dependencies that are created via XComArgs.
airflow/providers/google/suite/example_dags/example_gcs_to_sheets.py No No Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update any occurrences in this DAG.
airflow/providers/google/suite/example_dags/example_sheets.py Yes ** No ** Current XComArg object does not support accessing specific values of an iterable XCom value elegantly. Did not update the following occurrences:
"{{ task_instance.xcom_pull('upload_sheet_to_gcs')[0] }}"

All other xcom_pull() calls have been updated.
airflow/providers/http/example_dags/example_http.py No Yes Refactored default_args pattern.
airflow/providers/jdbc/example_dags/example_jdbc_queries.py No Yes Refactored default_args pattern.
airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py Yes Yes Removed explicit task dependencies that are created via XComArgs.

Refactored grab_artifact_from_jenkins() function to take an input XComArgs from a previous task.

Refactored default_args pattern.
airflow/providers/microsoft/azure/example_dags/example_azure_container_instances.py No Yes Refactored default_args pattern.
airflow/providers/microsoft/azure/example_dags/example_azure_cosmosdb.py No Yes Refactored default_args pattern.
airflow/providers/microsoft/winrm/example_dags/example_winrm.py No Yes Removed default_args pattern.
airflow/providers/mysql/example_dags/example_mysql.py No Yes Removed default_args pattern.
airflow/providers/neo4j/example_dags/example_neo4j.py No Yes Removed default_args pattern.
airflow/providers/papermill/example_dags/example_papermill.py No Yes Removed default_args pattern.
airflow/providers/plexus/example_dags/example_plexus.py No Yes Refactored default_args pattern.
airflow/providers/postgres/example_dags/example_postgres.py No Yes Removed default_args pattern.
airflow/providers/qubole/example_dags/example_qubole.py No Yes Updated to use get_current_context().
airflow/providers/singularity/example_dags/example_singularity.py No Yes Refactored default_args pattern.
airflow/providers/snowflake/example_dags/example_snowflake.py No Yes Removed default_args pattern.
airflow/providers/sqlite/example_dags/example_sqlite.py No Yes Removed default_args pattern.
airflow/providers/tableau/example_dags/example_tableau_refresh_workbook.py No Yes Refactored default_args pattern.
airflow/providers/telegram/example_dags/example_telegram.py No Yes Removed default_args pattern.
airflow/providers/yandex/example_dags/example_yandexcloud_dataproc.py No Yes Removed default_args pattern.

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

josh-fell and others added 12 commits June 18, 2021 17:36
- Converted `xcom_pull()` calls to use `XComArg` where applicable
- Updated `context` access via `kwargs` to use `get_current_context`
- Removed explicit task dependencies which are created implicitly via `XComArgs`
- Added comments on the implicit `XComArg` task dependencies
- Removed and refactored `default_args` code pattern
- Converted `xcom_pull()` calls to use `XComArg` where applicable
- Updated `context` access via `kwargs` to use `get_current_context`
- Removed explicit task dependencies which are created implicitly via `XComArgs`
- Added comments on the implicit `XComArg` task dependencies
- Removed and refactored `default_args` code pattern
@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:providers provider:Apache provider:amazon AWS/Amazon - related issues labels Jul 6, 2021
@ephraimbuddy
Copy link
Contributor

I think this PR should be broken down into smaller PRs.
My suggestion:
1 PR for core examples.
1 PR for each of the provider's example dags.

Reviewing 86 files is not going to be easy.

@josh-fell
Copy link
Contributor Author

I think this PR should be broken down into smaller PRs.
My suggestion:
1 PR for core examples.
1 PR for each of the provider's example dags.

Reviewing 86 files is not going to be easy.

Solid suggestion. I will close this PR and open smaller, more manageable ones. Thanks for the feedback!

@josh-fell josh-fell closed this Jul 6, 2021
@mik-laj mik-laj mentioned this pull request Oct 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Update example DAG files to latest syntax

2 participants