# Automated Machine Learning 
**Continuous retraining using Pipelines**


## Introduction
In this example we use AutoML and Pipelines to enable contious retraining of a model based on updates to the training dataset. We will create two pipelines:
* one to demonstrate a training dataset that gets updated over time. 
* The second pipeline utilizes pipeline `Schedule` to trigger continuous retraining. 

In this notebook you will learn how to:
* Create an Experiment in an existing Workspace.
* Configure AutoML using AutoMLConfig.
* Create data ingestion pipeline to update a dataset
* Create training pipeline to prepare data, run AutoML, register the model and setup pipeline triggers.

## Setup
As part of the setup you have already created an Azure ML `Workspace` object. For AutoML you will need to create an `Experiment` object, which is a named object in a `Workspace` used to run experiments.

In [1]:
import logging

from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
from sklearn import datasets

import azureml.core
from azureml.core.experiment import Experiment
from azureml.core.workspace import Workspace
from azureml.train.automl import AutoMLConfig

This sample notebook may use features that are not available in previous versions of the Azure ML SDK.

In [2]:
print("This notebook was created using version 1.18.0 of the Azure ML SDK")
print("You are currently using version", azureml.core.VERSION, "of the Azure ML SDK")

This notebook was created using version 1.18.0 of the Azure ML SDK
You are currently using version 1.11.0 of the Azure ML SDK


Accessing the Azure ML workspace requires authentication with Azure.

The default authentication is interactive authentication using the default tenant. Executing the ws = Workspace.from_config() line in the cell below will prompt for authentication the first time that it is run.

If you have multiple Azure tenants, you can specify the tenant by replacing the ws = Workspace.from_config() line in the cell below with the following:
```
from azureml.core.authentication import InteractiveLoginAuthentication
auth = InteractiveLoginAuthentication(tenant_id = 'mytenantid')
ws = Workspace.from_config(auth = auth)
```
If you need to run in an environment where interactive login is not possible, you can use Service Principal authentication by replacing the ws = Workspace.from_config() line in the cell below with the following:
```
from azureml.core.authentication import ServicePrincipalAuthentication
auth = auth = ServicePrincipalAuthentication('mytenantid', 'myappid', 'mypassword')
ws = Workspace.from_config(auth = auth)
```
For more details, see aka.ms/aml-notebook-auth

In [5]:
ws = Workspace.from_config()
output = {}
output['Subscription ID'] = ws.subscription_id
output['Workspace'] = ws.name
output['Resource Group'] = ws.resource_group
output['Location'] = ws.location
print(output)

{'Subscription ID': '52061d21-01dd-4f9e-aca9-60fff4d67ee2', 'Workspace': 'mlops', 'Resource Group': 'MLOpsWorkshop', 'Location': 'eastus'}


In [6]:
dstor = ws.get_default_datastore()

# Choose a name for the run history container in the workspace.
experiment_name = 'ar-factoring-2class-autoretrain'
experiment = Experiment(ws, experiment_name)

output['Run History Name'] = experiment_name
pd.set_option('display.max_colwidth', -1)
outputDf = pd.DataFrame(data = output, index = [''])
outputDf.T

Unnamed: 0,Unnamed: 1
Subscription ID,52061d21-01dd-4f9e-aca9-60fff4d67ee2
Workspace,mlops
Resource Group,MLOpsWorkshop
Location,eastus
Run History Name,ar-factoring-2class-autoretrain


If you look in your workspace the experiment is not yet created.  

## Compute 

#### Create or Attach existing AmlCompute

You will need to create a compute target for your AutoML run, or use existing.  
#### Creation of AmlCompute takes approximately 5 minutes. 
If the AmlCompute with that name is already in your workspace this code will skip the creation process.


In [7]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your CPU cluster, or use the existing compute cluster
amlcompute_cluster_name = "automl"

