In [None]:
import os
import re
from IPython.core.display import display, HTML
from datetime import datetime
import mlflow
import pymysql

In [None]:
# Jupyter magic jinja template to create Python file with variable substitution.
# Dictonaries for substituted variables: env[] for OS environment vars and var[] for global variables
from IPython.core.magic import register_line_cell_magic
from jinja2 import Template

@register_line_cell_magic
def writetemplate(line, cell):
    dirname = os.path.dirname(line)
    if len(dirname)>0 and not os.path.exists(dirname):
        os.makedirs(dirname)
    with open(line, 'w') as f:
        f.write(Template(cell).render({'env' : os.environ, 'var' : globals()}))

In [None]:
experiment_name = "chicago-taxi"
mlflow.set_experiment(experiment_name)

mlflow_tracking_uri = mlflow.get_tracking_uri()
MLFLOW_TRACKING_EXTERNAL_URI = os.environ["MLFLOW_TRACKING_EXTERNAL_URI"]

REGION=os.environ["MLOPS_REGION"]
ML_IMAGE_URI = os.environ["ML_IMAGE_URI"]
COMPOSER_NAME = os.environ["MLOPS_COMPOSER_NAME"]
MLFLOW_GCS_ROOT_URI = os.environ["MLFLOW_GCS_ROOT_URI"]

print(f"Cloud Composer instance name: {COMPOSER_NAME}")
print(f"Cloud Composer region: {REGION}")
print(f"MLflow tracking server URI: {mlflow_tracking_uri}")
print(f"MLflow GCS root: {MLFLOW_GCS_ROOT_URI}")

experiment_path = MLFLOW_GCS_ROOT_URI.replace("gs://","")
display(HTML('<hr>You can check results of this test in MLflow and GCS folder:'))
display(HTML(f'<h4><a href="{MLFLOW_TRACKING_EXTERNAL_URI}" rel="noopener noreferrer" target="_blank">Click to open MLflow UI</a></h4>'))
display(HTML(f'<h4><a href="https://console.cloud.google.com/storage/browser/{experiment_path}/experiments" rel="noopener noreferrer" target="_blank">Click to open MLFlow GCS folder</a></h4>'))

!mkdir -p ./package/training
!touch ./package/training/__init__.py

In [None]:
%%writefile ./package/setup.py
from setuptools import find_packages
from setuptools import setup

REQUIRED_PACKAGES = ['mlflow==1.11.0','PyMySQL==0.9.3']

setup(
    name='trainer',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    description='Customer training setup.'
)

In [None]:
%%writetemplate ./package/training/task.py

import mlflow
import mlflow.sklearn
import numpy as np
from sklearn.linear_model import LogisticRegression
import sys, stat
import argparse
import os

def train_model(args):
    print("Taxi fare estimation model training step started...")
    mlflow.set_experiment(args.experiment_name)
    with mlflow.start_run(nested=True):
        #TODO 😁😁😁😁
        mlflow.log_metric("score", score)
        mlflow.sklearn.log_model(lr, "model")
    print("Training finished.")

def main():
    print("Training arguments: " + " ".join(sys.argv[1:]))
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int)
    parser.add_argument("--job-dir", type=str)
    parser.add_argument("--local_data", type=str)
    parser.add_argument("--experiment_name", type=str)
    args, unknown_args = parser.parse_known_args()

    # CLOUD_ML_JOB conatains other CAIP Training runtime parameters in JSON object
    # job = os.environ["CLOUD_ML_JOB"]
    
    # MLflow locally available
    mlflow.set_tracking_uri("http://127.0.0.1:80")


    print("Training main started")
    train_model(args)

    # if --job-dir provided in 'ai-platform jobs submit' command you can upload any training result to that
    # if args.job_dir:
    # args.local_data, args.job_dir

if __name__ == "__main__":
    main()

In [None]:
!gcloud composer environments storage data import \
    --environment {COMPOSER_NAME} \
    --location {REGION} \
    --source ./package \
    --destination dual_model_trainer_dag


In [None]:
%%writetemplate dual_model_trainer_dag.py
import os
import logging
from datetime import (datetime, timedelta)
#import tensorflow_data_validation as tfdv
import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator

