### Setting up Flyte environment

In [2]:
from flytekit.configuration import set_flyte_config_file, platform
set_flyte_config_file("/Users/changhonghsu/.flyte/notebook-staging.config")
#set_flyte_config_file("notebook.config")

print("Connected to {}".format(platform.URL.get()))

def print_console_url(exc):
    print("http://{}/console/projects/{}/domains/{}/executions/{}".format(platform.URL.get(), exc.id.project, exc.id.domain, exc.id.name))

Connected to flyte-staging.lyft.net


# SageMaker on Flyte -- Launching SageMaker TrainingJob and HPOJob from Flyte (Alpha)

To enable seamless and powerful machine learning use cases on Flyte, we are implementing a plugin for Flyte to allow users to leverage some of AWS SageMaker's key functionalities directly from within their Flyte workflows and tasks, so that they can enjoy the excellent data-processing and orchestration capability of Flyte at the same time.



## Defining a Simple Training Job

Users can leverage SageMaker's powerful built-in algorithms easily without needing to write any function or logic. They can achieve this by simplying using Flytekit's `SdkSimpleTrainingJobTask` and supplies the settings and the spec of the target algorithm.


In [78]:
from flytekit.sdk.tasks import inputs
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Input, Output
from flytekit.common.tasks.sagemaker import training_job_task, hpo_job_task
from flytekit.models.sagemaker import training_job as training_job_models, hpo_job as hpo_job_models
from flytekit.sdk.sagemaker import types as _sdk_sagemaker_types

# Defining the values of some hyperparameters, which will be used by the TrainingJob 
xgboost_hyperparameters = {
    "num_round": "6",
    "base_score": "0.5",
    "booster": "gbtree",
    "csv_weights": "0",
    "dsplit": "row",
    "grow_policy": "depthwise",
    "lambda_bias": "0.0",
    "max_bin": "256",
    "normalize_type": "tree",
    "objective": "reg:linear",
    "one_drop": "0",
    "prob_buffer_row": "1.0",
    "process_type": "default",
    "refresh_leaf": "1",
    "sample_type": "uniform",
    "scale_pos_weight": "1.0",
    "silent": "0",
    "skip_drop": "0.0",
    "tree_method": "auto",
    "tweedie_variance_power": "1.5",
    "updater": "grow_colmaker,prune",
}


# Users can leverage SageMaker's powerful built-in algorithms easily 
# without needing to write any function or logic.

# When using SageMaker's built-in algorithm mode, users simply 
# specify the target algorithm, the version of the library (if applicable)
# and the target metric they want the training to optimize for

alg_spec = training_job_models.AlgorithmSpecification(
    input_mode=_sdk_sagemaker_types.InputMode.FILE,
    algorithm_name=_sdk_sagemaker_types.AlgorithmName.XGBOOST,
    algorithm_version="0.72",
    metric_definitions=[
        training_job_models.MetricDefinition(name="Minimize", regex="validation:error")
    ]
)


# And then they can simply define a training-job task by using SdkSimpleTrainingJobTask 
# in Flytekit and supplies the settings and the previously defined spec of the built-in 
# algorithm. Since a SdkSimpleTrainingJobTask inherits from Flytekit's SdkTask, it also
# can enjoy the benefit such as caching

xgboost_train_task = training_job_task.SdkSimpleTrainingJobTask(
    training_job_config=training_job_models.TrainingJobConfig(
        instance_type="ml.m4.xlarge",
        instance_count=1,
        volume_size_in_gb=25,
    ),
    algorithm_specification=alg_spec,
    cache_version='2',
    cacheable=True,
)


## Doing faster iterations with single task execution -- running the TrainingJob Task standalone

The single-task execution capability in Flyte allows users to run their tasks without a workflow. This allow users to do faster iterations purely from inside their notebook.

In [81]:
from flytekit.models.sagemaker.training_job import StoppingCondition

training_inputs={
    "train": "s3://lyft-modelbuilder/test-datasets/pima-indians/train",
    "validation": "s3://lyft-modelbuilder/test-datasets/pima-indians/validation",
    "static_hyperparameters": xgboost_hyperparameters,
    "stopping_condition": StoppingCondition(
        max_runtime_in_seconds=43200,
    ).to_flyte_idl(),
}



In [82]:
# Invoking the SdkSimpleTrainingJobTask
training_exc = xgboost_train_task.register_and_launch("flyteexamples", "development", inputs=training_inputs)
print_console_url(training_exc)


