In [32]:
from azureml.core import Dataset, Workspace, Experiment, Datastore
from azureml.core import Environment, Model
from azureml.core.runconfig import RunConfiguration
from azureml.data.datapath import DataPath
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

from sklearn.metrics import mean_squared_error

import pandas as pd 
import numpy as np

import os

In [33]:
# Setup a workspace
ws = Workspace.from_config('config.json')

In [34]:
# Create a folder for the pipeline step files
experiment_folder = 'bike_pipeline'
os.makedirs(experiment_folder, exist_ok=True)
print(experiment_folder)

bike_pipeline


In [35]:
%%writefile $experiment_folder/prep_bike.py
# Import libraries
import os 
import argparse 
import pandas as pd 
import pickle
from azureml.core import Run, Workspace
from sklearn.preprocessing import MinMaxScaler

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get the experiment run context 
run = Run.get_context()
 
# load the data (passed as an input dataset)
print('Loading dataset...')
bike_data = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count()
row_count = (len(bike_data))
run.log('raw_rows', row_count)

# remove all nulls
bike_data = bike_data.dropna()

# Normalize numeric collumns
scaler = MinMaxScaler()
num_cols = ['temp' ,'atemp' ,'humidity' ,'windspeed' ,'weather' ,'holiday' , 'workingday', 'season']
bike_data[num_cols] = scaler.fit_transform(bike_data[num_cols])

# Log processed rows
row_count = (len(bike_data))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'bike_data.csv')
bike_data.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

Overwriting bike_pipeline/prep_bike.py


In [36]:
%%writefile $experiment_folder/train_bike.py
# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_log_error

from hyperopt import tpe, STATUS_OK, Trials, hp, fmin, STATUS_OK, space_eval

import xgboost as xgb

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-data", type=str, dest='training_data', help='training data')
args = parser.parse_args()
training_data = args.training_data

# Get the experiment run context
run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_data, 'bike_data.csv')
bike_data = pd.read_csv(file_path)

# Separate features and labels
X, y = bike_data[['temp' ,'atemp' ,'humidity' ,'windspeed' ,'weather' ,'holiday' , 'workingday', 'season']].values, bike_data['count'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

space = {
    'learning_rate':    hp.choice('learning_rate',    np.arange(0.05, 1.00, 0.05)),
    'max_depth':        hp.choice('max_depth',        np.arange(5, 16, 1, dtype=int)),
    'min_child_weight': hp.choice('min_child_weight', np.arange(1, 8, 1, dtype=int)),
    'colsample_bytree': hp.choice('colsample_bytree', np.arange(0.3, 0.8, 0.1)),
    'subsample':        hp.choice('subsample',        [0, 0.50, 0.8, 1]),
    'n_estimators':     hp.choice('n_estimators',     range(10, 500, 10)),
    'tree_method':      hp.choice('tree_method',      ['hist'])
}

# Objective function
def objective(params):
    # Instantiate model
    model = xgb.XGBRegressor(**params)

    # Fit and predict
    model.fit(X_train, y_train)
    y_hat = model.predict(X_test)
    y_hat = y_hat.clip(min=0)
    
    # Calculate the root mean squared error
    rmsle = np.sqrt(mean_squared_log_error(y_test, y_hat))

    # Retrun loss, status and model
    return {'loss': rmsle, 'status': STATUS_OK, 'model': model}

# Trials to track progress
bayes_trials = Trials()

# Optimize
best = fmin(fn=objective, space=space, algo=tpe.suggest, max_evals=200, trials=bayes_trials)
params = space_eval(space, best)

# Train light gradient boosting model
print('Training a decision tree model...')
model = xgb.XGBRegressor(**params).fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
y_hat = y_hat.clip(min=0)
msle = mean_squared_log_error(y_test, y_hat)
print(msle)

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'bike_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'bike_model',
               tags={'Training context':'Pipeline'},
               properties={'MSE': np.float(msle)})

run.complete()

Overwriting bike_pipeline/train_bike.py


In [37]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = 'madison'

try: 
    #Checks if a compute target is already existing
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster! Using it for this session.')
except ComputeTargetException:
    # If compute target not existing, creating new one
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_A2_v2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster! Using it for this session.


