In [4]:
!pip install -q sagemaker --upgrade

[0m

In [5]:
import json
import time
from time import gmtime, strftime

In [6]:
import sagemaker 
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.inputs import TrainingInput, CreateModelInput
from sagemaker.tensorflow.processing import TensorFlowProcessor
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.tensorflow import TensorFlow

from sagemaker import image_uris
from sagemaker.estimator import Estimator

In [7]:
print(sagemaker.__version__)

session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = session.default_bucket()

import os 
bucket = os.environ["S3_BUCKET"]

2.103.0


In [8]:
prefix = 'MNIST'
input_data_uri = session.upload_data("./input",bucket = bucket)

In [9]:
region = ParameterString(
    name='Region',
    default_value='us-west-2'
)

processing_instance_count = ParameterInteger(
    name='ProcessingInstanceCount',
    default_value=1
)

processing_instance_type = ParameterString(
    name='ProcessingInstanceType',
    default_value='ml.t3.medium'
)

training_instance_type = ParameterString(
    name='TrainingInstanceType',
    default_value='ml.t3.medium'
)

training_instance_count = ParameterInteger(
    name='TrainingInstanceCount',
    default_value=1
)


model_approval_status = ParameterString(
    name='ModelApprovalStatus',
    default_value='PendingManualApproval'
)

input_data = ParameterString(
    name='InputData'
)

model_name = ParameterString(
    name='ModelName'
)

In [10]:
tensorflow_processor = TensorFlowProcessor(framework_version='2.3', role=role, instance_type=processing_instance_type, instance_count=processing_instance_count, py_version='py37')

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it.


In [16]:
step_process = ProcessingStep(
    name='preprocessing',
    code='scripts/preprocessing.py',
    processor=tensorflow_processor,
    inputs=[ProcessingInput(source=f"s3://{bucket}/data", destination="/opt/ml/processing/input")],
    outputs=[ProcessingOutput(source='/opt/ml/processing/output/')],
)

In [29]:
tf_estimator = TensorFlow(entry_point="./scripts/training.py",role=role,instance_count=1,instance_type="ml.m5.large",framework_version="2.3",py_version="py37")

In [30]:
step_train = TrainingStep(name='training',estimator=tf_estimator,inputs=f"s3://{bucket}/data")

In [31]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = 'testpipeline'

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        region,
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,
        training_instance_count,
        model_approval_status,
        input_data,
        model_name
    ],
    steps=[step_process, step_train]
)

In [32]:
pipeline.upsert(role_arn=role)
execution = pipeline.start(parameters=dict(InputData=input_data_uri,ModelName='test'))

In [33]:
execution.list_steps()

[{'StepName': 'preprocessing',
  'StartTime': datetime.datetime(2022, 8, 12, 11, 9, 33, 911000, tzinfo=tzlocal()),
  'StepStatus': 'Starting',
  'AttemptCount': 0,
  'Metadata': {}},
 {'StepName': 'training',
  'StartTime': datetime.datetime(2022, 8, 12, 11, 9, 33, 911000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'AttemptCount': 0,
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:883411003189:training-job/pipelines-byqpl54u84vj-training-a7mi10xxdy'}}}]

In [34]:
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(session)
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

{'StepName': 'training', 'StartTime': datetime.datetime(2022, 8, 12, 11, 9, 33, 911000, tzinfo=tzlocal()), 'StepStatus': 'Executing', 'AttemptCount': 0, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:883411003189:training-job/pipelines-byqpl54u84vj-training-a7mi10xxdy'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://sagemaker-us-west-2-883411003189/data,Input,DataSet,ContributedTo,artifact
1,76310...aws.com/tensorflow-training:2.3-cpu-py37,Input,Image,ContributedTo,artifact


{'StepName': 'preprocessing', 'StartTime': datetime.datetime(2022, 8, 12, 11, 9, 33, 911000, tzinfo=tzlocal()), 'StepStatus': 'Executing', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:883411003189:processing-job/pipelines-byqpl54u84vj-preprocessing-qypgbskcj3'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...b5db09/input/entrypoint/preprocessing.py,Input,DataSet,ContributedTo,artifact
1,s3://sagemaker-us-west-2-883411003189/data,Input,DataSet,ContributedTo,artifact
2,76310...aws.com/tensorflow-training:2.3-cpu-py37,Input,Image,ContributedTo,artifact
3,s3://...4aa88b79e3fb86eb08b5db09/output/output-1,Output,DataSet,Produced,artifact
