In [None]:
pip install -U sagemaker

In [1]:
import sagemaker
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.processing import Processor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.inputs import TrainingInput
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.sklearn.estimator import SKLearn
from time import gmtime, strftime
import boto3
from sagemaker import Session

from sagemaker.workflow.pipeline_context import PipelineSession
# from sagemaker.workflow.steps import CreateEndpointConfigStep, CreateEndpointStep

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
# Parameters for the pipeline
pipeline_name = "SklearnPipeline"
instance_type = ParameterString(name="InstanceType", default_value="ml.t3.medium")
model_name = ParameterString(name="ModelName", default_value="model")
instance_count = ParameterInteger(name="InstanceCount", default_value=1)
role = sagemaker.get_execution_role()
print(role)


arn:aws:iam::767397996001:role/service-role/AmazonSageMaker-ExecutionRole-20250120T122462


In [3]:
pipeline_session = PipelineSession()

s3 = boto3.client('s3')
bucket_name = 'ml-ops-zenon'  # Your S3 bucket
script_path = 'train.py'  # Path to the script locally
s3_key = 'scripts/train.py'  # Destination in S3

s3.upload_file(script_path, bucket_name, s3_key)
print("Done")

Done


In [4]:
# Define the estimator
sklearn_estimator = SKLearn(
    entry_point='train.py',
    role=role,  # SageMaker execution role
    instance_type='ml.t3.medium',  # You can customize the instance type
    instance_count=1,
    framework_version='1.2-1',  # Specify the version of scikit-learn you want
    py_version='py3',
    output_path=f's3://{bucket_name}/output',
    source_dir=f's3://{bucket_name}/scripts/',  # Path to your train.py script
)


In [5]:
# Define the Training step in the pipeline
training_step = TrainingStep(
    name="TrainingStep",
    estimator=sklearn_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=f's3://{bucket_name}/Input/diabetes-dev-1.csv',  # Your dataset in S3
            content_type="text/csv"
        )
    }
)

In [6]:
# retrieve sklearn image
session = boto3.Session()  # Initialize boto3 session
region = session.region_name 

image_uri = sagemaker.image_uris.retrieve(
    framework="sklearn",
    region=region,
    version="1.2-1",
    py_version="py3",
    instance_type="ml.t3.medium",
)

# Define the model artifact output from the training job
# model_artifact_uri = f's3://{bucket_name}/output/random_forest_model.tar.gz'
model_artifact_uri = training_step.properties.ModelArtifacts.S3ModelArtifacts


# Define the model for deployment
model = Model(
    image_uri=image_uri,  # Scikit-learn container image for inference
    model_data=model_artifact_uri,  # Path to the trained model artifact in S3
    role=role,  # IAM role for SageMaker
    entry_point="inference.py",  # Inference script
    source_dir="./scripts",  # Path to the directory with scripts in S3
    sagemaker_session=pipeline_session
)


# create_model_step = ModelStep(
#     name="CreateModelStep",
#     model=model,
#     # inputs={
#         # 'SAGEMAKER_PROGRAM': 'inference.py',  # Inference script for the model
#         # 'SAGEMAKER_SUBMIT_DIRECTORY': model_artifacts
#     # }
# )


# Define the CreateModelStep properly using the `Model` properties
create_model_step = ModelStep(
    name="CreateModelStep",
    step_args=model.create(
        instance_type="ml.t3.medium",  # Set your inference instance type
        # You can add other deployment options here
    )
)




In [7]:
# Register the model to SageMaker Model Registry
register_model_step = RegisterModel(
    name="RegisterModelStep",
    estimator=sklearn_estimator,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t3.medium"],
    transform_instances=["ml.t3.medium"],
    repack_model=False
)


In [None]:
# Define the endpoint configuration step
# endpoint_config_step = CreateEndpointConfigStep(
#     name="CreateEndpointConfigStep",
#     endpoint_config_name=f"{model_name}_EndpointConfig",  # Endpoint config name
#     model_name=create_model_step.properties.ModelName,  # Use the model name from the model step
#     instance_type="ml.t3.medium",  # Inference instance type
#     initial_instance_count=1
# )

# sagemaker_client.create_endpoint_config(
#     EndpointConfigName=endpoint_config_name,
#     ProductionVariants=[
#         {
#             "VariantName": "AllTraffic",
#             "ModelName": create_model_step.properties.ModelName,
#             "InitialInstanceCount": 1,
#             "InstanceType": "ml.t3.medium",
#             "InitialVariantWeight": 1.0
#         }
#     ]
# )

In [8]:
# Generate endpoint name
endpoint_name = f"sklearn-endpoint-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"

# endpoint_step = CreateEndpointStep(
#     name="CreateEndpointStep",
#     endpoint_name=endpoint_name,
#     endpoint_config_name=endpoint_config_step.properties.EndpointConfigName  # Use config from previous step
# )

# Call sagemaker_client.create_endpoint() to create the endpoint
# sagemaker_client.create_endpoint(
#     EndpointName=endpoint_name,
#     EndpointConfigName=endpoint_config_name
# )

In [9]:
pipeline = Pipeline(
    name="SklearnPipeline",
    steps=[training_step]
)


In [10]:
# Start the pipeline execution
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()

Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7fb767d393d0>>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(

KeyboardInterrupt: 


KeyboardInterrupt: 

In [None]:
status = execution.describe()["PipelineExecutionStatus"]
print(f"Pipeline execution status: {status}")