# Working with Airflow 1.10.10 version!
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.mlengine_operator import MLEngineTrainingOperator

experiment_name = "{{ var['experiment_name'] }}"
ML_IMAGE_URI = "{{ var['ML_IMAGE_URI'] }}"
job_experiment_root = f"{{ env['MLFLOW_GCS_ROOT_URI'] }}/experiments/{experiment_name}"

PROJECT_ID = os.getenv("GCP_PROJECT")
REGION = os.getenv("COMPOSER_LOCATION")

default_args = dict(retries=1,start_date=airflow.utils.dates.days_ago(0))

# Postfixes for temporary BQ tables and output CSV files
TRAINING_POSTFIX = "_training"
EVAL_POSTFIX = "_eval"
VALIDATION_POSTFIX = "_validation"

BQ_DATASET = "chicago_taxi_trips"
BQ_TABLE = "taxi_trips"
BQ_QUERY = """
    SELECT unique_key, taxi_id, trip_start_timestamp, trip_end_timestamp, trip_seconds, trip_miles, pickup_census_tract, 
        dropoff_census_tract, pickup_community_area, dropoff_community_area, fare, tips, tolls, extras, trip_total, 
        payment_type, company, pickup_latitude, pickup_longitude, pickup_location, dropoff_latitude, dropoff_longitude, dropoff_location
    FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` 
    WHERE
      dropoff_latitude IS NOT NULL and
      dropoff_longitude IS NOT NULL and
      dropoff_location  IS NOT NULL and
      MOD(ABS(FARM_FINGERPRINT(unique_key)), 100) {}
    LIMIT 100
    """

training_command="""gcloud ai-platform jobs submit training {job_name} \
    --region {region} \
    --scale-tier BASIC \
    --job-dir {job_dir} \
    --package-path /home/airflow/gcs/data/dual_model_trainer_dag/package/training/ \
    --module-name training.task \
    --master-image-uri {ml_image_uri} \
    -- \
    --experiment_name {experiment_name} \
    --epochs 2"""

def generate_tfdv_statistics(gcs_file_name, **kwargs):
    logging.info("Processing %s", gcs_file_name)
    # Currently skipped because of pip module versions in Airflow
    #train_stats = tfdv.generate_statistics_from_csv(gcs_file_name)
    #tfdv.WriteStatisticsToTFRecord(output_path = gcs_file_name + ".tfrecord")
    return None

def joiner_1(training_gcs_file_name, eval_gcs_file_name, **kwargs):
    logging.info("Joining %s, eval GCS files %s", training_gcs_file_name, eval_gcs_file_name)
    return None

def model_trainer(training_gcs_file_name, eval_gcs_file_name, model_file, **kwargs):
    logging.info("Training %s, eval GCS files %s", training_gcs_file_name, eval_gcs_file_name)
    
    return None
    
with DAG("dual_model_trainer",
         description = "Train evaluate and validate two models on taxi fare dataset. Select the best one and register it to Mlflow v0.02",
         schedule_interval = None, # manual trigger
         start_date = datetime(1969, 9, 1),
         catchup = False,
         doc_md = __doc__
         ) as dag:

    tasks = {
        "training" : {
            "dataset_range" : "between 0 and 80"
            },
        "eval":{
            "dataset_range" : "between 80 and 95"
            },
        "validation": {
            "dataset_range" : "between 95 and 100"
        }}
    
    # Define task list for preparation
    for task_key in tasks.keys():
        # Note: fix table names causes race condition in case when DAG triggered before the previous finished.
        table_name = f"{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}_{task_key}"
        task = tasks[task_key]
        task["gcs_file_name"] = f"{job_experiment_root}/data/ds_{task_key}.csv"
        
        # Deletes previous training temporary tables
        task["delete_table"] = BigQueryTableDeleteOperator(
            task_id = "delete_table_" + task_key,
            deletion_dataset_table = table_name,
            ignore_if_missing = True)

        # Splits and copy source BQ table to 'dataset_range' sized segments
        task["split_table"] = BigQueryOperator(
            task_id = "split_table_" + task_key,
            use_legacy_sql=False,
            destination_dataset_table = table_name,
            sql = BQ_QUERY.format(task["dataset_range"]),
            location = REGION)
        
        # Extract split tables to CSV files in GCS
        task["extract_to_gcs"] = BigQueryToCloudStorageOperator(
            task_id = "extract_to_gcs_" + task_key,
            source_project_dataset_table = table_name,
            destination_cloud_storage_uris = [task["gcs_file_name"]],
            field_delimiter = '|')
        
        # Generates statisctics by TFDV
        task["tfdv_statisctics"] = PythonOperator(
            task_id = "tfdv_statistics_for_" + task_key,
            python_callable = generate_tfdv_statistics,
            provide_context = True,
            op_kwargs={'gcs_file_name': task["gcs_file_name"]})

    
    joiner_1 = PythonOperator(
        task_id = "joiner_1",
        python_callable = joiner_1,
        provide_context = True,
        op_kwargs={ 'training_gcs_file_name': tasks["training"]["gcs_file_name"],
                    'eval_gcs_file_name': tasks["eval"]["gcs_file_name"]})

    # Model trainers
    
