In [1]:
#import os
import json
#import boto3
import sagemaker

from sagemaker.inputs import TrainingInput

from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.sklearn.model import SKLearnModel

from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CacheConfig
from sagemaker.workflow.lambda_step import LambdaStep, Lambda

#from sagemaker.workflow.step_collections import RegisterModel # Para registro y auditoría del modelo y luego despliegue fuera de un pipeline
from sagemaker.workflow.model_step import ModelStep # Para registro y auditoría del modelo como step de pipeline y luego despliegue
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.pipeline_context import PipelineSession

import sys

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/xdg-ubuntu/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/lromero/.config/sagemaker/config.yaml


In [2]:
default_bucket = "pipeline-test-ml-sklearn-randomforest-artifacts"

sagemaker_session = sagemaker.Session(default_bucket=default_bucket)
pipeline_session = PipelineSession(default_bucket=default_bucket)

sm_client = sagemaker_session.sagemaker_client
region = sagemaker_session.boto_region_name
role = "arn:aws:iam::007863746889:role/sagemakerS3"

account_id = sagemaker_session.account_id()

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials


In [3]:
prefix_input_data = "data/raw/"
base_job_prefix = "randomForest-pipeline"

processing_instance_count = 1
training_instance_count = 1

processing_instance_type = "ml.t3.large"
training_instance_type = "ml.m5.large"

input_data = ParameterString(name="InputRawData", default_value=f"s3://{default_bucket}/{prefix_input_data}")
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

# Cache Pipeline steps to reduce execution time on subsequent executions
cache_config = CacheConfig(enable_caching=True, expire_after="10d")

# Preprocessing

In [None]:
# Process the training data step using a python script.
# Split the training data set into train, test, and validation datasets

sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-LoanDefault-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
    
)

processor_args = {
    "outputs": [
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=f"s3://{default_bucket}/data/train/"
        ),
        ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/validation",
            destination=f"s3://{default_bucket}/data/validation/"
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test",
            destination=f"s3://{default_bucket}/data/test/"
        )
    ],
    "inputs": [
        ProcessingInput(
            source=input_data,
            destination="/opt/ml/processing/input/input_data",
            input_name="input_data",
            s3_input_mode="File"
        )
    ],
    "code": "code/preprocess.py",
    "arguments": ["--input-data", "/opt/ml/processing/input/input_data"]
}
processor_args = sklearn_processor.run(**processor_args)

step_process = ProcessingStep(
    name="PreprocessAbaloneData",
    step_args=processor_args,
    cache_config=cache_config
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


# Training

In [None]:
min_samples_split = ParameterInteger(name="MinSamplesSplit", default_value=10)
class_weight = ParameterString(name="ClassWeight", default_value="{0:1, 1:25}")
max_depth = ParameterInteger(name="MaxDepth", default_value=5)
n_estimator = ParameterInteger(name="NEstimators", default_value=100)

sklearn_train_estimator = SKLearn(
    entry_point="code/train.py",
    framework_version="1.2-1",
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    output_path=f"s3://{default_bucket}/model_artifacts",
    script_mode=True,
    role=role,
    py_version="py3",
    base_job_name=f"{base_job_prefix}/sklearn-LoanDefault-training",
    sagemaker_session=pipeline_session,
    hyperparameters={
        "n-estimators": n_estimator,
        "max-depth": max_depth,
        "class-weight": class_weight,
        "min-samples-split": min_samples_split
    }
)

train_args = {
    "inputs": {
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type= "text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
}
train_args = sklearn_train_estimator.fit(**train_args)

step_train = TrainingStep(
    name="TrainSklearnLoanDefaultModel",
    step_args=train_args,
    cache_config=cache_config
)

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


# Evaluation

In [6]:
evaluation_processor = ScriptProcessor(
    role=role,
    image_uri=sklearn_train_estimator.image_uri,
    instance_count=training_instance_count,
    instance_type=processing_instance_type,
    base_job_name="evaluationAbaloneModel",
    sagemaker_session=pipeline_session,
    command=["python3"]
)

evaluation_args = {
    "inputs":[
        ProcessingInput(
            input_name="Model",
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            input_name="train_data",
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/data/test"
        )
    ],
    "outputs":[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=f"s3://{default_bucket}/code/evaluation_report"
        )
    ],
    "code":"code/evaluation.py",
}
evaluation_report = PropertyFile(
    name="AbaloneEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

evaluation_args = evaluation_processor.run(**evaluation_args)

step_evaluation = ProcessingStep(
    name="EvaluateSKlearnAbaloneModel",
    step_args=evaluation_args,
    cache_config=cache_config,
    property_files=[evaluation_report]
)

# Model and conditional step

In [7]:
sklearn_model = SKLearnModel(
    entry_point="code/inference.py",
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    image_uri=sklearn_train_estimator.image_uri,
    sagemaker_session=pipeline_session
)

sklearn_model_args = dict(
    content_types=["text/csv"],
    response_types=["text/csv"],
    model_package_group_name="YourSKLearnModelGroup",
    approval_status="Approved"
)

sklearn_model_step_auto = ModelStep(
    name="AbaloneApprovedModel",
    step_args=sklearn_model.register(**sklearn_model_args)
)

sklearn_model_args["approval_status"] = "Rejected"
sklearn_model_step_rejected = sklearn_model_args = ModelStep(
    name="AbaloneRejectedModel",
    step_args=sklearn_model.register(**sklearn_model_args)
)

In [8]:
f1_score = JsonGet(
    step_name=step_evaluation.name,
    property_file=evaluation_report,
    json_path="classification_metrics.f1.value"
)

# Automatic lambda deplyment (severless)
lambda_deploy = LambdaStep(
    name="DeployModelWithLambda",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-east-1:007863746889:function:SeverlessDeploySagemakerPIpeline",
        session=pipeline_session,
    ),
    inputs={
        "model_package_arn": sklearn_model_step_auto.properties.ModelPackageArn
    }
)

condition_step = ConditionStep(
    name="CheckF1Score",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=f1_score,
            right=0.8
        )
    ],
    if_steps=[sklearn_model_step_auto, lambda_deploy],
    else_steps=[sklearn_model_step_rejected]
)

# Pipeline Definition

In [5]:
pipeline_instance = Pipeline(
    name="PipelineSkLernLoanDefault",
    parameters=[input_data],# n_estimators, max_depth],
    steps=[step_process],# step_train, step_evaluation, condition_step],
    sagemaker_session=pipeline_session
)

pipeline_definition = json.loads(pipeline_instance.definition())
pipeline_definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InputRawData',
   'Type': 'String',
   'DefaultValue': 's3://pipeline-test-ml-sklearn-randomforest-artifacts/data/raw/'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'PreprocessAbaloneData',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.t3.large',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3',
     'ContainerArguments': ['--input-data',
      '/opt/ml/processing/input/input_data'],
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/preprocess.py']},
    'RoleArn': 'arn:aws:iam::007863746889:role/sagemakerS3',
    'ProcessingInputs': [{'InputName': 'input_data',
      'AppManaged': False,
   

In [6]:
pipeline_instance.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:007863746889:pipeline/PipelineSkLernLoanDefault',
 'ResponseMetadata': {'RequestId': '7a350a8d-16ce-4eed-a49b-e0127be7bc6d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7a350a8d-16ce-4eed-a49b-e0127be7bc6d',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '115',
   'date': 'Fri, 25 Jul 2025 05:48:09 GMT'},
  'RetryAttempts': 0}}

In [7]:
execution = pipeline_instance.start()
execution.wait()