Skip to content

Commit

Permalink
Add script to deploy python sdk on astro
Browse files Browse the repository at this point in the history
Deploy astro CI job
  • Loading branch information
pankajastro committed Jan 18, 2023
1 parent 769901e commit 69bb343
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 1 deletion.
17 changes: 16 additions & 1 deletion .github/workflows/ci-benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
- labeled
schedule:
- cron: '0 0 * * 1'
- cron: '0 0 * * *'

# This allows a subsequently queued workflow run to interrupt and cancel previous runs
concurrency:
Expand All @@ -14,7 +15,7 @@ concurrency:
jobs:
Run-Benchmark:
if: >-
github.event_name == 'schedule' ||
(github.event_name == 'schedule' && github.event.schedule != '0 0 * * *') ||
(
github.event.label.name == 'run_benchmark' &&
github.event.name != 'pull_request_target'
Expand Down Expand Up @@ -64,3 +65,17 @@ jobs:
( echo "cat <<EOF >python-sdk/test-connections.yaml"; cat .github/ci-test-connections.yaml; ) >python-sdk/test-connections.yaml && . python-sdk/test-connections.yaml
python -c 'import os; print(os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON", "").strip())' > ${{ env.GOOGLE_APPLICATION_CREDENTIALS }}
cd python-sdk/tests/benchmark && make
Astro-deploy:
if: >-
(github.event_name == 'schedule' && github.event.schedule != '0 0 * * 1')
env:
ASTRO_DOCKER_REGISTRY: ${{ secrets.ASTRO_DOCKER_REGISTRY }}
ASTRO_ORGANIZATION_ID: ${{ secrets.ASTRO_ORGANIZATION_ID }}
ASTRO_DEPLOYMENT_ID: ${{ secrets.ASTRO_DEPLOYMENT_ID }}
ASTRO_KEY_ID: ${{ secrets.ASTRO_KEY_ID }}
ASTRO_KEY_SECRET: ${{ secrets.ASTRO_KEY_SECRET }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: cd python-sdk/tests_integration/astro_deploy && sh deploy.sh $ASTRO_DOCKER_REGISTRY $ASTRO_ORGANIZATION_ID $ASTRO_DEPLOYMENT_ID $ASTRO_KEY_ID $ASTRO_KEY_SECRET
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ python-sdk/docs/autoapi/
**/dev/logs
**/dev/plugins

# Astro deployment related
python-sdk/tests_integration/astro_deploy/example_dags
python-sdk/tests_integration/astro_deploy/python-sdk

# Generated by tests
sql-cli/tests/.airflow/dags/*
!sql-cli/tests/.airflow/dags/sql/.gitkeep
Expand Down
12 changes: 12 additions & 0 deletions python-sdk/tests_integration/astro_deploy/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM quay.io/astronomer/astro-runtime:7.0.0-base

COPY python-sdk /tmp/python-sdk
RUN pip install /tmp/python-sdk[all]
RUN pip install apache-airflow[slack]

RUN mkdir -p ${AIRFLOW_HOME}/dags

COPY example_dags/ ${AIRFLOW_HOME}/dags/
COPY master_dag.py/ ${AIRFLOW_HOME}/dags/

RUN ls ${AIRFLOW_HOME}/dags/
103 changes: 103 additions & 0 deletions python-sdk/tests_integration/astro_deploy/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/usr/bin/env bash

# Make the script exit with the status if one of the commands fails. Without this, the Airflow task calling this script
# will be marked as 'success' and the DAG will proceed on to the subsequent tasks.
set -e

# This script deploys to an already existing Astro Cloud Airflow deployment.
# It currently does not support creating a new deployment.
#
# Execute the script with below positional params
# bash deploy.sh <ASTRO_DOCKER_REGISTRY> <ASTRO_ORGANIZATION_ID> <ASTRO_DEPLOYMENT_ID> <ASTRO_KEY_ID> <ASTRO_KEY_SECRET>
# - ASTRO_DOCKER_REGISTRY: Docker registry domain. Script will push the docker image here.
# - ASTRO_ORGANIZATION_ID: Astro cloud deployment organization Id. Get it from UI.
# - ASTRO_DEPLOYMENT_ID: Astro cloud deployment Id. Get it from UI.
# - ASTRO_KEY_ID: Astro cloud deployment service account API key Id.
# - ASTRO_KEY_SECRET: Astro cloud deployment service account API key secret.

SCRIPT_PATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
PROJECT_PATH=${SCRIPT_PATH}/../..

function echo_help() {
echo "Usage:"
echo "ASTRO_DOCKER_REGISTRY: Docker registry"
echo "ASTRO_ORGANIZATION_ID Astro cloud organization Id"
echo "ASTRO_DEPLOYMENT_ID Astro cloud Deployment id"
echo "ASTRO_KEY_ID Astro cloud service account API key id"
echo "ASTRO_KEY_SECRET Astro cloud service account API key secret"
echo "bash deploy.sh <ASTRO_DOCKER_REGISTRY> <ASTRO_ORGANIZATION_ID> <ASTRO_DEPLOYMENT_ID> <ASTRO_KEY_ID> <ASTRO_KEY_SECRET>"
}

if [ "$1" == "-h" ]; then
echo_help
exit
fi

# Delete if source old source files exist
function clean() {
if [ -d "${SCRIPT_PATH}"/python-sdk ]; then rm -Rf "${SCRIPT_PATH}"/python-sdk; fi
if [ -d "${SCRIPT_PATH}"/example_dags ]; then rm -Rf "${SCRIPT_PATH}"/example_dags; fi
}

ASTRO_DOCKER_REGISTRY=$1
ASTRO_ORGANIZATION_ID=$2
ASTRO_DEPLOYMENT_ID=$3
ASTRO_KEY_ID=$4
ASTRO_KEY_SECRET=$5

clean


# Copy source files
mkdir "${SCRIPT_PATH}"/python-sdk
cp -r "${PROJECT_PATH}"/src "${SCRIPT_PATH}"/python-sdk
cp -r "${PROJECT_PATH}"/pyproject.toml "${SCRIPT_PATH}"/python-sdk
cp -r "${PROJECT_PATH}"/README.md "${SCRIPT_PATH}"/python-sdk
cp -r "${PROJECT_PATH}"/example_dags "${SCRIPT_PATH}"/example_dags

# Build image and deploy
BUILD_NUMBER=$(awk 'BEGIN {srand(); print srand()}')
IMAGE_NAME=${ASTRO_DOCKER_REGISTRY}/${ASTRO_ORGANIZATION_ID}/${ASTRO_DEPLOYMENT_ID}:ci-${BUILD_NUMBER}
docker build --platform=linux/amd64 -t "${IMAGE_NAME}" -f "${SCRIPT_PATH}"/Dockerfile "${SCRIPT_PATH}"
docker login "${ASTRO_DOCKER_REGISTRY}" -u "${ASTRO_KEY_ID}" -p "${ASTRO_KEY_SECRET}"
docker push "${IMAGE_NAME}"

TOKEN=$( curl --location --request POST "https://auth.astronomer.io/oauth/token" \
--header "content-type: application/json" \
--data-raw "{
\"client_id\": \"$ASTRO_KEY_ID\",
\"client_secret\": \"$ASTRO_KEY_SECRET\",
\"audience\": \"astronomer-ee\",
\"grant_type\": \"client_credentials\"}" | jq -r '.access_token' )

# Step 5. Create the Image
echo "get image id"
IMAGE=$( curl --location --request POST "https://api.astronomer.io/hub/v1" \
--header "Authorization: Bearer $TOKEN" \
--header "Content-Type: application/json" \
--data-raw "{
\"query\" : \"mutation imageCreate(\n \$input: ImageCreateInput!\n) {\n imageCreate (\n input: \$input\n) {\n id\n tag\n repository\n digest\n env\n labels\n deploymentId\n }\n}\",
\"variables\" : {
\"input\" : {
\"deploymentId\" : \"$ASTRO_DEPLOYMENT_ID\",
\"tag\" : \"ci-$BUILD_NUMBER\"
}
}
}" | jq -r '.data.imageCreate.id')
# Step 6. Deploy the Image
echo "deploy image"
curl --location --request POST "https://api.astronomer.io/hub/v1" \
--header "Authorization: Bearer $TOKEN" \
--header "Content-Type: application/json" \
--data-raw "{
\"query\" : \"mutation imageDeploy(\n \$input: ImageDeployInput!\n ) {\n imageDeploy(\n input: \$input\n ) {\n id\n deploymentId\n digest\n env\n labels\n name\n tag\n repository\n }\n}\",
\"variables\" : {
\"input\" : {
\"id\" : \"$IMAGE\",
\"tag\" : \"ci-$BUILD_NUMBER\",
\"repository\" : \"images.astronomer.cloud/$ASTRO_ORGANIZATION_ID/$ASTRO_DEPLOYMENT_ID\"
}
}
}"

clean
201 changes: 201 additions & 0 deletions python-sdk/tests_integration/astro_deploy/master_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import logging
import os
import time
from datetime import datetime
from typing import Any, List

from airflow import DAG
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.models import DagRun
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.session import create_session

SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "#provider-alert")
SLACK_WEBHOOK_CONN = os.getenv("SLACK_WEBHOOK_CONN", "http_slack")
SLACK_USERNAME = os.getenv("SLACK_USERNAME", "airflow_app")


def get_report(dag_run_ids: List[str], **context: Any) -> None:
"""Fetch dags run details and generate report"""
with create_session() as session:
last_dags_runs: List[DagRun] = session.query(DagRun).filter(DagRun.run_id.in_(dag_run_ids)).all()
message_list: List[str] = []

airflow_version = context["ti"].xcom_pull(task_ids="get_airflow_version")
airflow_version_message = f"Airflow version for the below run is `{airflow_version}` \n\n"
message_list.append(airflow_version_message)

for dr in last_dags_runs:
dr_status = f" *{dr.dag_id} : {dr.get_state()}* \n"
message_list.append(dr_status)
for ti in dr.get_task_instances():
task_code = ":black_circle: "
if not ((ti.task_id == "end") or (ti.task_id == "get_report")):
if ti.state == "success":
task_code = ":large_green_circle: "
elif ti.state == "failed":
task_code = ":red_circle: "
elif ti.state == "upstream_failed":
task_code = ":large_orange_circle: "
task_message_str = f"{task_code} {ti.task_id} : {ti.state} \n"
message_list.append(task_message_str)

logging.info("%s", "".join(message_list))
# Send dag run report on Slack
try:
SlackWebhookOperator(
task_id="slack_alert",
http_conn_id=SLACK_WEBHOOK_CONN,
message="".join(message_list),
channel=SLACK_CHANNEL,
username=SLACK_USERNAME,
).execute(context=None)
except Exception as exception:
logging.exception("Error occur while sending slack alert.")
raise exception


def prepare_dag_dependency(task_info, execution_time):
"""Prepare list of TriggerDagRunOperator task and dags run ids for dags of same providers"""
_dag_run_ids = []
_task_list = []
for _example_dag in task_info:
_task_id = list(_example_dag.keys())[0]

_run_id = f"{_task_id}_{_example_dag.get(_task_id)}_" + execution_time
_dag_run_ids.append(_run_id)
_task_list.append(
TriggerDagRunOperator(
task_id=_task_id,
trigger_dag_id=_example_dag.get(_task_id),
trigger_run_id=_run_id,
wait_for_completion=True,
reset_dag_run=True,
execution_date=execution_time,
allowed_states=["success", "failed", "skipped"],
)
)
return _task_list, _dag_run_ids


with DAG(
dag_id="example_master_dag",
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["master_dag"],
) as dag:
start = PythonOperator(
task_id="start",
python_callable=lambda: time.sleep(30),
)

list_installed_pip_packages = BashOperator(
task_id="list_installed_pip_packages", bash_command="pip freeze"
)

get_airflow_version = BashOperator(
task_id="get_airflow_version", bash_command="airflow version", do_xcom_push=True
)

dag_run_ids = []

load_file_task_info = [
{"example_load_file": "example_load_file"},
{"example_google_bigquery_gcs_load_and_save": "example_google_bigquery_gcs_load_and_save"},
{"example_amazon_s3_postgres_load_and_save": "example_amazon_s3_postgres_load_and_save"},
{"example_amazon_s3_postgres": "example_amazon_s3_postgres"},
]

load_file_trigger_tasks, ids = prepare_dag_dependency(load_file_task_info, "{{ ds }}")
dag_run_ids.extend(ids)
chain(*load_file_trigger_tasks)

transform_task_info = [
{"example_transform": "example_transform"},
{"example_transform_file": "example_transform_file"},
{"example_sqlite_load_transform": "example_sqlite_load_transform"},
{"example_amazon_s3_snowflake_transform": "example_amazon_s3_snowflake_transform"},
{"calculate_popular_movies": "calculate_popular_movies"},
]

transform_trigger_tasks, ids = prepare_dag_dependency(transform_task_info, "{{ ds }}")
dag_run_ids.extend(ids)
chain(*transform_trigger_tasks)

dataframe_task_info = [
{"example_dataframe": "example_dataframe"},
{"calculate_top_2_movies_using_dataframe": "calculate_top_2_movies_using_dataframe"},
]

dataframe_trigger_tasks, ids = prepare_dag_dependency(dataframe_task_info, "{{ ds }}")
dag_run_ids.extend(ids)
chain(*dataframe_trigger_tasks)

append_task_info = [
{"example_append": "example_append"},
{"example_snowflake_partial_table_with_append": "example_snowflake_partial_table_with_append"},
]

append_trigger_tasks, ids = prepare_dag_dependency(append_task_info, "{{ ds }}")
dag_run_ids.extend(ids)
chain(*append_trigger_tasks)

merge_trigger_tasks = [{"example_merge_bigquery": "example_merge_bigquery"}]

merge_trigger_tasks, ids = prepare_dag_dependency(merge_trigger_tasks, "{{ ds }}")
dag_run_ids.extend(ids)
chain(*merge_trigger_tasks)

dynamic_task_info = [
{"example_dynamic_map_task": "example_dynamic_map_task"},
{"example_dynamic_task_template": "example_dynamic_task_template"},
]

dynamic_task_trigger_tasks, ids = prepare_dag_dependency(dynamic_task_info, "{{ ds }}")
dag_run_ids.extend(ids)
chain(*dynamic_task_trigger_tasks)

# example_dataset_producer [TODO]

report = PythonOperator(
task_id="get_report",
python_callable=get_report,
op_kwargs={"dag_run_ids": dag_run_ids},
trigger_rule="all_done",
provide_context=True,
)

end = DummyOperator(
task_id="end",
trigger_rule="all_success",
)

start >> [
list_installed_pip_packages,
get_airflow_version,
load_file_trigger_tasks[0],
transform_trigger_tasks[0],
dataframe_trigger_tasks[0],
append_trigger_tasks[0],
merge_trigger_tasks[0],
dynamic_task_trigger_tasks[0],
]

last_task = [
list_installed_pip_packages,
get_airflow_version,
load_file_trigger_tasks[-1],
transform_trigger_tasks[-1],
dataframe_trigger_tasks[-1],
append_trigger_tasks[-1],
merge_trigger_tasks[-1],
dynamic_task_trigger_tasks[-1],
]

last_task >> end
last_task >> report

0 comments on commit 69bb343

Please sign in to comment.