# Custom Pipeline: Image Feature Engineering for chained GroundTruth Job 

In this pipeline we will create MLOps that will pre-process images for a GroundTruth Job, then kick off a GroundTruth Labeling Job.

In this example the ML project uses the SageMaker Segmantation algorithm which is recommended to use 512x512 labelled images. We also want to clean files prior to labeling. Other projects types may have other cleaning requirements. 

The pipeline will perform computer vision parsing over supplied 'drop' files and add it to a pool of 'groundtruth-input-images'.

Upon completion, the pipeline will setup a chained GroundTruth Labeling job (extending/improving previous labeling jobs for this project).

In [None]:
import boto3
import sagemaker

region = boto3.Session().region_name

default_bucket = sagemaker.session.Session().default_bucket()
sagemaker_session = sagemaker.Session(default_bucket=default_bucket)

sts = boto3.client("sts")
account_id = sts.get_caller_identity()["Account"]

# Change these to reflect your project/business name or if you want to separate ModelPackageGroup/Pipeline from the rest of your team
project_friendly_name = "My Labeling Project"
project_prefix = 'myproject-image-labeling'
pipeline_description = "Pipeline for preparing new images for labeling with GroundTruth"

pipeline_name = "{}-groundtruth-pipeline".format(project_prefix)

#Update with your private workforce arn
groundtruth_private_workforce_arn = "arn:aws:sagemaker:ap-southeast-2:XXXXXXXXX:workteam/private-crowd/myteam"

#Ground Truth Streaming Labelling requires unique policies to interact with SNS, SQS, S3, etc. Lets use our role created by the setup CloudFormation Script for this project.
role = "arn:aws:iam::{}:role/{}-sagemaker-execution-role".format(account_id, project_prefix)

#See CF script and readme.md setup - /pipelines/groundtruth_feature_engineering
sns_topic_arn_streaming_labeling="arn:aws:sns:{}:{}:{}-topic-streaming-labeling".format(region, account_id, project_prefix)

s3_bucketname_drop = '{}-drop'.format(project_prefix)
s3_bucketname_build_artifacts = '{}-buildartifacts'.format(project_prefix)
s3_bucketname_labelinginstructions = '{}-labelinginstructions-publicwebsite'.format(project_prefix)
s3_bucketname_streaming_labeling_input = '{}-streaminglabeling-input'.format(project_prefix)
s3_bucketname_streaming_labeling_output = '{}-streaminglabeling-output'.format(project_prefix)
s3_publicwebsite_labelinginstructions_url = "https://{}-labelinginstructions-publicwebsite.s3.{}.amazonaws.com".format(project_prefix, region)

### Get the pipeline instance

Here we get the pipeline instance from your pipeline module so that we can work with it.

In [None]:
# from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import CacheConfig, ProcessingStep

from sagemaker.sklearn.processing import SKLearnProcessor

from datetime import datetime

param_processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.c5.2xlarge")
param_processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

script_feature_engineering="s3://{}/latest/smp_code/{}".format(s3_bucketname_build_artifacts, "1_feature_engineering.py")
script_groundtruth_chain_job="s3://{}/latest/smp_code/{}".format(s3_bucketname_build_artifacts, "2_groundtruth_chain_job.py")

param_project_friendly_name = ParameterString(name="ProjectFriendlyName", default_value=project_friendly_name)
param_project_prefix = ParameterString(name="ProjectPrefix", default_value=project_prefix)
param_aws_region = ParameterString(name="AWSRegion", default_value=region)
param_default_bucket = ParameterString(name="SageMakerDefaultBucket", default_value=default_bucket)

param_groundtruth_private_workforce_arn = ParameterString(name="GroundTruthPrivateWorkforceArn", default_value=groundtruth_private_workforce_arn)

param_s3bucketname_drop = ParameterString(name="S3BucketNameDrop", default_value=s3_bucketname_drop)
param_s3bucketname_streaming_labeling_input = ParameterString(name="S3BucketNameStreamingLabelingInput", default_value=s3_bucketname_streaming_labeling_input)
param_s3bucketname_streaming_labeling_output = ParameterString(name="S3BucketNameStreamingLabelingOutput", default_value=s3_bucketname_streaming_labeling_output)
param_s3bucketname_labelinginstructions =  ParameterString(name="S3BucketNameLabelingInstructions", default_value=s3_bucketname_labelinginstructions)

