In [1]:
from datetime import datetime

import sagemaker
import secrets
import boto3
import os

In [4]:
def get_sagemaker_session():
    """
    Gets sagemaker session, bucket and role.
    """
    sagemaker_session = sagemaker.Session()
    role = sagemaker.get_execution_role()
    bucket = sagemaker_session.default_bucket()

    print(f"Role : {role}")
    return sagemaker_session, role, bucket

sagemaker_session, role, bucket = get_sagemaker_session()

Role : arn:aws:iam::436376758376:role/service-role/SageMaker-MLOpsEngineer1


In [5]:
bucket

'sagemaker-eu-west-1-436376758376'

In [6]:
prefix = "projects/mlops"

In [7]:
logs = {}
model_id_prefix = "sklearn-dummy"
date_str = datetime.now().strftime("%Y-%m-%d")
logs["model_id"] = f'{model_id_prefix}-{date_str}-' + secrets.token_hex(nbytes=16)
print(logs["model_id"])

sklearn-dummy-2023-08-21-e3b9520dccff4036b738fb79d1e0d5ec


In [8]:
now = datetime.now()

In [9]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

base_job_prefix="mlops-test"
model_package_group_name = "MLOpsTestModel"
pipeline_name = "TrainingPipelineMLOpsTest"  # SageMaker Pipeline name

# parameters for pipeline execution
processing_instance_count = 1
evaluation_instance_count = 1
processing_instance_type = "ml.m5.large"
training_instance_type = "ml.m5.large"
evaluation_instance_type = "ml.m5.large"
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

In [10]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

In [11]:
sklearn_framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=sklearn_framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    volume_size_in_gb=5,
    base_job_name="mlops-test-nb-pipeline-preprocess",
    role=role
)

In [12]:
processing_step = ProcessingStep(
    name="MLOpsTestProcessing",
    processor=sklearn_processor,
    outputs=[
        ProcessingOutput(output_name="train_data", source="/opt/ml/processing/train",
                        destination=os.path.join(f"s3://{bucket}", prefix, "challenger", 
                                                 now.strftime("%Y/%m/%d"), logs["model_id"], "data", "train")),
        ProcessingOutput(output_name="test_data", source="/opt/ml/processing/test",
                        destination=os.path.join(f"s3://{bucket}", prefix, "challenger", 
                                                 now.strftime("%Y/%m/%d"), logs["model_id"], "data", "test")),
    ],
    code=os.path.join("../../train/code/preprocess.py")
)

In [13]:
from sagemaker.sklearn.estimator import SKLearn

sklearn = SKLearn(
    entry_point="train.py",
    source_dir="../../train/code/",
    framework_version="1.2-1",
    instance_type=training_instance_type,
    instance_count=1,
    role=role,
    sagemaker_session=sagemaker_session,
    output_path=f"s3://{bucket}/{prefix}/training/output",
    code_location=f"s3://{bucket}/{prefix}/training/code"
)


In [14]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="mlops-pipeline-test-model",
    estimator=sklearn,
    inputs={
        "train": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
            content_type="text/csv",
        )
    }
)

In [15]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)


In [16]:
evaluation = SKLearnProcessor(
    framework_version=sklearn_framework_version,
    instance_type=evaluation_instance_type,
    instance_count=evaluation_instance_count,
    volume_size_in_gb=5,
    base_job_name="mlops-test-nb-pipeline-evaluate",
    role=role
)

In [17]:
step_evaluate = ProcessingStep(
    name="EvaluatePerformance",
    processor=evaluation,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/model",
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination=os.path.join(f"s3://{bucket}", prefix, "challenger", 
                                                 now.strftime("%Y/%m/%d"), logs["model_id"], "data", "evaluation_report")),
    ],
    property_files=[evaluation_report],
    code="../../train/code/evaluate.py"
)


In [18]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    steps=[processing_step, step_train, step_evaluate]
)

In [19]:
import json
json.loads(pipeline.definition())

Using provided s3_resource


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'MLOpsTestProcessing',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.large',
      'InstanceCount': 1,
      'VolumeSizeInGB': 5}},
    'AppSpecification': {'ImageUri': '141502667606.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/preprocess.py']},
    'RoleArn': 'arn:aws:iam::436376758376:role/service-role/SageMaker-MLOpsEngineer1',
    'ProcessingInputs': [{'InputName': 'code',
      'AppManaged': False,
      'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-436376758376/MLOpsTestProcessing-28a82667eb79a70202ead63468573f44/input/code/preprocess.py',
       'LocalPath': '/opt/ml/processing/input/code',
       

In [20]:
pipeline.upsert(role_arn=role)

Using provided s3_resource


{'PipelineArn': 'arn:aws:sagemaker:eu-west-1:436376758376:pipeline/TrainingPipelineMLOpsTest',
 'ResponseMetadata': {'RequestId': '441b2d97-3bbb-4868-a798-bcb431f97469',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '441b2d97-3bbb-4868-a798-bcb431f97469',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '93',
   'date': 'Mon, 21 Aug 2023 09:01:35 GMT'},
  'RetryAttempts': 0}}

In [23]:
pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:eu-west-1:436376758376:pipeline/TrainingPipelineMLOpsTest/execution/d3ew88p85ela', sagemaker_session=<sagemaker.session.Session object at 0x7f025703b550>)

In [24]:
sm = boto3.client('sagemaker')
sm.list_pipelines()

{'PipelineSummaries': [{'PipelineArn': 'arn:aws:sagemaker:eu-west-1:436376758376:pipeline/TrainingPipelineMLOpsTest',
   'PipelineName': 'TrainingPipelineMLOpsTest',
   'PipelineDisplayName': 'TrainingPipelineMLOpsTest',
   'RoleArn': 'arn:aws:iam::436376758376:role/service-role/SageMaker-MLOpsEngineer1',
   'CreationTime': datetime.datetime(2023, 8, 21, 9, 1, 35, 568000, tzinfo=tzlocal()),
   'LastModifiedTime': datetime.datetime(2023, 8, 21, 9, 4, 36, 460000, tzinfo=tzlocal())}],
 'ResponseMetadata': {'RequestId': '56cfc0fd-da51-490a-8e8a-ea2b14ec956a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '56cfc0fd-da51-490a-8e8a-ea2b14ec956a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '358',
   'date': 'Mon, 21 Aug 2023 09:04:37 GMT'},
  'RetryAttempts': 0}}

In [22]:
import sys

sys.path.append("../../src/lambda/")