In [38]:
%%writefile $experiment_folder/experiment_env.yml
name: experiment_env
dependencies:
- python=3.7.2
- pip:
  - azureml-defaults
  - scikit-learn
  - pip
  - pyarrow
  - hyperopt 
  - ipykernel
  - matplotlib
  - pandas
  - xgboost

Overwriting bike_pipeline/experiment_env.yml


In [39]:
# Create enviroment from .yml file
experiment_env = Environment.from_conda_specification('experiment_env', experiment_folder + '/experiment_env.yml')

# Register the enviroment 
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Create a new runconfig onject for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute created above
pipeline_run_config.target = pipeline_cluster

# Assign the enviroment to the run config
pipeline_run_config.environment = registered_env

print('Running config created')

Running config created


In [40]:
# Get the training dataset
bike_ds = ws.datasets.get('london_bike_data')

# Create an OutputFileDatasetConfig (temp data reference) for data passed from step 1 to step 2
prepped_data = OutputFileDatasetConfig('prepped_data')

# Step 1, Run the data prep script
prep_step = PythonScriptStep(
    name='Prepare data',
    source_directory=experiment_folder,
    script_name='prep_bike.py',
    arguments=[
        '--input-data', bike_ds.as_named_input('raw_data'),
        '--prepped-data', prepped_data],
    runconfig=pipeline_run_config,
    allow_reuse=True
)

# Step 2, run the training script
train_step = PythonScriptStep(
    name='Train and register model',
    source_directory=experiment_folder,
    script_name='train_bike.py',
    arguments=['--training-data', prepped_data.as_input()],
    compute_target=pipeline_cluster,
    runconfig=pipeline_run_config,
    allow_reuse=True
)

print('Pipeline steps defined')

Pipeline steps defined


In [41]:
# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print('Pipeline is built!')

# Create an experiment
experiment = Experiment(workspace=ws, name='bike_pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print('Pipeline submitted for execution.')

#RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Pipeline is built!
Created step Prepare data [9378776c][9f91cb71-c259-480b-b468-9e95c73fbe12], (This step will run and generate new outputs)
Created step Train and register model [90fd1d46][4df32f26-873e-4acd-a84d-5cdaf4f4c062], (This step will run and generate new outputs)
Submitted PipelineRun ca78698d-8369-4c86-bddd-c5003de18695
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ca78698d-8369-4c86-bddd-c5003de18695?wsid=/subscriptions/5a361d37-b562-4eee-981b-0936493063e9/resourcegroups/ml_group/workspaces/ml_enviroment&tid=08548f02-0216-4325-938b-fd30f6829e55
Pipeline submitted for execution.
PipelineRunId: ca78698d-8369-4c86-bddd-c5003de18695
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ca78698d-8369-4c86-bddd-c5003de18695?wsid=/subscriptions/5a361d37-b562-4eee-981b-0936493063e9/resourcegroups/ml_group/workspaces/ml_enviroment&tid=08548f02-0216-4325-938b-fd30f6829e55
PipelineRun Status: Running


StepRunId: 3c9dcbd6-855b-4ece-bada-1338d9bac4e6
Link

ExperimentExecutionException: ExperimentExecutionException:
	Message: The output streaming for the run interrupted.
But the run is still executing on the compute target. 
Details for canceling the run can be found here: https://aka.ms/aml-docs-cancel-run
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "The output streaming for the run interrupted.\nBut the run is still executing on the compute target. \nDetails for canceling the run can be found here: https://aka.ms/aml-docs-cancel-run"
    }
}

In [None]:
# Examine metrics recorded by child runs
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t', metric_name, ':', metrics[metric_name])

Train and register model :
Prepare data :
	 raw_rows : 10886
	 processed_rows : 10886


In [None]:
# Register a new model with a training context tag to indicate it was trained in a pipeline
for model in Model.list(ws):
    print(model.name, 'version', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print('\t', tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print('\t', prop_name, ':', prop)
    print('\n')

bike_model version 6
	 Training context : Pipeline
	 MSE : 21783.041721689464


bike_model version 5
	 Training context : Pipeline
	 MSE : 774685.1528476852


bike_model version 4
	 Training context : Pipeline
	 MSE : 0.012539497713253425


windpred_automl_lightgbm version 1
	 azureml.datastoreId : /subscriptions/5a361d37-b562-4eee-981b-0936493063e9/resourceGroups/ml_group/providers/Microsoft.MachineLearningServices/workspaces/ml_enviroment/datastores/workspaceartifactstore


