In [None]:
from azureml.core import Workspace
ws = Workspace.from_config()

In [None]:
# Provision compute context for the pipeline
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "mlopsbootcamp"

try:
    # Check for existing compute target
    inference_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_DS2_v2', max_nodes=2)
        inference_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        inference_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

In [None]:
import os

# Create a folder for the files
capstone_folder = os.path.join(os.path.dirname(os.getcwd()),'pytown_energymonitor')
print(capstone_folder)

In [43]:
%%writefile $capstone_folder\score.py

import os
import pandas as pd

def init():
    # Runs when the pipeline step is initialized
    global model


def get_classification(df_pred, col_energy, col_prediction):
    df_pred['difference'] = df_pred[col_energy] - df_pred[col_prediction]
    df_pred['class'] = 0 # add a class column with 0 as low charge (1 is middle charge  and 2 is normal charge)
    df_pred.loc[df_pred['difference'] > 0, 'class'] = 1 
    df_pred.loc[df_pred['difference'] > 5000, 'class'] = 2 
    return df_pred

def run(mini_batch):
    # mini_batch is the pandas dataframe    
  
    #calculate naive forecast
    prediction_df = (
        mini_batch
        .set_index('data_index_')
        .groupby('dayofweek')
        .load_actuals_mw
        .rolling(3, closed = 'left')
        .mean()
        .reset_index()
    ).rename(columns ={'load_actuals_mw' : 'load_pred_mw'})

    df_complete = (
        pd.merge(prediction_df, mini_batch, on='data_index_')
        .drop('dayofweek_y', axis=1)
        .sort_values(by='data_index_')
        .set_index('data_index_')
    )

    return get_classification(df_complete, 'total_gen', 'load_pred_mw')



Overwriting c:\Users\meira\Projects\PyLadiesMLOpsCapstone\pytown_energymonitor\score.py


In [25]:
%%writefile $capstone_folder\capstone_environment.yml
name: capstone_environment
dependencies:
- python=3.8
- numpy
- pandas
- scikit-learn
- pip:
  - azureml-core
  - azureml-dataset-runtime[fuse]
  - azureml-pipeline-core
  - azureml-pipeline-steps
  - azureml-dataprep

Overwriting c:\Users\meira\Projects\PyLadiesMLOpsCapstone\pytown_energymonitor\capstone_environment.yml


In [22]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# Create an Environment for the pipeline to run
capstone_env = Environment.from_conda_specification("capstone_env", capstone_folder + "/capstone_environment.yml")
capstone_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

Configuration ready.


In [45]:
# run the batch pipeline: execute py script and save results to txt in the output folder
from datetime import datetime

from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig
from azureml.core.runconfig import DockerConfiguration

# Get the batch dataset for input
input_data_set = ws.datasets['daily_load_and_wind'] # Tabular dataset -> this means the mini-batch is a pd.DataFrame

# Set the output location
default_ds = ws.get_default_datastore()
output_dir = OutputFileDatasetConfig(name='capstone_inferences')

# Define the parallel run step step configuration
parallel_run_config = ParallelRunConfig(
    source_directory=capstone_folder,
    entry_script="score.py",
    mini_batch_size="10MB",
    error_threshold=10,
    output_action="append_row",
    environment=capstone_env,
    compute_target=inference_cluster,
    node_count=2)

parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

# Create the parallel run step
parallelrun_step = ParallelRunStep(
    name=parallel_step_name,
    parallel_run_config=parallel_run_config,
    inputs=[input_data_set.as_named_input('daily_load_and_wind')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')


Steps defined


ParallelRunStep requires azureml-dataset-runtime[fuse,pandas] for tabular dataset.
Please add relevant package in CondaDependencies.


In [47]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

# Create the pipeline
pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])

# Run the pipeline as an experiment
pipeline_run = Experiment(ws, 'capstone-naive-forecast-batch').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

In [None]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('capstone-batch-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('capstone_inferences')
prediction_output.download(local_path='capstone-batch-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('capstone-batch-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

In [None]:
# publish the pipeline
published_pipeline = pipeline_run.publish_pipeline(name='Linear_regression_batch_prediction_pipeline',
                                                   description='Batch scoring using linear regression model',
                                                   version='1.0')

published_pipeline

In [None]:
# check all published pipelines
from azureml.pipeline.core import PublishedPipeline

published_pipelines = PublishedPipeline.list(ws)
for published_pipeline in  published_pipelines:
    print(f"{published_pipeline.name},'{published_pipeline.id}'")

In [None]:
# schedule the pipeline
from azureml.pipeline.core import ScheduleRecurrence, Schedule


recurrence = ScheduleRecurrence(frequency='Daily', interval=1)
pipeline_schedule = Schedule.create(ws, name='Daily Naive Predictions',
                                        description='capstone naive forecast batch inferencing',
                                        pipeline_id=published_pipeline.id,
                                        experiment_name='Capstone_Batch_Prediction',
                                        recurrence=recurrence)