In [2]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.session import Session

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
model_package_group_name = f"BidsModelPackageGroupName"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [3]:
%store -r bucket
%store -r prefix
%store -r s3_train_data
%store -r s3_test_data
%store -r s3_val_data
%store -r s3_prod_data

In [6]:
!mkdir -p code

In [4]:
input_data_uri = s3_train_data + 'train_data.csv'
val_data_uri = s3_val_data + 'validation_data.csv'
test_data_uri = s3_test_data + 'test_data_with_outcome.csv'
batch_data_uri = s3_prod_data + 'prod_data.csv'

In [5]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
train_data = ParameterString(
    name="TrainData",
    default_value=input_data_uri,
)
val_data = ParameterString(
    name="ValData",
    default_value=val_data_uri,
)
test_data = ParameterString(
    name="TestData",
    default_value=test_data_uri,
)

acc_threshold = ParameterFloat(name="AccuracyThreshold", default_value=.9)

In [6]:
%%writefile code/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    train = pd.read_csv(
        f"{base_dir}/train1/train_data.csv",
        header=None)
    
    test = pd.read_csv(
        f"{base_dir}/test1/test_data_with_outcome.csv",
        header=None)
    test = test[:round(len(test)*0.8)]
    
    validation = pd.read_csv(
        f"{base_dir}/val1/validation_data.csv",
        header=None)
    
    prod = test[round(len(test)*0.8):].drop(test.columns[0], axis=1)
    print(test.shape)
    print(prod.shape)


    pd.DataFrame(train).to_csv(f"{base_dir}/train2/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation2/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test2/test.csv", header=False, index=False)
    pd.DataFrame(prod).to_csv(f"{base_dir}/prod2/prod.csv", header=False, index=False)

Overwriting code/preprocessing.py


In [7]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-bids-process",
    role=role,
    sagemaker_session=pipeline_session,
)

In [8]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=train_data, input_name="proc_train_in", destination="/opt/ml/processing/train1"),
        ProcessingInput(source=val_data, input_name="proc_validation_in", destination="/opt/ml/processing/val1"),
        ProcessingInput(source=test_data, input_name="proc_test_in", destination="/opt/ml/processing/test1"),
    ],
    outputs=[
        ProcessingOutput(output_name="proc_train_out", source="/opt/ml/processing/train2"),
        ProcessingOutput(output_name="proc_validation_out", source="/opt/ml/processing/validation2"),
        ProcessingOutput(output_name="proc_test_out", source="/opt/ml/processing/test2"),
        ProcessingOutput(output_name="proc_prod_out", source="/opt/ml/processing/prod2"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="BidsProcess", step_args=processor_args)



## Define a Training Step to Train a Model

In [9]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{bucket}/facebook-recruiting-iv-human-or-bot/BidsTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="binary:hinge",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["proc_train_out"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "proc_validation_out"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

In [10]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="BidsTrain",
    step_args=train_args,
)

## Define Model Evaluation Step

In [11]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import accuracy_score


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test2/test.csv"
    df = pd.read_csv(test_path, header=None)
    
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    acc = accuracy_score(y_test, predictions, normalize=True)
    report_dict = {
        "regression_metrics": {
            "acc": acc,
        },
    }
    #print statement for reference in pipeline dashboard
    print(acc)

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting code/evaluation.py


In [12]:
from sagemaker.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-bids-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["proc_test_out"].S3Output.S3Uri,
            destination="/opt/ml/processing/test2",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

In [13]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="BidsEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Define a Create Model Step to Create a Model

In [14]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

Define the `ModelStep` by providing the return values from `model.create()` as the step arguments.

In [15]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="BidsCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

## Define a Transform Step to Perform Batch Transformation

In [16]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{bucket}/BidsTransform",
)

Pass in the transformer instance and the `TransformInput` with the `batch_data` pipeline parameter defined earlier.

In [17]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="BidsTransform",
    transformer=transformer,
    inputs=TransformInput(data=step_process.properties.ProcessingOutputConfig.Outputs["proc_prod_out"].S3Output.S3Uri,
                         content_type="text/csv")
)

## Define a Register Model Step to Create a Model Package

In [18]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="BidsRegisterModel", step_args=register_args)



## Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

In [19]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="BidsAccFail",
    error_message=Join(on=" ", values=["Execution failed due to Accuracy <", acc_threshold]),
)

## Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State

In [20]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.acc",
    ),
    right=acc_threshold,
)

