In [1]:
import boto3
import sagemaker
import sagemaker.session
from sagemaker.workflow.pipeline_context import PipelineSession

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
pipeline_session = PipelineSession()
model_package_group_name = "EndEndPackage"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


# Steps
1. Preprocessing
2. Training
3. Evaluation
4. Condition Evaluation
5. Model Registration

In [2]:
# Downlaod the dataset
# https://archive.ics.uci.edu/dataset/1/abalone

!mkdir -p data
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset.csv",
    local_path)

In [3]:
default_bucket

'sagemaker-eu-north-1-919751357950'

In [4]:
base_uri = f"s3://{default_bucket}/aws-mlops-live"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri)
print(input_data_uri)

s3://sagemaker-eu-north-1-919751357950/aws-mlops-live/abalone-dataset.csv


In [5]:
# download second dataset for batch transformation
local_path = "data/abalone-dataset-batch.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch",
    local_path)

base_uri = f"s3://{default_bucket}/aws-mlops-live"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri)
print(batch_data_uri)


s3://sagemaker-eu-north-1-919751357950/aws-mlops-live/abalone-dataset-batch.csv


# Definition of Pipeline Parameters
- processing_instance_count
- input_data
- batch_data
- model_approval_status

In [6]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount",default_value=1)
model_approval_status = ParameterString(name="ModelApprovalStatus",default_value="PendingManualApproval")
input_data = ParameterString(name="InputData",default_value=input_data_uri)
batch_data = ParameterString(name="BatchData",default_value=batch_data_uri)

# Defining the processing step

In [7]:
!mkdir -p abalone

In [8]:
import pandas as pd
df = pd.read_csv("data/abalone-dataset.csv",header=None)
df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8
0,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


In [9]:
df.columns = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
    "rings"]

In [10]:
df.head()

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
0,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


In [11]:
df[df.duplicated()]

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings


In [12]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4177 entries, 0 to 4176
Data columns (total 9 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   sex             4177 non-null   object 
 1   length          4177 non-null   float64
 2   diameter        4177 non-null   float64
 3   height          4177 non-null   float64
 4   whole_weight    4177 non-null   float64
 5   shucked_weight  4177 non-null   float64
 6   viscera_weight  4177 non-null   float64
 7   shell_weight    4177 non-null   float64
 8   rings           4177 non-null   int64  
dtypes: float64(7), int64(1), object(1)
memory usage: 293.8+ KB


In [13]:
df.describe(include='O')

Unnamed: 0,sex
count,4177
unique,3
top,M
freq,1528


In [14]:
df.isna().sum()

sex               0
length            0
diameter          0
height            0
whole_weight      0
shucked_weight    0
viscera_weight    0
shell_weight      0
rings             0
dtype: int64

In [15]:
df.nunique()

sex                  3
length             134
diameter           111
height              51
whole_weight      2429
shucked_weight    1515
viscera_weight     880
shell_weight       926
rings               28
dtype: int64

In [16]:
df.sex.unique()

array(['M', 'F', 'I'], dtype=object)

In [17]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler,OneHotEncoder

numeric_transformer = Pipeline(steps=[("imputer",SimpleImputer(strategy="median")),("scalar",StandardScaler())])

categorical_transformer = Pipeline(steps=[("imputer",SimpleImputer(strategy="constant",fill_value="missing")),
                                    ("onehot",OneHotEncoder())])

In [18]:
df.columns

Index(['sex', 'length', 'diameter', 'height', 'whole_weight', 'shucked_weight',
       'viscera_weight', 'shell_weight', 'rings'],
      dtype='object')

In [19]:
label_column = "rings"
numeric_cols = list(df.columns)
numeric_cols.remove("sex")
numeric_cols.remove("rings")

In [20]:
numeric_cols

['length',
 'diameter',
 'height',
 'whole_weight',
 'shucked_weight',
 'viscera_weight',
 'shell_weight']

In [21]:
categoric_cols = ["sex"]

In [22]:
from sklearn.compose import ColumnTransformer

preprocess = ColumnTransformer(transformers=[
    ("nums",numeric_transformer,numeric_cols),
    ("cat",categorical_transformer,categoric_cols)
])

In [23]:
X = df.drop(columns="rings")
y = df["rings"]

In [24]:
X_preprocessing = preprocess.fit_transform(X)
X_preprocessing

array([[-0.57455813, -0.43214879, -1.06442415, ...,  0.        ,
         0.        ,  1.        ],
       [-1.44898585, -1.439929  , -1.18397831, ...,  0.        ,
         0.        ,  1.        ],
       [ 0.05003309,  0.12213032, -0.10799087, ...,  1.        ,
         0.        ,  0.        ],
       ...,
       [ 0.6329849 ,  0.67640943,  1.56576738, ...,  0.        ,
         0.        ,  1.        ],
       [ 0.84118198,  0.77718745,  0.25067161, ...,  1.        ,
         0.        ,  0.        ],
       [ 1.54905203,  1.48263359,  1.32665906, ...,  0.        ,
         0.        ,  1.        ]])

In [25]:
preprocess.get_feature_names_out()

array(['nums__length', 'nums__diameter', 'nums__height',
       'nums__whole_weight', 'nums__shucked_weight',
       'nums__viscera_weight', 'nums__shell_weight', 'cat__sex_F',
       'cat__sex_I', 'cat__sex_M'], dtype=object)

In [26]:
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",base_job_name="sklearn-job",
    instance_count=processing_instance_count,
    sagemaker_session=pipeline_session,
    role=role   
)

