In [3]:
"""
    #Start by loading a workspace from configuration file
    #Import azure core package
"""
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.0.85 to work with ml-fullpipe-practice


In [13]:
"""
    #Prepare data by first uploading it to the cloud blob storage
"""
from azureml.core import Dataset

#Pre-transformation step: turn xlsx to .csv
import pandas as pd
pre_df = pd.read_excel("./data/shuttles.xlsx")
pre_df.to_csv("./data/shuttles.csv", sep=",")

#Begin uploading data to Azure Blob Storage
default_ds = ws.get_default_datastore()
default_ds.upload_files(files=['./data/companies.csv', './data/reviews.csv', './data/shuttles.csv'], # Upload the diabetes csv files in /data
                       target_path='space-data/', # Put it in a folder path in the datastore
                       overwrite=True, # Replace existing files of the same name
                       show_progress=True)

#Create a tabular dataset from the path on the datastore (this may take a short while)
companies_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'space-data/companies.csv'))
reviews_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'space-data/reviews.csv'))
shuttles_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'space-data/shuttles.csv'))

# Register the tabular dataset
# Data Extraction Step (Essentially)
companies_data_set = companies_data_set.register(workspace=ws, 
                           name='space companies dataset',
                           description='companies data',
                           tags = {'format':'CSV'},
                           create_new_version=True)
reviews_data_set = reviews_data_set.register(workspace=ws, 
                           name='space reviews dataset',
                           description='companies data',
                           tags = {'format':'CSV'},
                           create_new_version=True)
shuttles_data_set = shuttles_data_set.register(workspace=ws, 
                           name='space shuttles dataset',
                           description='shuttles data',
                           tags = {'format':'CSV'},
                           create_new_version=True)

print('Dataset ready.')


Uploading an estimated of 3 files
Uploading ./data/companies.csv
Uploading ./data/reviews.csv
Uploading ./data/shuttles.csv
Uploaded ./data/companies.csv, 1 files out of an estimated total of 3
Uploaded ./data/reviews.csv, 2 files out of an estimated total of 3
Uploaded ./data/shuttles.csv, 3 files out of an estimated total of 3
Uploaded 3 files
Dataset ready.


In [83]:
#companies_data_set.to_pandas_dataframe()
#shuttles_data_set.to_pandas_dataframe()

In [14]:
#Create a Folder to Store Pipeline Works
import os
# Create a folder for the pipeline step files
experiment_folder = 'space_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

space_pipeline


In [71]:
#Data Transformation Step
import pandas as pd

#Define function to transform our data.
def _is_true(x):
    return x == "t"


def _parse_percentage(x):
    if (x != ""):
        return float(x.replace("%", ""))/100.
    else:
        return 0.0


def _parse_money(x):
    return float(x.replace("$", "").replace(",", ""))


def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
    """Preprocess the data for companies.

        Args:
            companies: Source data.
        Returns:
            Preprocessed data.

    """

    companies["iata_approved"] = companies["iata_approved"].apply(_is_true)

    companies["company_rating"] = companies["company_rating"].apply(_parse_percentage)

    return companies


def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
    """Preprocess the data for shuttles.

        Args:
            shuttles: Source data.
        Returns:
            Preprocessed data.

    """
    shuttles["d_check_complete"] = shuttles["d_check_complete"].apply(_is_true)

    shuttles["moon_clearance_complete"] = shuttles["moon_clearance_complete"].apply(
        _is_true
    )

    shuttles["price"] = shuttles["price"].apply(_parse_money)

    return shuttles




In [77]:
#Run Preprocess step to cleanup data
preprocessed_companies_data_set = preprocess_companies(companies_data_set.to_pandas_dataframe())
preprocessed_shuttles_data_set = preprocess_shuttles(shuttles_data_set.to_pandas_dataframe())
preprocessed_reviews_data_set = reviews_data_set.to_pandas_dataframe()