step_cond = ConditionStep(
    name="BidsAccCond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

## Define a Pipeline of Parameters, Steps, and Conditions

In [21]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"BidsPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        train_data,
        val_data,
        test_data,
        acc_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [22]:
import json


definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'TrainData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-215730597255/sagemaker-featurestore/train/train_data.csv'},
  {'Name': 'ValData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-215730597255/sagemaker-featurestore/validation/validation_data.csv'},
  {'Name': 'TestData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-215730597255/sagemaker-featurestore/test/test_data_with_outcome.csv'},
  {'Name': 'AccuracyThreshold', 'Type': 'Float', 'DefaultValue': 0.9}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExe

## Submit the pipeline to SageMaker and start execution

In [23]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:215730597255:pipeline/BidsPipeline',
 'ResponseMetadata': {'RequestId': '7ce7d796-0256-4ead-9747-190ea3c3e138',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7ce7d796-0256-4ead-9747-190ea3c3e138',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '80',
   'date': 'Sun, 23 Jun 2024 19:52:56 GMT'},
  'RetryAttempts': 0}}

Start the pipeline and accept all the default parameters.

In [24]:
execution = pipeline.start()

## Pipeline Operations: Examining and Waiting for Pipeline Execution

Describe the pipeline execution.

In [25]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:215730597255:pipeline/BidsPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:215730597255:pipeline/BidsPipeline/execution/6l0ls1m7zpp3',
 'PipelineExecutionDisplayName': 'execution-1719172377271',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2024, 6, 23, 19, 52, 57, 199000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 6, 23, 19, 52, 57, 199000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:215730597255:user-profile/d-vg5uuw61idko/afrederick',
  'UserProfileName': 'afrederick',
  'DomainId': 'd-vg5uuw61idko',
  'IamIdentity': {'Arn': 'arn:aws:sts::215730597255:assumed-role/LabRole/SageMaker',
   'PrincipalId': 'AROATEOULVWDV3CB446P2:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:215730597255:user-profile/d-vg5uuw61idko/afrederick',
  'UserProfileName': 'afrederick',
  'DomainId': 'd-vg5uuw61idko',
  'IamIdentity

In [26]:
#wait for pipeline to finish execution
execution.wait()

In [27]:
#view steps in execution
execution.list_steps()

[{'StepName': 'BidsTransform',
  'StartTime': datetime.datetime(2024, 6, 23, 20, 0, 23, 961000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 23, 20, 5, 57, 853000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:215730597255:transform-job/pipelines-6l0ls1m7zpp3-BidsTransform-KGP3aR7piP'}},
  'AttemptCount': 1},
 {'StepName': 'BidsRegisterModel-RegisterModel',
  'StartTime': datetime.datetime(2024, 6, 23, 20, 0, 22, 208000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 23, 20, 0, 23, 240000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:215730597255:model-package/BidsModelPackageGroupName/7'}},
  'AttemptCount': 1},
 {'StepName': 'BidsCreateModel-CreateModel',
  'StartTime': datetime.datetime(2024, 6, 23, 20, 0, 22, 208000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 23, 20, 0, 23, 504000, tzinfo=tzlocal()),


### Examining the Evaluation

In [28]:
#Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report.
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))



{'regression_metrics': {'acc': 0.9415584415584416}}


### Testing Failure to Meet Accuracy Threshold

In [29]:
execution = pipeline.start(parameters=dict(AccuracyThreshold=.96))

In [30]:
try:
    execution.wait()
except Exception as error:
    print(error)

Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"


In [31]:
execution.list_steps()

[{'StepName': 'BidsAccFail',
  'StartTime': datetime.datetime(2024, 6, 23, 20, 16, 2, 926000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 23, 20, 16, 3, 426000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'FailureReason': 'Execution failed due to Accuracy < 0.96',
  'Metadata': {'Fail': {'ErrorMessage': 'Execution failed due to Accuracy < 0.96'}},
  'AttemptCount': 1},
 {'StepName': 'BidsAccCond',
  'StartTime': datetime.datetime(2024, 6, 23, 20, 16, 1, 743000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 23, 20, 16, 2, 135000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Condition': {'Outcome': 'False'}},
  'AttemptCount': 1},
 {'StepName': 'BidsEval',
  'StartTime': datetime.datetime(2024, 6, 23, 20, 13, 28, 417000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 6, 23, 20, 16, 1, 43000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:215730597255:processing

# Release Resources

In [32]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>