# Verify that cluster does not exist already
try:
    compute_target = ComputeTarget(workspace=ws, name=amlcompute_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           max_nodes=4)
    compute_target = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)

compute_target.wait_for_completion(show_output=True)

Found existing cluster, use it.
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


## Run Configuration

In [8]:
from azureml.core.runconfig import CondaDependencies, RunConfiguration

# create a new RunConfig object
conda_run_config = RunConfiguration(framework="python")

# Set compute target to AmlCompute
conda_run_config.target = compute_target

conda_run_config.environment.docker.enabled = True

cd = CondaDependencies.create(pip_packages=['azureml-sdk[automl]', 'applicationinsights', 'azureml-opendatasets', 'azureml-defaults'], 
                              conda_packages=['numpy==1.16.2'], 
                              pin_sdk_version=False)
conda_run_config.environment.python.conda_dependencies = cd

print('run config is ready')

run config is ready


## Data Ingestion Pipeline 
For this lab, we are simply going to pull a copy of the existing data directly from our github repo, overriding the existing data.  In the real world we would pull the latest data into the registered dataset.  Simply making a copy of the data is sufficient because the copy will set the flags for the last time the data was updated.  We can use that information later to determine if we want to start a retraining event.  

In the next cell we have a little python program that simply copies the data to the existing dataset/datastore.  When we build a AMLS pipeline we need to use a `python script` so this cell actually builds a python script (the first line does this) for us.  Change any variables you need and run this cell.  You should see the `upload_latest_data.py` is updated.  


In [None]:
%%writefile upload_latest_data.py

import argparse
import os
from datetime import datetime
from dateutil.relativedelta import relativedelta
import pandas as pd
import traceback
from azureml.core import Dataset
from azureml.core.run import Run, _OfflineRun
from azureml.core import Workspace
from azureml.opendatasets import NoaaIsdWeather

run = Run.get_context()
ws = None
if type(run) == _OfflineRun:
    ws = Workspace.from_config()
else:
    ws = run.experiment.workspace

usaf_list = ['725724', '722149', '723090', '722159', '723910', '720279',
             '725513', '725254', '726430', '720381', '723074', '726682',
             '725486', '727883', '723177', '722075', '723086', '724053',
             '725070', '722073', '726060', '725224', '725260', '724520',
             '720305', '724020', '726510', '725126', '722523', '703333',
             '722249', '722728', '725483', '722972', '724975', '742079',
             '727468', '722193', '725624', '722030', '726380', '720309',
             '722071', '720326', '725415', '724504', '725665', '725424',
             '725066']


def get_noaa_data(start_time, end_time):
    columns = ['usaf', 'wban', 'datetime', 'latitude', 'longitude', 'elevation',
               'windAngle', 'windSpeed', 'temperature', 'stationName', 'p_k']
    isd = NoaaIsdWeather(start_time, end_time, cols=columns)
    noaa_df = isd.to_pandas_dataframe()
    df_filtered = noaa_df[noaa_df["usaf"].isin(usaf_list)]
    df_filtered.reset_index(drop=True)
    print("Received {0} rows of training data between {1} and {2}".format(
        df_filtered.shape[0], start_time, end_time))
    return df_filtered


print("Check for new data and prepare the data")

parser = argparse.ArgumentParser("split")
parser.add_argument("--ds_name", help="name of the Dataset to update")
args = parser.parse_args()

print("Argument 1(ds_name): %s" % args.ds_name)

dstor = ws.get_default_datastore()
register_dataset = False
try:
    ds = Dataset.get_by_name(ws, args.ds_name)
    end_time_last_slice = ds.data_changed_time.replace(tzinfo=None)
    print("Dataset {0} last updated on {1}".format(args.ds_name,
                                                   end_time_last_slice))
except Exception as e:
    print(traceback.format_exc())
    print("Dataset with name {0} not found, registering new dataset.".format(args.ds_name))
    register_dataset = True
    end_time_last_slice = datetime.today() - relativedelta(weeks=2)