#    trainer_1 = PythonOperator(
#        task_id = "trainer_1",
#        python_callable = model_trainer,
#        provide_context = True,
#        op_kwargs={ 'training_gcs_file_name': tasks["training"]["gcs_file_name"],
#                    'eval_gcs_file_name': tasks["eval"]["gcs_file_name"],
#                    'model_file': f"{job_experiment_root}/data/model1.joblib"}
#    )
#
#    trainer_2 = PythonOperator(
#        task_id = "trainer_2",
#        python_callable = model_trainer,
#        provide_context = True,
#        op_kwargs={ 'training_gcs_file_name': tasks["training"]["gcs_file_name"],
#                    'eval_gcs_file_name': tasks["eval"]["gcs_file_name"],
#                    'model_file': f"{job_experiment_root}/data/model2.joblib"}
#    )

    submit_time = datetime.now().strftime("%Y%m%d_%H%M%S")
    job_name = f"training_job_{submit_time}"
    job_dir = f"{job_experiment_root}/dmt_{submit_time}"
    print(f"Training job: '{job_name}' will produce output to: {job_dir}")

    trainer_1 = BashOperator(
        task_id="trainer_1",
        bash_command=training_command.format(region = REGION,
                                             job_name = job_name+'_1',
                                             job_dir = job_dir+'_1',
                                             ml_image_uri = ML_IMAGE_URI,
                                             experiment_name = experiment_name)
    )
    trainer_2 = BashOperator(
        task_id="trainer_2",
        bash_command=training_command.format(region = REGION,
                                             job_name = job_name+'_2',
                                             job_dir = job_dir+'_2',
                                             ml_image_uri = ML_IMAGE_URI,
                                             experiment_name = experiment_name)
    )

    # Exectute tasks
    for task_key, task in tasks.items():
        task["delete_table"] >> task["split_table"] >> task["extract_to_gcs"] >> task["tfdv_statisctics"]
    [tasks["training"]["tfdv_statisctics"], tasks["eval"]["tfdv_statisctics"]] >> joiner_1
    joiner_1 >> [trainer_1, trainer_2]
#    [tasks["training"]["tfdv_statisctics"], tasks["eval"]["tfdv_statisctics"]] >> trainer_1
#    [tasks["training"]["tfdv_statisctics"], tasks["eval"]["tfdv_statisctics"]] >> trainer_2
    
    # Train two models (two separate AI Platform Training Jobs) (PythonOperator)
    #  Input: data in GCS
    #  Output: model1.joblib model2.joblib
    #  Note: eval metric (one eval split) is stored in MLflow

    # Evaluate the previous model on the current  eval split
    #  Input: experiment Id (fetch the last (registered) model)
    #  Output: eval stored in MLflow for the previous model

    # Validate the model (PythonOperator)
    #  Input: Mflow metric
    #  Output: which model (path) to register

    # Register the model (PythonOperator) 
    #  Input: Path of the winning model
    #  Output: Model in specific GCS location

In [None]:
!gcloud composer environments storage dags import \
  --environment {COMPOSER_NAME}  \
  --location {REGION} \
  --source dual_model_trainer_dag.py