## Smart Grid SageMaker Pipeline

The Smart Grid System pipeline that is a typical machine learning (ML) application pattern of preprocessing, training, evaluation, model creation, batch transformation, and model registration:

![A typical ML Application pipeline](img/pipeline-full.png)

In [3]:
pip install -q --upgrade pip

[0mNote: you may need to restart the kernel to use updated packages.


In [4]:
!pip install -q -U sagemaker

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
distributed 2022.7.0 requires tornado<6.2,>=6.0.3, but you have tornado 6.4 which is incompatible.[0m[31m
[0m

In [5]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"SmartGridModelPackageGroupName"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [6]:
import pandas as pd
input_dataT_uri = f"s3://{default_bucket}/Smart_Grid/data/dataT.csv"
print(input_dataT_uri)

s3://sagemaker-us-east-1-911199926915/Smart_Grid/data/dataT.csv


In [7]:
local_path = "test_data/batch_data.csv"

base_uri = f"s3://{default_bucket}/Smart_Grid/data"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sagemaker-us-east-1-911199926915/Smart_Grid/data/batch_data.csv


In [8]:
weatherdata_uri = f"s3://{default_bucket}/Smart_Grid/data/weather_hourly_darksky.csv"
holidaydata_uri = f"s3://{default_bucket}/Smart_Grid/data/uk_bank_holidays.csv"
print(weatherdata_uri)
print(holidaydata_uri)

s3://sagemaker-us-east-1-911199926915/Smart_Grid/data/weather_hourly_darksky.csv
s3://sagemaker-us-east-1-911199926915/Smart_Grid/data/uk_bank_holidays.csv


## Define Parameters to Parametrize Pipeline Execution

Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* `ParameterString` - represents a `str` Python type
* `ParameterInteger` - represents an `int` Python type
* `ParameterFloat` - represents a `float` Python type

These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

The parameters defined in this workflow include:

* `processing_instance_count` - The instance count of the processing job.
* `instance_type` - The `ml.*` instance type of the training job.
* `model_approval_status` - The approval status to register with the trained model for CI/CD purposes ("PendingManualApproval" is the default).
* `input_data` - The S3 bucket URI location of the input data.
* `batch_data` - The S3 bucket URI location of the batch data.
* `mse_threshold` - The Mean Squared Error (MSE) threshold used to verify the accuracy of a model.

![Define Parameters](img/pipeline-1.png)

In [9]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
dataT_uri = ParameterString(
    name="DataTUri",
    default_value=input_dataT_uri,
)
weather_hourly_uri = ParameterString(
    name="WeatherHourlyUri",
    default_value=weatherdata_uri,
)
uk_bank_holidays_uri = ParameterString(
    name="UkBankHolidaysUri",
    default_value=holidaydata_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=0.001669)

## Define a Processing Step for Feature Engineering

![Define a Processing Step for Feature Engineering](img/pipeline-2.png)

In [10]:
!mkdir -p src

In [45]:
%%writefile src/preprocessing.py
import os
import pandas as pd
from sklearn import preprocessing

def load_and_process_weather_data(datapath):
    weather_hourly_darksky = pd.read_csv(os.path.join(datapath, 'weather_hourly_darksky.csv'), index_col='time', parse_dates=True)
    temperaturedata = weather_hourly_darksky['temperature']
    return temperaturedata

def load_and_process_holiday_data(datapath):
    dfholiday = pd.read_csv(os.path.join(datapath, 'uk_bank_holidays.csv'))
    dfholiday['Bank holidays'] = pd.to_datetime(dfholiday['Bank holidays'])
    dfholiday = dfholiday.rename(columns={'Bank holidays': 'date', 'Type': 'Holiday'})
    calendarencoder = preprocessing.LabelEncoder()
    dfholiday['Holiday'] = calendarencoder.fit_transform(dfholiday['Holiday'])
    return dfholiday

def process_energy_data(df):
    houseAVG = df.mean(axis=1).to_frame()
    houseAVG.columns = ['kWh']
    return houseAVG

def merge_data(houseAVG, temperaturedata, dfholiday):
    meterdataset = houseAVG.copy()
    meterdataset['timestamp'] = meterdataset.index
    meterdataset['date'] = pd.to_datetime(meterdataset.index.date)
    meterdataset['weekday'] = meterdataset.index.weekday
    meterdataset['hour'] = meterdataset.index.hour + meterdataset.index.minute / 60
    meterdataset = meterdataset.merge(temperaturedata, left_index=True, right_index=True, how='left')
    meterdataset = meterdataset.merge(dfholiday, on='date', how='left')
    meterdataset = meterdataset.drop('date', axis=1)
    meterdataset['Holiday'] = meterdataset['Holiday'].fillna(-1)
    meterdataset['year'] = pd.DatetimeIndex(meterdataset['timestamp']).year
    meterdataset['month'] = pd.DatetimeIndex(meterdataset['timestamp']).month
    meterdataset.set_index('timestamp', inplace=True)
    meterdataset = meterdataset.dropna()
    return meterdataset

def split_data(meterdataset):
    # Split the data into training, validation, and testing sets
    traindata = meterdataset.loc['2013-01':'2013-12'].copy()
    validationdata = meterdataset.loc['2014-01':].copy()
    testdata = meterdataset.loc['2014-02':].copy()

    return traindata, validationdata, testdata

if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    # Load data
    temperaturedata = load_and_process_weather_data(os.path.join(base_dir, "weather"))
    dfholiday = load_and_process_holiday_data(os.path.join(base_dir, "holidays"))

    # Load energy data
    df = pd.read_csv(os.path.join(base_dir, 'dataT', 'dataT.csv'), index_col='time', parse_dates=True)

    houseAVG = process_energy_data(df)
    meterdataset = merge_data(houseAVG, temperaturedata, dfholiday)

    # Split data
    traindata, validationdata, testdata = split_data(meterdataset)

    # Save processed data
    output_dir = os.path.join(base_dir, 'output')
    os.makedirs(output_dir, exist_ok=True)
    traindata.to_csv(os.path.join(output_dir, 'train_data.csv'))
    validationdata.to_csv(os.path.join(output_dir, 'validation_data.csv'))
    testdata.to_csv(os.path.join(output_dir, 'test_data.csv'))

Overwriting src/preprocessing.py


In [12]:
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-smartgrid-process",
    role=role,
    sagemaker_session=pipeline_session,
)

In [13]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=dataT_uri, destination="/opt/ml/processing/dataT"),
        ProcessingInput(source=weather_hourly_uri, destination="/opt/ml/processing/weather"),
        ProcessingInput(source=uk_bank_holidays_uri, destination="/opt/ml/processing/holidays"),
    ],
    outputs=[
        ProcessingOutput(output_name="train_data", source="/opt/ml/processing/output/train_data.csv"),
        ProcessingOutput(output_name="validation_data", source="/opt/ml/processing/output/validation_data.csv"),
        ProcessingOutput(output_name="test_data", source="/opt/ml/processing/output/test_data.csv"),
    ],
    code="src/preprocessing.py",
)

