# <B> Monitor - SageMaker pipeline </B>
* Container: conda_python3

## AutoReload

In [1]:
%load_ext autoreload
%autoreload 2

## 0. Install packages

In [3]:
install_needed = True  # should only be True once
# install_needed = False

In [4]:
%%bash
#!/bin/bash

DAEMON_PATH="/etc/docker"
MEMORY_SIZE=10G

FLAG=$(cat $DAEMON_PATH/daemon.json | jq 'has("data-root")')
# echo $FLAG

if [ "$FLAG" == true ]; then
    echo "Already revised"
else
    echo "Add data-root and default-shm-size=$MEMORY_SIZE"
    sudo cp $DAEMON_PATH/daemon.json $DAEMON_PATH/daemon.json.bak
    sudo cat $DAEMON_PATH/daemon.json.bak | jq '. += {"data-root":"/home/ec2-user/SageMaker/.container/docker","default-shm-size":"'$MEMORY_SIZE'"}' | sudo tee $DAEMON_PATH/daemon.json > /dev/null
    sudo service docker restart
    echo "Docker Restart"
fi

Already revised


In [5]:
import sys
import IPython

if install_needed:
    print("installing deps and restarting kernel")
    !{sys.executable} -m pip install -U pip
    !{sys.executable} -m pip install -U smdebug sagemaker-experiments
    !{sys.executable} -m pip install -U sagemaker
    !{sys.executable} -m pip uninstall pycodestyle -y

    IPython.Application.instance().kernel.do_shutdown(True)