# [A working example]
# https://flyte-staging.lyft.net/console/projects/flyteexamples/domains/development/executions/soxtmrw4am

# [A failed example]
# http://flyte-staging.lyft.net/console/projects/flyteexamples/domains/development/executions/w11i8a1njq


http://flyte-staging.lyft.net/console/projects/flyteexamples/domains/development/executions/peetm5bfmf


In [None]:
training_exc.wait_for_completion()

## Wrapping a Training Job inside a Hyperparameter Optimization Job

Sometimes, training with a static set of hyperparameters might not yield the desired results

In [90]:
# Let's wrap the previously defined SdkSimpleTrainingJobTask
# inside a SdkSimpleHPOJobTask and config the HPOJob task a little bit

xgboost_hpo_task = hpo_job_task.SdkSimpleHPOJobTask(
    training_job=xgboost_train_task,
    max_number_of_training_jobs=10,
    max_parallel_training_jobs=5,
    cache_version='2',
    retries=2,
    cacheable=True,
)

# We can print out the SdkSimpleHPOJobTask's trainingJob field to 
# verify if the definition is wrapped correctly
print(xgboost_hpo_task.custom['trainingJob'])


{'algorithmSpecification': {'algorithmName': 'XGBOOST', 'algorithmVersion': '0.72', 'metricDefinitions': [{'name': 'Minimize', 'regex': 'validation:error'}]}, 'trainingJobConfig': {'instanceCount': '1', 'instanceType': 'ml.m4.xlarge', 'volumeSizeInGb': '25'}}
{'hpo_job_config': type {
  simple: BINARY
  metadata {
    fields {
      key: "pb_type"
      value {
        string_value: "flyteidl.plugins.sagemaker.hpo_job_pb2.HPOJobConfig"
      }
    }
  }
}
, 'static_hyperparameters': type {
  simple: STRUCT
}
, 'train': type {
  blob {
    format: "csv"
    dimensionality: MULTIPART
  }
}
, 'validation': type {
  blob {
    format: "csv"
    dimensionality: MULTIPART
  }
}
, 'stopping_condition': type {
  simple: BINARY
  metadata {
    fields {
      key: "pb_type"
      value {
        string_value: "flyteidl.plugins.sagemaker.training_job_pb2.StoppingCondition"
      }
    }
  }
}
}


## Let's run the Hyperparameter Optimization Job standalone!

In [94]:
from flytekit.models.sagemaker.training_job import StoppingCondition
from flytekit.models.sagemaker.hpo_job import HPOJobConfig, HyperparameterTuningObjective
from flytekit.models.sagemaker.parameter_ranges import ParameterRanges, CategoricalParameterRange, ContinuousParameterRange, IntegerParameterRange

# When launching the TrainingJob and HPOJob, we need to define the inputs.
# Inputs are those directly related to algorithm outputs. We use the inputs
# and the version information to decide cache hit/miss

hpo_inputs={
    "train": "s3://lyft-modelbuilder/test-datasets/pima-indians/train",
    "validation": "s3://lyft-modelbuilder/test-datasets/pima-indians/validation",
    "static_hyperparameters": xgboost_hyperparameters,
    "stopping_condition": StoppingCondition(
        max_runtime_in_seconds=43200,
    ).to_flyte_idl(),
    "hpo_job_config": HPOJobConfig(
        
        #############################################
        # Define the tunable hyperparameters and the 
        # range/set of possible values of each hp
        #############################################
        hyperparameter_ranges=ParameterRanges(
            parameter_range_map={
                "max_depth": IntegerParameterRange(min_value=5, max_value=7, 
                                                   scaling_type=_sdk_sagemaker_types.HyperparameterScalingType.LINEAR),
                # Untunable hyperparameter in XGBoost 0.72
                "rate_drop": ContinuousParameterRange(min_value=0.0, max_value=0.1,
                                                      scaling_type=_sdk_sagemaker_types.HyperparameterScalingType.LINEAR),
#                 "gamma": ContinuousParameterRange(min_value=0.0, max_value=0.3,
#                                                   scaling_type=_sdk_sagemaker_types.HyperparameterScalingType.LINEAR),
            }
        ),
        tuning_strategy=_sdk_sagemaker_types.HyperparameterTuningStrategy.BAYESIAN,
        tuning_objective=HyperparameterTuningObjective(
            objective_type=_sdk_sagemaker_types.HyperparameterTuningObjectiveType.MINIMIZE,
            metric_name="validation:error",
        ),
        training_job_early_stopping_type=_sdk_sagemaker_types.TrainingJobEarlyStoppingType.AUTO
    ).to_flyte_idl(),
}

