# Imports

In [None]:
from adal import AuthenticationContext
from azureml.data import OutputFileDatasetConfig
from azureml.core import Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.core.compute import SynapseCompute
from azureml.core.experiment import Experiment
from azureml.data import HDFSOutputDatasetConfig
from azureml.pipeline.core import Pipeline, PipelineData, PublishedPipeline, StepSequence
from azureml.pipeline.core.schedule import Schedule
from azureml.pipeline.core.graph import PipelineParameter
from azureml.pipeline.steps import PythonScriptStep, SynapseSparkStep
import os
import requests
from utils import *

# Synapse Parameters

In [None]:
synapse_compute_name = ""

# Workspace Parameters

In [None]:
account_key = ""
blob_datastore_name = ""
CONTAINER_NAME = ""
local_path = os.getcwd() + "your_desired_local_path"
LOCATION = ""
RESOURCE_GROUP_NAME = ""
STORAGE_ACCOUNT_NAME = ""
subscription_id = ""
ws_name = ""

# Pipeline Parameters

In [None]:
BLOBNAME_train = ""
BLOBNAME_test = ""
compute_name = ""
compute_type = ""
encoding = "iso88591"
env_name = ""
model_name = ""
pipeline_name = ""
pip_packages = ['pandas', 'scikit-learn', 'azureml-sdk', 'nltk', 'xgboost']
prep_inference_data = ""
prep_training_data = ""
train_dataset_name = ""
test_dataset_name = ""
vm_size = "STANDARD_D1_V2"

# Steps

## Setup ML workspace

In [None]:
ws = setup_workspace(local_path, LOCATION, RESOURCE_GROUP_NAME, 
                     STORAGE_ACCOUNT_NAME, subscription_id, ws_name)
ws

## Register new datastore (optional)

In [None]:
register_datastore(account_key, blob_datastore_name, CONTAINER_NAME,
                       STORAGE_ACCOUNT_NAME, ws)

## Register train and test datasets (optional)

In [None]:
register_dataset(blob_datastore_name, BLOBNAME_train, BLOBNAME_test, dataset_type,
                     encoding, test_dataset_name, train_dataset_name, ws)

## Retrieve datasets

In [None]:
train_ds, test_ds = retrieve_dataset(test_dataset_name, train_dataset_name, ws)

## Prepare Compute Target and Pipeline configuration

In [None]:
compute_target, run_config, environment = prepare_pipeline(compute_name, compute_type, env_name, 
                                              vm_size, ws, pip_packages=pip_packages)

## Training Pipeline

#### Default datastore

In [None]:
def_blob_store = Datastore(ws, "workspaceblobstore")

#### Output logs file to datastore

In [None]:
log_data = OutputFileDatasetConfig(destination=(Datastore(ws, "synapse_datastore"), 'runLogs'))

#### Input data and output data for SynapseSparkStep 1 and input data for PythonScriptStep 2

#### step1_input1 is a registered Dataset

In [None]:
step1_input1 = train_ds.as_named_input('TrainingDataSynapse')
step1_output = HDFSOutputDatasetConfig(destination=(def_blob_store,"train")).register_on_complete(name="TrainDTM")

step2_input = step1_output.as_input("TrainDTM").as_mount()

In [None]:
dataprep_step = SynapseSparkStep(name = '01 Data Preprocessing',
                          file = 'spark_data_prep.py',
                          source_directory=os.getcwd() + '/AzureML_SparkPreprocessing',
                          inputs=[step1_input1],
                          outputs=[step1_output],
                          allow_reuse=False,
                          arguments=["--input", step1_input1,
                                     "--output_dir", step1_output,
                                     "--process", "Training"],
                          compute_target = synapse_compute_name,
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 4,
                          num_executors = 2)

In [None]:
train_step = PythonScriptStep(
    script_name="train.py",
    name = '02 Training',
    allow_reuse=False,
    arguments=[step2_input],
    inputs=[step2_input],
    compute_target=compute_target,
    runconfig=run_config,
    source_directory=os.getcwd() + '/AzureML_SparkPreprocessing'
)

In [None]:
log_step = PythonScriptStep(
    script_name="log.py",
    name = '03 Logging Run Status',
    arguments=["--process", "Training",
               "--outputfolder", log_data],
    allow_reuse=False,
    compute_target=compute_target,
    runconfig=run_config,
    source_directory=os.getcwd() + '/AzureML_SparkPreprocessing'
)