In [27]:
from sagemaker.processing import ProcessingInput,ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[ProcessingInput(source=input_data,destination="/opt/ml/processing/input")],
    outputs=[ProcessingOutput(output_name="train",source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="validation",source="/opt/ml/processing/validation"),
            ProcessingOutput(output_name="test",source="/opt/ml/processing/test")],
    code="abalone/preprocessing.py"
)



In [28]:
processor_args

<sagemaker.workflow.pipeline_context._StepArguments at 0x7f966698f320>

In [29]:
step_process = ProcessingStep(name="Preprocessing",
                              step_args=processor_args)

In [30]:
# training
model_path = f"s3://{default_bucket}/AbaloneTrain"

In [31]:
from sagemaker.estimator import Estimator

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge"
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=model_path,
    sagemaker_session=pipeline_session,
    role=role
)

xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0)

In [32]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

train_args = xgb_train.fit(
    inputs={
        "train":TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation":TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"].S3Output.S3Uri,
            content_type="text/csv"
        )
        },
)

step_train = TrainingStep(
    name="TrainingStep",
    step_args=train_args)

In [33]:
## evaluation

In [34]:
from sagemaker.processing import ScriptProcessor
script_eval = ScriptProcessor(
    image_uri = image_uri,
    command=["python3"],base_job_name="script-eval",
    instance_count = 1,
    instance_type="ml.m5.xlarge",
    role=role,
    sagemaker_session = pipeline_session
)

eval_args = script_eval.run(
    inputs = [ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                              destination="/opt/ml/processing/model"),
              ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
                              destination="/opt/ml/processing/test")],
    outputs = [ProcessingOutput(output_name="evaluation",source="/opt/ml/processing/evaluation")],
    code = "abalone/evaluation.py"
)

In [35]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(name="EvaluationReport",output_name="evaluation",path="evaluation.json")

step_eval = ProcessingStep(name="AbaloneEval",
                           step_args= eval_args,
                           property_files = [evaluation_report] )

In [36]:
# Create the model
from sagemaker.model import Model

model = Model(
    image_uri = image_uri,
    model_data = step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session = pipeline_session,
    role =role)

from sagemaker.inputs import CreateModelInput

inputs = CreateModelInput(
    instance_type = "ml.m5.large",
    accelerator_type = "ml.aia1.medium"
)

In [37]:
from sagemaker.workflow.steps import CreateModelStep

step_create_model = CreateModelStep(name="CreateModel",
                                    model= model,
                                    inputs= inputs)

In [38]:
# Batch Transformation

from sagemaker.transformer import Transformer

transformer = Transformer(
    model_name = step_create_model.properties.ModelName,
    instance_type= "ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform"
)
    

In [39]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

step_transform = TransformStep(
    name = "AbaloneTransform",
    transformer=transformer,
    inputs = TransformInput(data=batch_data)
)

In [40]:
from sagemaker.model_metrics import MetricsSource,ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri = "{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
                ),
            content_type = "application/json"
            )
)

step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types = ["text/csv"],
    response_types = ["text/csv"],
    inference_instances = ["ml.t2.medium","ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)



In [41]:
# condition step

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionLessThanOrEqualTo(
    left = JsonGet(
        step_name=step_eval.name,
        property_file= evaluation_report,
        json_path = "regression_metrics.mse.value"
    ),
    right=6.0
)

step_cond = ConditionStep(
    name = "AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register,step_create_model,step_transform],
    else_steps=[],
)

In [42]:
from sagemaker.workflow.pipeline import Pipeline
pipeline_name = f"Abalone-pipeline"
pipeline= Pipeline(
    name=pipeline_name,
    parameters=[processing_instance_count,model_approval_status,input_data,batch_data],
    steps=[step_process,step_train,step_eval,step_cond])

In [43]:
import json

json.loads(pipeline.definition())




{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-eu-north-1-919751357950/aws-mlops-live/abalone-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-eu-north-1-919751357950/aws-mlops-live/abalone-dataset-batch.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Preprocessing',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '662702820516.dkr.ecr.eu-north-1.amazo

In [44]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:eu-north-1:919751357950:pipeline/Abalone-pipeline',
 'ResponseMetadata': {'RequestId': '3f21bbfc-8b8d-4a41-b55e-b01a9de8a9dd',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '3f21bbfc-8b8d-4a41-b55e-b01a9de8a9dd',
   'strict-transport-security': 'max-age=47304000; includeSubDomains',
   'x-frame-options': 'DENY',
   'content-security-policy': "frame-ancestors 'none'",
   'cache-control': 'no-cache, no-store, must-revalidate',
   'x-content-type-options': 'nosniff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '85',
   'date': 'Sun, 08 Feb 2026 10:26:46 GMT'},
  'RetryAttempts': 0}}