## Register and launch the task standalone!
hpo_exc = xgboost_hpo_task.register_and_launch("flyteexamples", "development", inputs=hpo_inputs)
print_console_url(hpo_exc)

# [A working example]
# https://flyte-staging.lyft.net/console/projects/flyteexamples/domains/development/executions/zw6oqashn6
# https://flyte-staging.lyft.net/console/projects/flyteexamples/domains/development/executions/zxr1n4911g

# [A failed example]
# https://flyte-staging.lyft.net/console/projects/flyteexamples/domains/development/executions/pr8nzuxl7o

http://flyte-staging.lyft.net/console/projects/flyteexamples/domains/development/executions/zxr1n4911g


In [None]:
hpo_exc.wait_for_completion()

## Can I launch my SageMaker tasks in a Workflow too?
Of course! After the users are satisfied with the TrainingJob task and HPOJob task definitions and want to see how it plays with the other components in their pipelines, they can just directly invoke the same tasks they've been iterating on inside their workflows.

In this next cell, let's define and register some `SdkPrestoTask`'s to fetch the data from Presto instead of relying on an external CSV file. 


In [None]:
from flytekit.common.tasks.task import SdkTask
from flytekit.sdk.workflow import workflow_class, Input, Output
from flytekit.models.sagemaker.training_job import StoppingCondition
from flytekit.models.sagemaker.hpo_job import HPOJobConfig, HyperparameterTuningObjective
from flytekit.models.sagemaker.parameter_ranges import ParameterRanges, CategoricalParameterRange, ContinuousParameterRange, IntegerParameterRange
from flytekit.common.tasks.presto_task import SdkPrestoTask
from flytekit.sdk.tasks import inputs

from os import environ
environ["version"] = "29"
environ["spec_version"] = "29-1"

get_train_data2 = SdkPrestoTask(
    task_inputs=inputs(),
    statement="""
    SELECT * 
    FROM hive.flyte.datacouncildemo_train
    """,
    output_schema=Types.Schema(),
    discoverable=True,
    discovery_version="3",
)
get_train_data2.register(project="flytesnacks", domain="development", name="get_train_data", version=environ["version"])

get_validation_data = SdkPrestoTask(
    task_inputs=inputs(),
    statement="""
    SELECT * 
    FROM hive.flyte.datacouncildemo_validation
    """,
    output_schema=Types.Schema(),
    discoverable=True,
    discovery_version="2",
)
get_validation_data.register(project="flytesnacks", domain="development", name="get_validation_data", version=environ["version"])



### Don't re-invent the wheels
Note that, the `SdkPrestoTask`'s will generate parquet files, but the `TrainingJob` and `HPOJob` we defined earlier requires inputs to be in CSV format. Fortunately, somebody has already writen a common python task that transforms parquet to CSV. **Let's just import that task and use it.**

In [None]:
# Data transformation task
transform_parquet_to_csv = SdkTask.fetch(project="flytesnacks", domain="development", name="transform_parquet_to_csv", version="24")

## Let's create a workflow wrapping the  `SdkSimpleHPOJobTask` we defined earlier



In [None]:
import copy

@workflow_class()
class TrainingWorkflow(object):    
    
    # Retrieve data
    train_data = get_train_data2()
    validation_data = get_validation_data()
    
    # Transform data
    train_csv = transform_parquet_to_csv(input_parquet=train_data.outputs.results)
    validation_csv = transform_parquet_to_csv(input_parquet=validation_data.outputs.results)
    
    # Invoking the same HPO task we defined earlier
    train = xgboost_hpo_task(
        # Using the input we got from the Presto tasks
        train=train_csv.outputs.output_csv,
        validation=validation_csv.outputs.output_csv,
        
        static_hyperparameters=xgboost_hyperparameters,
        stopping_condition=StoppingCondition(max_runtime_in_seconds=43200).to_flyte_idl(),
        hpo_job_config=HPOJobConfig(    
            hyperparameter_ranges=ParameterRanges(
                parameter_range_map={
                "max_depth": IntegerParameterRange(min_value=5, max_value=7, 
                                                   scaling_type=_sdk_sagemaker_types.HyperparameterScalingType.LINEAR),
                }
            ),
            tuning_strategy=_sdk_sagemaker_types.HyperparameterTuningStrategy.BAYESIAN,
            tuning_objective=HyperparameterTuningObjective(
                objective_type=_sdk_sagemaker_types.HyperparameterTuningObjectiveType.MINIMIZE,
                metric_name="validation:error",
            ),
            training_job_early_stopping_type=_sdk_sagemaker_types.TrainingJobEarlyStoppingType.AUTO
        ).to_flyte_idl(),
    )
    
    model = Output(train.outputs.model, sdk_type=Types.Blob)
    