"\npreprocessed_companies_data_set = companies_data_set.register(workspace=ws, \n                           name='space companies dataset',\n                           description='companies data',\n                           tags = {'format':'Pandas-DataFrame'},\n                           create_new_version=True)\n\npreprocessed_shuttles_data_set = shuttles_data_set.register(workspace=ws, \n                           name='space shuttles dataset',\n                           description='shuttles data',\n                           tags = {'format':'Pandas-DataFrame'},\n                           create_new_version=True)\n"

In [80]:
#More DataEngineer Step
#Create Master Table
def create_master_table(
    shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
    """Combines all data to create a master table.

        Args:
            shuttles: Preprocessed data for shuttles.
            companies: Preprocessed data for companies.
            reviews: Source data for reviews.
        Returns:
            Master table.

    """
    rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")

    with_companies = rated_shuttles.merge(
        companies, left_on="company_id", right_on="id"
    )

    master_table = with_companies.drop(["shuttle_id", "company_id"], axis=1)
    master_table = master_table.dropna()
    return master_table



In [88]:
#Create Master Table and Store it in the blob storage
master_table = create_master_table(preprocessed_shuttles_data_set, preprocessed_companies_data_set, preprocessed_reviews_data_set)
master_table.to_csv("./data/space_master.csv", sep=",")


#Upload it to blob storage
default_ds.upload_files(files=['./data/space_master.csv'], # Upload the diabetes csv files in /data
                       target_path='space-data/', # Put it in a folder path in the datastore
                       overwrite=True, # Replace existing files of the same name
                       show_progress=True)

#Get Master Data Table
master_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'space-data/space_master.csv'))

# Register the tabular dataset for future usage
master_data_set = master_data_set.register(workspace=ws, 
                           name='space master dataset',
                           description='master data',
                           tags = {'format':'CSV'},
                           create_new_version=True)



Uploading an estimated of 1 files
Uploading ./data/space_master.csv
Uploaded ./data/space_master.csv, 1 files out of an estimated total of 1
Uploaded 1 files


Unnamed: 0,Column1,id_x,shuttle_location,shuttle_type,engine_type,engine_vendor,engines,passenger_capacity,cancellation_policy,crew,...,review_scores_crew,review_scores_location,review_scores_price,number_of_reviews,reviews_per_month,id_y,company_rating,company_location,total_fleet_count,iata_approved
0,0,63561,Niue,Type V5,Quantum,ThetaBase Services,1.0,2,strict,1.0,...,10.0,9.0,10.0,133,1.65,35029,1.00,Niue,4.0,False
1,0,63561,Niue,Type V5,Quantum,ThetaBase Services,1.0,2,strict,1.0,...,10.0,9.0,10.0,133,1.65,35029,1.00,Niue,4.0,False
2,0,63561,Niue,Type V5,Quantum,ThetaBase Services,1.0,2,strict,1.0,...,10.0,9.0,10.0,133,1.65,35029,1.00,Niue,4.0,False
3,0,63561,Niue,Type V5,Quantum,ThetaBase Services,1.0,2,strict,1.0,...,10.0,9.0,10.0,133,1.65,35029,1.00,Niue,4.0,False
4,1353,53260,Niue,Type V5,Quantum,"Banks, Wood and Phillips",1.0,2,strict,1.0,...,10.0,9.0,10.0,37,0.48,35029,1.00,Niue,4.0,False
5,1353,53260,Niue,Type V5,Quantum,"Banks, Wood and Phillips",1.0,2,strict,1.0,...,10.0,9.0,10.0,37,0.48,35029,1.00,Niue,4.0,False
6,1353,53260,Niue,Type V5,Quantum,"Banks, Wood and Phillips",1.0,2,strict,1.0,...,10.0,9.0,10.0,37,0.48,35029,1.00,Niue,4.0,False
7,1353,53260,Niue,Type V5,Quantum,"Banks, Wood and Phillips",1.0,2,strict,1.0,...,10.0,9.0,10.0,37,0.48,35029,1.00,Niue,4.0,False
8,1984,51019,Niue,Type V5,Quantum,ThetaBase Services,1.0,2,flexible,1.0,...,10.0,9.0,9.0,10,0.15,35029,1.00,Niue,4.0,False
9,1984,51019,Niue,Type V5,Quantum,ThetaBase Services,1.0,2,flexible,1.0,...,10.0,9.0,9.0,10,0.15,35029,1.00,Niue,4.0,False


