diff --git a/.github/workflows/ci-benchmark.yaml b/.github/workflows/ci-benchmark.yaml index 6861464334..8b00a8fffe 100644 --- a/.github/workflows/ci-benchmark.yaml +++ b/.github/workflows/ci-benchmark.yaml @@ -5,6 +5,7 @@ on: - labeled schedule: - cron: '0 0 * * 1' + - cron: '0 0 * * *' # This allows a subsequently queued workflow run to interrupt and cancel previous runs concurrency: @@ -14,7 +15,7 @@ concurrency: jobs: Run-Benchmark: if: >- - github.event_name == 'schedule' || + (github.event_name == 'schedule' && github.event.schedule != '0 0 * * *') || ( github.event.label.name == 'run_benchmark' && github.event.name != 'pull_request_target' @@ -64,3 +65,17 @@ jobs: ( echo "cat <python-sdk/test-connections.yaml"; cat .github/ci-test-connections.yaml; ) >python-sdk/test-connections.yaml && . python-sdk/test-connections.yaml python -c 'import os; print(os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON", "").strip())' > ${{ env.GOOGLE_APPLICATION_CREDENTIALS }} cd python-sdk/tests/benchmark && make + + Astro-deploy: + if: >- + (github.event_name == 'schedule' && github.event.schedule != '0 0 * * 1') + env: + ASTRO_DOCKER_REGISTRY: ${{ secrets.ASTRO_DOCKER_REGISTRY }} + ASTRO_ORGANIZATION_ID: ${{ secrets.ASTRO_ORGANIZATION_ID }} + ASTRO_DEPLOYMENT_ID: ${{ secrets.ASTRO_DEPLOYMENT_ID }} + ASTRO_KEY_ID: ${{ secrets.ASTRO_KEY_ID }} + ASTRO_KEY_SECRET: ${{ secrets.ASTRO_KEY_SECRET }} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: cd python-sdk/tests_integration/astro_deploy && sh deploy.sh $ASTRO_DOCKER_REGISTRY $ASTRO_ORGANIZATION_ID $ASTRO_DEPLOYMENT_ID $ASTRO_KEY_ID $ASTRO_KEY_SECRET diff --git a/.gitignore b/.gitignore index bc8da2bd59..ab9ef13752 100644 --- a/.gitignore +++ b/.gitignore @@ -48,6 +48,11 @@ python-sdk/docs/autoapi/ **/dev/logs **/dev/plugins +# Astro deployment related +python-sdk/tests_integration/astro_deploy/example_dags +python-sdk/tests_integration/astro_deploy/python-sdk +python-sdk/tests_integration/astro_deploy/tests + # Generated by tests sql-cli/tests/.airflow/dags/* !sql-cli/tests/.airflow/dags/sql/.gitkeep diff --git a/python-sdk/example_dags/example_amazon_s3_postgres.py b/python-sdk/example_dags/example_amazon_s3_postgres.py index 9f7241ec48..920bedab06 100644 --- a/python-sdk/example_dags/example_amazon_s3_postgres.py +++ b/python-sdk/example_dags/example_amazon_s3_postgres.py @@ -1,5 +1,5 @@ import os -from datetime import datetime, timedelta +from datetime import datetime from airflow.models import DAG from pandas import DataFrame @@ -20,8 +20,9 @@ dag_id="example_amazon_s3_postgres", start_date=datetime(2019, 1, 1), max_active_runs=3, - schedule_interval=timedelta(minutes=30), + schedule_interval=None, default_args=default_args, + catchup=False, ) diff --git a/python-sdk/example_dags/example_amazon_s3_postgres_load_and_save.py b/python-sdk/example_dags/example_amazon_s3_postgres_load_and_save.py index dca3b6b8d0..8ff5b2767f 100644 --- a/python-sdk/example_dags/example_amazon_s3_postgres_load_and_save.py +++ b/python-sdk/example_dags/example_amazon_s3_postgres_load_and_save.py @@ -21,6 +21,7 @@ schedule_interval=None, start_date=timezone.utcnow(), tags=["demo"], + catchup=False, ) def example_amazon_s3_postgres_load_and_save(): t1 = aql.load_file( diff --git a/python-sdk/example_dags/example_amazon_s3_snowflake_transform.py b/python-sdk/example_dags/example_amazon_s3_snowflake_transform.py index 0d7152bf5f..59ca1f4a42 100644 --- a/python-sdk/example_dags/example_amazon_s3_snowflake_transform.py +++ b/python-sdk/example_dags/example_amazon_s3_snowflake_transform.py @@ -40,7 +40,7 @@ def aggregate_data(df: pd.DataFrame): @dag( start_date=datetime(2021, 1, 1), max_active_runs=1, - schedule_interval="@daily", + schedule_interval=None, default_args={ "email_on_failure": False, "retries": 0, diff --git a/python-sdk/example_dags/example_append.py b/python-sdk/example_dags/example_append.py index 4a390803e4..0b6d804523 100644 --- a/python-sdk/example_dags/example_append.py +++ b/python-sdk/example_dags/example_append.py @@ -1,5 +1,5 @@ import pathlib -from datetime import datetime, timedelta +from datetime import datetime from airflow.models import DAG @@ -19,8 +19,9 @@ dag_id="example_append", start_date=datetime(2019, 1, 1), max_active_runs=3, - schedule_interval=timedelta(minutes=30), + schedule_interval=None, default_args=default_args, + catchup=False, ) DATA_DIR = str(CWD) + "/data/" diff --git a/python-sdk/example_dags/example_dataframe_api.py b/python-sdk/example_dags/example_dataframe_api.py index dd79ee26a5..889a0c2294 100644 --- a/python-sdk/example_dags/example_dataframe_api.py +++ b/python-sdk/example_dags/example_dataframe_api.py @@ -3,6 +3,7 @@ from io import StringIO from typing import List +import airflow import pandas as pd import requests from airflow import DAG @@ -17,7 +18,13 @@ @task or @aql.transform tasks. """ -ALLOWED_DESERIALIZATION_CLASSES = os.getenv("AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES") +# We need deserialization classes for airflow 2.5 with the value where as 2.2.5 we don't need it. +if airflow.__version__ == "2.2.5": + ALLOWED_DESERIALIZATION_CLASSES = os.getenv("AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES") +else: + ALLOWED_DESERIALIZATION_CLASSES = os.getenv( + "AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES", default="airflow\\.* astro\\.*" + ) def _load_covid_data(): diff --git a/python-sdk/example_dags/example_google_bigquery_gcs_load_and_save.py b/python-sdk/example_dags/example_google_bigquery_gcs_load_and_save.py index 8fbbb97a66..142de8bd06 100644 --- a/python-sdk/example_dags/example_google_bigquery_gcs_load_and_save.py +++ b/python-sdk/example_dags/example_google_bigquery_gcs_load_and_save.py @@ -26,6 +26,7 @@ dag_id="example_google_bigquery_gcs_load_and_save", schedule_interval=None, start_date=timezone.datetime(2022, 1, 1), + catchup=False, ) as dag: # [START load_file_http_example] t1 = aql.load_file( diff --git a/python-sdk/example_dags/example_load_file.py b/python-sdk/example_dags/example_load_file.py index 1bdf68da27..1696c1d6fa 100644 --- a/python-sdk/example_dags/example_load_file.py +++ b/python-sdk/example_dags/example_load_file.py @@ -18,7 +18,7 @@ """ import os import pathlib -from datetime import datetime, timedelta +from datetime import datetime import sqlalchemy from airflow.models import DAG @@ -51,8 +51,9 @@ dag_id="example_load_file", start_date=datetime(2019, 1, 1), max_active_runs=3, - schedule_interval=timedelta(minutes=30), + schedule_interval=None, default_args=default_args, + catchup=False, ) diff --git a/python-sdk/example_dags/example_merge.py b/python-sdk/example_dags/example_merge.py index 5c73c6ac9a..a44e742abe 100644 --- a/python-sdk/example_dags/example_merge.py +++ b/python-sdk/example_dags/example_merge.py @@ -1,5 +1,5 @@ import pathlib -from datetime import datetime, timedelta +from datetime import datetime from airflow.models import DAG from pandas import DataFrame @@ -22,8 +22,9 @@ dag_id="example_merge_bigquery", start_date=datetime(2019, 1, 1), max_active_runs=3, - schedule_interval=timedelta(minutes=30), + schedule_interval=None, default_args=default_args, + catchup=False, ) diff --git a/python-sdk/example_dags/example_snowflake_partial_table_with_append.py b/python-sdk/example_dags/example_snowflake_partial_table_with_append.py index 9eca4fe5b1..ac3bdd16c7 100644 --- a/python-sdk/example_dags/example_snowflake_partial_table_with_append.py +++ b/python-sdk/example_dags/example_snowflake_partial_table_with_append.py @@ -69,9 +69,9 @@ def create_table(table: Table): """ -@dag(start_date=datetime(2021, 12, 1), schedule_interval="@daily", catchup=False) +@dag(start_date=datetime(2021, 12, 1), schedule_interval=None, catchup=False) def example_snowflake_partial_table_with_append(): - homes_reporting = Table(conn_id=SNOWFLAKE_CONN_ID) + homes_reporting = Table(name="homes_reporting", temp=True, conn_id=SNOWFLAKE_CONN_ID) create_results_table = create_table(table=homes_reporting, conn_id=SNOWFLAKE_CONN_ID) # [END howto_run_raw_sql_snowflake_1] diff --git a/python-sdk/tests_integration/astro_deploy/Dockerfile b/python-sdk/tests_integration/astro_deploy/Dockerfile new file mode 100644 index 0000000000..5f142f21e2 --- /dev/null +++ b/python-sdk/tests_integration/astro_deploy/Dockerfile @@ -0,0 +1,15 @@ +FROM quay.io/astronomer/astro-runtime:7.2.0-base + +COPY python-sdk /tmp/python-sdk +RUN pip install /tmp/python-sdk[all] +RUN pip install apache-airflow[slack] + +RUN mkdir -p ${AIRFLOW_HOME}/dags +RUN mkdir -p ${AIRFLOW_HOME}/tests + +COPY example_dags/ ${AIRFLOW_HOME}/dags/ +COPY master_dag.py/ ${AIRFLOW_HOME}/dags/ +COPY example_snowflake_cleanup.py/ ${AIRFLOW_HOME}/dags/ +COPY tests/ ${AIRFLOW_HOME}/tests/ + +RUN ls ${AIRFLOW_HOME}/dags/ diff --git a/python-sdk/tests_integration/astro_deploy/deploy.sh b/python-sdk/tests_integration/astro_deploy/deploy.sh new file mode 100644 index 0000000000..b284d3acce --- /dev/null +++ b/python-sdk/tests_integration/astro_deploy/deploy.sh @@ -0,0 +1,107 @@ +#!/usr/bin/env bash + +# Make the script exit with the status if one of the commands fails. Without this, the Airflow task calling this script +# will be marked as 'success' and the DAG will proceed on to the subsequent tasks. +set -e + +# This script deploys to an already existing Astro Cloud Airflow deployment. +# It currently does not support creating a new deployment. +# +# Execute the script with below positional params +# bash deploy.sh +# - ASTRO_DOCKER_REGISTRY: Docker registry domain. Script will push the docker image here. +# - ASTRO_ORGANIZATION_ID: Astro cloud deployment organization Id. Get it from UI. +# - ASTRO_DEPLOYMENT_ID: Astro cloud deployment Id. Get it from UI. +# - ASTRO_KEY_ID: Astro cloud deployment service account API key Id. +# - ASTRO_KEY_SECRET: Astro cloud deployment service account API key secret. + +SCRIPT_PATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" +PROJECT_PATH=${SCRIPT_PATH}/../.. + +function echo_help() { + echo "Usage:" + echo "ASTRO_DOCKER_REGISTRY: Docker registry" + echo "ASTRO_ORGANIZATION_ID Astro cloud organization Id" + echo "ASTRO_DEPLOYMENT_ID Astro cloud Deployment id" + echo "ASTRO_KEY_ID Astro cloud service account API key id" + echo "ASTRO_KEY_SECRET Astro cloud service account API key secret" + echo "bash deploy.sh " +} + +if [ "$1" == "-h" ]; then + echo_help + exit +fi + +# Delete if source old source files exist +function clean() { + if [ -d "${SCRIPT_PATH}"/python-sdk ]; then rm -Rf "${SCRIPT_PATH}"/python-sdk; fi + if [ -d "${SCRIPT_PATH}"/example_dags ]; then rm -Rf "${SCRIPT_PATH}"/example_dags; fi + if [ -d "${SCRIPT_PATH}"/tests ]; then rm -Rf "${SCRIPT_PATH}"/tests; fi +} + +ASTRO_DOCKER_REGISTRY=$1 +ASTRO_ORGANIZATION_ID=$2 +ASTRO_DEPLOYMENT_ID=$3 +ASTRO_KEY_ID=$4 +ASTRO_KEY_SECRET=$5 + +clean + + +# Copy source files +mkdir "${SCRIPT_PATH}"/python-sdk +mkdir "${SCRIPT_PATH}"/tests +cp -r "${PROJECT_PATH}"/src "${SCRIPT_PATH}"/python-sdk +cp -r "${PROJECT_PATH}"/pyproject.toml "${SCRIPT_PATH}"/python-sdk +cp -r "${PROJECT_PATH}"/README.md "${SCRIPT_PATH}"/python-sdk +cp -r "${PROJECT_PATH}"/example_dags "${SCRIPT_PATH}"/example_dags +cp -r "${PROJECT_PATH}"/tests/data "${SCRIPT_PATH}"/tests/data + + +# Build image and deploy +BUILD_NUMBER=$(awk 'BEGIN {srand(); print srand()}') +IMAGE_NAME=${ASTRO_DOCKER_REGISTRY}/${ASTRO_ORGANIZATION_ID}/${ASTRO_DEPLOYMENT_ID}:ci-${BUILD_NUMBER} +docker build --platform=linux/amd64 -t "${IMAGE_NAME}" -f "${SCRIPT_PATH}"/Dockerfile "${SCRIPT_PATH}" +docker login "${ASTRO_DOCKER_REGISTRY}" -u "${ASTRO_KEY_ID}" -p "${ASTRO_KEY_SECRET}" +docker push "${IMAGE_NAME}" + +TOKEN=$( curl --location --request POST "https://auth.astronomer.io/oauth/token" \ + --header "content-type: application/json" \ + --data-raw "{ + \"client_id\": \"$ASTRO_KEY_ID\", + \"client_secret\": \"$ASTRO_KEY_SECRET\", + \"audience\": \"astronomer-ee\", + \"grant_type\": \"client_credentials\"}" | jq -r '.access_token' ) + +# Step 5. Create the Image +echo "get image id" +IMAGE=$( curl --location --request POST "https://api.astronomer.io/hub/v1" \ + --header "Authorization: Bearer $TOKEN" \ + --header "Content-Type: application/json" \ + --data-raw "{ + \"query\" : \"mutation imageCreate(\n \$input: ImageCreateInput!\n) {\n imageCreate (\n input: \$input\n) {\n id\n tag\n repository\n digest\n env\n labels\n deploymentId\n }\n}\", + \"variables\" : { + \"input\" : { + \"deploymentId\" : \"$ASTRO_DEPLOYMENT_ID\", + \"tag\" : \"ci-$BUILD_NUMBER\" + } + } + }" | jq -r '.data.imageCreate.id') +# Step 6. Deploy the Image +echo "deploy image" +curl --location --request POST "https://api.astronomer.io/hub/v1" \ + --header "Authorization: Bearer $TOKEN" \ + --header "Content-Type: application/json" \ + --data-raw "{ + \"query\" : \"mutation imageDeploy(\n \$input: ImageDeployInput!\n ) {\n imageDeploy(\n input: \$input\n ) {\n id\n deploymentId\n digest\n env\n labels\n name\n tag\n repository\n }\n}\", + \"variables\" : { + \"input\" : { + \"id\" : \"$IMAGE\", + \"tag\" : \"ci-$BUILD_NUMBER\", + \"repository\" : \"images.astronomer.cloud/$ASTRO_ORGANIZATION_ID/$ASTRO_DEPLOYMENT_ID\" + } + } + }" + +clean diff --git a/python-sdk/tests_integration/astro_deploy/example_snowflake_cleanup.py b/python-sdk/tests_integration/astro_deploy/example_snowflake_cleanup.py new file mode 100644 index 0000000000..fcce66a8cd --- /dev/null +++ b/python-sdk/tests_integration/astro_deploy/example_snowflake_cleanup.py @@ -0,0 +1,51 @@ +import os +from datetime import date, datetime, timedelta +from typing import Any + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator + +query = f"""SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE +TABLE_SCHEMA = '{os.environ.get("SNOWFLAKE_SCHEMA", "ASTROFLOW_CI")}' and +TABLE_OWNER = '{os.environ.get("SNOWFLAKE_TABLE_OWNER","AIRFLOW_TEST_USER")}' and + TABLE_NAME LIKE '_TMP_%' and CREATED < '{date.today() - timedelta(days = 1)}'""" + + +def make_drop_statements(task_instance: Any): + temp_tables = task_instance.xcom_pull(key="return_value", task_ids=["snowflake_op_sql_str"])[0] + delete_temp_tables = "" + # converting list of dictornary to sql statements + for temp_table in temp_tables: + temp_table = "DROP TABLE IF EXISTS " + temp_table["TABLE_NAME"] + ";" + delete_temp_tables += temp_table + print(len(delete_temp_tables)) + if len(delete_temp_tables) == 0: + delete_temp_tables = "Select 1" + return delete_temp_tables + + +def handle_result(result: Any): + return result.fetchall() + + +with DAG( + dag_id="example_snowflake_cleanup", + start_date=datetime(2021, 1, 1), + default_args={"snowflake_conn_id": "snowflake_conn"}, + tags=["example"], + schedule="@once", + catchup=False, +) as dag: + snowflake_op_sql_str = SnowflakeOperator(task_id="snowflake_op_sql_str", sql=query, handler=handle_result) + + create_drop_statement = PythonOperator( + task_id="create_drop_statement", python_callable=make_drop_statements + ) + + snowflake_op_sql_multiple_stmts = SnowflakeOperator( + task_id="snowflake_op_sql_multiple_stmts", + sql="{{ task_instance.xcom_pull(task_ids='create_drop_statement', dag_id='example_snowflake_cleanup', key='return_value') }}", # noqa: E501 + ) + + snowflake_op_sql_str >> create_drop_statement >> snowflake_op_sql_multiple_stmts # skipcq PYL-W0104 diff --git a/python-sdk/tests_integration/astro_deploy/master_dag.py b/python-sdk/tests_integration/astro_deploy/master_dag.py new file mode 100644 index 0000000000..e0665eca75 --- /dev/null +++ b/python-sdk/tests_integration/astro_deploy/master_dag.py @@ -0,0 +1,232 @@ +import logging +import os +import time +from datetime import datetime +from typing import Any, List + +from airflow import DAG +from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator +from airflow.models import DagRun +from airflow.models.baseoperator import chain +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import PythonOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.utils.session import create_session + +SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "#provider-alert") +SLACK_WEBHOOK_CONN = os.getenv("SLACK_WEBHOOK_CONN", "http_slack") +SLACK_USERNAME = os.getenv("SLACK_USERNAME", "airflow_app") + + +def get_report(dag_run_ids: List[str], **context: Any) -> None: + """Fetch dags run details and generate report""" + with create_session() as session: + last_dags_runs: List[DagRun] = session.query(DagRun).filter(DagRun.run_id.in_(dag_run_ids)).all() + message_list: List[str] = [] + + airflow_version = context["ti"].xcom_pull(task_ids="get_airflow_version") + airflow_version_message = f"Airflow version for the below astro-sdk run is `{airflow_version}` \n\n" + message_list.append(airflow_version_message) + + for dr in last_dags_runs: + dr_status = f" *{dr.dag_id} : {dr.get_state()}* \n" + message_list.append(dr_status) + for ti in dr.get_task_instances(): + task_code = ":black_circle: " + if ti.task_id not in ["end", "get_report"]: + if ti.state == "success": + task_code = ":large_green_circle: " + elif ti.state == "failed": + task_code = ":red_circle: " + elif ti.state == "upstream_failed": + task_code = ":large_orange_circle: " + task_message_str = f"{task_code} {ti.task_id} : {ti.state} \n" + message_list.append(task_message_str) + + logging.info("%s", "".join(message_list)) + # Send dag run report on Slack + try: + SlackWebhookOperator( + task_id="slack_alert", + http_conn_id=SLACK_WEBHOOK_CONN, + message="".join(message_list), + channel=SLACK_CHANNEL, + username=SLACK_USERNAME, + ).execute(context=None) + except Exception as exception: + logging.exception("Error occur while sending slack alert.") + raise exception + + +def prepare_dag_dependency(task_info, execution_time): + """Prepare list of TriggerDagRunOperator task and dags run ids for dags of same providers""" + _dag_run_ids = [] + _task_list = [] + for _example_dag in task_info: + _task_id = list(_example_dag.keys())[0] + + _run_id = f"{_task_id}_{_example_dag.get(_task_id)}_" + execution_time + _dag_run_ids.append(_run_id) + _task_list.append( + TriggerDagRunOperator( + task_id=_task_id, + trigger_dag_id=_example_dag.get(_task_id), + trigger_run_id=_run_id, + wait_for_completion=True, + reset_dag_run=True, + execution_date=execution_time, + allowed_states=["success", "failed", "skipped"], + ) + ) + return _task_list, _dag_run_ids + + +with DAG( + dag_id="example_master_dag", + schedule_interval="@daily", + start_date=datetime(2023, 1, 1), + catchup=False, + tags=["master_dag"], +) as dag: + start = PythonOperator( + task_id="start", + python_callable=lambda: time.sleep(30), + ) + + list_installed_pip_packages = BashOperator( + task_id="list_installed_pip_packages", bash_command="pip freeze" + ) + + get_airflow_version = BashOperator( + task_id="get_airflow_version", bash_command="airflow version", do_xcom_push=True + ) + + dag_run_ids = [] + + load_file_task_info = [ + {"example_google_bigquery_gcs_load_and_save": "example_google_bigquery_gcs_load_and_save"}, + {"example_amazon_s3_postgres_load_and_save": "example_amazon_s3_postgres_load_and_save"}, + {"example_amazon_s3_postgres": "example_amazon_s3_postgres"}, + {"example_load_file": "example_load_file"}, + ] + + load_file_trigger_tasks, ids = prepare_dag_dependency(load_file_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*load_file_trigger_tasks) + + transform_task_info = [ + {"example_transform": "example_transform"}, + {"example_transform_file": "example_transform_file"}, + {"example_amazon_s3_snowflake_transform": "example_amazon_s3_snowflake_transform"}, + {"calculate_popular_movies": "calculate_popular_movies"}, + {"example_transform_mssql": "example_transform_mssql"}, + {"example_sqlite_load_transform": "example_sqlite_load_transform"}, + { + "example_duckdb_load_transform_dataframe_and_save": "example_duckdb_load_transform_dataframe_and_save" + }, + ] + + transform_trigger_tasks, ids = prepare_dag_dependency(transform_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*transform_trigger_tasks) + + dataframe_task_info = [ + {"example_dataframe": "example_dataframe"}, + {"calculate_top_2_movies_using_dataframe": "calculate_top_2_movies_using_dataframe"}, + ] + + dataframe_trigger_tasks, ids = prepare_dag_dependency(dataframe_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*dataframe_trigger_tasks) + + append_task_info = [ + {"example_append": "example_append"}, + {"example_snowflake_partial_table_with_append": "example_snowflake_partial_table_with_append"}, + ] + + append_trigger_tasks, ids = prepare_dag_dependency(append_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*append_trigger_tasks) + + merge_trigger_tasks = [{"example_merge_bigquery": "example_merge_bigquery"}] + + merge_trigger_tasks, ids = prepare_dag_dependency(merge_trigger_tasks, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*merge_trigger_tasks) + + dynamic_task_info = [ + {"example_dynamic_map_task": "example_dynamic_map_task"}, + {"example_dynamic_task_template": "example_dynamic_task_template"}, + ] + + dynamic_task_trigger_tasks, ids = prepare_dag_dependency(dynamic_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*dynamic_task_trigger_tasks) + + data_validation_dags_info = [ + {"data_validation_check_table": "data_validation_check_table"}, + {"data_validation_check_column": "data_validation_check_column"}, + ] + + data_validation_trigger_tasks, ids = prepare_dag_dependency(data_validation_dags_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*data_validation_trigger_tasks) + + dataset_dags_info = [ + {"example_dataset_producer": "example_dataset_producer"}, + ] + + dataset_trigger_tasks, ids = prepare_dag_dependency(dataset_dags_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*dataset_trigger_tasks) + + cleanup_snowflake_task_info = [{"example_snowflake_cleanup": "example_snowflake_cleanup"}] + + cleanup_snowflake_trigger_tasks, ids = prepare_dag_dependency(cleanup_snowflake_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*cleanup_snowflake_trigger_tasks) + + report = PythonOperator( + task_id="get_report", + python_callable=get_report, + op_kwargs={"dag_run_ids": dag_run_ids}, + trigger_rule="all_done", + provide_context=True, + ) + + end = DummyOperator( + task_id="end", + trigger_rule="all_success", + ) + + start >> [ # skipcq PYL-W0104 + list_installed_pip_packages, + get_airflow_version, + load_file_trigger_tasks[0], + transform_trigger_tasks[0], + dataframe_trigger_tasks[0], + append_trigger_tasks[0], + merge_trigger_tasks[0], + dynamic_task_trigger_tasks[0], + data_validation_trigger_tasks[0], + dataset_trigger_tasks[0], + cleanup_snowflake_trigger_tasks[0], + ] + + last_task = [ + list_installed_pip_packages, + get_airflow_version, + load_file_trigger_tasks[-1], + transform_trigger_tasks[-1], + dataframe_trigger_tasks[-1], + append_trigger_tasks[-1], + merge_trigger_tasks[-1], + dynamic_task_trigger_tasks[-1], + data_validation_trigger_tasks[-1], + dataset_trigger_tasks[-1], + cleanup_snowflake_trigger_tasks[-1], + ] + + last_task >> end # skipcq PYL-W0104 + last_task >> report # skipcq PYL-W0104