In [1]:
import boto3
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline

from sagemaker.workflow.pipeline_context import LocalPipelineSession # local_run

from sagemaker import get_execution_role
import sagemaker.session

In [2]:
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join

from sagemaker.workflow.parameters import ParameterInteger,ParameterString, ParameterFloat,ParameterBoolean

!mkdir -p docker

%%writefile docker/Dockerfile
FROM python:3.9-buster

RUN pip install --upgrade pip
RUN pip install pandas==1.4.2 sklearn pyarrow joblib s3fs lightgbm 
ENV PYTHONUNBUFFERED=TRUE  
ENV PYTHONPATH "${PYTHONPATH}:/opt/ml/processing/input/"

ENTRYPOINT ["python3"]

https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/create-a-custom-docker-container-image-for-sagemaker-and-use-it-for-model-training-in-aws-step-functions.html

Important: Before running the following cells, make sure that you’ve created a Dockerfile and stored it in the directory called docker. Also, make sure that you’ve created an Amazon ECR repository, and that you replace the ecr_repository value in the first cell with your repository’s name.

In [4]:
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.Session().region_name
ecr_repository = 'sagemaker-container-simple-model-pipe' #repository name
tag = ':latest'
processing_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)
print(processing_repository_uri)

Building docker image

In [36]:
#session = LocalPipelineSession() # local_run
script_processor = ScriptProcessor(command=['python3'],
                   image_uri=processing_repository_uri,
                   role='arn:aws:iam::838325439115:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole',
                   instance_count=1,
                   instance_type='ml.t3.medium', #local_run: 'local' // 'ml.t3.medium' 
                   #sagemaker_session=session # local_run
                    )

In [58]:
# define arguments
MainPath = ParameterString(name="main_path", default_value="s3://tenant-k/simple_pipe/model")
PredictionsPath = ParameterString(name="predictions_path", default_value="s3://tenant-k/simple_pipe/model/predictions")
TrainingData = ParameterString(name="training_data", default_value="diabetes")
TargetColumn = ParameterString(name="target_column", default_value="Outcome")
TestSize = ParameterFloat(name="test_size", default_value=0.25)
ModelParamsPath = ParameterString(name="model_params_path", default_value="s3://tenant-k/simple_pipe/model/model_params")
SaveTheModel = ParameterBoolean(name="save_the_model", default_value=True)

In [59]:
data_prep_step = ProcessingStep(
                    name="DataPrepAndTraining",
                    processor=script_processor,
                    code="prep_step.py",
                    inputs=[
                        ProcessingInput(source='./', 
                                        destination="/opt/ml/processing/input/"),
                    ],
                    outputs=[
                        ProcessingOutput(output_name='output',
                                         source="/opt/ml/processing/output/data/")
                    ],
                    job_arguments=['--main_path', MainPath,
                                   #'--training_data', TrainingData,
                                   #'--target_column',TargetColumn,
                                   #'--test_size',TestSize,
                                   #'--model_params_path',ModelParamsPath,
                                   #'--save_the_model',SaveTheModel
                                  ]
    
                      )

In [61]:
prediction = ProcessingStep(
                    name="Predictions",
                    processor=script_processor,
                    code="pred_step.py",
                    inputs=[
                        ProcessingInput(source='./', 
                                        destination="/opt/ml/processing/input/"),
                    ],
                    outputs=[
                        ProcessingOutput(output_name='models',
                                         source="/opt/ml/processing/output/data/", 
                                         destination="s3://tenant-k/simple_pipe/model")
                    ],
                    job_arguments=['--main_path', MainPath,
                                   #'--predictions_path', PredictionsPath
                                  ],
                    
                    depends_on = [data_prep_step]
                      )

Defining Parameters.

In [62]:
pipeline_name = "SimpleModelPipe"
pipeline = Pipeline(name=pipeline_name,
                    pipeline_experiment_config=PipelineExperimentConfig(
                          ExecutionVariables.PIPELINE_NAME,
                          Join(on="-", values=["SimpleModelPipe", ExecutionVariables.PIPELINE_EXECUTION_ID])),  
                    parameters=[MainPath,
                                #PredictionsPath,TrainingData,TargetColumn,TestSize,ModelParamsPath,SaveTheModel
                               ],
                    steps=[data_prep_step,prediction],
                    #sagemaker_session=session # local run
)

Saving pipeline with the role.

In [63]:
role = get_execution_role()
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:838325439115:pipeline/simplemodelpipe',
 'ResponseMetadata': {'RequestId': 'd6315f5a-6395-4422-b744-ace0eda7c10b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd6315f5a-6395-4422-b744-ace0eda7c10b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Mon, 21 Nov 2022 09:01:44 GMT'},
  'RetryAttempts': 0}}

Now we will give parameters inside pipeline and run it.

In [64]:
execution = pipeline.start(
    parameters=dict(
        main_path="s3://tenant-k/simple_pipe/model"
    ))

## Deleting a pipelilne

In [None]:
pipeline.delete()

# Pipeline Informations