TrainingWorkflow.register(project="flyteexamples", domain="development", name="TrainingWorkflow", version=environ["spec_version"])
TrainingWorkflow_lp = TrainingWorkflow.create_launch_plan()
TrainingWorkflow_lp.register(project="flyteexamples", domain="development", name="TrainingWorkflow", version=environ["spec_version"])

In [None]:
exec = TrainingWorkflow_lp.launch(project="flyteexamples", domain="development", inputs={})
print_console_url(exec)

# [A working example]
# https://flyte-staging.lyft.net/console/projects/flytesnacks/domains/development/executions/f9e645a85e0164b0a9b3

'tsk:flytesnacks:development:get_validation_data:29'

### Don't re-invent the wheels
Note that, the `SdkPrestoTask`'s will generate parquet files, but the `TrainingJob` and `HPOJob` we defined earlier requires inputs to be in CSV format. Fortunately, somebody has already writen a common python task that transforms parquet to CSV. **Let's just import that task and use it.**

In [93]:
# Data transformation task
transform_parquet_to_csv = SdkTask.fetch(project="flytesnacks", domain="development", name="transform_parquet_to_csv", version="24")

## Let's create a workflow wrapping the  `SdkSimpleHPOJobTask` we defined earlier



In [72]:
import copy

@workflow_class()
class TrainingWorkflow(object):    
    
    # Retrieve data
    train_data = get_train_data2()
    validation_data = get_validation_data()
    
    # Transform data
    train_csv = transform_parquet_to_csv(input_parquet=train_data.outputs.results)
    validation_csv = transform_parquet_to_csv(input_parquet=validation_data.outputs.results)
    
    # Invoking the same HPO task we defined earlier
    train = xgboost_hpo_task(
        # Using the input we got from the Presto tasks
        train=train_csv.outputs.output_csv,
        validation=validation_csv.outputs.output_csv,
        
        static_hyperparameters=xgboost_hyperparameters,
        stopping_condition=StoppingCondition(max_runtime_in_seconds=43200).to_flyte_idl(),
        hpo_job_config=HPOJobConfig(    
            hyperparameter_ranges=ParameterRanges(
                parameter_range_map={
                "max_depth": IntegerParameterRange(min_value=5, max_value=7, 
                                                   scaling_type=_sdk_sagemaker_types.HyperparameterScalingType.LINEAR),
                }
            ),
            tuning_strategy=_sdk_sagemaker_types.HyperparameterTuningStrategy.BAYESIAN,
            tuning_objective=HyperparameterTuningObjective(
                objective_type=_sdk_sagemaker_types.HyperparameterTuningObjectiveType.MINIMIZE,
                metric_name="validation:error",
            ),
            training_job_early_stopping_type=_sdk_sagemaker_types.TrainingJobEarlyStoppingType.AUTO
        ).to_flyte_idl(),
    )
    
    model = Output(train.outputs.model, sdk_type=Types.Blob)
    
TrainingWorkflow.register(project="flyteexamples", domain="development", name="TrainingWorkflow", version=environ["spec_version"])
TrainingWorkflow_lp = TrainingWorkflow.create_launch_plan()
TrainingWorkflow_lp.register(project="flyteexamples", domain="development", name="TrainingWorkflow", version=environ["spec_version"])

'lp:flyteexamples:development:TrainingWorkflow:29-1'

In [74]:
exec = TrainingWorkflow_lp.launch(project="flyteexamples", domain="development", inputs={})
print_console_url(exec)

# [A working example]
# https://flyte-staging.lyft.net/console/projects/flytesnacks/domains/development/executions/f9e645a85e0164b0a9b3

http://flyte-staging.lyft.net/console/projects/flyteexamples/domains/development/executions/f5fe2f8f8dd104b3291f
