In [None]:
# Copyright 2021 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Trigger Airflow DAG in Cloud Composer from a Vertex Pipeline

Apache Airflow is most popular choice for data pipelining in general. However, arguably not a good choice to run Machine learning pipelines due to lack of ML metadata tracking, artifact lineage, tracking ML metrics across metrics etc. [Vertex Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) solves this problem and automates, monitors, and governs your ML systems by orchestrating your ML workflow in a serverless manner, and storing your workflow's artifacts using Vertex ML Metadata.

In this notebook, we will show you how you can trigger a data pipeline i.e. Airflow DAG on Cloud Composer from a ML pipeline running on Vertex Pipelines.

![Trigger Airflow DAG on Cloud Composer from Vertex Pipeline](images/trigger-airflow-dag-on-cloud-composer-from-vertex-pipeline.png)

Following are high level steps:

1. Create Cloud Composer environment
2. Upload Airflow DAG to Composer environment that performs data processing
3. Create a Vertex Pipeline that triggers the Airflow DAG

### Installing packages

Start with installing KFP SDK and Google Cloud Pipeline components in the environment

In [None]:
USER_FLAG = "--user"

In [None]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.0.0 --upgrade
!pip3 install {USER_FLAG} kfp google-cloud-pipeline-components==0.1.1 --upgrade

After installing these packages you'll need to restart the kernel:

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Finally, check that you have correctly installed the packages. The KFP SDK version should be >=1.6:

In [None]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

### Set your project ID and bucket

Throughout this notebook you'll reference your Cloud project ID and the bucket you created earlier. Next we'll create variables for each of those.

If you don't know your project ID you may be able to get it by running the following:

In [None]:
import google.auth

creds, PROJECT_ID = google.auth.default()
REGION = 'us-central1'

Otherwise, set it here:

In [None]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

In [None]:
print(f"PROJECT_ID = {PROJECT_ID}")
print(f"REGION = {REGION}")

Then create a variable to store your bucket name and create the bucket if it does not exists already.

In [None]:
BUCKET_NAME = "gs://" + "cloud-ai-platform-2f444b6a-a742-444b-b91a-c7519f51bd77"

In [None]:
# run only if the bucket does not exists already
!gsutil mb -l $REGION $BUCKET_NAME

## Create Composer Environment

