In [None]:
import os
PROJECT = "PROJECT" # REPLACE WITH YOUR PROJECT ID
BUCKET = "BUCKET" # REPLACE WITH A BUCKET NAME (PUT YOUR PROJECT ID AND WE CREATE THE BUCKET ITSELF NEXT)
REGION = "us-central1" # REPLACE WITH YOUR REGION e.g. us-central1

# do not change these
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] =  BUCKET
os.environ["REGION"] = REGION

In [None]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

In [None]:
# %%bash
# OUTDIR=gs://${BUCKET}/taxifare/trained_model/yellow_trips
# JOBNAME=taxifare_yellow_trips_$(date -u +%y%m%d_%H%M%S)
# echo $OUTDIR $REGION $JOBNAME
# # Clear the Cloud Storage Bucket used for the training job
# gcloud storage rm --recursive --continue-on-error $OUTDIR
# gcloud ml-engine jobs submit training $JOBNAME \
#   --region=$REGION \
#   --module-name=trainer.task \
#   --package-path=${PWD}/cloud_composer_automated_ml_pipeline_taxifare_module/trainer \
#   --job-dir=$OUTDIR \
#   --staging-bucket=gs://$BUCKET \
#   --scale-tier=BASIC \
#   --runtime-version="1.13" \
#   -- \
#   --train_data_paths=gs://$BUCKET/taxifare/data/yellow_trips/train-* \
#   --eval_data_paths=gs://$BUCKET/taxifare/data/yellow_trips/valid-* \
#   --output_dir=$OUTDIR \
#   --train_steps=500

In [None]:
%%bash
cd cloud_composer_automated_ml_pipeline_taxifare_module
touch README.md
python setup.py sdist

In [None]:
%%bash
gcloud storage cp cloud_composer_automated_ml_pipeline_taxifare_module/dist/taxifare-0.1.tar.gz gs://$BUCKET/taxifare/code/

***
# Part Two: Setup a scheduled workflow with Cloud Composer
In this section you will complete a partially written training.py DAG file and copy it to the DAGS folder in your Composer instance.

