From 69bb343f376dc44e5e573d6f7ce003dcd74e32c2 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 28 Dec 2022 17:14:17 +0530 Subject: [PATCH] Add script to deploy python sdk on astro Deploy astro CI job --- .github/workflows/ci-benchmark.yaml | 17 +- .gitignore | 4 + .../tests_integration/astro_deploy/Dockerfile | 12 ++ .../tests_integration/astro_deploy/deploy.sh | 103 +++++++++ .../astro_deploy/master_dag.py | 201 ++++++++++++++++++ 5 files changed, 336 insertions(+), 1 deletion(-) create mode 100644 python-sdk/tests_integration/astro_deploy/Dockerfile create mode 100644 python-sdk/tests_integration/astro_deploy/deploy.sh create mode 100644 python-sdk/tests_integration/astro_deploy/master_dag.py diff --git a/.github/workflows/ci-benchmark.yaml b/.github/workflows/ci-benchmark.yaml index 6861464334..8b00a8fffe 100644 --- a/.github/workflows/ci-benchmark.yaml +++ b/.github/workflows/ci-benchmark.yaml @@ -5,6 +5,7 @@ on: - labeled schedule: - cron: '0 0 * * 1' + - cron: '0 0 * * *' # This allows a subsequently queued workflow run to interrupt and cancel previous runs concurrency: @@ -14,7 +15,7 @@ concurrency: jobs: Run-Benchmark: if: >- - github.event_name == 'schedule' || + (github.event_name == 'schedule' && github.event.schedule != '0 0 * * *') || ( github.event.label.name == 'run_benchmark' && github.event.name != 'pull_request_target' @@ -64,3 +65,17 @@ jobs: ( echo "cat <python-sdk/test-connections.yaml"; cat .github/ci-test-connections.yaml; ) >python-sdk/test-connections.yaml && . python-sdk/test-connections.yaml python -c 'import os; print(os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON", "").strip())' > ${{ env.GOOGLE_APPLICATION_CREDENTIALS }} cd python-sdk/tests/benchmark && make + + Astro-deploy: + if: >- + (github.event_name == 'schedule' && github.event.schedule != '0 0 * * 1') + env: + ASTRO_DOCKER_REGISTRY: ${{ secrets.ASTRO_DOCKER_REGISTRY }} + ASTRO_ORGANIZATION_ID: ${{ secrets.ASTRO_ORGANIZATION_ID }} + ASTRO_DEPLOYMENT_ID: ${{ secrets.ASTRO_DEPLOYMENT_ID }} + ASTRO_KEY_ID: ${{ secrets.ASTRO_KEY_ID }} + ASTRO_KEY_SECRET: ${{ secrets.ASTRO_KEY_SECRET }} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: cd python-sdk/tests_integration/astro_deploy && sh deploy.sh $ASTRO_DOCKER_REGISTRY $ASTRO_ORGANIZATION_ID $ASTRO_DEPLOYMENT_ID $ASTRO_KEY_ID $ASTRO_KEY_SECRET diff --git a/.gitignore b/.gitignore index bc8da2bd59..35276e552e 100644 --- a/.gitignore +++ b/.gitignore @@ -48,6 +48,10 @@ python-sdk/docs/autoapi/ **/dev/logs **/dev/plugins +# Astro deployment related +python-sdk/tests_integration/astro_deploy/example_dags +python-sdk/tests_integration/astro_deploy/python-sdk + # Generated by tests sql-cli/tests/.airflow/dags/* !sql-cli/tests/.airflow/dags/sql/.gitkeep diff --git a/python-sdk/tests_integration/astro_deploy/Dockerfile b/python-sdk/tests_integration/astro_deploy/Dockerfile new file mode 100644 index 0000000000..2c8e85ec18 --- /dev/null +++ b/python-sdk/tests_integration/astro_deploy/Dockerfile @@ -0,0 +1,12 @@ +FROM quay.io/astronomer/astro-runtime:7.0.0-base + +COPY python-sdk /tmp/python-sdk +RUN pip install /tmp/python-sdk[all] +RUN pip install apache-airflow[slack] + +RUN mkdir -p ${AIRFLOW_HOME}/dags + +COPY example_dags/ ${AIRFLOW_HOME}/dags/ +COPY master_dag.py/ ${AIRFLOW_HOME}/dags/ + +RUN ls ${AIRFLOW_HOME}/dags/ diff --git a/python-sdk/tests_integration/astro_deploy/deploy.sh b/python-sdk/tests_integration/astro_deploy/deploy.sh new file mode 100644 index 0000000000..efcdf8ce7b --- /dev/null +++ b/python-sdk/tests_integration/astro_deploy/deploy.sh @@ -0,0 +1,103 @@ +#!/usr/bin/env bash + +# Make the script exit with the status if one of the commands fails. Without this, the Airflow task calling this script +# will be marked as 'success' and the DAG will proceed on to the subsequent tasks. +set -e + +# This script deploys to an already existing Astro Cloud Airflow deployment. +# It currently does not support creating a new deployment. +# +# Execute the script with below positional params +# bash deploy.sh +# - ASTRO_DOCKER_REGISTRY: Docker registry domain. Script will push the docker image here. +# - ASTRO_ORGANIZATION_ID: Astro cloud deployment organization Id. Get it from UI. +# - ASTRO_DEPLOYMENT_ID: Astro cloud deployment Id. Get it from UI. +# - ASTRO_KEY_ID: Astro cloud deployment service account API key Id. +# - ASTRO_KEY_SECRET: Astro cloud deployment service account API key secret. + +SCRIPT_PATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" +PROJECT_PATH=${SCRIPT_PATH}/../.. + +function echo_help() { + echo "Usage:" + echo "ASTRO_DOCKER_REGISTRY: Docker registry" + echo "ASTRO_ORGANIZATION_ID Astro cloud organization Id" + echo "ASTRO_DEPLOYMENT_ID Astro cloud Deployment id" + echo "ASTRO_KEY_ID Astro cloud service account API key id" + echo "ASTRO_KEY_SECRET Astro cloud service account API key secret" + echo "bash deploy.sh " +} + +if [ "$1" == "-h" ]; then + echo_help + exit +fi + +# Delete if source old source files exist +function clean() { + if [ -d "${SCRIPT_PATH}"/python-sdk ]; then rm -Rf "${SCRIPT_PATH}"/python-sdk; fi + if [ -d "${SCRIPT_PATH}"/example_dags ]; then rm -Rf "${SCRIPT_PATH}"/example_dags; fi +} + +ASTRO_DOCKER_REGISTRY=$1 +ASTRO_ORGANIZATION_ID=$2 +ASTRO_DEPLOYMENT_ID=$3 +ASTRO_KEY_ID=$4 +ASTRO_KEY_SECRET=$5 + +clean + + +# Copy source files +mkdir "${SCRIPT_PATH}"/python-sdk +cp -r "${PROJECT_PATH}"/src "${SCRIPT_PATH}"/python-sdk +cp -r "${PROJECT_PATH}"/pyproject.toml "${SCRIPT_PATH}"/python-sdk +cp -r "${PROJECT_PATH}"/README.md "${SCRIPT_PATH}"/python-sdk +cp -r "${PROJECT_PATH}"/example_dags "${SCRIPT_PATH}"/example_dags + +# Build image and deploy +BUILD_NUMBER=$(awk 'BEGIN {srand(); print srand()}') +IMAGE_NAME=${ASTRO_DOCKER_REGISTRY}/${ASTRO_ORGANIZATION_ID}/${ASTRO_DEPLOYMENT_ID}:ci-${BUILD_NUMBER} +docker build --platform=linux/amd64 -t "${IMAGE_NAME}" -f "${SCRIPT_PATH}"/Dockerfile "${SCRIPT_PATH}" +docker login "${ASTRO_DOCKER_REGISTRY}" -u "${ASTRO_KEY_ID}" -p "${ASTRO_KEY_SECRET}" +docker push "${IMAGE_NAME}" + +TOKEN=$( curl --location --request POST "https://auth.astronomer.io/oauth/token" \ + --header "content-type: application/json" \ + --data-raw "{ + \"client_id\": \"$ASTRO_KEY_ID\", + \"client_secret\": \"$ASTRO_KEY_SECRET\", + \"audience\": \"astronomer-ee\", + \"grant_type\": \"client_credentials\"}" | jq -r '.access_token' ) + +# Step 5. Create the Image +echo "get image id" +IMAGE=$( curl --location --request POST "https://api.astronomer.io/hub/v1" \ + --header "Authorization: Bearer $TOKEN" \ + --header "Content-Type: application/json" \ + --data-raw "{ + \"query\" : \"mutation imageCreate(\n \$input: ImageCreateInput!\n) {\n imageCreate (\n input: \$input\n) {\n id\n tag\n repository\n digest\n env\n labels\n deploymentId\n }\n}\", + \"variables\" : { + \"input\" : { + \"deploymentId\" : \"$ASTRO_DEPLOYMENT_ID\", + \"tag\" : \"ci-$BUILD_NUMBER\" + } + } + }" | jq -r '.data.imageCreate.id') +# Step 6. Deploy the Image +echo "deploy image" +curl --location --request POST "https://api.astronomer.io/hub/v1" \ + --header "Authorization: Bearer $TOKEN" \ + --header "Content-Type: application/json" \ + --data-raw "{ + \"query\" : \"mutation imageDeploy(\n \$input: ImageDeployInput!\n ) {\n imageDeploy(\n input: \$input\n ) {\n id\n deploymentId\n digest\n env\n labels\n name\n tag\n repository\n }\n}\", + \"variables\" : { + \"input\" : { + \"id\" : \"$IMAGE\", + \"tag\" : \"ci-$BUILD_NUMBER\", + \"repository\" : \"images.astronomer.cloud/$ASTRO_ORGANIZATION_ID/$ASTRO_DEPLOYMENT_ID\" + } + } + }" + +clean diff --git a/python-sdk/tests_integration/astro_deploy/master_dag.py b/python-sdk/tests_integration/astro_deploy/master_dag.py new file mode 100644 index 0000000000..8edf732b65 --- /dev/null +++ b/python-sdk/tests_integration/astro_deploy/master_dag.py @@ -0,0 +1,201 @@ +import logging +import os +import time +from datetime import datetime +from typing import Any, List + +from airflow import DAG +from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator +from airflow.models import DagRun +from airflow.models.baseoperator import chain +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import PythonOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.utils.session import create_session + +SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "#provider-alert") +SLACK_WEBHOOK_CONN = os.getenv("SLACK_WEBHOOK_CONN", "http_slack") +SLACK_USERNAME = os.getenv("SLACK_USERNAME", "airflow_app") + + +def get_report(dag_run_ids: List[str], **context: Any) -> None: + """Fetch dags run details and generate report""" + with create_session() as session: + last_dags_runs: List[DagRun] = session.query(DagRun).filter(DagRun.run_id.in_(dag_run_ids)).all() + message_list: List[str] = [] + + airflow_version = context["ti"].xcom_pull(task_ids="get_airflow_version") + airflow_version_message = f"Airflow version for the below run is `{airflow_version}` \n\n" + message_list.append(airflow_version_message) + + for dr in last_dags_runs: + dr_status = f" *{dr.dag_id} : {dr.get_state()}* \n" + message_list.append(dr_status) + for ti in dr.get_task_instances(): + task_code = ":black_circle: " + if not ((ti.task_id == "end") or (ti.task_id == "get_report")): + if ti.state == "success": + task_code = ":large_green_circle: " + elif ti.state == "failed": + task_code = ":red_circle: " + elif ti.state == "upstream_failed": + task_code = ":large_orange_circle: " + task_message_str = f"{task_code} {ti.task_id} : {ti.state} \n" + message_list.append(task_message_str) + + logging.info("%s", "".join(message_list)) + # Send dag run report on Slack + try: + SlackWebhookOperator( + task_id="slack_alert", + http_conn_id=SLACK_WEBHOOK_CONN, + message="".join(message_list), + channel=SLACK_CHANNEL, + username=SLACK_USERNAME, + ).execute(context=None) + except Exception as exception: + logging.exception("Error occur while sending slack alert.") + raise exception + + +def prepare_dag_dependency(task_info, execution_time): + """Prepare list of TriggerDagRunOperator task and dags run ids for dags of same providers""" + _dag_run_ids = [] + _task_list = [] + for _example_dag in task_info: + _task_id = list(_example_dag.keys())[0] + + _run_id = f"{_task_id}_{_example_dag.get(_task_id)}_" + execution_time + _dag_run_ids.append(_run_id) + _task_list.append( + TriggerDagRunOperator( + task_id=_task_id, + trigger_dag_id=_example_dag.get(_task_id), + trigger_run_id=_run_id, + wait_for_completion=True, + reset_dag_run=True, + execution_date=execution_time, + allowed_states=["success", "failed", "skipped"], + ) + ) + return _task_list, _dag_run_ids + + +with DAG( + dag_id="example_master_dag", + schedule_interval="@daily", + start_date=datetime(2023, 1, 1), + catchup=False, + tags=["master_dag"], +) as dag: + start = PythonOperator( + task_id="start", + python_callable=lambda: time.sleep(30), + ) + + list_installed_pip_packages = BashOperator( + task_id="list_installed_pip_packages", bash_command="pip freeze" + ) + + get_airflow_version = BashOperator( + task_id="get_airflow_version", bash_command="airflow version", do_xcom_push=True + ) + + dag_run_ids = [] + + load_file_task_info = [ + {"example_load_file": "example_load_file"}, + {"example_google_bigquery_gcs_load_and_save": "example_google_bigquery_gcs_load_and_save"}, + {"example_amazon_s3_postgres_load_and_save": "example_amazon_s3_postgres_load_and_save"}, + {"example_amazon_s3_postgres": "example_amazon_s3_postgres"}, + ] + + load_file_trigger_tasks, ids = prepare_dag_dependency(load_file_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*load_file_trigger_tasks) + + transform_task_info = [ + {"example_transform": "example_transform"}, + {"example_transform_file": "example_transform_file"}, + {"example_sqlite_load_transform": "example_sqlite_load_transform"}, + {"example_amazon_s3_snowflake_transform": "example_amazon_s3_snowflake_transform"}, + {"calculate_popular_movies": "calculate_popular_movies"}, + ] + + transform_trigger_tasks, ids = prepare_dag_dependency(transform_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*transform_trigger_tasks) + + dataframe_task_info = [ + {"example_dataframe": "example_dataframe"}, + {"calculate_top_2_movies_using_dataframe": "calculate_top_2_movies_using_dataframe"}, + ] + + dataframe_trigger_tasks, ids = prepare_dag_dependency(dataframe_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*dataframe_trigger_tasks) + + append_task_info = [ + {"example_append": "example_append"}, + {"example_snowflake_partial_table_with_append": "example_snowflake_partial_table_with_append"}, + ] + + append_trigger_tasks, ids = prepare_dag_dependency(append_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*append_trigger_tasks) + + merge_trigger_tasks = [{"example_merge_bigquery": "example_merge_bigquery"}] + + merge_trigger_tasks, ids = prepare_dag_dependency(merge_trigger_tasks, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*merge_trigger_tasks) + + dynamic_task_info = [ + {"example_dynamic_map_task": "example_dynamic_map_task"}, + {"example_dynamic_task_template": "example_dynamic_task_template"}, + ] + + dynamic_task_trigger_tasks, ids = prepare_dag_dependency(dynamic_task_info, "{{ ds }}") + dag_run_ids.extend(ids) + chain(*dynamic_task_trigger_tasks) + + # example_dataset_producer [TODO] + + report = PythonOperator( + task_id="get_report", + python_callable=get_report, + op_kwargs={"dag_run_ids": dag_run_ids}, + trigger_rule="all_done", + provide_context=True, + ) + + end = DummyOperator( + task_id="end", + trigger_rule="all_success", + ) + + start >> [ + list_installed_pip_packages, + get_airflow_version, + load_file_trigger_tasks[0], + transform_trigger_tasks[0], + dataframe_trigger_tasks[0], + append_trigger_tasks[0], + merge_trigger_tasks[0], + dynamic_task_trigger_tasks[0], + ] + + last_task = [ + list_installed_pip_packages, + get_airflow_version, + load_file_trigger_tasks[-1], + transform_trigger_tasks[-1], + dataframe_trigger_tasks[-1], + append_trigger_tasks[-1], + merge_trigger_tasks[-1], + dynamic_task_trigger_tasks[-1], + ] + + last_task >> end + last_task >> report