end_time = datetime.utcnow()
train_df = get_noaa_data(end_time_last_slice, end_time)

if train_df.size > 0:
    print("Received {0} rows of new data after {0}.".format(
        train_df.shape[0], end_time_last_slice))
    folder_name = "{}/{:04d}/{:02d}/{:02d}/{:02d}/{:02d}/{:02d}".format(args.ds_name, end_time.year,
                                                                        end_time.month, end_time.day,
                                                                        end_time.hour, end_time.minute,
                                                                        end_time.second)
    file_path = "{0}/data.csv".format(folder_name)

    # Add a new partition to the registered dataset
    os.makedirs(folder_name, exist_ok=True)
    train_df.to_csv(file_path, index=False)

    dstor.upload_files(files=[file_path],
                       target_path=folder_name,
                       overwrite=True,
                       show_progress=True)
else:
    print("No new data since {0}.".format(end_time_last_slice))

if register_dataset:
    ds = Dataset.Tabular.from_delimited_files(dstor.path("{}/**/*.csv".format(
        args.ds_name)), partition_format='/{partition_date:yyyy/MM/dd/HH/mm/ss}/data.csv')
    ds.register(ws, name=args.ds_name)


In [None]:
# The name and target column of the Dataset to create 
dataset = "NOAA-Weather-DS4"
target_column_name = "temperature"


### Upload Data Step
The data ingestion pipeline has a single step with a script to query the latest weather data and upload it to the blob store. During the first run, the script will create and register a time-series based `TabularDataset` with the past one week of weather data. For each subsequent run, the script will create a partition in the blob store by querying NOAA for new weather data since the last modified time of the dataset (`dataset.data_changed_time`) and creating a data.csv file.

In [None]:
from azureml.pipeline.core import Pipeline, PipelineParameter
from azureml.pipeline.steps import PythonScriptStep

ds_name = PipelineParameter(name="ds_name", default_value=dataset)
upload_data_step = PythonScriptStep(script_name="upload_weather_data.py", 
                                         allow_reuse=False,
                                         name="upload_weather_data",
                                         arguments=["--ds_name", ds_name],
                                         compute_target=compute_target, 
                                         runconfig=conda_run_config)

### Submit Pipeline Run

In [None]:
data_pipeline = Pipeline(
    description="pipeline_with_uploaddata",
    workspace=ws,    
    steps=[upload_data_step])
data_pipeline_run = experiment.submit(data_pipeline, pipeline_parameters={"ds_name":dataset})

In [None]:
data_pipeline_run.wait_for_completion(show_output=False)

## Training Pipeline
### Prepare Training Data Step

Script to check if new data is available since the model was last trained. If no new data is available, we cancel the remaining pipeline steps. We need to set allow_reuse flag to False to allow the pipeline to run even when inputs don't change. We also need the name of the model to check the time the model was last trained.

In [None]:
from azureml.pipeline.core import PipelineData

# The model name with which to register the trained model in the workspace.
model_name = PipelineParameter("model_name", default_value="noaaweatherds")

In [None]:
data_prep_step = PythonScriptStep(script_name="check_data.py", 
                                         allow_reuse=False,
                                         name="check_data",
                                         arguments=["--ds_name", ds_name,
                                                    "--model_name", model_name],
                                         compute_target=compute_target, 
                                         runconfig=conda_run_config)

In [None]:
from azureml.core import Dataset
train_ds = Dataset.get_by_name(ws, dataset)
train_ds = train_ds.drop_columns(["partition_date"])

### AutoMLStep
Create an AutoMLConfig and a training step.

In [None]:
from azureml.train.automl import AutoMLConfig
from azureml.pipeline.steps import AutoMLStep

automl_settings = {
    "iteration_timeout_minutes": 10,
    "experiment_timeout_hours": 0.25,
    "n_cross_validations": 3,
    "primary_metric": 'r2_score',
    "max_concurrent_iterations": 3,
    "max_cores_per_iteration": -1,
    "verbosity": logging.INFO,
    "enable_early_stopping": True
}