In [89]:
#Rerturn all versions of given dataset
print("Datasets:")
for dataset_name in list(ws.datasets.keys()):
    dataset = Dataset.get_by_name(ws, dataset_name)
    print("\t", dataset.name, 'version', dataset.version)

Datasets:
	 space master dataset version 1
	 space shuttles dataset version 1
	 space reviews dataset version 1
	 space companies dataset version 1
	 diabetes file dataset version 1
	 diabetes dataset version 1


In [109]:
%%writefile $experiment_folder/train_space.py
"""
    #Data Science Step
    #Generate a script that create/train a model of simple linear regression
"""
# Import libraries
from azureml.core import Run
import argparse
import pandas as pd
import numpy as np


import joblib
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score


# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--output_folder', type=str, dest='output_folder', default="space_model", help='output folder')
args = parser.parse_args()
output_folder = args.output_folder

# Get the experiment run context
run = Run.get_context()

# load the diabetes data (passed as an input dataset)
print("Loading Data...")
space = run.input_datasets['space_master'].to_pandas_dataframe()

# Separate features and labels
X, y = space[['engines','passenger_capacity','crew','d_check_complete','moon_clearance_complete']].values, space['price'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=3)

# Train Linear Regression model
print('Training a Linear Regression model')
model = LinearRegression().fit(X_train, y_train)

# calculate R2 Score
y_pred = model.predict(X_test)
score = r2_score(y_test, y_pred)
print('Regressor Model R2 Score:', score)
run.log('R2 Score', np.float(score))

# Save the trained model
os.makedirs(output_folder, exist_ok=True)
output_path = output_folder + "/model.pkl"
joblib.dump(value=model, filename=output_path)

run.complete()

Overwriting space_pipeline/train_space.py


In [110]:
%%writefile $experiment_folder/register_space.py
"""
    #Data Science Step
    #Register the model
"""
# Import libraries
import argparse
import joblib
from azureml.core import Workspace, Model, Run

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--model_folder', type=str, dest='model_folder', default="space_model", help='model location')
args = parser.parse_args()
model_folder = args.model_folder

# Get the experiment run context
run = Run.get_context()

# load the model
print("Loading model from " + model_folder)
model_file = model_folder + "/model.pkl"
model = joblib.load(model_file)

Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'space_model',
               tags={'Training context':'Pipeline'})

run.complete()

Overwriting space_pipeline/register_space.py


In [111]:
#Prepare Compute Enviornment and Cluster

from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "aml-cluster"

# Verify that cluster exists
try:
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If not, create it
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           max_nodes=4,
                                                           idle_seconds_before_scaledown=1800)
    pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)

pipeline_cluster.wait_for_completion(show_output=True)

Found existing cluster, use it.
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


In [112]:
#Configure Enviornment + Dependencies
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
diabetes_env = Environment("space-pipeline-env")
diabetes_env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
diabetes_env.docker.enabled = True # Use a docker container

# Create a set of package dependencies
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','pandas'],
                                             pip_packages=['azureml-sdk'])

# Add the dependencies to the environment
diabetes_env.python.conda_dependencies = diabetes_packages

# Register the environment (just in case you want to use it again)
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'space-pipeline-env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


In [113]:
#Pipeline generation

from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator

# Get the training dataset
space_ds = ws.datasets.get("space master dataset")

# Create a PipelineData (temporary Data Reference) for the model folder
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

estimator = Estimator(source_directory=experiment_folder,
                        compute_target = pipeline_cluster,
                        environment_definition=pipeline_run_config.environment,
                        entry_script='train_space.py')