installing deps and restarting kernel
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
[0m

## 1. parameter store 설정

In [2]:
import boto3
from utils.ssm import parameter_store

In [3]:
strRegionName=boto3.Session().region_name
pm = parameter_store(strRegionName)
strPrefix = pm.get_params(key="PREFIX")

In [4]:
strBucketName = pm.get_params(key="-".join([strPrefix, "BUCKET"]))
strExecutionRole = pm.get_params(key="-".join([strPrefix, "SAGEMAKER-ROLE-ARN"]))

In [5]:
print (f'strExecutionRole: {strExecutionRole}')

strExecutionRole: arn:aws:iam::419974056037:role/service-role/AmazonSageMaker-ExecutionRole-20240211T155431


## 2. EventBridge - Lambda - SNS 설정

In [17]:
import boto3

### 2.1 SNS Topoc 생성

1. 토픽생성 : Topic - Create Topic
2. 구독생성 : Subscriptions - Create subscription 
    1. Topic ARN - 1.에서 생성한 Topic ARN 선택
    2. Protocol - Email 선택
    3. Endpoint - 알람을 받을 이메일 입력
    4. Endpoint의 이메일을 받으면 Confirm subscription 선택

In [18]:
strSNSTopicArn = "arn:aws:sns:us-east-1:419974056037:Monitor-SageMaker-Pipeline" # <Your ARN>

### 2-1. Lambda function 생성

In [19]:
import shutil
import zipfile
import tempfile
from utils.lambda_func import lambda_handler

In [20]:
lam_handler = lambda_handler(region_name=strRegionName)

In [21]:
%%writefile ./monitor/lambda/pipeline_monitor_lambda.py

import os
import boto3
import json
from pprint import pprint

TOPIC_ARN = os.environ['TOPIC_ARN']

def lambda_handler(event, context):
    
    # TODO implement
    
    pprint (event)
    print ("==")
    
    strPipelineArn = event["detail"]["pipelineArn"]
    strStepName = event["detail"]["stepName"]
    strCurrentStepStatus = event["detail"]["currentStepStatus"]
    strFailReasion = event["detail"]["failureReason"]
    strEndTime = event["detail"]["stepEndTime"]
    strMetaData = str(event["detail"]["metadata"])
    
    
    print (f'strPipelineArn: {strPipelineArn}')
    print (f'strStepName: {strStepName}')
    print (f'strMetaData: {strMetaData}')
    print (f'strCurrentStepStatus: {strCurrentStepStatus}')
    print (f'strFailReasion: {strFailReasion}')
    print (f'strEndTime: {strEndTime}')
    
    print ("TOPIC_ARN", TOPIC_ARN)
    
        # Send message to SNS
    MY_SNS_TOPIC_ARN = TOPIC_ARN #'<Topic ARN, SNS - Topics에서 확인, arn:aws:sns:us-east-1:계정:TestTopic >'
    msg = "\n".join(
        [
            f'Pipeline ARN: {strPipelineArn}',
            f'Step Name: {strStepName}',
            f'Job Name: {strMetaData}',
            f'Status: {strCurrentStepStatus}',
            f'Fail Reason: {strFailReasion}',
            f'End time: {strEndTime}'
        ]
    )
    
    sns_client = boto3.client('sns')
    sns_client.publish(
        TopicArn=MY_SNS_TOPIC_ARN,
        Subject='[AWS Notification] Monitor for SageMaker pipleine',
        Message=msg
    )

    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

Overwriting ./monitor/lambda/pipeline_monitor_lambda.py


In [22]:
strLambdaRoleArn = pm.get_params(key="-".join([strPrefix, "LAMBDA-ROLE-ARN"]))
strLambdaFuncName = "-".join([strPrefix, "LAMBDA"])
strLambdaSrcDir = "./monitor/lambda"
strLambdaHandler = "pipeline_monitor_lambda.lambda_handler"

In [23]:
print (f'strLambdaRoleArn: {strLambdaRoleArn}')
print (f'strRegionName: {strRegionName}')

strLambdaRoleArn: arn:aws:iam::419974056037:role/DJ-SM-IMD-LabmdaRole
strRegionName: us-west-2


In [24]:
with tempfile.TemporaryDirectory() as tempDirPath:
    
    lambda_archive_path = shutil.make_archive(
        base_name=tempDirPath,
        format="zip",
        root_dir=strLambdaSrcDir,
    )
    
    with open(lambda_archive_path, 'rb') as f: zipped_code = f.read()
    
    strLambdaArn = lam_handler.create_function(
        Code=dict(ZipFile=zipped_code),
        Description='SageMaker IMD: Lambda for Automating Amazon SageMaker with Amazon EventBridge',
        Environment={
           "Variables": {
               "REGION":strRegionName,
               "TOPIC_ARN":strSNSTopicArn
           },
        },
        FunctionName=strLambdaFuncName,
        Handler=strLambdaHandler,
        Publish=True,
        Role=strLambdaRoleArn,
        Runtime='python3.9',
    )
    
print (f'LambdaArn: {strLambdaArn}')
print (f'strLambdaFuncName: {strLambdaFuncName}')
pm.put_params(key="-".join([strPrefix, "LAMBDA-PIPELINE-MONITOR"]), value=strLambdaFuncName, overwrite=True)

== CREATE LAMBDA FUNCTION ==
Argments for lambda below:

{'Architectures': ['x86_64'],
 'CodeSha256': 'Uub8ZPdrJATUEX9TV1yIk/+sDe1hX5W15p+P5Y3b/Lg=',
 'CodeSize': 789,
 'Description': 'SageMaker IMD: Lambda for Automating Amazon SageMaker with '
                'Amazon EventBridge',
 'Environment': {'Variables': {'REGION': 'us-west-2',
                               'TOPIC_ARN': 'arn:aws:sns:us-east-1:419974056037:Monitor-SageMaker-Pipeline'}},
 'EphemeralStorage': {'Size': 512},
 'FunctionArn': 'arn:aws:lambda:us-west-2:419974056037:function:DJ-SM-IMD-LAMBDA',
 'FunctionName': 'DJ-SM-IMD-LAMBDA',
 'Handler': 'pipeline_monitor_lambda.lambda_handler',
 'LastModified': '2025-01-10T09:03:33.310+0000',
 'LoggingConfig': {'LogFormat': 'Text',
                   'LogGroup': '/aws/lambda/DJ-SM-IMD-LAMBDA'},
 'MemorySize': 128,
 'PackageType': 'Zip',
 'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
                                      'content-length': '1538',
               

'Store suceess'

### 2-1. Event Rule 생성
* [Automating Amazon SageMaker with Amazon EventBridge](https://docs.aws.amazon.com/sagemaker/latest/dg/automating-sagemaker-with-eventbridge.html#eventbridge-pipeline)
* [BOTO3 for eventbridge](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/events.html)

In [25]:
client_events = boto3.client('events')

In [26]:
dicEventPattern = '''
{
    "source": ["aws.sagemaker"],
    "detail-type": ["SageMaker Model Building Pipeline Execution Step Status Change"],
    "detail": {
        "currentStepStatus": ["Failed"]
    }
    "resources": ["your pipeline arn"]
}
'''
strEventBridgeRole = pm.get_params(key="-".join([strPrefix, "CODE-EVENTBRIDGE-ROLE-ARN"]))
strEventRuleName = "SAGEMAKER-PIPELINE-STEP-MONITOR"
pm.put_params(key="-".join([strPrefix, "EVENT-RULE-NAME"]), value=strEventRuleName, overwrite=True)

'Store suceess'

In [27]:
print (f'strEventBridgeRole: {strEventBridgeRole}')
print (f'strEventRuleName: {strEventRuleName}')

strEventBridgeRole: arn:aws:iam::419974056037:role/DJ-SM-IMD-EventBridgeRole
strEventRuleName: SAGEMAKER-PIPELINE-STEP-MONITOR


In [28]:
rule_response = client_events.put_rule(
    Name=strEventRuleName,
    #ScheduleExpression='string',
    EventPattern=dicEventPattern,
    State="ENABLED", #'ENABLED'|'DISABLED'
    Description="Trigger when currentStepStatus is Failed",
    RoleArn=strEventBridgeRole
)
rule_response

{'RuleArn': 'arn:aws:events:us-west-2:419974056037:rule/SAGEMAKER-PIPELINE-STEP-MONITOR',
 'ResponseMetadata': {'RequestId': 'd37c5f7c-7a9c-49c7-8877-76496ee9849e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd37c5f7c-7a9c-49c7-8877-76496ee9849e',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Fri, 10 Jan 2025 09:03:34 GMT'},
  'RetryAttempts': 0}}

### 2.2 target 설정

In [29]:
target_response = client_events.put_targets(
    Rule=strEventRuleName,
    Targets=[
        {
            'Id': strLambdaFuncName,
            'Arn': strLambdaArn
        }
    ]
)

In [30]:
target_response

{'FailedEntryCount': 0,
 'FailedEntries': [],
 'ResponseMetadata': {'RequestId': 'dff78766-9d44-450e-8a94-33e601ef7bc1',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'dff78766-9d44-450e-8a94-33e601ef7bc1',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '41',
   'date': 'Fri, 10 Jan 2025 09:03:36 GMT'},
  'RetryAttempts': 0}}

In [31]:
add_lambda_permission = lam_handler.add_permission(
    strLambdaArn=strLambdaArn,
    strLambdaFuncName=strLambdaFuncName,
    SourceArn=rule_response["RuleArn"]
)
add_lambda_permission

{'ResponseMetadata': {'RequestId': 'f7a5f9a1-34ea-4584-8e2d-157f97ec7611',
  'HTTPStatusCode': 201,
  'HTTPHeaders': {'date': 'Fri, 10 Jan 2025 09:03:38 GMT',
   'content-type': 'application/json',
   'content-length': '370',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'f7a5f9a1-34ea-4584-8e2d-157f97ec7611'},
  'RetryAttempts': 0},
 'Statement': '{"Sid":"62026DJ-SM-IMD-LAMBDA","Effect":"Allow","Principal":{"Service":"events.amazonaws.com"},"Action":"lambda:InvokeFunction","Resource":"arn:aws:lambda:us-west-2:419974056037:function:DJ-SM-IMD-LAMBDA","Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:events:us-west-2:419974056037:rule/SAGEMAKER-PIPELINE-STEP-MONITOR"}}}'}

In [68]:
aa = {'version': '0', 'id': '317a4633-524f-2ac3-2cbb-f038c637920e', 'detail-type': 'SageMaker Model Building Pipeline Execution Step Status Change', 'source': 'aws.sagemaker', 'account': '419974056037', 'time': '2023-05-04T15:31:33Z', 'region': 'us-east-1', 'resources': ['arn:aws:sagemaker:us-east-1:419974056037:pipeline/dj-sm-imd-pipeline', 'arn:aws:sagemaker:us-east-1:419974056037:pipeline/dj-sm-imd-pipeline/execution/x1xt2felyccl'], 'detail': {'failureReason': 'ClientError: Failed to invoke sagemaker:CreateTrainingJob. Error Details: No S3 objects found under S3 URL "s3://sagemaker-us-east-1-419974056037/dataset/train_.csv" given in input data source. Please ensure that the bucket exists in the selected region (us-east-1), that objects exist under that S3 prefix, and that the role "arn:aws:iam::419974056037:role/service-role/AmazonSageMaker-ExecutionRole-20221206T163436" has "s3:ListBucket" permissions on bucket "sagemaker-us-east-1-419974056037".', 'metadata': {}, 'stepStartTime': '2023-05-04T15:31:32Z', 'stepEndTime': '2023-05-04T15:31:33Z', 'stepName': 'TrainingProcess', 'stepType': 'Training', 'previousStepStatus': 'Starting', 'currentStepStatus': 'Failed', 'pipelineArn': 'arn:aws:sagemaker:us-east-1:419974056037:pipeline/dj-sm-imd-pipeline', 'pipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:419974056037:pipeline/dj-sm-imd-pipeline/execution/x1xt2felyccl'}}

In [97]:
strPipelineArn = aa["detail"]["pipelineArn"]
strStepName = aa["detail"]["stepName"]
strCurrentStepStatus = aa["detail"]["currentStepStatus"]
strFailReasion = aa["detail"]["failureReason"]
strEndTime = aa["detail"]["stepEndTime"]

In [69]:
import pprint

In [71]:
pprint.pprint(aa)

{'account': '419974056037',
 'detail': {'currentStepStatus': 'Failed',
            'failureReason': 'ClientError: Failed to invoke '
                             'sagemaker:CreateTrainingJob. Error Details: No '
                             'S3 objects found under S3 URL '
                             '"s3://sagemaker-us-east-1-419974056037/dataset/train_.csv" '
                             'given in input data source. Please ensure that '
                             'the bucket exists in the selected region '
                             '(us-east-1), that objects exist under that S3 '
                             'prefix, and that the role '
                             '"arn:aws:iam::419974056037:role/service-role/AmazonSageMaker-ExecutionRole-20221206T163436" '
                             'has "s3:ListBucket" permissions on bucket '
                             '"sagemaker-us-east-1-419974056037".',
            'metadata': {},
            'pipelineArn': 'arn:aws:sagemaker:us-east

In [3]:
meta = {'processingJob': {'arn': 'arn:aws:sagemaker:us-east-1:419974056037:processing-job/pipelines-a1bzloczzdgz-PreprocessingProcess-Z7ov8YlR7l'}}

In [1]:
res = {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge', 'InstanceCount': 1, 'VolumeSizeInGB': 30}}, 'AppSpecification': {'ImageUri': '419974056037.dkr.ecr.us-east-1.amazonaws.com/mlops-image-prep:latest', 'ContainerArguments': ['--prefix_prep', '/opt/ml/processing/', '--region', 'us-east-1'], 'ContainerEntrypoint': ['/bin/bash', '/opt/ml/processing/input/entrypoint/runproc.sh']}, 'RoleArn': 'arn:aws:iam::419974056037:role/service-role/AmazonSageMaker-ExecutionRole-20221206T163436', 'ProcessingInputs': [{'InputName': 'input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/DJ-SM-PIPELINE-DATA', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/preprocessing/source/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'entrypoint', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/preprocessing/source/runproc.sh', 'LocalPath': '/opt/ml/processing/input/entrypoint', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}], 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'train-data', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/DJ-SM-PIPELINE-MODEL-1/preprocessing/output/train-data', 'LocalPath': '/opt/ml/processing/output/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'validation-data', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/DJ-SM-PIPELINE-MODEL-1/preprocessing/output/validation-data', 'LocalPath': '/opt/ml/processing/output/validation', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'test-data', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/DJ-SM-PIPELINE-MODEL-1/preprocessing/output/test-data', 'LocalPath': '/opt/ml/processing/output/test', 'S3UploadMode': 'EndOfJob'}}]}}