step_process = ProcessingStep(name="SmartGridProcess", step_args=processor_args)




## Define a Training Step to Train a Model

![Define a Training Step to Train a Model](img/pipeline-3.png)

In [14]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/SmartGridTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

In [15]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="SmartGridTrain",
    step_args=train_args,
)

## Define a Model Evaluation Step to Evaluate the Trained Model

![Define a Model Evaluation Step to Evaluate the Trained Model](img/pipeline-4.png)

In [16]:
%%writefile src/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Writing src/evaluation.py


In [24]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-SmartGrid-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="src/evaluation.py",
)

In [26]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="SmartGridEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Define a Create Model Step to Create a Model

![Define a Create Model Step and Batch Transform to Process Data in Batch at Scale](img/pipeline-5.png)

### Define a Create Model Step to Create a Model

In [27]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

In [28]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="SmartGridCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

### Define a Transform Step to Perform Batch Transformation

In [29]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/SmartGridTransform",
)

In [30]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="SmartGridTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

### Define a Register Model Step to Create a Model Package

In [31]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="SmartGridRegisterModel", step_args=register_args)



## Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

![Define a Fail Step to Terminate the Execution in Failed State](img/pipeline-8.png)

In [32]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="SmartGridMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

## Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State

![Define a Condition Step to Check Accuracy and Conditionally Execute Steps](img/pipeline-6.png)

In [33]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="SmartGridMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

## Define a Pipeline of Parameters, Steps, and Conditions

![Define a Pipeline of Parameters, Steps, and Conditions](img/pipeline-7.png)

In [35]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"SmartGridPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        dataT_uri,
        weather_hourly_uri,
        uk_bank_holidays_uri,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

## Examining the pipeline definition

In [36]:
import json


definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'DataTUri',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-911199926915/Smart_Grid/data/dataT.csv'},
  {'Name': 'WeatherHourlyUri',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-911199926915/Smart_Grid/data/weather_hourly_darksky.csv'},
  {'Name': 'UkBankHolidaysUri',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-911199926915/Smart_Grid/data/uk_bank_holidays.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-911199926915/Smart_Grid/data/batch_data.csv'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 0.001669}],
 'PipelineExperimentC

## Submit the pipeline to SageMaker and start execution

In [37]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:911199926915:pipeline/SmartGridPipeline',
 'ResponseMetadata': {'RequestId': '494acf91-2b66-409b-a8bb-c5e948266d73',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '494acf91-2b66-409b-a8bb-c5e948266d73',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '85',
   'date': 'Tue, 27 Feb 2024 01:33:19 GMT'},
  'RetryAttempts': 0}}

In [46]:
execution = pipeline.start()

## Pipeline Operations: Examining and Waiting for Pipeline Execution

Describe the pipeline execution.

In [47]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:911199926915:pipeline/SmartGridPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:911199926915:pipeline/SmartGridPipeline/execution/c774wm3eento',
 'PipelineExecutionDisplayName': 'execution-1708998572086',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'smartgridpipeline',
  'TrialName': 'c774wm3eento'},
 'CreationTime': datetime.datetime(2024, 2, 27, 1, 49, 32, 34000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 2, 27, 1, 49, 32, 34000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:911199926915:user-profile/d-gpqar2kj1qba/blu',
  'UserProfileName': 'blu',
  'DomainId': 'd-gpqar2kj1qba'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:911199926915:user-profile/d-gpqar2kj1qba/blu',
  'UserProfileName': 'blu',
  'DomainId': 'd-gpqar2kj1qba'},
 'ResponseMetadata': {'RequestId': 'f8fc6e08-6cd2-40d4-9583-348cfedd3290'

In [48]:
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [None]:
execution.list_steps()

## Examining the Evaluation

Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report.

In [2]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

NameError: name 'sagemaker' is not defined

## Lineage

Review the lineage of the artifacts generated by the pipeline.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

## Parametrized Executions

In [None]:
execution = pipeline.start(
    parameters=dict(
        ModelApprovalStatus="Approved",
    )
)

In [None]:
execution.wait()

In [None]:
execution.list_steps()

In [None]:
execution = pipeline.start(parameters=dict(MseThreshold=3.0))

In [None]:
try:
    execution.wait()
except Exception as error:
    print(error)

In [None]:
execution.list_steps()