param_s3_publicwebsite_labelinginstructions_url = ParameterString(name="S3PublicWebsiteLabelingInstructionsUrl", default_value=s3_publicwebsite_labelinginstructions_url)

param_sns_topic_arn_streaming_labeling = ParameterString(name="SNSTopicArnStreamingLabeling", default_value=sns_topic_arn_streaming_labeling)

param_groundtruth_execution_role_arn = ParameterString(name="GroundTruthExecutionRoleArn", default_value=role)

# Cache configuration for workflow
cache_config = CacheConfig(enable_caching=False, expire_after="30d")

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=param_processing_instance_type,
    instance_count=param_processing_instance_count,
    base_job_name=project_prefix + "/preprocessing",
    sagemaker_session=sagemaker_session,
    role=role,
)


step_feature_engineering = ProcessingStep(
    name="FeatureEngineeringProcess",
    display_name="{}-feature-engineering-step".format(project_prefix),
    description="Step to pick up any new images placed in drop folder, perform feature engineering ready for Ground Truth Labeling",
    cache_config=cache_config,
    processor=sklearn_processor,
    job_arguments=[
        "--project-prefix",param_project_prefix,
        "--s3bucketname-drop",param_s3bucketname_drop,
        "--s3bucketname-groundtruth-job-input",param_s3bucketname_streaming_labeling_input,
    ],
    code=script_feature_engineering,
)


step_groundtruth_chain_job = ProcessingStep(
    name="GroundTruthChainJob",
    display_name="{}-groundtruth-chain-job".format(project_prefix),
    description="Step to start a chained Ground Truth job for the project",
    cache_config=cache_config,
    processor=sklearn_processor,
    job_arguments=[
        "--project-friendly-name",param_project_friendly_name,
        "--project-prefix",param_project_prefix,
        "--region",param_aws_region,
        "--s3bucketname-groundtruth-labelinginstructions",param_s3bucketname_labelinginstructions,
        "--s3bucketname-groundtruth-job-input",param_s3bucketname_streaming_labeling_input,
        "--s3bucketname-groundtruth-job-output",param_s3bucketname_streaming_labeling_output,
        "--urlwebsite-labelinginstructions",param_s3_publicwebsite_labelinginstructions_url,
        "--sns-topic-arn-streaming-labeling",param_sns_topic_arn_streaming_labeling,
        "--groundtruth-execution-role-arn",param_groundtruth_execution_role_arn,
        "--groundtruth-private-workforce-arn",param_groundtruth_private_workforce_arn,
    ],
    depends_on=[step_feature_engineering],
    code=script_groundtruth_chain_job,
)

tags = [{
         'Key': 'Project', 
         'Value': project_friendly_name
        },
        {
         'Key': 'Purpose', 
         'Value': 'GroundTruth Labeling'
        },
        {
         'Key': 'Environment', 
         'Value': 'Development'
        }
] 

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        param_processing_instance_type, 
        param_processing_instance_count,
        param_project_friendly_name,
        param_project_prefix,
        param_s3bucketname_drop,
        param_s3bucketname_labelinginstructions,
        param_s3bucketname_streaming_labeling_input,
        param_s3bucketname_streaming_labeling_output,
        param_s3_publicwebsite_labelinginstructions_url,
        param_sns_topic_arn_streaming_labeling,
        param_aws_region,
        param_groundtruth_execution_role_arn,
        param_groundtruth_private_workforce_arn
    ],
    steps=[
        step_feature_engineering,
        step_groundtruth_chain_job
    ],
)

# upsert creates or updates the pipeline.
pipeline.upsert(role_arn=role,
               description=pipeline_description,
               tags=tags)

### [Optional] Kick-off our pipeline manually

If we want to manually kick off our pipeline, we should be able to... Just make sure there is at least 1 image in the `streaminglabeling-input` bucket. 

In [None]:
#Run the pipeline
# execution = pipeline.start(parameters=pipeline_parameters)
execution = pipeline.start(
    execution_display_name="{}-{}{:02d}{:02d}{}{}{}".format(
        project_prefix,datetime.now().year, 
        datetime.now().month, 
        datetime.now().day,
        datetime.now().hour, 
        datetime.now().minute, 
        datetime.now().second, 
    ),
    execution_description="GroundTruth pipeline - preprocesses any new drop images and starts a Labeling Job with Ground Truth"
) # run with default params
execution.describe()
execution.wait()
execution.list_steps()