# Step 1, run the estimator to train the model
train_step = EstimatorStep(name = "Train Model",
                           estimator=estimator, 
                           estimator_entry_script_arguments=['--output_folder', model_folder],
                           inputs=[space_ds.as_named_input('space_master')],
                           outputs=[model_folder],
                           compute_target = pipeline_cluster,
                           allow_reuse = True)

# Step 2, run the model registration script
register_step = PythonScriptStep(name = "Register Model",
                                source_directory = experiment_folder,
                                script_name = "register_space.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

Pipeline steps defined


In [114]:
#Building the pipeline with the steps defined previously

from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'space-training-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")

RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion()

Pipeline is built.
Created step Train Model [1b1506cd][7e0ecade-2e2a-4d01-bfcb-5c9a4cfca0f3], (This step will run and generate new outputs)
Created step Register Model [4156d2e7][9d1ec792-38e0-4650-b09a-8dc903da8dc3], (This step will run and generate new outputs)
Submitted PipelineRun 1426257e-e951-435e-88f7-4b996f28ecf4
Link to Azure Machine Learning studio: https://ml.azure.com/experiments/space-training-pipeline/runs/1426257e-e951-435e-88f7-4b996f28ecf4?wsid=/subscriptions/812800e3-0579-4b9a-b1a6-a53215573bf6/resourcegroups/ml-full-pipeline/workspaces/ml-fullpipe-practice
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: 1426257e-e951-435e-88f7-4b996f28ecf4
Link to Portal: https://ml.azure.com/experiments/space-training-pipeline/runs/1426257e-e951-435e-88f7-4b996f28ecf4?wsid=/subscriptions/812800e3-0579-4b9a-b1a6-a53215573bf6/resourcegroups/ml-full-pipeline/workspaces/ml-fullpipe-practice
PipelineRun Status: Running


StepRunId: 5c6acb72-ef46-41d0-b7a6-fc5043d24c1d
Link to Portal: https://ml.azure.com/experiments/space-training-pipeline/runs/5c6acb72-ef46-41d0-b7a6-fc5043d24c1d?wsid=/subscriptions/812800e3-0579-4b9a-b1a6-a53215573bf6/resourcegroups/ml-full-pipeline/workspaces/ml-fullpipe-practice
StepRun( Train Model ) Status: NotStarted
StepRun( Train Model ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_d5efd74c435bd3393ef162fb499b62454610c93c31bcff24d58dc8762dd588a3_d.txt
2020-03-05T21:36:32Z Starting output-watcher...
2020-03-05T21:36:32Z IsDedicatedCompute == True, won't poll for Low Pri Preemption
Login Succeeded
Using default tag: latest
latest: Pulling from a

'Finished'

In [115]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

space_model version: 1
	 Training context : Pipeline


diabetes_model version: 4
	 Training context : Pipeline


diabetes_model version: 3
	 Training context : Pipeline


diabetes_model version: 2
	 Training context : Parameterized SKLearn Estimator
	 AUC : 0.8483904671874223
	 Accuracy : 0.7736666666666666


diabetes_model version: 1
	 Training context : Estimator
	 AUC : 0.8483203144435048
	 Accuracy : 0.774




In [116]:
published_pipeline = pipeline.publish(name="Space_Training_Pipeline",
                                      description="Trains space model",
                                      version="1.0")
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://eastus.aether.ms/api/v1.0/subscriptions/812800e3-0579-4b9a-b1a6-a53215573bf6/resourceGroups/ml-full-pipeline/providers/Microsoft.MachineLearningServices/workspaces/ml-fullpipe-practice/PipelineRuns/PipelineSubmit/836fb06e-60ef-4e72-86a7-6cbdfeb31d97


In [117]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

In [118]:
import requests
experiment_name = 'Run-diabetes-pipeline'

response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

'b0e91367-171d-4c13-825d-25e27f55b551'

In [119]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
RunDetails(published_pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …