# Preprocessing Pipeline

## Setup

In [1]:
import boto3
import pandas as pd
import numpy as np
import time
import sagemaker
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup

In [2]:
%env AWS_PROFILE=aeroxye-sagemaker

env: AWS_PROFILE=aeroxye-sagemaker


In [3]:
!aws sts get-caller-identity

{
    "UserId": "AROAWC4YSIQL5OBFCNGEX:botocore-session-1687363488",
    "Account": "418542404631",
    "Arn": "arn:aws:sts::418542404631:assumed-role/SageMaker-UserRole/botocore-session-1687363488"
}


In [4]:
try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='SageMaker-UserRole')['Role']['Arn']

region = boto3.Session().region_name
print(f'Current region: {region}')

boto_session = boto3.Session(region_name=region)
sagemaker_session = sagemaker.Session(boto_session=boto_session)
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
sagemaker_client.list_feature_groups()

featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

Current region: ap-southeast-1


In [5]:
from sagemaker.workflow.pipeline_context import PipelineSession
pipeline_session = PipelineSession()

pipeline_name = "petfinder6000-preprocess-pipeline"  # SageMaker Pipeline name

## Define pipeline parameters

In [6]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString

bucket_name = "petfinder6000"
sklearn_framework_version = "1.2-1"
tf_framework_version = '2.12'
current_time = time.strftime("%Y%m%d-%H-%M-%S", time.gmtime())

# destinations
interactions_s3 = ParameterString(name="InteractionsS3Uri",
                                  default_value=f"s3://{bucket_name}/auxiliary/interactions/interactions-{current_time}")
users_s3 = ParameterString(name="UsersS3Uri",
                                  default_value=f"s3://{bucket_name}/auxiliary/users/users-{current_time}")
lsuo_base = f"s3://{bucket_name}/data/training/lsuo/{current_time}"
lsuo_train = ParameterString(name="LSUOTrainS3Uri", default_value=f"{lsuo_base}/train")
lsuo_validation = ParameterString(name="LSUOValidationS3Uri", default_value=f"{lsuo_base}/validation")
lsuo_test = ParameterString(name="LSUOTestS3Uri", default_value=f"{lsuo_base}/test")
strat_base = f"s3://{bucket_name}/data/training/strat/{current_time}"
strat_train = ParameterString(name="StratifiedTrainS3Uri", default_value=f"{strat_base}/train")
strat_validation = ParameterString(name="StratifiedValidationS3Uri", default_value=f"{strat_base}/validation")
strat_test = ParameterString(name="StratifiedTestS3Uri", default_value=f"{strat_base}/test")

# processing step parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

## Create preprocessing step

In [7]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.processing import FrameworkProcessor, ProcessingOutput

tf_processor = FrameworkProcessor(
    estimator_cls=TensorFlow,
    framework_version=tf_framework_version,
    py_version='py310',
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{bucket_name}/data/auxiliary",
    role=role,
    sagemaker_session=pipeline_session,
    env={"REGION": region},
)

preprocessor_args = tf_processor.run(
    outputs=[
        ProcessingOutput(source=f"/opt/ml/processing/interactions", output_name="interactions", destination=interactions_s3),
        ProcessingOutput(source=f"/opt/ml/processing/users", output_name="users", destination=users_s3),
    ],
    code="process_main.py",
    source_dir="./cleaning_scripts",
    dependencies=[
        './cleaning_scripts/load_data.py',
        './cleaning_scripts/process_cats.py',
        './cleaning_scripts/process_users.py',
        './cleaning_scripts/process_interactions.py',
        './cleaning_scripts/process_images.py',
        './cleaning_scripts/store_feature.py',
    ],
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [8]:
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name="PreprocessData",
    step_args=preprocessor_args,
)

## Create Data Splitting Step

In [9]:
from sagemaker.sklearn import SKLearnProcessor
from sagemaker.processing import ProcessingInput

sklearn_processor = SKLearnProcessor(
    framework_version=sklearn_framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{bucket_name}/training/lsuo",
    role=role,
    sagemaker_session=pipeline_session,
)

lsuo_processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["interactions"].S3Output.S3Uri,
                        destination="/opt/ml/processing/input")
    ],
    outputs=[
        ProcessingOutput(source=f"/opt/ml/processing/train", output_name="train", destination=lsuo_train),
        ProcessingOutput(source=f"/opt/ml/processing/validation", output_name="validation",
                         destination=lsuo_validation),
        ProcessingOutput(source=f"/opt/ml/processing/test", output_name="test", destination=lsuo_test),
    ],
    code="splitting_scripts/leave-some-users-out.py",
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [10]:
step_split_lsuo = ProcessingStep(
    name="SplitLSUO",
    step_args=lsuo_processor_args,
)

In [11]:
sklearn_processor = SKLearnProcessor(
    framework_version=sklearn_framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{bucket_name}/training/strat",
    role=role,
    sagemaker_session=pipeline_session,
)

strat_processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["interactions"].S3Output.S3Uri,
                        destination="/opt/ml/processing/input")
    ],
    outputs=[
        ProcessingOutput(source=f"/opt/ml/processing/train", output_name="train", destination=strat_train),
        ProcessingOutput(source=f"/opt/ml/processing/validation", output_name="validation",
                         destination=strat_validation),
        ProcessingOutput(source=f"/opt/ml/processing/test", output_name="test", destination=strat_test),
    ],
    code="splitting_scripts/stratified-split.py",
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [12]:
step_split_strat = ProcessingStep(
    name="SplitStrat",
    step_args=strat_processor_args,
)

## Define Pipeline

In [13]:
from sagemaker.workflow.pipeline import Pipeline

# Create a Sagemaker Pipeline.
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        interactions_s3,
        users_s3,
        lsuo_train,
        lsuo_validation,
        lsuo_test,
        strat_train,
        strat_validation,
        strat_test,
        processing_instance_type,
        processing_instance_count,
    ],
    steps=[step_process, step_split_lsuo, step_split_strat],
)

In [14]:
pipeline.upsert(role_arn=role)

Using provided s3_resource


INFO:sagemaker.processing:Uploaded ./cleaning_scripts to s3://sagemaker-ap-southeast-1-418542404631/petfinder6000-preprocess-pipeline/code/acd4ad4a8cd13e557ca9539e693d2341/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://sagemaker-ap-southeast-1-418542404631/petfinder6000-preprocess-pipeline/code/052ff1a4768ac08a5304c8ec11b99722/runproc.sh


Using provided s3_resource


{'PipelineArn': 'arn:aws:sagemaker:ap-southeast-1:418542404631:pipeline/petfinder6000-preprocess-pipeline',
 'ResponseMetadata': {'RequestId': '0e8cbd7e-ba7e-4806-9060-fd82704e7273',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '0e8cbd7e-ba7e-4806-9060-fd82704e7273',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '106',
   'date': 'Wed, 21 Jun 2023 16:15:25 GMT'},
  'RetryAttempts': 0}}

In [15]:
execution = pipeline.start()

In [16]:
execution.wait()

KeyboardInterrupt: 