## Setting up the environment
This is just common setup so that we can load the right flyte configuration and the connect with the right endpoint.

In [28]:
from flytekit.configuration import set_flyte_config_file, platform
#set_flyte_config_file("/Users/kumare/.ssh/notebook-staging.config")
set_flyte_config_file("../flyte-production.config")

print("Connected to {} {}".format(platform.URL.get(), platform.AUTH.get()))

def print_console_url(exc):
    print("http://{}/console/projects/{}/domains/{}/executions/{}".format(platform.URL.get(), exc.id.project, exc.id.domain, exc.id.name))

Connected to flyte.lyft.net False


## Defining a XGBoost Training job
We will create a job that will train an XGBoost model using the prebuilt algorithms @Sagemaker. Refer to [Sagemaker XGBoost docs here](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html). To understand more about XGBoost refer [here](https://xgboost.readthedocs.io/en/latest/). To dive deeper into the Flytekit API refer to [docs](https://lyft.github.io/flyte/flytekit/flytekit.common.tasks.sagemaker.html?highlight=sagemaker#module-flytekit.common.tasks.sagemaker)


In [20]:
from flytekit.sdk.tasks import inputs
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Input, Output
from flytekit.common.tasks.sagemaker import built_in_training_job_task
from flytekit.models.sagemaker import training_job as training_job_models

# Defining the values of some hyperparameters, which will be used by the TrainingJob 
# these hyper-parameters are commonly used by the XGboost algorithm. Here we bootstrap them with some default Values
# Usually the default values are selected or "tuned - refer to next section"
xgboost_hyperparameters = {
    "num_round": "100",
    "base_score": "0.5",
    "booster": "gbtree",
    "csv_weights": "0",
    "dsplit": "row",
    "grow_policy": "depthwise",
    "lambda_bias": "0.0",
    "max_bin": "256",
    "normalize_type": "tree",
    "objective": "reg:linear",
    "one_drop": "0",
    "process_type": "default",
    "refresh_leaf": "1",
    "sample_type": "uniform",
    "scale_pos_weight": "1.0",
    "silent": "0",
    "skip_drop": "0.0",
    "tree_method": "auto",
    "tweedie_variance_power": "1.5",
    "updater": "grow_colmaker,prune",
}

# Here we define the actual algorithm (XGBOOST) and version of the algorithm to use
alg_spec = training_job_models.AlgorithmSpecification(
    input_mode=training_job_models.InputMode.FILE,
    algorithm_name=training_job_models.AlgorithmName.XGBOOST,
    algorithm_version="0.90",
    input_content_type=training_job_models.InputContentType.TEXT_CSV,
)

# Finally lets use Flytekit plugin called SdkBuiltinAlgorithmTrainingJobTask, to create a task that wraps the algorithm.
# This task does not really have a user-defined function as the actual algorithm is pre-defined in Sagemaker.
# But, this task still has the same set of properties like any other FlyteTask
# - Caching
# - Resource specification
# - versioning etc
xgboost_train_task = built_in_training_job_task.SdkBuiltinAlgorithmTrainingJobTask(
    training_job_resource_config=training_job_models.TrainingJobResourceConfig(
        instance_type="ml.m4.xlarge",
        instance_count=1,
        volume_size_in_gb=25,
    ),
    algorithm_specification=alg_spec,
    cache_version='blah9',
    cacheable=True,
)


## Execute the training job
You can use [Single task Execution](https://lyft.github.io/flyte/user/features/single_task_execution.html?highlight=single%20task%20execution) to trigger an execution for the defined training task. To trigger an execution, you need to provide
1. Project (flyteexamples)
1. Domain (development)
1. inputs

Pre-built algorithms have restricitve set of inputs. They always expect
1. Training data set
1. Validation data set
1. Static set of hyper parameters as a dictionary

In this case lets take the input generated for the single location training as shown in the previous example. To get the outputs, we will fetch the outputs from the previous execution

In [21]:
from flytekit.common.workflow_execution import SdkWorkflowExecution
exe_id = "ctwv59t4g9" # Replace this
exe = SdkWorkflowExecution.fetch(project="flytesnacks", domain="development", name=exe_id)
print_console_url(exe)
exe.outputs

http://flyte.lyft.net/console/projects/flytesnacks/domains/development/executions/ctwv59t4g9


{'model': metadata {
   type {
   }
 }
 uri: "s3://lyft-modelbuilder/3g/ctwv59t4g9-fit-task-0/0e1a85186c9f6255216738e5a2810fb6",
 'accuracy': 0.0}

### We have the execution now
As we can see the outputs are available locally. But we dont want the outputs of the workflow, we want the intermediate generated data - the intermediate outputs. This was output of node execution. The node in this case is "split" the node

In [22]:
nodes = exe.get_node_executions()
op = nodes["split"].outputs
train = op["train"]
val = op["val"]

In [26]:
training_inputs={
    "train": train.uri,
    "validation": val.uri,
    "static_hyperparameters": xgboost_hyperparameters,
}

# Invoking the SdkBuiltinAlgorithmTrainingJobTask
training_exc = xgboost_train_task.register_and_launch("flytesnacks", "development", inputs=training_inputs)
print_console_url(training_exc)

http://flyte.lyft.net/console/projects/flytesnacks/domains/development/executions/o4zh55a04r


## Optimize the hyper-parameters
Amazon Sagemaker offers automatic hyper parameter blackbox optimization using [HPO Service](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning-how-it-works.html). This technique is highly effective to figure out the right set of hyper parameters to use that improve the overall accuracy of the model (or minimize the error)

Flyte makes it extremely effective to optimize a model using Amazon Sagemaker HPO. In this example we will look how this can be done for the prebuilt algorithm training done in the previous section

### Define an HPO Task, that wraps the training task
So to start with hyper parameter optimization, once a training task is created, wrap it in **SdkSimpleHyperparameterTuningJobTask** attribte **training_job**

In [24]:
from flytekit.models.sagemaker import hpo_job as hpo_job_models
from flytekit.common.tasks.sagemaker import hpo_job_task
from flytekit.models.sagemaker.hpo_job import HyperparameterTuningJobConfig, \
HyperparameterTuningObjectiveType, HyperparameterTuningStrategy, \
TrainingJobEarlyStoppingType, HyperparameterTuningObjective
from flytekit.models.sagemaker.parameter_ranges import HyperparameterScalingType, ParameterRanges, CategoricalParameterRange, ContinuousParameterRange, IntegerParameterRange

xgboost_hpo_task = hpo_job_task.SdkSimpleHyperparameterTuningJobTask(
    training_job=xgboost_train_task,
    max_number_of_training_jobs=10,
    max_parallel_training_jobs=5,
    cache_version='blah9',
    retries=2,
    cacheable=True,
)

### Launch the HPO Job
Just like Training Job, HPO job can be launched directly from the notebook
The inputs for an HPO job that wraps a training job, are the combination of all inputs for the training job - i.e,
 1. "train" dataset, "validation" dataset and "static hyper parameters" for Training job
 1. HyperparameterTuningJobConfig, which consists of ParameterRanges, for the parameters that should be tuned,
 1. tuning strategy - Bayesian OR Random (or others as described in Sagemaker)
 1. Stopping condition and
 1. Objective metric name and type (whether to minimize etc)

In [25]:
# When launching the TrainingJob and HPOJob, we need to define the inputs.
# Inputs are those directly related to algorithm outputs. We use the inputs
# and the version information to decide cache hit/miss

hpo_inputs={
    "train": train.uri,
    "validation": val.uri,
    "static_hyperparameters": xgboost_hyperparameters,
    "hyperparameter_tuning_job_config": HyperparameterTuningJobConfig(
        
        #############################################
        # Define the tunable hyperparameters and the 
        # range/set of possible values of each hp
        #############################################
        hyperparameter_ranges=ParameterRanges(
            parameter_range_map={
                "num_round": IntegerParameterRange(min_value=3, max_value=10, 
                                                   scaling_type=HyperparameterScalingType.LINEAR),
                "max_depth": IntegerParameterRange(min_value=5, max_value=7, 
                                                   scaling_type=HyperparameterScalingType.LINEAR),
                "gamma": ContinuousParameterRange(min_value=0.0, max_value=0.3,
                                                  scaling_type=HyperparameterScalingType.LINEAR),
            }
        ),
        tuning_strategy=HyperparameterTuningStrategy.BAYESIAN,
        tuning_objective=HyperparameterTuningObjective(
            objective_type=HyperparameterTuningObjectiveType.MINIMIZE,
            metric_name="validation:error",
        ),
        training_job_early_stopping_type=TrainingJobEarlyStoppingType.AUTO
    ).to_flyte_idl(),
}

## Register and launch the task standalone!
hpo_exc = xgboost_hpo_task.register_and_launch("flytesnacks", "development", inputs=hpo_inputs)
print_console_url(hpo_exc)

http://flyte.lyft.net/console/projects/flytesnacks/domains/development/executions/mhgakfah9h


## We can actually create a pipeline dynamically in the notebook
Let us use the task that would generate the data
"demo.house_price_predictor.generate_and_split_data"
And then create a workflow that chains this task with the sagemaker training task (Of course you can do this for the HPO Task as well)

In [38]:
from flytekit.common.tasks.task import SdkTask
generate_data = SdkTask.fetch("flytesnacks", "development", "demo.house_price_predictor.generate_and_split_data", version="85dc5898ae0d9b375e194aa1246b548ab95242d7")
generate_data

Flyte python-task: inputs {
  variables {
    key: "loc"
    value {
      type {
        simple: STRING
      }
    }
  }
  variables {
    key: "number_of_houses"
    value {
      type {
        simple: INTEGER
      }
    }
  }
  variables {
    key: "seed"
    value {
      type {
        simple: INTEGER
      }
    }
  }
}
outputs {
  variables {
    key: "test"
    value {
      type {
        blob {
          format: "csv"
        }
      }
    }
  }
  variables {
    key: "train"
    value {
      type {
        blob {
          format: "csv"
          dimensionality: MULTIPART
        }
      }
    }
  }
  variables {
    key: "val"
    value {
      type {
        blob {
          format: "csv"
          dimensionality: MULTIPART
        }
      }
    }
  }
}

In [39]:
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Input, Output, workflow

@workflow_class
class WorkflowFromSharedTasks():
    loc = Input(Types.String, help="Location for where to train the model.")
    seed = Input(Types.Integer, default=7, help="Seed to use for splitting.")
    num_houses = Input(Types.Integer, default=1000, help="Number of houses to generate data for")
    generate_data_task = generate_data(loc=loc, number_of_houses=num_houses, seed=seed)
    fit_task = xgboost_train_task(train=generate_data_task.outputs.train, validation=generate_data_task.outputs.val, static_hyperparameters=xgboost_hyperparameters)

    # Outputs: joblib seralized model and accuracy of the model
    model = Output(fit_task.outputs.model, sdk_type=Types.Blob)

In [34]:
sagemaker_lp = WorkflowFromSharedTasks.create_launch_plan()

In [35]:
WorkflowFromSharedTasks.register(project="flytesnacks", domain="development", name="SagemakerShared", version="1")
sagemaker_lp.register(project="flytesnacks", domain="development", name="SagemakerShared", version="1")

'lp:flytesnacks:development:SagemakerShared:1'

In [37]:
exe=sagemaker_lp.launch(project="flytesnacks", domain="development", inputs={"loc":"SFO", "seed":5, "num_houses":1000})
print_console_url(exe)

http://flyte.lyft.net/console/projects/flytesnacks/domains/development/executions/fe249ec555287416dadc


In [41]:
exe.wait_for_completion()