In [3]:
res.keys()

dict_keys(['ProcessingResources', 'AppSpecification', 'RoleArn', 'ProcessingInputs', 'ProcessingOutputConfig'])

In [7]:
res["ProcessingOutputConfig"]["Outputs"]

[{'OutputName': 'train-data',
  'AppManaged': False,
  'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/DJ-SM-PIPELINE-MODEL-1/preprocessing/output/train-data',
   'LocalPath': '/opt/ml/processing/output/train',
   'S3UploadMode': 'EndOfJob'}},
 {'OutputName': 'validation-data',
  'AppManaged': False,
  'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/DJ-SM-PIPELINE-MODEL-1/preprocessing/output/validation-data',
   'LocalPath': '/opt/ml/processing/output/validation',
   'S3UploadMode': 'EndOfJob'}},
 {'OutputName': 'test-data',
  'AppManaged': False,
  'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-419974056037/DJ-SM-PIPELINE-MODEL-1/preprocessing/output/test-data',
   'LocalPath': '/opt/ml/processing/output/test',
   'S3UploadMode': 'EndOfJob'}}]

In [1]:
strPipelineArn = "23"
strStepName = "23"
strMetaData = "23"
strCurrentStepStatus = "23"
strFailReasion = "23"
strEndTime = "23"

msg = "\n".join(
    [
        f'Pipeline ARN: {strPipelineArn}',
        f'Step Name: {strStepName}',
        f'Job Name: {strMetaData}',
        f'Status: {strCurrentStepStatus}',
        f'Fail Reason: {strFailReasion}',
        f'End time: {strEndTime}'
    ]
)

In [2]:
msg

'Pipeline ARN: 23\nStep Name: 23\nJob Name: 23\nStatus: 23\nFail Reason: 23\nEnd time: 23'