Please follow the instructions in the [document](https://cloud.google.com/composer/docs/how-to/managing/creating#) to create a Composer Environment with the configuration you need. For this sample demonstration, we create a bare minimum Composer environment. 

To trigger an Airflow DAG from Verte Pipeline, we will using Airflow web server REST API. By default, the API authentication feature is disabled in Airflow 1.10.11 and above which would deny all requests made to Airflow web server. To trigger DAG, we enable this feature. To enable the API authentication feature we override `auth_backend` configuration in Composer environment to `airflow.api.auth.backend.default`.

**NOTE:** Cloud Composer environment creation may take up to 30 min. Grab your favorite beverage until then.

In [None]:
COMPOSER_ENV_NAME = "test-composer-env"
ZONE = "us-central1-f"

In [None]:
!gcloud beta composer environments create $COMPOSER_ENV_NAME \
    --location $REGION \
    --zone $ZONE\
    --machine-type n1-standard-2 \
    --image-version composer-latest-airflow-1.10.15 \
    --airflow-configs=api-auth_backend=airflow.api.auth.backend.default

### Get Composer Environment configuration

We will get Composer environment configuration such as webserver URL and client ID to use in the Vertex Pipeline using the script `get_composer_client_id.py`

In [None]:
# This code is modified version of https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/composer/rest/get_client_id.py

shell_output=!python3 get_composer_config.py $PROJECT_ID $REGION $COMPOSER_ENV_NAME
COMPOSER_WEB_URI = shell_output[0]
COMPOSER_DAG_GCS = shell_output[1]
COMPOSER_CLIENT_ID = shell_output[2]

print(f"COMPOSER_WEB_URI = {COMPOSER_WEB_URI}")
print(f"COMPOSER_DAG_GCS = {COMPOSER_DAG_GCS}")
print(f"COMPOSER_CLIENT_ID = {COMPOSER_CLIENT_ID}")

You can navigate to Airflow webserver by going to this URL

In [None]:
COMPOSER_WEB_URI

## Upload DAG to Cloud Composer environment

We have a sample data processing DAG `data_orchestration_bq_example_dag.py` that reads a CSV file from GCS bucket and writes to BigQuery. We will add this file to the GCS bucket configure for the Composer environment that Airflow watches.

In [None]:
COMPOSER_DAG_NAME = "dag_gcs_to_bq_orch"
COMPOSER_DAG_FILENAME = "data_orchestration_bq_example_dag.py"

In [None]:
%%writefile $COMPOSER_DAG_FILENAME

"""An example Composer workflow integrating GCS and BigQuery.

A .csv is read from a GCS bucket to a BigQuery table; a query is made, and the
result is written back to a different BigQuery table within a new dataset.
"""

from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.operators.bash_operator import BashOperator

YESTERDAY = datetime.combine(
    datetime.today() - timedelta(days=1), datetime.min.time())
BQ_DATASET_NAME = 'bq_demos'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': YESTERDAY,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Solution: pass a schedule_interval argument to DAG instantiation.
with DAG('dag_gcs_to_bq_orch', default_args=default_args,
         schedule_interval=None) as dag:
  create_bq_dataset_if_not_exist = """
    bq ls {0}
    if [ $? -ne 0 ]; then
      bq mk {0}
    fi
  """.format(BQ_DATASET_NAME)

  # Create destination dataset.
  t1 = BashOperator(
      task_id='create_destination_dataset',
      bash_command=create_bq_dataset_if_not_exist,
      dag=dag)

  # Create a bigquery table from a .csv file located in a GCS bucket
  # (gs://example-datasets/game_data_condensed.csv).
  # Store it in our dataset.
  t2 = GoogleCloudStorageToBigQueryOperator(
      task_id='gcs_to_bq',
      bucket='example-datasets',
      source_objects=['game_data_condensed.csv'],
      destination_project_dataset_table='{0}.composer_game_data_table'
      .format(BQ_DATASET_NAME),
      schema_fields=[
          {'name': 'name', 'type': 'string', 'mode': 'nullable'},
          {'name': 'team', 'type': 'string', 'mode': 'nullable'},
          {'name': 'total_score', 'type': 'integer', 'mode': 'nullable'},
          {'name': 'timestamp', 'type': 'integer', 'mode': 'nullable'},
          {'name': 'window_start', 'type': 'string', 'mode': 'nullable'},
      ],
      write_disposition='WRITE_TRUNCATE')

  # Run example query (http://shortn/_BdF1UTEYOb) and save result to the
  # destination table.
  t3 = BigQueryOperator(
      task_id='bq_example_query',
      bql=f"""
        SELECT
          name, team, total_score
        FROM
          {BQ_DATASET_NAME}.composer_game_data_table
        WHERE total_score > 15
        LIMIT 100;
      """,
      destination_dataset_table='{0}.gcp_example_query_result'
      .format(BQ_DATASET_NAME),
      write_disposition='WRITE_TRUNCATE')

  t1 >> t2 >> t3

In [None]:
!gsutil cp $COMPOSER_DAG_FILENAME $COMPOSER_DAG_GCS/

In [None]:
!gsutil ls -l $COMPOSER_DAG_GCS/$COMPOSER_DAG_FILENAME

You should the DAG in your Airflow webserver

![](images/airflow_webserver_with_dag.png)

![](images/airflow_dag.png)

## Vertex Pipelines setup

### Import libraries

Add the following to import the libraries we'll be using throughout this codelab:

In [None]:
from typing import NamedTuple
import re

import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)
from kfp.v2.google.client import AIPlatformClient

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip

### Define constants

Before building the pipeline define some constant variables:

- `PIPELINE_ROOT` is the Cloud Storage path where the artifacts created by the pipeline will be written. We're using us-central1 as the region here, but if you used a different region when you created your bucket, update the REGION variable in the code above

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

After running the code above, you should see the root directory for your pipeline printed. This is the Cloud Storage location where the artifacts from your pipeline will be written. It will be in the format of `gs://BUCKET_NAME/pipeline_root/`

### Create a Python function based component to trigger Airflow DAG

