Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deploy python-sdk on astro #1499

Merged
merged 19 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .github/workflows/ci-benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
- labeled
schedule:
- cron: '0 0 * * 1'
- cron: '0 0 * * *'

# This allows a subsequently queued workflow run to interrupt and cancel previous runs
concurrency:
Expand All @@ -14,7 +15,7 @@ concurrency:
jobs:
Run-Benchmark:
if: >-
github.event_name == 'schedule' ||
(github.event_name == 'schedule' && github.event.schedule != '0 0 * * *') ||
(
github.event.label.name == 'run_benchmark' &&
github.event.name != 'pull_request_target'
Expand Down Expand Up @@ -64,3 +65,17 @@ jobs:
( echo "cat <<EOF >python-sdk/test-connections.yaml"; cat .github/ci-test-connections.yaml; ) >python-sdk/test-connections.yaml && . python-sdk/test-connections.yaml
python -c 'import os; print(os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON", "").strip())' > ${{ env.GOOGLE_APPLICATION_CREDENTIALS }}
cd python-sdk/tests/benchmark && make

Astro-deploy:
if: >-
(github.event_name == 'schedule' && github.event.schedule != '0 0 * * 1')
env:
ASTRO_DOCKER_REGISTRY: ${{ secrets.ASTRO_DOCKER_REGISTRY }}
ASTRO_ORGANIZATION_ID: ${{ secrets.ASTRO_ORGANIZATION_ID }}
ASTRO_DEPLOYMENT_ID: ${{ secrets.ASTRO_DEPLOYMENT_ID }}
ASTRO_KEY_ID: ${{ secrets.ASTRO_KEY_ID }}
ASTRO_KEY_SECRET: ${{ secrets.ASTRO_KEY_SECRET }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: cd python-sdk/tests_integration/astro_deploy && sh deploy.sh $ASTRO_DOCKER_REGISTRY $ASTRO_ORGANIZATION_ID $ASTRO_DEPLOYMENT_ID $ASTRO_KEY_ID $ASTRO_KEY_SECRET
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ python-sdk/docs/autoapi/
**/dev/logs
**/dev/plugins

# Astro deployment related
python-sdk/tests_integration/astro_deploy/example_dags
python-sdk/tests_integration/astro_deploy/python-sdk
python-sdk/tests_integration/astro_deploy/tests

# Generated by tests
sql-cli/tests/.airflow/dags/*
!sql-cli/tests/.airflow/dags/sql/.gitkeep
Expand Down
5 changes: 3 additions & 2 deletions python-sdk/example_dags/example_amazon_s3_postgres.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from datetime import datetime, timedelta
from datetime import datetime

from airflow.models import DAG
from pandas import DataFrame
Expand All @@ -20,8 +20,9 @@
dag_id="example_amazon_s3_postgres",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30),
schedule_interval=None,
default_args=default_args,
catchup=False,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
schedule_interval=None,
start_date=timezone.utcnow(),
tags=["demo"],
catchup=False,
)
def example_amazon_s3_postgres_load_and_save():
t1 = aql.load_file(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def aggregate_data(df: pd.DataFrame):
@dag(
start_date=datetime(2021, 1, 1),
max_active_runs=1,
schedule_interval="@daily",
schedule_interval=None,
default_args={
"email_on_failure": False,
"retries": 0,
Expand Down
5 changes: 3 additions & 2 deletions python-sdk/example_dags/example_append.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pathlib
from datetime import datetime, timedelta
from datetime import datetime

from airflow.models import DAG

Expand All @@ -19,8 +19,9 @@
dag_id="example_append",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30),
schedule_interval=None,
default_args=default_args,
catchup=False,
)

DATA_DIR = str(CWD) + "/data/"
Expand Down
9 changes: 8 additions & 1 deletion python-sdk/example_dags/example_dataframe_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from io import StringIO
from typing import List

import airflow
import pandas as pd
import requests
from airflow import DAG
Expand All @@ -17,7 +18,13 @@
@task or @aql.transform tasks.
"""

ALLOWED_DESERIALIZATION_CLASSES = os.getenv("AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES")
# We need deserialization classes for airflow 2.5 with the value where as 2.2.5 we don't need it.
if airflow.__version__ == "2.2.5":
ALLOWED_DESERIALIZATION_CLASSES = os.getenv("AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES")
else:
ALLOWED_DESERIALIZATION_CLASSES = os.getenv(
"AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES", default="airflow\\.* astro\\.*"
)


def _load_covid_data():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
dag_id="example_google_bigquery_gcs_load_and_save",
schedule_interval=None,
start_date=timezone.datetime(2022, 1, 1),
catchup=False,
) as dag:
# [START load_file_http_example]
t1 = aql.load_file(
Expand Down
5 changes: 3 additions & 2 deletions python-sdk/example_dags/example_load_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""
import os
import pathlib
from datetime import datetime, timedelta
from datetime import datetime

import sqlalchemy
from airflow.models import DAG
Expand Down Expand Up @@ -51,8 +51,9 @@
dag_id="example_load_file",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30),
schedule_interval=None,
default_args=default_args,
catchup=False,
)


Expand Down
5 changes: 3 additions & 2 deletions python-sdk/example_dags/example_merge.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pathlib
from datetime import datetime, timedelta
from datetime import datetime

from airflow.models import DAG
from pandas import DataFrame
Expand All @@ -22,8 +22,9 @@
dag_id="example_merge_bigquery",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30),
schedule_interval=None,
default_args=default_args,
catchup=False,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ def create_table(table: Table):
"""


@dag(start_date=datetime(2021, 12, 1), schedule_interval="@daily", catchup=False)
@dag(start_date=datetime(2021, 12, 1), schedule_interval=None, catchup=False)
def example_snowflake_partial_table_with_append():
homes_reporting = Table(conn_id=SNOWFLAKE_CONN_ID)
homes_reporting = Table(name="homes_reporting", temp=True, conn_id=SNOWFLAKE_CONN_ID)
create_results_table = create_table(table=homes_reporting, conn_id=SNOWFLAKE_CONN_ID)
# [END howto_run_raw_sql_snowflake_1]

Expand Down
15 changes: 15 additions & 0 deletions python-sdk/tests_integration/astro_deploy/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM quay.io/astronomer/astro-runtime:7.2.0-base

COPY python-sdk /tmp/python-sdk
RUN pip install /tmp/python-sdk[all]
RUN pip install apache-airflow[slack]

RUN mkdir -p ${AIRFLOW_HOME}/dags
RUN mkdir -p ${AIRFLOW_HOME}/tests

COPY example_dags/ ${AIRFLOW_HOME}/dags/
COPY master_dag.py/ ${AIRFLOW_HOME}/dags/
COPY example_snowflake_cleanup.py/ ${AIRFLOW_HOME}/dags/
COPY tests/ ${AIRFLOW_HOME}/tests/

RUN ls ${AIRFLOW_HOME}/dags/
107 changes: 107 additions & 0 deletions python-sdk/tests_integration/astro_deploy/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env bash
sunank200 marked this conversation as resolved.
Show resolved Hide resolved

# Make the script exit with the status if one of the commands fails. Without this, the Airflow task calling this script
# will be marked as 'success' and the DAG will proceed on to the subsequent tasks.
set -e

# This script deploys to an already existing Astro Cloud Airflow deployment.
# It currently does not support creating a new deployment.
#
# Execute the script with below positional params
# bash deploy.sh <ASTRO_DOCKER_REGISTRY> <ASTRO_ORGANIZATION_ID> <ASTRO_DEPLOYMENT_ID> <ASTRO_KEY_ID> <ASTRO_KEY_SECRET>
# - ASTRO_DOCKER_REGISTRY: Docker registry domain. Script will push the docker image here.
# - ASTRO_ORGANIZATION_ID: Astro cloud deployment organization Id. Get it from UI.
# - ASTRO_DEPLOYMENT_ID: Astro cloud deployment Id. Get it from UI.
# - ASTRO_KEY_ID: Astro cloud deployment service account API key Id.
# - ASTRO_KEY_SECRET: Astro cloud deployment service account API key secret.

SCRIPT_PATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
PROJECT_PATH=${SCRIPT_PATH}/../..

function echo_help() {
echo "Usage:"
echo "ASTRO_DOCKER_REGISTRY: Docker registry"
echo "ASTRO_ORGANIZATION_ID Astro cloud organization Id"
echo "ASTRO_DEPLOYMENT_ID Astro cloud Deployment id"
echo "ASTRO_KEY_ID Astro cloud service account API key id"
echo "ASTRO_KEY_SECRET Astro cloud service account API key secret"
echo "bash deploy.sh <ASTRO_DOCKER_REGISTRY> <ASTRO_ORGANIZATION_ID> <ASTRO_DEPLOYMENT_ID> <ASTRO_KEY_ID> <ASTRO_KEY_SECRET>"
}

if [ "$1" == "-h" ]; then
echo_help
exit
fi

# Delete if source old source files exist
function clean() {
if [ -d "${SCRIPT_PATH}"/python-sdk ]; then rm -Rf "${SCRIPT_PATH}"/python-sdk; fi
if [ -d "${SCRIPT_PATH}"/example_dags ]; then rm -Rf "${SCRIPT_PATH}"/example_dags; fi
if [ -d "${SCRIPT_PATH}"/tests ]; then rm -Rf "${SCRIPT_PATH}"/tests; fi
}

ASTRO_DOCKER_REGISTRY=$1
ASTRO_ORGANIZATION_ID=$2
ASTRO_DEPLOYMENT_ID=$3
ASTRO_KEY_ID=$4
ASTRO_KEY_SECRET=$5

clean


# Copy source files
mkdir "${SCRIPT_PATH}"/python-sdk
mkdir "${SCRIPT_PATH}"/tests
cp -r "${PROJECT_PATH}"/src "${SCRIPT_PATH}"/python-sdk
cp -r "${PROJECT_PATH}"/pyproject.toml "${SCRIPT_PATH}"/python-sdk
cp -r "${PROJECT_PATH}"/README.md "${SCRIPT_PATH}"/python-sdk
cp -r "${PROJECT_PATH}"/example_dags "${SCRIPT_PATH}"/example_dags
cp -r "${PROJECT_PATH}"/tests/data "${SCRIPT_PATH}"/tests/data


# Build image and deploy
BUILD_NUMBER=$(awk 'BEGIN {srand(); print srand()}')
IMAGE_NAME=${ASTRO_DOCKER_REGISTRY}/${ASTRO_ORGANIZATION_ID}/${ASTRO_DEPLOYMENT_ID}:ci-${BUILD_NUMBER}
docker build --platform=linux/amd64 -t "${IMAGE_NAME}" -f "${SCRIPT_PATH}"/Dockerfile "${SCRIPT_PATH}"
docker login "${ASTRO_DOCKER_REGISTRY}" -u "${ASTRO_KEY_ID}" -p "${ASTRO_KEY_SECRET}"
docker push "${IMAGE_NAME}"

TOKEN=$( curl --location --request POST "https://auth.astronomer.io/oauth/token" \
--header "content-type: application/json" \
--data-raw "{
\"client_id\": \"$ASTRO_KEY_ID\",
\"client_secret\": \"$ASTRO_KEY_SECRET\",
\"audience\": \"astronomer-ee\",
\"grant_type\": \"client_credentials\"}" | jq -r '.access_token' )

# Step 5. Create the Image
echo "get image id"
IMAGE=$( curl --location --request POST "https://api.astronomer.io/hub/v1" \
--header "Authorization: Bearer $TOKEN" \
--header "Content-Type: application/json" \
--data-raw "{
\"query\" : \"mutation imageCreate(\n \$input: ImageCreateInput!\n) {\n imageCreate (\n input: \$input\n) {\n id\n tag\n repository\n digest\n env\n labels\n deploymentId\n }\n}\",
\"variables\" : {
\"input\" : {
\"deploymentId\" : \"$ASTRO_DEPLOYMENT_ID\",
\"tag\" : \"ci-$BUILD_NUMBER\"
}
}
}" | jq -r '.data.imageCreate.id')
# Step 6. Deploy the Image
echo "deploy image"
curl --location --request POST "https://api.astronomer.io/hub/v1" \
--header "Authorization: Bearer $TOKEN" \
--header "Content-Type: application/json" \
--data-raw "{
\"query\" : \"mutation imageDeploy(\n \$input: ImageDeployInput!\n ) {\n imageDeploy(\n input: \$input\n ) {\n id\n deploymentId\n digest\n env\n labels\n name\n tag\n repository\n }\n}\",
\"variables\" : {
\"input\" : {
\"id\" : \"$IMAGE\",
\"tag\" : \"ci-$BUILD_NUMBER\",
\"repository\" : \"images.astronomer.cloud/$ASTRO_ORGANIZATION_ID/$ASTRO_DEPLOYMENT_ID\"
}
}
}"

clean
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
from datetime import date, datetime, timedelta
from typing import Any

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

query = f"""SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE
TABLE_SCHEMA = '{os.environ.get("SNOWFLAKE_SCHEMA", "ASTROFLOW_CI")}' and
TABLE_OWNER = '{os.environ.get("SNOWFLAKE_TABLE_OWNER","AIRFLOW_TEST_USER")}' and
TABLE_NAME LIKE '_TMP_%' and CREATED < '{date.today() - timedelta(days = 1)}'"""


def make_drop_statements(task_instance: Any):
temp_tables = task_instance.xcom_pull(key="return_value", task_ids=["snowflake_op_sql_str"])[0]
delete_temp_tables = ""
# converting list of dictornary to sql statements
for temp_table in temp_tables:
temp_table = "DROP TABLE IF EXISTS " + temp_table["TABLE_NAME"] + ";"
delete_temp_tables += temp_table
print(len(delete_temp_tables))
if len(delete_temp_tables) == 0:
delete_temp_tables = "Select 1"
return delete_temp_tables


def handle_result(result: Any):
return result.fetchall()


with DAG(
dag_id="example_snowflake_cleanup",
start_date=datetime(2021, 1, 1),
default_args={"snowflake_conn_id": "snowflake_conn"},
tags=["example"],
schedule="@once",
catchup=False,
) as dag:
snowflake_op_sql_str = SnowflakeOperator(task_id="snowflake_op_sql_str", sql=query, handler=handle_result)

create_drop_statement = PythonOperator(
task_id="create_drop_statement", python_callable=make_drop_statements
)

snowflake_op_sql_multiple_stmts = SnowflakeOperator(
task_id="snowflake_op_sql_multiple_stmts",
sql="{{ task_instance.xcom_pull(task_ids='create_drop_statement', dag_id='example_snowflake_cleanup', key='return_value') }}", # noqa: E501
)

snowflake_op_sql_str >> create_drop_statement >> snowflake_op_sql_multiple_stmts # skipcq PYL-W0104
Loading