In [None]:
experiment = Experiment(workspace=ws, name=pipeline_name)
step_sequence = StepSequence(steps=[dataprep_step, train_step, log_step])
train_pipeline = Pipeline(workspace=ws, steps=step_sequence)
train_pipeline_run = experiment.submit(train_pipeline, continue_on_step_failure=True)
train_pipeline_run.wait_for_completion(show_output=True)

## Publish Pipeline

In [None]:
published_pipeline = train_pipeline.publish(name="Training Pipeline",
                                            description="Training pipeline",
                                            version="1.0",
                                            continue_on_step_failure=True)

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

# Triggers

In [None]:
pipeline_id = ""
experiment_name = ""
datastore = Datastore(workspace=ws, name="")

## Run pipeline with requests

In [None]:
p = PublishedPipeline.get(ws, id=pipeline_id)

In [None]:
interactive_auth = InteractiveLoginAuthentication()

auth_header = interactive_auth.get_authentication_header()
auth_header

In [None]:
response = requests.post(p.endpoint,
                         json={"ExperimentName": experiment_name},
                         headers=auth_header
                         )
print(response.json())

## Create change based event (on Blob change)

In [None]:
path_on_datastore = ""

In [None]:
reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.",
                                    pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, 
                                    continue_on_step_failure=True, path_on_datastore=path_on_datastore,
                                    polling_interval=1)

In [None]:
reactive_schedule

In [None]:
reactive_schedule.disable()

# Inference Pipeline

In [None]:
def_blob_store = Datastore(ws, "workspaceblobstore")

#### Output data csv

In [None]:
log_data = OutputFileDatasetConfig(destination=(Datastore(ws, "synapse_datastore"), 'runLogs'))
prediction_data = OutputFileDatasetConfig(destination=(Datastore(ws, "synapse_datastore"), 'Predictions'))

#### Input/ouput datasets during pipeline run

#### train_dtm_input is a Registered Dataset

In [None]:
train_dtm_input = Dataset.get_by_name(ws, "TrainDTM").as_named_input('TrainDTM').as_hdfs()

#### Input data and output data for SynapseSparkStep 1 and input data for PythonScriptStep 2

In [None]:
step1_test_input1 = test_ds.as_named_input('InferenceDataSynapse')
step1_test_output = HDFSOutputDatasetConfig(destination=(def_blob_store,"inference"))

step2_test_input = step1_test_output.as_input("InferenceDTM").as_mount()

In [None]:
dataprep_step = SynapseSparkStep(name = '01 Data Preprocessing',
                          file = 'spark_data_prep.py',
                          source_directory=os.getcwd() + '/AzureML_SparkPreprocessing', 
                          inputs=[train_dtm_input, step1_test_input1],
                          outputs=[step1_test_output],
                          allow_reuse=False,
                          arguments=["--train_input", train_dtm_input,
                                     "--input", step1_test_input1,
                                     "--output_dir", step1_test_output,
                                     "--process", "Inference"],
                          compute_target = synapse_compute_name,
                          driver_memory = "7g",
                          driver_cores = 4,
                          executor_memory = "7g",
                          executor_cores = 4,
                          num_executors = 2)

In [None]:
predict_step = PythonScriptStep(
    script_name="inference.py",
    name = '02 Predict',
    arguments=["--model_name", model_name,
               "--outputfolder", prediction_data],
    inputs=[step2_test_input],
    compute_target=compute_target,
    runconfig=run_config,
    source_directory=os.getcwd() + '/AzureML_SparkPreprocessing'
)

In [None]:
log_step = PythonScriptStep(
    script_name="log.py",
    name = '03 Logging Run Status',
    arguments=["--process", "Inference",
               "--outputfolder", log_data],
    allow_reuse=False,
    compute_target=compute_target,
    runconfig=run_config,
    source_directory=os.getcwd() + '/AzureML_SparkPreprocessing'
)

In [None]:
experiment = Experiment(workspace=ws, name='spark-inference-pipeline-test')
step_sequence = StepSequence(steps=[dataprep_step, predict_step, log_step])
inference_pipeline = Pipeline(workspace=ws, steps=step_sequence)
inference_pipeline_run = experiment.submit(inference_pipeline, continue_on_step_failure=True)
inference_pipeline_run.wait_for_completion(show_output=True)

In [None]:
inf_published_pipeline = inference_pipeline.publish(name="Inference Pipeline",
                                            description="Inference pipeline",
                                            version="1.0",
                                            continue_on_step_failure=True)

## Create change based event (on Blob change)

In [None]:
path_on_datastore = ""

In [None]:
reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.",
                                    pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, 
                                    continue_on_step_failure=True, path_on_datastore=path_on_datastore,
                                    polling_interval=1)