Using the KFP SDK, we can create components based on Python functions. The component takes Airflow DAG name `dag_name` a string as input and returns response from Airflow web server as an `Artifact` that contains Airflow DAG run information. The component makes a request to Airflow REST API of your Cloud Composer environment. Airflow processes this request and runs a DAG. The DAG outputs information about the change that is logged as artifact (you can output as string as well.

In [None]:
@component(
    base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:0.1.3",
    output_component_file="composer-trigger-dag-component.yaml",
    packages_to_install=["requests"],
)
def trigger_airflow_dag(
    dag_name: str,
    composer_client_id: str,
    composer_webserver_id: str,
    response: Output[Artifact]
):
    # [START composer_trigger]

    from google.auth.transport.requests import Request
    from google.oauth2 import id_token
    import requests
    import json
    import os


    IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
    OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
    
    data = '{"replace_microseconds":"false"}'
    context = None

    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.
    """

    # Form webserver URL to make REST API calls
    webserver_url = f'{composer_webserver_id}/api/experimental/dags/{dag_name}/dag_runs'
    # print(webserver_url)

    # This code is copied from
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
    # START COPIED IAP CODE
    def make_iap_request(url, client_id, method='GET', **kwargs):
        """Makes a request to an application protected by Identity-Aware Proxy.
        Args:
          url: The Identity-Aware Proxy-protected URL to fetch.
          client_id: The client ID used by Identity-Aware Proxy.
          method: The request method to use
                  ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
          **kwargs: Any of the parameters defined for the request function:
                    https://github.com/requests/requests/blob/master/requests/api.py
                    If no timeout is provided, it is set to 90 by default.
        Returns:
          The page body, or raises an exception if the page couldn't be retrieved.
        """
        # Set the default timeout, if missing
        if 'timeout' not in kwargs:
            kwargs['timeout'] = 90

        # Obtain an OpenID Connect (OIDC) token from metadata server or using service
        # account.
        google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

        # Fetch the Identity-Aware Proxy-protected URL, including an
        # Authorization header containing "Bearer " followed by a
        # Google-issued OpenID Connect token for the service account.
        resp = requests.request(
            method, url,
            headers={'Authorization': 'Bearer {}'.format(
                google_open_id_connect_token)}, **kwargs)
        if resp.status_code == 403:
            raise Exception('Service account does not have permission to '
                            'access the IAP-protected application.')
        elif resp.status_code != 200:
            raise Exception(
                'Bad response from application: {!r} / {!r} / {!r}'.format(
                    resp.status_code, resp.headers, resp.text))
        else:
            print(f"response = {resp.text}")
            file_path = os.path.join(response.path)
            os.makedirs(file_path)
            with open(os.path.join(file_path, "airflow_response.json"), 'w') as f:
                json.dump(resp.text, f)

    # END COPIED IAP CODE

    
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, composer_client_id, method='POST', json={"conf": data, "replace_microseconds": 'false'})
    
    # [END composer_trigger]

Understanding the component structure
- The **`@component`** decorator compiles this function to a component when the pipeline is run. You'll use this anytime you write a custom component.
- The **`base_image parameter`** specifies the container image this component will use.
- The **`output_component_file`** parameter is optional, and specifies the yaml file to write the compiled component to.
- The **`packages_to_install`** parameter installs required python packages in the container to run the component

### Test Triggering Airflow DAG from Notebook

In [None]:
# before running comment out @component annotation in the cell above
trigger_airflow_dag(
    dag_name=COMPOSER_DAG_NAME,
    composer_client_id=COMPOSER_CLIENT_ID,
    composer_webserver_id=COMPOSER_WEB_URI,
    response=None
)

In [None]:
COMPOSER_WEB_URI

### Adding the components to a pipeline

In [None]:
PIPELINE_NAME = "pipeline-trigger-airflow-dag"

In [None]:
@dsl.pipeline(
    name=PIPELINE_NAME,
    description="Trigger Airflow DAG from Vertex Pipelines",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def pipeline():
    data_processing_task_dag_name = COMPOSER_DAG_NAME
    data_processing_task = trigger_airflow_dag(
        dag_name=data_processing_task_dag_name,
        composer_client_id=COMPOSER_CLIENT_ID,
        composer_webserver_id=COMPOSER_WEB_URI
    )

### Compile and run the pipeline

With your pipeline defined, you're ready to compile it. The following will generate a JSON file that you'll use to run the pipeline:

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=f"{PIPELINE_NAME}.json"
)

Next, instantiate an API client:

In [None]:
api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

Finally, run the pipeline:

In [None]:
response = api_client.create_run_from_job_spec(
    job_spec_path=f"{PIPELINE_NAME}.json",
    # pipeline_root=PIPELINE_ROOT  # this argument is necessary if you did not specify PIPELINE_ROOT as part of the pipeline definition.
)

### Monitor Vertex Pipeline status

From Cloud Console, you can monitor the pipeline run status and view the output artifact

![](images/pipeline_run.png)

You can also API client to get pipeline status and artifact information.

In [None]:
def get_job_id(job_name):
    """get job id from pipeline job name"""
    p = re.compile('projects/(?P<project_id>.*)/locations/(?P<region>.*)/pipelineJobs/(?P<job_id>.*)')
    result = p.search(job_name)
    return result.group('job_id') if result else None

In [None]:
job_status = api_client.get_job(get_job_id(response['name']))
print(f"JOB STATUS: {job_status['state']}")

Get Airflow DAG run instance from the output artifact

In [None]:
airflow_response_uri = [task['outputs']['response']['artifacts'][0]['uri'] for task in job_status['jobDetail']['taskDetails'] if task['taskName']=='trigger-airflow-dag'][0]
airflow_response_uri

In [None]:
!gsutil ls $airflow_response_uri/

In [None]:
!gsutil cat $airflow_response_uri/airflow_response.json

### Monitor Airflow DAG run

Go to Airflow webserver and monitor the status of data processing DAG. Airflow webserver URL is

In [None]:
COMPOSER_WEB_URI

![](images/airflow_dag_run.png)

## Clean Up

- Delete Cloud Storage bucket
- Delete Cloud Composer environment