# Azure ML Training Pipeline for Water Demand Forecasting
This notebook defines an Azure machine learning pipeline for a single training experiment and submits the pipeline as an experiment to be run on an Azure virtual machine. It then publishes the pipeline in the workspace.

In [None]:
# Import statements
import azureml.core
from azureml.core import Experiment
from azureml.core import Workspace, Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import PipelineData
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.environment import Environment
from azureml.core.runconfig import RunConfiguration
import shutil
import datetime

### Register the workspace and configure its Python environment.

In [None]:
# Get reference to the workspace
ws = Workspace.from_config("./ws_config.json")

# Set workspace's environment
env = Environment.from_pip_requirements(name = "Water_env", file_path = "./../requirements.txt")
env.python.conda_dependencies.add_pip_package("azureml-core")
env.python.conda_dependencies.add_pip_package("sendgrid")
env.register(workspace=ws)
runconfig = RunConfiguration(conda_dependencies=env.python.conda_dependencies)
print(env.python.conda_dependencies.serialize_to_string())

# Move AML ignore file to root folder
aml_ignore_path = shutil.copy('./.amlignore', './../.amlignore') 
gitignore_path = shutil.copy('./../.gitignore', './.gitignore') 

### Create references to persistent and intermediate data
Create DataReference objects that point to our data on the blob.

In [None]:
# Get the blob datastores associated with this workspace
water_blob_ds = Datastore(ws, name='water_forecast_ds') 

# Create data references to folders on the blobs
new_raw_data_dr = DataReference(
    datastore=water_blob_ds,
    data_reference_name="new_raw_data_files",
    path_on_datastore="raw_data/quarterly/")
inter_raw_datasets_dr = DataReference(
    datastore=water_blob_ds,
    data_reference_name="intermediate_raw_datasets",
    path_on_datastore="raw_data/intermediate/")
merged_raw_datasets_dr = DataReference(
    datastore=water_blob_ds,
    data_reference_name="merged_raw_dataset",
    path_on_datastore="raw_data/full_raw_data.csv")
preprocessed_data_dr = DataReference(
    datastore=water_blob_ds,
    data_reference_name="preprocessed_datasets",
    path_on_datastore="preprocessed_data/")
outputs_dr = DataReference(
    datastore=water_blob_ds,
    data_reference_name="outputs",
    path_on_datastore="outputs/")

# Set up references to pipeline data (intermediate pipeline storage)
preprocessed_data_pd = PipelineData(
    "preprocessed_output",
    datastore=water_blob_ds,
    output_name="preprocessed_datasets",
    output_mode="mount")

### Compute Target
Specify and configure the compute target for this workspace. If a compute cluster by the name we specified does not exist, create a new compute cluster.

In [None]:
# Define some constants
CT_NAME = "d16v3-train"          # Name of our compute cluster
VM_SIZE = "STANDARD_D16_V3"      # Specify the Azure VM for execution of our pipeline
MIN_NODES = 0                    # Min number of compute nodes in cluster
MAX_NODES = 3                    # Max number of compute nodes in cluster

# Set up the compute target for this experiment
try:
    compute_target = AmlCompute(ws, CT_NAME)
    print("Found existing compute target.")
except ComputeTargetException:
    print("Creating new compute target")
    provisioning_config = AmlCompute.provisioning_configuration(vm_size=VM_SIZE, min_nodes=MIN_NODES, max_nodes=MAX_NODES)    
    compute_target = ComputeTarget.create(ws, CT_NAME, provisioning_config)  # Create the compute cluster
    
    # Wait for cluster to be provisioned
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20) 
    
print("Azure Machine Learning Compute attached")
print("Compute targets: ", ws.compute_targets)
compute_target = ws.compute_targets[CT_NAME]

### Define pipeline and submit experiment.
Define the steps of an Azure machine learning pipeline. Create an Azure Experiment that will run our pipeline. Submit the experiment to the execution environment.

In [None]:
# Define some constants
TEST_DAYS = 183                # Test trained model on most recent half year
FORECAST_DAYS = 20 * 365 + 5    # Obtain forecast for the next 20 years
PREPROCESS_STRATEGY = 'complete'  # Set to 'complete' to loop over all consumption data. Set to 'quick' to preprocess newly uploaded data and append to old preprocessed data.
EXPERIMENT_NAME = 'Train-Forecast-Experiment-v2'

# Define preprocessing step in the ML pipeline
step1 = PythonScriptStep(name="preprocess_step",
                         script_name="azure/preprocess_step/preprocess_step.py",
                         arguments=["--newrawdatadir", new_raw_data_dr, "--intermediaterawdatasets", inter_raw_datasets_dr,
                                    "--rawmergeddataset", merged_raw_datasets_dr, "--preprocessedoutputdir", preprocessed_data_dr, "--preprocessedintermediatedir", preprocessed_data_pd],
                         inputs=[new_raw_data_dr, inter_raw_datasets_dr, merged_raw_datasets_dr, preprocessed_data_dr],
                         outputs=[preprocessed_data_pd],
                         compute_target=compute_target, 
                         source_directory="./../",
                         runconfig=runconfig,
                         params={"PREPROCESS_STRATEGY": PREPROCESS_STRATEGY},
                         allow_reuse=False)

# Define interpretability step in the ML pipeline
step2 = PythonScriptStep(name="train_step",
                         script_name="azure/train_step/train_step.py",
                         arguments=["--preprocessedintermediatedir", preprocessed_data_pd, "--trainoutputdir", outputs_dr, "--preprocessedoutputdir", preprocessed_data_dr],
                         inputs=[preprocessed_data_pd, outputs_dr, preprocessed_data_dr],
                         outputs=[],
                         compute_target=compute_target, 
                         source_directory="./../",
                         runconfig=runconfig,
                         params={"TEST_DAYS": TEST_DAYS, "FORECAST_DAYS": FORECAST_DAYS},
                         allow_reuse=False)

# Construct the ML pipeline from the steps
steps = [step1, step2]
single_train_pipeline = Pipeline(workspace=ws, steps=steps)
single_train_pipeline.validate()

# Define a new experiment and submit a new pipeline run to the compute target.
experiment = Experiment(workspace=ws, name=EXPERIMENT_NAME)
run = experiment.submit(single_train_pipeline, regenerate_outputs=False)
print("Pipeline is submitted for execution.")

# Move AML ignore file back to original folder
aml_ignore_path = shutil.move(aml_ignore_path, './.amlignore') 
gitignore_path = shutil.move(gitignore_path, './../.gitignore') 

### Publish the pipeline
Wait for the pipeline run to finish. Then publish the pipeline. The pipeline will be visible as an endpoint in the Pipelines tab in the workspace on Azure Machine Learning studio. Delete the training compute cluster to prevent further cost.

In [None]:
# Wait for the pipeline to finish running.
run.wait_for_completion()

# Publish the pipeline.
published_pipeline = run.publish_pipeline(
     name="Water Demand Forecasting Training Pipeline",
     description="Azure ML Pipeline that trains a Prophet model for water demand and produces a forecast.",
     version="1.1")

### Delete the compute target
Delete the training compute cluster to prevent further cost.

In [None]:
# Wait for the pipeline to finish running.
run.wait_for_completion()

# Delete compute cluster to avoid extra charges
compute_target.delete()