## Copy your Airflow bucket name
1. Navigate to your Cloud Composer [instance](https://console.cloud.google.com/composer/environments?project=)<br/><br/>
2. Select __DAGs Folder__<br/><br/>
3. You will be taken to the Google Cloud Storage bucket that Cloud Composer has created automatically for your Airflow instance<br/><br/>
4. __Copy the bucket name__ into the variable below (example: us-central1-composer-08f6edeb-bucket)

In [None]:
AIRFLOW_BUCKET = "AIRFLOW_BUCKET" # REPLACE WITH AIRFLOW BUCKET NAME
os.environ["AIRFLOW_BUCKET"] = AIRFLOW_BUCKET

## Complete the training.py DAG file
Apache Airflow orchestrates tasks out to other services through a [DAG (Directed Acyclic Graph)](https://airflow.apache.org/concepts.html) file which specifies what services to call, what to do, and when to run these tasks. DAG files are written in python and are loaded automatically into Airflow once present in the Airflow/dags/ folder in your Cloud Composer bucket. 

Execute the code cells to create the files.

## Multi

In [None]:
%%writefile airflow/dags/taxifare_multi.py
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""DAG definition for taxifare automated pipeline."""

import airflow
from airflow import DAG

# Reference for all available airflow operators: 
# https://github.com/apache/incubator-airflow/tree/master/airflow/contrib/operators
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.hooks.base_hook import BaseHook

from airflow.contrib.operators.mlengine_operator import MLEngineTrainingOperator, MLEngineModelOperator, MLEngineVersionOperator
from airflow.models import TaskInstance

import datetime
import logging

def _get_project_id():
  """Get project ID from default GCP connection."""

  extras = BaseHook.get_connection("google_cloud_default").extra_dejson
  key = "extra__google_cloud_platform__project"
  if key in extras:
    project_id = extras[key]
  else:
    raise ("Must configure project_id in google_cloud_default "
           "connection from Airflow Console")
  return project_id

PROJECT_ID = _get_project_id()

# Data set constants, used in BigQuery tasks.    You can change these
# to conform to your data.

# Specify your source BigQuery project, dataset, and table names
SOURCE_BQ_PROJECT = "nyc-tlc"
SOURCE_DATASET_TABLE_NAMES = "yellow.trips,green.trips_2014,green.trips_2015".split(",")

# Specify your destination BigQuery dataset
DESTINATION_DATASET = "taxifare"

# GCS bucket names and region, can also be changed.
BUCKET = "gs://" + PROJECT_ID + "-bucket"
REGION = "us-east1"

# # The code package name comes from the model code in the wals_ml_engine
# # directory of the solution code base.
PACKAGE_URI = BUCKET + "/taxifare/code/taxifare-0.1.tar.gz"
JOB_DIR = BUCKET + "/jobs"

default_args = {
  "owner": "airflow",
  "depends_on_past": False,
  "start_date": airflow.utils.dates.days_ago(2),
  "email": ["airflow@example.com"],
  "email_on_failure": True,
  "email_on_retry": False,
  "retries": 5,
  "retry_delay": datetime.timedelta(minutes=5)
}

# Default schedule interval using cronjob syntax - can be customized here
# or in the Airflow console.

# Specify a schedule interval in CRON syntax to run once a day at 2100 hours (9pm)
# Reference: https://airflow.apache.org/scheduler.html
schedule_interval = "00 21 * * *"

# Title your DAG
dag = DAG(
  "taxifare_multi", 
  default_args=default_args,
  schedule_interval=None
)

dag.doc_md = __doc__


#
#
# Task Definition
#
#

for model in SOURCE_DATASET_TABLE_NAMES:
  # BigQuery data query
  bql="""
  SELECT
    (tolls_amount + fare_amount) AS fare_amount,
    EXTRACT(DAYOFWEEK FROM pickup_datetime) * 1.0 AS dayofweek,
    EXTRACT(HOUR FROM pickup_datetime) * 1.0 AS hourofday,
    pickup_longitude AS pickuplon,
    pickup_latitude AS pickuplat,
    dropoff_longitude AS dropofflon,
    dropoff_latitude AS dropofflat,
    passenger_count*1.0 AS passengers,
    CONCAT(CAST(pickup_datetime AS STRING), CAST(pickup_longitude AS STRING), CAST(pickup_latitude AS STRING), CAST(dropoff_latitude AS STRING), CAST(dropoff_longitude AS STRING)) AS key
  FROM
    `{0}.{1}`
  WHERE
    trip_distance > 0
    AND fare_amount >= 2.5
    AND pickup_longitude > -78
    AND pickup_longitude < -70
    AND dropoff_longitude > -78
    AND dropoff_longitude < -70
    AND pickup_latitude > 37
    AND pickup_latitude < 45
    AND dropoff_latitude > 37
    AND dropoff_latitude < 45
    AND passenger_count > 0
    AND rand() < 0.00001
  """

  bql = bql.format(SOURCE_BQ_PROJECT, model)

  bql_train = "SELECT * EXCEPT (key) FROM({0}) WHERE ABS(MOD(FARM_FINGERPRINT(key), 5)) < 4".format(bql)
  bql_eval = "SELECT * EXCEPT (key) FROM({0}) WHERE ABS(MOD(FARM_FINGERPRINT(key), 5)) = 4".format(bql)

  # Complete the BigQueryOperator task to truncate the table if it already exists before writing
  # Reference: https://airflow.apache.org/integration.html#bigqueryoperator
  bq_train_data_op = BigQueryOperator(
    task_id="bq_train_data_{}_task".format(model.replace(".","_")),
    bql=bql_train,
    destination_dataset_table="{}.{}_train_data".format(DESTINATION_DATASET, model.replace(".","_")),
    write_disposition="WRITE_TRUNCATE", # specify to truncate on writes
    use_legacy_sql=False,
    dag=dag
  )

  bq_eval_data_op = BigQueryOperator(
    task_id="bq_eval_data_{}_task".format(model.replace(".","_")),
    bql=bql_eval,
    destination_dataset_table="{}.{}_eval_data".format(DESTINATION_DATASET, model.replace(".","_")),
    write_disposition="WRITE_TRUNCATE", # specify to truncate on writes
    use_legacy_sql=False,
    dag=dag
  )

  sql = """
  SELECT
    COUNT(*)
  FROM
    [{0}:{1}.{2}]
  """

  # Check to make sure that the data tables won"t be empty
  bq_check_train_data_op = BigQueryCheckOperator(
    task_id="bq_check_train_data_{}_task".format(model.replace(".","_")),
    sql=sql.format(PROJECT_ID, DESTINATION_DATASET, model.replace(".","_") + "_train_data"),
    dag=dag
  )

  bq_check_eval_data_op = BigQueryCheckOperator(
    task_id="bq_check_eval_data_{}_task".format(model.replace(".","_")),
    sql=sql.format(PROJECT_ID, DESTINATION_DATASET, model.replace(".","_") + "_eval_data"),
    dag=dag
  )

  # BigQuery training data export to GCS
  bash_remove_old_data_op = BashOperator(
    task_id="bash_remove_old_data_{}_task".format(model.replace(".","_")),
    bash_command="if gcloud storage ls {0}/taxifare/data/{1} 2> /dev/null; then gcloud storage rm --recursive --continue-on-error {0}/taxifare/data/{1}/*; else true; fi".format(BUCKET, model.replace(".","_")),
    dag=dag
  )

  # Takes a BigQuery dataset and table as input and exports it to GCS as a CSV
  train_files = BUCKET + "/taxifare/data/"

  bq_export_gcs_train_csv_op = BigQueryToCloudStorageOperator(
    task_id="bq_export_gcs_train_csv_{}_task".format(model.replace(".","_")),
    source_project_dataset_table="{}.{}_train_data".format(DESTINATION_DATASET, model.replace(".","_")),
    destination_cloud_storage_uris=[train_files + "{}/train-*.csv".format(model.replace(".","_"))],
    export_format="CSV",
    print_header=False,
    dag=dag
  )

  eval_files = BUCKET + "/taxifare/data/"

  bq_export_gcs_eval_csv_op = BigQueryToCloudStorageOperator(
    task_id="bq_export_gcs_eval_csv_{}_task".format(model.replace(".","_")),
    source_project_dataset_table="{}.{}_eval_data".format(DESTINATION_DATASET, model.replace(".","_")),
    destination_cloud_storage_uris=[eval_files + "{}/eval-*.csv".format(model.replace(".","_"))],
    export_format="CSV",
    print_header=False,
    dag=dag
  )


  # ML Engine training job
  job_id = "taxifare_{}_{}".format(model.replace(".","_"), datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
  output_dir = BUCKET + "/taxifare/trained_model/{}".format(model.replace(".","_"))
  job_dir = JOB_DIR + "/" + job_id
  training_args = [
    "--job-dir", job_dir,
    "--train_data_paths", train_files,
    "--eval_data_paths", eval_files,
    "--output_dir", output_dir,
    "--train_steps", str(500),
    "--train_batch_size", str(32),
    "--eval_steps", str(500),
    "--eval_batch_size", str(32),
    "--nbuckets", str(8),
    "--hidden_units", "128,32,4"
  ]

  # Reference: https://airflow.apache.org/integration.html#cloud-ml-engine
  ml_engine_training_op = MLEngineTrainingOperator(
    task_id="ml_engine_training_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID,
    job_id=job_id,
    package_uris=[PACKAGE_URI],
    training_python_module="trainer.task",
    training_args=training_args,
    region=REGION,
    scale_tier="BASIC",
    runtime_version="1.13", 
    python_version="3.5",
    dag=dag
  )

  MODEL_NAME = "taxifare_"
  MODEL_VERSION = "v1"
  MODEL_LOCATION = BUCKET + "/taxifare/saved_model/"

  bash_remove_old_saved_model_op = BashOperator(
    task_id="bash_remove_old_saved_model_{}_task".format(model.replace(".","_")),
    bash_command="if gcloud storage ls {0} 2> /dev/null; then gcloud storage rm --recursive --continue-on-error {0}/*; else true; fi".format(MODEL_LOCATION + model.replace(".","_")),
    dag=dag
  )

  bash_copy_new_saved_model_op = BashOperator(
    task_id="bash_copy_new_saved_model_{}_task".format(model.replace(".","_")),
    bash_command="gcloud storage rsync --delete-unmatched-destination-objects --recursive `gcloud storage ls {0}/export/exporter/ | tail -1` {1}".format(output_dir, MODEL_LOCATION + model.replace(".","_")),
    dag=dag
  )

  # Create model on ML-Engine
  bash_ml_engine_models_list_op = BashOperator(
    task_id="bash_ml_engine_models_list_{}_task".format(model.replace(".","_")),
    xcom_push=True,
    bash_command="gcloud ml-engine models list --filter='name:{0}'".format(MODEL_NAME + model.replace(".","_")),
    dag=dag
  )

  def check_if_model_already_exists(templates_dict, **kwargs):
    cur_model = templates_dict["model"].replace(".","_")
    ml_engine_models_list = kwargs["ti"].xcom_pull(task_ids="bash_ml_engine_models_list_{}_task".format(cur_model))
    logging.info("check_if_model_already_exists: {}: ml_engine_models_list = \n{}".format(cur_model, ml_engine_models_list))
    create_model_task = "ml_engine_create_model_{}_task".format(cur_model)
    dont_create_model_task = "dont_create_model_dummy_branch_{}_task".format(cur_model)
    if len(ml_engine_models_list) == 0 or ml_engine_models_list == "Listed 0 items.":
      return create_model_task
    return dont_create_model_task

  check_if_model_already_exists_op = BranchPythonOperator(
    task_id="check_if_model_already_exists_{}_task".format(model.replace(".","_")),
    templates_dict={"model": model.replace(".","_")},
    python_callable=check_if_model_already_exists,
    provide_context=True,
    dag=dag
  )

  ml_engine_create_model_op = MLEngineModelOperator(
    task_id="ml_engine_create_model_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model={"name": MODEL_NAME + model.replace(".","_")}, 
    operation="create",
    dag=dag
  )

  create_model_dummy_op = DummyOperator(
    task_id="create_model_dummy_{}_task".format(model.replace(".","_")),
    trigger_rule="all_done",
    dag=dag
  )

  dont_create_model_dummy_branch_op = DummyOperator(
    task_id="dont_create_model_dummy_branch_{}_task".format(model.replace(".","_")),
    dag=dag
  )

  dont_create_model_dummy_op = DummyOperator(
    task_id="dont_create_model_dummy_{}_task".format(model.replace(".","_")),
    trigger_rule="all_done",
    dag=dag
  )

  # Create version of model on ML-Engine
  bash_ml_engine_versions_list_op = BashOperator(
    task_id="bash_ml_engine_versions_list_{}_task".format(model.replace(".","_")),
    xcom_push=True,
    bash_command="gcloud ml-engine versions list --model {0} --filter='name:{1}'".format(MODEL_NAME + model.replace(".","_"), MODEL_VERSION),
    dag=dag
  )

  def check_if_model_version_already_exists(templates_dict, **kwargs):
    cur_model = templates_dict["model"].replace(".","_")
    ml_engine_versions_list = kwargs["ti"].xcom_pull(task_ids="bash_ml_engine_versions_list_{}_task".format(cur_model))
    logging.info("check_if_model_version_already_exists: {}: ml_engine_versions_list = \n{}".format(cur_model, ml_engine_versions_list))
    create_version_task = "ml_engine_create_version_{}_task".format(cur_model)
    create_other_version_task = "ml_engine_create_other_version_{}_task".format(cur_model)
    if len(ml_engine_versions_list) == 0 or ml_engine_versions_list == "Listed 0 items.":
      return create_version_task
    return create_other_version_task

  check_if_model_version_already_exists_op = BranchPythonOperator(
    task_id="check_if_model_version_already_exists_{}_task".format(model.replace(".","_")), 
    templates_dict={"model": model.replace(".","_")},
    python_callable=check_if_model_version_already_exists,
    provide_context=True,
    dag=dag
  )

  OTHER_VERSION_NAME = "v_{0}".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S")[0:12])

  ml_engine_create_version_op = MLEngineVersionOperator(
    task_id="ml_engine_create_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=MODEL_VERSION, 
    version={
      "name": MODEL_VERSION,
      "deploymentUri": MODEL_LOCATION + model.replace(".","_"),
      "runtimeVersion": "1.13",
      "framework": "TENSORFLOW",
      "pythonVersion": "3.5",
    },
    operation="create",
    dag=dag
  )

  ml_engine_create_other_version_op = MLEngineVersionOperator(
    task_id="ml_engine_create_other_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=OTHER_VERSION_NAME, 
    version={
      "name": OTHER_VERSION_NAME,
      "deploymentUri": MODEL_LOCATION + model.replace(".","_"),
      "runtimeVersion": "1.13",
      "framework": "TENSORFLOW",
      "pythonVersion": "3.5",
    },
    operation="create",
    dag=dag
  )

  ml_engine_set_default_version_op = MLEngineVersionOperator(
    task_id="ml_engine_set_default_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=MODEL_VERSION, 
    version={"name": MODEL_VERSION}, 
    operation="set_default",
    dag=dag
  )

  ml_engine_set_default_other_version_op = MLEngineVersionOperator(
    task_id="ml_engine_set_default_other_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=OTHER_VERSION_NAME, 
    version={"name": OTHER_VERSION_NAME}, 
    operation="set_default",
    dag=dag
  )

  # Build dependency graph, set_upstream dependencies for all tasks
  bq_check_train_data_op.set_upstream(bq_train_data_op)
  bq_check_eval_data_op.set_upstream(bq_eval_data_op)

  bash_remove_old_data_op.set_upstream([bq_check_train_data_op, bq_check_eval_data_op])

  bq_export_gcs_train_csv_op.set_upstream([bash_remove_old_data_op])
  bq_export_gcs_eval_csv_op.set_upstream([bash_remove_old_data_op])

  ml_engine_training_op.set_upstream([bq_export_gcs_train_csv_op, bq_export_gcs_eval_csv_op])

  bash_remove_old_saved_model_op.set_upstream(ml_engine_training_op)
  bash_copy_new_saved_model_op.set_upstream(bash_remove_old_saved_model_op)

  bash_ml_engine_models_list_op.set_upstream(ml_engine_training_op)
  check_if_model_already_exists_op.set_upstream(bash_ml_engine_models_list_op)

  ml_engine_create_model_op.set_upstream(check_if_model_already_exists_op)
  create_model_dummy_op.set_upstream(ml_engine_create_model_op)
  dont_create_model_dummy_branch_op.set_upstream(check_if_model_already_exists_op)
  dont_create_model_dummy_op.set_upstream(dont_create_model_dummy_branch_op)

  bash_ml_engine_versions_list_op.set_upstream([dont_create_model_dummy_op, create_model_dummy_op])
  check_if_model_version_already_exists_op.set_upstream(bash_ml_engine_versions_list_op)

  ml_engine_create_version_op.set_upstream([bash_copy_new_saved_model_op, check_if_model_version_already_exists_op])
  ml_engine_create_other_version_op.set_upstream([bash_copy_new_saved_model_op, check_if_model_version_already_exists_op])

  ml_engine_set_default_version_op.set_upstream(ml_engine_create_version_op)
  ml_engine_set_default_other_version_op.set_upstream(ml_engine_create_other_version_op)

## Module

In [None]:
%%writefile airflow/dags/module/taxifare_module.py
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""DAG definition for taxifare automated pipeline."""

import airflow
from airflow import DAG

# Reference for all available airflow operators: 
# https://github.com/apache/incubator-airflow/tree/master/airflow/contrib/operators
from airflow.hooks.base_hook import BaseHook

from airflow.models import TaskInstance

import datetime

from module import preprocess
from module import training
from module import deploy


def _get_project_id():
  """Get project ID from default GCP connection."""

  extras = BaseHook.get_connection("google_cloud_default").extra_dejson
  key = "extra__google_cloud_platform__project"
  if key in extras:
    project_id = extras[key]
  else:
    raise ("Must configure project_id in google_cloud_default "
           "connection from Airflow Console")
  return project_id

# Constants
# Get project ID and GCS bucket
PROJECT_ID = _get_project_id()
BUCKET = "gs://" + PROJECT_ID + "-bucket"

# Specify your source BigQuery dataset and table names
SOURCE_DATASET_TABLE_NAMES = "yellow.trips,green.trips_2014,green.trips_2015".split(",")

# Where to write out data in GCS
DATA_DIR = BUCKET + "/taxifare/data/"

# Base model parameters
MODEL_NAME = "taxifare_"
MODEL_VERSION = "v1"
MODEL_LOCATION = BUCKET + "/taxifare/saved_model/"

default_args = {
  "owner": "airflow",
  "depends_on_past": False,
  "start_date": airflow.utils.dates.days_ago(2),
  "email": ["airflow@example.com"],
  "email_on_failure": True,
  "email_on_retry": False,
  "retries": 5,
  "retry_delay": datetime.timedelta(minutes=5)
}

# Default schedule interval using cronjob syntax - can be customized here
# or in the Airflow console.

# Specify a schedule interval in CRON syntax to run once a day at 2100 hours (9pm)
# Reference: https://airflow.apache.org/scheduler.html
schedule_interval = "00 21 * * *"

# Title your DAG
dag = DAG(
  "taxifare_module", 
  default_args=default_args,
  schedule_interval=None
)

dag.doc_md = __doc__


#
#
# Task Definition
#
#

for model in SOURCE_DATASET_TABLE_NAMES:
  (bq_export_gcs_train_csv_op,
   bq_export_gcs_eval_csv_op) = preprocess.preprocess_tasks(
       model, dag, PROJECT_ID, BUCKET, DATA_DIR)


  (ml_engine_training_op,
   bash_copy_new_saved_model_op) = training.training_tasks(
       model, dag, PROJECT_ID, BUCKET, DATA_DIR, MODEL_NAME, MODEL_VERSION, MODEL_LOCATION)

  (bash_ml_engine_models_list_op,
   check_if_model_version_already_exists_op,
   ml_engine_create_version_op,
   ml_engine_create_other_version_op) = deploy.deploy_tasks(
       model, dag, PROJECT_ID, MODEL_NAME, MODEL_VERSION, MODEL_LOCATION)

  # Build dependency graph, set_upstream dependencies for all tasks
  ml_engine_training_op.set_upstream([bq_export_gcs_train_csv_op, bq_export_gcs_eval_csv_op])

  bash_ml_engine_models_list_op.set_upstream(ml_engine_training_op)

  ml_engine_create_version_op.set_upstream([bash_copy_new_saved_model_op, check_if_model_version_already_exists_op])
  ml_engine_create_other_version_op.set_upstream([bash_copy_new_saved_model_op, check_if_model_version_already_exists_op])

In [None]:
%%writefile airflow/dags/module/preprocess.py
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.operators.bash_operator import BashOperator


def preprocess_tasks(model, dag, PROJECT_ID, BUCKET, DATA_DIR):
  # Constants
  # Specify your source BigQuery project, dataset, and table names
  SOURCE_BQ_PROJECT = "nyc-tlc"
  SOURCE_DATASET_TABLE_NAMES = "yellow.trips,green.trips_2014,green.trips_2015".split(",")

  # Specify your destination BigQuery dataset
  DESTINATION_DATASET = "taxifare"
  
  # BigQuery data query
  bql="""
  SELECT
    (tolls_amount + fare_amount) AS fare_amount,
    EXTRACT(DAYOFWEEK FROM pickup_datetime) * 1.0 AS dayofweek,
    EXTRACT(HOUR FROM pickup_datetime) * 1.0 AS hourofday,
    pickup_longitude AS pickuplon,
    pickup_latitude AS pickuplat,
    dropoff_longitude AS dropofflon,
    dropoff_latitude AS dropofflat,
    passenger_count*1.0 AS passengers,
    CONCAT(CAST(pickup_datetime AS STRING), CAST(pickup_longitude AS STRING), CAST(pickup_latitude AS STRING), CAST(dropoff_latitude AS STRING), CAST(dropoff_longitude AS STRING)) AS key
  FROM
    `{0}.{1}`
  WHERE
    trip_distance > 0
    AND fare_amount >= 2.5
    AND pickup_longitude > -78
    AND pickup_longitude < -70
    AND dropoff_longitude > -78
    AND dropoff_longitude < -70
    AND pickup_latitude > 37
    AND pickup_latitude < 45
    AND dropoff_latitude > 37
    AND dropoff_latitude < 45
    AND passenger_count > 0
    AND rand() < 0.00001
  """

  bql = bql.format(SOURCE_BQ_PROJECT, model)

  bql_train = "SELECT * EXCEPT (key) FROM({0}) WHERE ABS(MOD(FARM_FINGERPRINT(key), 5)) < 4".format(bql)
  bql_eval = "SELECT * EXCEPT (key) FROM({0}) WHERE ABS(MOD(FARM_FINGERPRINT(key), 5)) = 4".format(bql)

  # Complete the BigQueryOperator task to truncate the table if it already exists before writing
  # Reference: https://airflow.apache.org/integration.html#bigqueryoperator
  bq_train_data_op = BigQueryOperator(
    task_id="bq_train_data_{}_task".format(model.replace(".","_")),
    bql=bql_train,
    destination_dataset_table="{}.{}_train_data".format(DESTINATION_DATASET, model.replace(".","_")),
    write_disposition="WRITE_TRUNCATE", # specify to truncate on writes
    use_legacy_sql=False,
    dag=dag
  )

  bq_eval_data_op = BigQueryOperator(
    task_id="bq_eval_data_{}_task".format(model.replace(".","_")),
    bql=bql_eval,
    destination_dataset_table="{}.{}_eval_data".format(DESTINATION_DATASET, model.replace(".","_")),
    write_disposition="WRITE_TRUNCATE", # specify to truncate on writes
    use_legacy_sql=False,
    dag=dag
  )

  sql = """
  SELECT
    COUNT(*)
  FROM
    [{0}:{1}.{2}]
  """

  # Check to make sure that the data tables won"t be empty
  bq_check_train_data_op = BigQueryCheckOperator(
    task_id="bq_check_train_data_{}_task".format(model.replace(".","_")),
    sql=sql.format(PROJECT_ID, DESTINATION_DATASET, model.replace(".","_") + "_train_data"),
    dag=dag
  )

  bq_check_eval_data_op = BigQueryCheckOperator(
    task_id="bq_check_eval_data_{}_task".format(model.replace(".","_")),
    sql=sql.format(PROJECT_ID, DESTINATION_DATASET, model.replace(".","_") + "_eval_data"),
    dag=dag
  )

  # BigQuery training data export to GCS
  bash_remove_old_data_op = BashOperator(
    task_id="bash_remove_old_data_{}_task".format(model.replace(".","_")),
    bash_command="if gcloud storage ls {0}/taxifare/data/{1} 2> /dev/null; then gcloud storage rm --recursive --continue-on-error {0}/taxifare/data/{1}/*; else true; fi".format(BUCKET, model.replace(".","_")),
    dag=dag
  )

  # Takes a BigQuery dataset and table as input and exports it to GCS as a CSV
  bq_export_gcs_train_csv_op = BigQueryToCloudStorageOperator(
    task_id="bq_export_gcs_train_csv_{}_task".format(model.replace(".","_")),
    source_project_dataset_table="{}.{}_train_data".format(DESTINATION_DATASET, model.replace(".","_")),
    destination_cloud_storage_uris=[DATA_DIR + "{}/train-*.csv".format(model.replace(".","_"))],
    export_format="CSV",
    print_header=False,
    dag=dag
  )

  bq_export_gcs_eval_csv_op = BigQueryToCloudStorageOperator(
    task_id="bq_export_gcs_eval_csv_{}_task".format(model.replace(".","_")),
    source_project_dataset_table="{}.{}_eval_data".format(DESTINATION_DATASET, model.replace(".","_")),
    destination_cloud_storage_uris=[DATA_DIR + "{}/eval-*.csv".format(model.replace(".","_"))],
    export_format="CSV",
    print_header=False,
    dag=dag
  )
  
  # Build dependency graph, set_upstream dependencies for all tasks
  bq_check_train_data_op.set_upstream(bq_train_data_op)
  bq_check_eval_data_op.set_upstream(bq_eval_data_op)

  bash_remove_old_data_op.set_upstream([bq_check_train_data_op, bq_check_eval_data_op])

  bq_export_gcs_train_csv_op.set_upstream(bash_remove_old_data_op)
  bq_export_gcs_eval_csv_op.set_upstream(bash_remove_old_data_op)
  
  return (bq_export_gcs_train_csv_op,
          bq_export_gcs_eval_csv_op)

In [None]:
%%writefile airflow/dags/module/training.py
import datetime

from airflow.contrib.operators.mlengine_operator import MLEngineTrainingOperator
from airflow.operators.bash_operator import BashOperator


def training_tasks(model, dag, PROJECT_ID, BUCKET, DATA_DIR, MODEL_NAME, MODEL_VERSION, MODEL_LOCATION):
  # Constants
  # The code package name comes from the model code in the module directory
  REGION = "us-east1"
  PACKAGE_URI = BUCKET + "/taxifare/code/taxifare-0.1.tar.gz"
  JOB_DIR = BUCKET + "/jobs"

  # ML Engine training job
  job_id = "taxifare_{}_{}".format(model.replace(".","_"), datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
  train_files = DATA_DIR + "{}/train-*.csv".format(model.replace(".","_"))
  eval_files = DATA_DIR + "{}/eval-*.csv".format(model.replace(".","_"))
  output_dir = BUCKET + "/taxifare/trained_model/{}".format(model.replace(".","_"))
  job_dir = JOB_DIR + "/" + job_id
  training_args = [
    "--job-dir", job_dir,
    "--train_data_paths", train_files,
    "--eval_data_paths", eval_files,
    "--output_dir", output_dir,
    "--train_steps", str(500),
    "--train_batch_size", str(32),
    "--eval_steps", str(500),
    "--eval_batch_size", str(32),
    "--nbuckets", str(8),
    "--hidden_units", "128,32,4"
  ]

  # Reference: https://airflow.apache.org/integration.html#cloud-ml-engine
  ml_engine_training_op = MLEngineTrainingOperator(
    task_id="ml_engine_training_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID,
    job_id=job_id,
    package_uris=[PACKAGE_URI],
    training_python_module="trainer.task",
    training_args=training_args,
    region=REGION,
    scale_tier="BASIC",
    runtime_version="1.13", 
    python_version="3.5",
    dag=dag
  )

  bash_remove_old_saved_model_op = BashOperator(
    task_id="bash_remove_old_saved_model_{}_task".format(model.replace(".","_")),
    bash_command="if gcloud storage ls {0} 2> /dev/null; then gcloud storage rm --recursive --continue-on-error {0}/*; else true; fi".format(MODEL_LOCATION + model.replace(".","_")),
    dag=dag
  )

  bash_copy_new_saved_model_op = BashOperator(
    task_id="bash_copy_new_saved_model_{}_task".format(model.replace(".","_")),
    bash_command="gcloud storage rsync --delete-unmatched-destination-objects --recursive `gcloud storage ls {0}/export/exporter/ | tail -1` {1}".format(output_dir, MODEL_LOCATION + model.replace(".","_")),
    dag=dag
  )
  
  # Build dependency graph, set_upstream dependencies for all tasks
  bash_remove_old_saved_model_op.set_upstream(ml_engine_training_op)
  bash_copy_new_saved_model_op.set_upstream(bash_remove_old_saved_model_op)
  
  return (ml_engine_training_op,
          bash_copy_new_saved_model_op)

In [None]:
%%writefile airflow/dags/module/deploy.py
import datetime
import logging

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.mlengine_operator import MLEngineModelOperator, MLEngineVersionOperator


def deploy_tasks(model, dag, PROJECT_ID, MODEL_NAME, MODEL_VERSION, MODEL_LOCATION):
  # Constants
  OTHER_VERSION_NAME = "v_{0}".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S")[0:12])

  # Create model on ML-Engine
  bash_ml_engine_models_list_op = BashOperator(
    task_id="bash_ml_engine_models_list_{}_task".format(model.replace(".","_")),
    xcom_push=True,
    bash_command="gcloud ml-engine models list --filter='name:{0}'".format(MODEL_NAME + model.replace(".","_")),
    dag=dag
  )

  def check_if_model_already_exists(templates_dict, **kwargs):
    cur_model = templates_dict["model"].replace(".","_")
    ml_engine_models_list = kwargs["ti"].xcom_pull(task_ids="bash_ml_engine_models_list_{}_task".format(cur_model))
    logging.info("check_if_model_already_exists: {}: ml_engine_models_list = \n{}".format(cur_model, ml_engine_models_list))
    create_model_task = "ml_engine_create_model_{}_task".format(cur_model)
    dont_create_model_task = "dont_create_model_dummy_branch_{}_task".format(cur_model)
    if len(ml_engine_models_list) == 0 or ml_engine_models_list == "Listed 0 items.":
      return create_model_task
    return dont_create_model_task

  check_if_model_already_exists_op = BranchPythonOperator(
    task_id="check_if_model_already_exists_{}_task".format(model.replace(".","_")),
    templates_dict={"model": model.replace(".","_")},
    python_callable=check_if_model_already_exists,
    provide_context=True,
    dag=dag
  )

  ml_engine_create_model_op = MLEngineModelOperator(
    task_id="ml_engine_create_model_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model={"name": MODEL_NAME + model.replace(".","_")}, 
    operation="create",
    dag=dag
  )

  create_model_dummy_op = DummyOperator(
    task_id="create_model_dummy_{}_task".format(model.replace(".","_")),
    trigger_rule="all_done",
    dag=dag
  )

  dont_create_model_dummy_branch_op = DummyOperator(
    task_id="dont_create_model_dummy_branch_{}_task".format(model.replace(".","_")),
    dag=dag
  )

  dont_create_model_dummy_op = DummyOperator(
    task_id="dont_create_model_dummy_{}_task".format(model.replace(".","_")),
    trigger_rule="all_done",
    dag=dag
  )

  # Create version of model on ML-Engine
  bash_ml_engine_versions_list_op = BashOperator(
    task_id="bash_ml_engine_versions_list_{}_task".format(model.replace(".","_")),
    xcom_push=True,
    bash_command="gcloud ml-engine versions list --model {0} --filter='name:{1}'".format(MODEL_NAME + model.replace(".","_"), MODEL_VERSION),
    dag=dag
  )

  def check_if_model_version_already_exists(templates_dict, **kwargs):
    cur_model = templates_dict["model"].replace(".","_")
    ml_engine_versions_list = kwargs["ti"].xcom_pull(task_ids="bash_ml_engine_versions_list_{}_task".format(cur_model))
    logging.info("check_if_model_version_already_exists: {}: ml_engine_versions_list = \n{}".format(cur_model, ml_engine_versions_list))
    create_version_task = "ml_engine_create_version_{}_task".format(cur_model)
    create_other_version_task = "ml_engine_create_other_version_{}_task".format(cur_model)
    if len(ml_engine_versions_list) == 0 or ml_engine_versions_list == "Listed 0 items.":
      return create_version_task
    return create_other_version_task

  check_if_model_version_already_exists_op = BranchPythonOperator(
    task_id="check_if_model_version_already_exists_{}_task".format(model.replace(".","_")), 
    templates_dict={"model": model.replace(".","_")},
    python_callable=check_if_model_version_already_exists,
    provide_context=True,
    dag=dag
  )

  ml_engine_create_version_op = MLEngineVersionOperator(
    task_id="ml_engine_create_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=MODEL_VERSION, 
    version={
      "name": MODEL_VERSION,
      "deploymentUri": MODEL_LOCATION + model.replace(".","_"),
      "runtimeVersion": "1.13",
      "framework": "TENSORFLOW",
      "pythonVersion": "3.5",
    },
    operation="create",
    dag=dag
  )

  ml_engine_create_other_version_op = MLEngineVersionOperator(
    task_id="ml_engine_create_other_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=OTHER_VERSION_NAME, 
    version={
      "name": OTHER_VERSION_NAME,
      "deploymentUri": MODEL_LOCATION + model.replace(".","_"),
      "runtimeVersion": "1.13",
      "framework": "TENSORFLOW",
      "pythonVersion": "3.5",
    },
    operation="create",
    dag=dag
  )

  ml_engine_set_default_version_op = MLEngineVersionOperator(
    task_id="ml_engine_set_default_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=MODEL_VERSION, 
    version={"name": MODEL_VERSION}, 
    operation="set_default",
    dag=dag
  )

  ml_engine_set_default_other_version_op = MLEngineVersionOperator(
    task_id="ml_engine_set_default_other_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=OTHER_VERSION_NAME, 
    version={"name": OTHER_VERSION_NAME}, 
    operation="set_default",
    dag=dag
  )
  
  # Build dependency graph, set_upstream dependencies for all tasks
  check_if_model_already_exists_op.set_upstream(bash_ml_engine_models_list_op)

  ml_engine_create_model_op.set_upstream(check_if_model_already_exists_op)
  create_model_dummy_op.set_upstream(ml_engine_create_model_op)
  dont_create_model_dummy_branch_op.set_upstream(check_if_model_already_exists_op)
  dont_create_model_dummy_op.set_upstream(dont_create_model_dummy_branch_op)

  bash_ml_engine_versions_list_op.set_upstream([dont_create_model_dummy_op, create_model_dummy_op])
  check_if_model_version_already_exists_op.set_upstream(bash_ml_engine_versions_list_op)

  ml_engine_set_default_version_op.set_upstream(ml_engine_create_version_op)
  ml_engine_set_default_other_version_op.set_upstream(ml_engine_create_other_version_op)
  
  return (bash_ml_engine_models_list_op,
          check_if_model_version_already_exists_op,
          ml_engine_create_version_op,
          ml_engine_create_other_version_op)

## Subdag

In [None]:
%%writefile airflow/dags/subdag/taxifare_subdag.py
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""DAG definition for taxifare automated pipeline."""

import airflow
from airflow import DAG

# Reference for all available airflow operators: 
# https://github.com/apache/incubator-airflow/tree/master/airflow/contrib/operators
from airflow.operators.subdag_operator import SubDagOperator
from airflow.hooks.base_hook import BaseHook

from airflow.models import TaskInstance

import datetime

from subdag import preprocess
from subdag import training
from subdag import deploy


def _get_project_id():
  """Get project ID from default GCP connection."""

  extras = BaseHook.get_connection("google_cloud_default").extra_dejson
  key = "extra__google_cloud_platform__project"
  if key in extras:
    project_id = extras[key]
  else:
    raise ("Must configure project_id in google_cloud_default "
           "connection from Airflow Console")
  return project_id

# Constants
# Get project ID and GCS bucket
PROJECT_ID = _get_project_id()
BUCKET = "gs://" + PROJECT_ID + "-bucket"

# Specify your source BigQuery dataset and table names
SOURCE_DATASET_TABLE_NAMES = "yellow.trips,green.trips_2014,green.trips_2015".split(",")

# Where to write out data in GCS
DATA_DIR = BUCKET + "/taxifare/data/"

# Base model parameters
MODEL_NAME = "taxifare_"
MODEL_VERSION = "v1"
MODEL_LOCATION = BUCKET + "/taxifare/saved_model/"

default_args = {
  "owner": "airflow",
  "depends_on_past": False,
  "start_date": airflow.utils.dates.days_ago(2),
  "email": ["airflow@example.com"],
  "email_on_failure": True,
  "email_on_retry": False,
  "retries": 5,
  "retry_delay": datetime.timedelta(minutes=5)
}

# Default schedule interval using cronjob syntax - can be customized here
# or in the Airflow console.

# Specify a schedule interval in CRON syntax to run once a day at 2100 hours (9pm)
# Reference: https://airflow.apache.org/scheduler.html
schedule_interval = "00 21 * * *"

# Title your DAG
DAG_NAME = "taxifare_subdag"

dag = DAG(
  DAG_NAME, 
  default_args=default_args,
  schedule_interval=None
)

dag.doc_md = __doc__


#
#
# Task Definition
#
#

for model in SOURCE_DATASET_TABLE_NAMES:
  subdag_preprocess_op = SubDagOperator(
    task_id="subdag_preprocess_{}_task".format(model.replace(".","_")),
    subdag=preprocess.preprocess_tasks(
        model,
        DAG_NAME,
        "subdag_preprocess_{}_task".format(model.replace(".","_")),
        default_args,
        PROJECT_ID,
        BUCKET,
        DATA_DIR),
    dag=dag
  )
  
  subdag_training_op = SubDagOperator(
    task_id="subdag_training_{}_task".format(model.replace(".","_")),
    subdag=training.training_tasks(
        model,
        DAG_NAME,
        "subdag_training_{}_task".format(model.replace(".","_")),
        default_args,
        PROJECT_ID,
        BUCKET,
        DATA_DIR,
        MODEL_NAME,
        MODEL_VERSION,
        MODEL_LOCATION),
    dag=dag
  )
  
  subdag_deploy_op = SubDagOperator(
    task_id="subdag_deploy_{}_task".format(model.replace(".","_")),
    subdag=deploy.deploy_tasks(
        model,
        DAG_NAME,
        "subdag_deploy_{}_task".format(model.replace(".","_")),
        default_args,
        PROJECT_ID,
        MODEL_NAME,
        MODEL_VERSION,
        MODEL_LOCATION),
    dag=dag
  )

  # Build dependency graph, set_upstream dependencies for all tasks
  subdag_training_op.set_upstream(subdag_preprocess_op)
  subdag_deploy_op.set_upstream(subdag_training_op)

In [None]:
%%writefile airflow/dags/subdag/preprocess.py
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.operators.bash_operator import BashOperator


def preprocess_tasks(model, parent_dag_name, child_dag_name, default_args, PROJECT_ID, BUCKET, DATA_DIR):
  # Create inner dag
  dag = DAG(
    "{0}.{1}".format(parent_dag_name, child_dag_name),
    default_args=default_args,
    schedule_interval=None
  )

  # Constants
  # Specify your source BigQuery project, dataset, and table names
  SOURCE_BQ_PROJECT = "nyc-tlc"
  SOURCE_DATASET_TABLE_NAMES = "yellow.trips,green.trips_2014,green.trips_2015".split(",")

  # Specify your destination BigQuery dataset
  DESTINATION_DATASET = "taxifare"
  
  # BigQuery data query
  bql="""
  SELECT
    (tolls_amount + fare_amount) AS fare_amount,
    EXTRACT(DAYOFWEEK FROM pickup_datetime) * 1.0 AS dayofweek,
    EXTRACT(HOUR FROM pickup_datetime) * 1.0 AS hourofday,
    pickup_longitude AS pickuplon,
    pickup_latitude AS pickuplat,
    dropoff_longitude AS dropofflon,
    dropoff_latitude AS dropofflat,
    passenger_count*1.0 AS passengers,
    CONCAT(CAST(pickup_datetime AS STRING), CAST(pickup_longitude AS STRING), CAST(pickup_latitude AS STRING), CAST(dropoff_latitude AS STRING), CAST(dropoff_longitude AS STRING)) AS key
  FROM
    `{0}.{1}`
  WHERE
    trip_distance > 0
    AND fare_amount >= 2.5
    AND pickup_longitude > -78
    AND pickup_longitude < -70
    AND dropoff_longitude > -78
    AND dropoff_longitude < -70
    AND pickup_latitude > 37
    AND pickup_latitude < 45
    AND dropoff_latitude > 37
    AND dropoff_latitude < 45
    AND passenger_count > 0
    AND rand() < 0.00001
  """

  bql = bql.format(SOURCE_BQ_PROJECT, model)

  bql_train = "SELECT * EXCEPT (key) FROM({0}) WHERE ABS(MOD(FARM_FINGERPRINT(key), 5)) < 4".format(bql)
  bql_eval = "SELECT * EXCEPT (key) FROM({0}) WHERE ABS(MOD(FARM_FINGERPRINT(key), 5)) = 4".format(bql)

  # Complete the BigQueryOperator task to truncate the table if it already exists before writing
  # Reference: https://airflow.apache.org/integration.html#bigqueryoperator
  bq_train_data_op = BigQueryOperator(
    task_id="bq_train_data_{}_task".format(model.replace(".","_")),
    bql=bql_train,
    destination_dataset_table="{}.{}_train_data".format(DESTINATION_DATASET, model.replace(".","_")),
    write_disposition="WRITE_TRUNCATE", # specify to truncate on writes
    use_legacy_sql=False,
    dag=dag
  )

  bq_eval_data_op = BigQueryOperator(
    task_id="bq_eval_data_{}_task".format(model.replace(".","_")),
    bql=bql_eval,
    destination_dataset_table="{}.{}_eval_data".format(DESTINATION_DATASET, model.replace(".","_")),
    write_disposition="WRITE_TRUNCATE", # specify to truncate on writes
    use_legacy_sql=False,
    dag=dag
  )

  sql = """
  SELECT
    COUNT(*)
  FROM
    [{0}:{1}.{2}]
  """

  # Check to make sure that the data tables won"t be empty
  bq_check_train_data_op = BigQueryCheckOperator(
    task_id="bq_check_train_data_{}_task".format(model.replace(".","_")),
    sql=sql.format(PROJECT_ID, DESTINATION_DATASET, model.replace(".","_") + "_train_data"),
    dag=dag
  )

  bq_check_eval_data_op = BigQueryCheckOperator(
    task_id="bq_check_eval_data_{}_task".format(model.replace(".","_")),
    sql=sql.format(PROJECT_ID, DESTINATION_DATASET, model.replace(".","_") + "_eval_data"),
    dag=dag
  )

  # BigQuery training data export to GCS
  bash_remove_old_data_op = BashOperator(
    task_id="bash_remove_old_data_{}_task".format(model.replace(".","_")),
    bash_command="if gcloud storage ls {0}/taxifare/data/{1} 2> /dev/null; then gcloud storage rm --recursive --continue-on-error {0}/taxifare/data/{1}/*; else true; fi".format(BUCKET, model.replace(".","_")),
    dag=dag
  )

  # Takes a BigQuery dataset and table as input and exports it to GCS as a CSV
  bq_export_gcs_train_csv_op = BigQueryToCloudStorageOperator(
    task_id="bq_export_gcs_train_csv_{}_task".format(model.replace(".","_")),
    source_project_dataset_table="{}.{}_train_data".format(DESTINATION_DATASET, model.replace(".","_")),
    destination_cloud_storage_uris=[DATA_DIR + "{}/train-*.csv".format(model.replace(".","_"))],
    export_format="CSV",
    print_header=False,
    dag=dag
  )

  bq_export_gcs_eval_csv_op = BigQueryToCloudStorageOperator(
    task_id="bq_export_gcs_eval_csv_{}_task".format(model.replace(".","_")),
    source_project_dataset_table="{}.{}_eval_data".format(DESTINATION_DATASET, model.replace(".","_")),
    destination_cloud_storage_uris=[DATA_DIR + "{}/eval-*.csv".format(model.replace(".","_"))],
    export_format="CSV",
    print_header=False,
    dag=dag
  )
  
  # Build dependency graph, set_upstream dependencies for all tasks
  bq_check_train_data_op.set_upstream(bq_train_data_op)
  bq_check_eval_data_op.set_upstream(bq_eval_data_op)

  bash_remove_old_data_op.set_upstream([bq_check_train_data_op, bq_check_eval_data_op])

  bq_export_gcs_train_csv_op.set_upstream(bash_remove_old_data_op)
  bq_export_gcs_eval_csv_op.set_upstream(bash_remove_old_data_op)
  
  return dag

In [None]:
%%writefile airflow/dags/subdag/training.py
import datetime

from airflow import DAG
from airflow.contrib.operators.mlengine_operator import MLEngineTrainingOperator
from airflow.operators.bash_operator import BashOperator


def training_tasks(model, parent_dag_name, child_dag_name, default_args, PROJECT_ID, BUCKET, DATA_DIR, MODEL_NAME, MODEL_VERSION, MODEL_LOCATION):
  # Create inner dag
  dag = DAG(
    "{0}.{1}".format(parent_dag_name, child_dag_name),
    default_args=default_args,
    schedule_interval=None
  )

  # Constants
  # The code package name comes from the model code in the module directory
  REGION = "us-east1"
  PACKAGE_URI = BUCKET + "/taxifare/code/taxifare-0.1.tar.gz"
  JOB_DIR = BUCKET + "/jobs"

  # ML Engine training job
  job_id = "taxifare_{}_{}".format(model.replace(".","_"), datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
  train_files = DATA_DIR + "{}/train-*.csv".format(model.replace(".","_"))
  eval_files = DATA_DIR + "{}/eval-*.csv".format(model.replace(".","_"))
  output_dir = BUCKET + "/taxifare/trained_model/{}".format(model.replace(".","_"))
  job_dir = JOB_DIR + "/" + job_id
  training_args = [
    "--job-dir", job_dir,
    "--train_data_paths", train_files,
    "--eval_data_paths", eval_files,
    "--output_dir", output_dir,
    "--train_steps", str(500),
    "--train_batch_size", str(32),
    "--eval_steps", str(500),
    "--eval_batch_size", str(32),
    "--nbuckets", str(8),
    "--hidden_units", "128,32,4"
  ]

  # Reference: https://airflow.apache.org/integration.html#cloud-ml-engine
  ml_engine_training_op = MLEngineTrainingOperator(
    task_id="ml_engine_training_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID,
    job_id=job_id,
    package_uris=[PACKAGE_URI],
    training_python_module="trainer.task",
    training_args=training_args,
    region=REGION,
    scale_tier="BASIC",
    runtime_version="1.13", 
    python_version="3.5",
    dag=dag
  )

  bash_remove_old_saved_model_op = BashOperator(
    task_id="bash_remove_old_saved_model_{}_task".format(model.replace(".","_")),
    bash_command="if gcloud storage ls {0} 2> /dev/null; then gcloud storage rm --recursive --continue-on-error {0}/*; else true; fi".format(MODEL_LOCATION + model.replace(".","_")),
    dag=dag
  )

  bash_copy_new_saved_model_op = BashOperator(
    task_id="bash_copy_new_saved_model_{}_task".format(model.replace(".","_")),
    bash_command="gcloud storage rsync --delete-unmatched-destination-objects --recursive `gcloud storage ls {0}/export/exporter/ | tail -1` {1}".format(output_dir, MODEL_LOCATION + model.replace(".","_")),
    dag=dag
  )

  # Build dependency graph, set_upstream dependencies for all tasks
  bash_remove_old_saved_model_op.set_upstream(ml_engine_training_op)
  bash_copy_new_saved_model_op.set_upstream(bash_remove_old_saved_model_op)
  
  return dag

In [None]:
%%writefile airflow/dags/subdag/deploy.py
import datetime
import logging

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.mlengine_operator import MLEngineModelOperator, MLEngineVersionOperator


def deploy_tasks(model, parent_dag_name, child_dag_name, default_args, PROJECT_ID, MODEL_NAME, MODEL_VERSION, MODEL_LOCATION):
  # Create inner dag
  dag = DAG(
    "{0}.{1}".format(parent_dag_name, child_dag_name),
    default_args=default_args,
    schedule_interval=None
  )

  # Constants
  OTHER_VERSION_NAME = "v_{0}".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S")[0:12])

  # Create model on ML-Engine
  bash_ml_engine_models_list_op = BashOperator(
    task_id="bash_ml_engine_models_list_{}_task".format(model.replace(".","_")),
    xcom_push=True,
    bash_command="gcloud ml-engine models list --filter='name:{0}'".format(MODEL_NAME + model.replace(".","_")),
    dag=dag
  )

  def check_if_model_already_exists(templates_dict, **kwargs):
    cur_model = templates_dict["model"].replace(".","_")
    ml_engine_models_list = kwargs["ti"].xcom_pull(task_ids="bash_ml_engine_models_list_{}_task".format(cur_model))
    logging.info("check_if_model_already_exists: {}: ml_engine_models_list = \n{}".format(cur_model, ml_engine_models_list))
    create_model_task = "ml_engine_create_model_{}_task".format(cur_model)
    dont_create_model_task = "dont_create_model_dummy_branch_{}_task".format(cur_model)
    if len(ml_engine_models_list) == 0 or ml_engine_models_list == "Listed 0 items.":
      return create_model_task
    return dont_create_model_task

  check_if_model_already_exists_op = BranchPythonOperator(
      task_id="check_if_model_already_exists_{}_task".format(model.replace(".","_")),
      templates_dict={"model": model.replace(".","_")},
      python_callable=check_if_model_already_exists,
      provide_context=True,
      dag=dag
  )

  ml_engine_create_model_op = MLEngineModelOperator(
    task_id="ml_engine_create_model_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model={"name": MODEL_NAME + model.replace(".","_")}, 
    operation="create",
    dag=dag
  )

  create_model_dummy_op = DummyOperator(
    task_id="create_model_dummy_{}_task".format(model.replace(".","_")),
    trigger_rule="all_done",
    dag=dag
  )

  dont_create_model_dummy_branch_op = DummyOperator(
    task_id="dont_create_model_dummy_branch_{}_task".format(model.replace(".","_")),
    dag=dag
  )

  dont_create_model_dummy_op = DummyOperator(
    task_id="dont_create_model_dummy_{}_task".format(model.replace(".","_")),
    trigger_rule="all_done",
    dag=dag
  )

  # Create version of model on ML-Engine
  bash_ml_engine_versions_list_op = BashOperator(
    task_id="bash_ml_engine_versions_list_{}_task".format(model.replace(".","_")),
    xcom_push=True,
    bash_command="gcloud ml-engine versions list --model {0} --filter='name:{1}'".format(MODEL_NAME + model.replace(".","_"), MODEL_VERSION),
    dag=dag
  )

  def check_if_model_version_already_exists(templates_dict, **kwargs):
    cur_model = templates_dict["model"].replace(".","_")
    ml_engine_versions_list = kwargs["ti"].xcom_pull(task_ids="bash_ml_engine_versions_list_{}_task".format(cur_model))
    logging.info("check_if_model_version_already_exists: {}: ml_engine_versions_list = \n{}".format(cur_model, ml_engine_versions_list))
    create_version_task = "ml_engine_create_version_{}_task".format(cur_model)
    create_other_version_task = "ml_engine_create_other_version_{}_task".format(cur_model)
    if len(ml_engine_versions_list) == 0 or ml_engine_versions_list == "Listed 0 items.":
      return create_version_task
    return create_other_version_task

  check_if_model_version_already_exists_op = BranchPythonOperator(
    task_id="check_if_model_version_already_exists_{}_task".format(model.replace(".","_")), 
    templates_dict={"model": model.replace(".","_")},
    python_callable=check_if_model_version_already_exists,
    provide_context=True,
    dag=dag
  )

  ml_engine_create_version_op = MLEngineVersionOperator(
    task_id="ml_engine_create_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=MODEL_VERSION, 
    version={
        "name": MODEL_VERSION,
        "deploymentUri": MODEL_LOCATION + model.replace(".","_"),
        "runtimeVersion": "1.13",
        "framework": "TENSORFLOW",
        "pythonVersion": "3.5",
    },
    operation="create",
    dag=dag
  )

  ml_engine_create_other_version_op = MLEngineVersionOperator(
    task_id="ml_engine_create_other_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=OTHER_VERSION_NAME, 
    version={
        "name": OTHER_VERSION_NAME,
        "deploymentUri": MODEL_LOCATION + model.replace(".","_"),
        "runtimeVersion": "1.13",
        "framework": "TENSORFLOW",
        "pythonVersion": "3.5",
    },
    operation="create",
    dag=dag
  )

  ml_engine_set_default_version_op = MLEngineVersionOperator(
    task_id="ml_engine_set_default_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=MODEL_VERSION, 
    version={"name": MODEL_VERSION}, 
    operation="set_default",
    dag=dag
  )

  ml_engine_set_default_other_version_op = MLEngineVersionOperator(
    task_id="ml_engine_set_default_other_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=OTHER_VERSION_NAME, 
    version={"name": OTHER_VERSION_NAME}, 
    operation="set_default",
    dag=dag
  )

  # Build dependency graph, set_upstream dependencies for all tasks
  check_if_model_already_exists_op.set_upstream(bash_ml_engine_models_list_op)

  ml_engine_create_model_op.set_upstream(check_if_model_already_exists_op)
  create_model_dummy_op.set_upstream(ml_engine_create_model_op)
  dont_create_model_dummy_branch_op.set_upstream(check_if_model_already_exists_op)
  dont_create_model_dummy_op.set_upstream(dont_create_model_dummy_branch_op)

  bash_ml_engine_versions_list_op.set_upstream([dont_create_model_dummy_op, create_model_dummy_op])
  check_if_model_version_already_exists_op.set_upstream(bash_ml_engine_versions_list_op)

  ml_engine_create_version_op.set_upstream(check_if_model_version_already_exists_op)
  ml_engine_create_other_version_op.set_upstream(check_if_model_version_already_exists_op)

  ml_engine_set_default_version_op.set_upstream(ml_engine_create_version_op)
  ml_engine_set_default_other_version_op.set_upstream(ml_engine_create_other_version_op)
  
  return dag

### Copy local Airflow DAG file and plugins into the DAGs folder

In [None]:
%%bash
gcloud storage cp --recursive airflow/dags/* gs://${AIRFLOW_BUCKET}/dags # overwrite if it exists

1. Navigate to your Cloud Composer [instance](https://console.cloud.google.com/composer/environments?project=)<br/><br/>

2. Trigger a __manual run__ of your DAG for testing<br/><br/>

3. Ensure your DAG runs successfully (all nodes outlined in dark green and 'success' tag shows)