automl_config = AutoMLConfig(task = 'regression',
                             debug_log = 'automl_errors.log',
                             path = ".",
                             compute_target=compute_target,
                             training_data = train_ds,
                             label_column_name = target_column_name,
                             **automl_settings
                            )

In [None]:
from azureml.pipeline.core import PipelineData, TrainingOutput

metrics_output_name = 'metrics_output'
best_model_output_name = 'best_model_output'

metrics_data = PipelineData(name='metrics_data',
                           datastore=dstor,
                           pipeline_output_name=metrics_output_name,
                           training_output=TrainingOutput(type='Metrics'))
model_data = PipelineData(name='model_data',
                           datastore=dstor,
                           pipeline_output_name=best_model_output_name,
                           training_output=TrainingOutput(type='Model'))

In [None]:
automl_step = AutoMLStep(
    name='automl_module',
    automl_config=automl_config,
    outputs=[metrics_data, model_data],
    allow_reuse=False)

### Register Model Step
Script to register the model to the workspace. 

In [None]:
register_model_step = PythonScriptStep(script_name="register_model.py",
                                       name="register_model",
                                       allow_reuse=False,
                                       arguments=["--model_name", model_name, "--model_path", model_data, "--ds_name", ds_name],
                                       inputs=[model_data],
                                       compute_target=compute_target,
                                       runconfig=conda_run_config)

### Submit Pipeline Run

In [None]:
training_pipeline = Pipeline(
    description="training_pipeline",
    workspace=ws,    
    steps=[data_prep_step, automl_step, register_model_step])

In [None]:
training_pipeline_run = experiment.submit(training_pipeline, pipeline_parameters={
        "ds_name": dataset, "model_name": "noaaweatherds"})

In [None]:
training_pipeline_run.wait_for_completion(show_output=False)

### Publish Retraining Pipeline and Schedule
Once we are happy with the pipeline, we can publish the training pipeline to the workspace and create a schedule to trigger on blob change. The schedule polls the blob store where the data is being uploaded and runs the retraining pipeline if there is a data change. A new version of the model will be registered to the workspace once the run is complete.

In [None]:
pipeline_name = "Retraining-Pipeline-NOAAWeather"

published_pipeline = training_pipeline.publish(
    name=pipeline_name, 
    description="Pipeline that retrains AutoML model")

published_pipeline

In [None]:
from azureml.pipeline.core import Schedule
schedule = Schedule.create(workspace=ws, name="RetrainingSchedule",
                           pipeline_parameters={"ds_name": dataset, "model_name": "noaaweatherds"},
                           pipeline_id=published_pipeline.id, 
                           experiment_name=experiment_name, 
                           datastore=dstor,
                           wait_for_provisioning=True,
                           polling_interval=1440)

## Test Retraining
Here we setup the data ingestion pipeline to run on a schedule, to verify that the retraining pipeline runs as expected. 

Note: 
* Azure NOAA Weather data is updated daily and retraining will not trigger if there is no new data available. 
* Depending on the polling interval set in the schedule, the retraining may take some time trigger after data ingestion pipeline completes.

In [None]:
pipeline_name = "DataIngestion-Pipeline-NOAAWeather"

published_pipeline = training_pipeline.publish(
    name=pipeline_name, 
    description="Pipeline that updates NOAAWeather Dataset")

published_pipeline

In [None]:
from azureml.pipeline.core import Schedule
schedule = Schedule.create(workspace=ws, name="RetrainingSchedule-DataIngestion",
                           pipeline_parameters={"ds_name":dataset},
                           pipeline_id=published_pipeline.id, 
                           experiment_name=experiment_name, 
                           datastore=dstor,
                           wait_for_provisioning=True,
                           polling_interval=1440)