In [1]:
import sagemaker
import boto3
import warnings
from sagemaker.inputs import TrainingInput
from sagemaker.model import Model
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.parameters import ParameterString

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [2]:
# Initialize SageMaker session and client
sagemaker_session = sagemaker.Session()
sm_client = boto3.client("sagemaker")
pipeline_session = PipelineSession()
role = sagemaker.get_execution_role()  # Replace with your SageMaker execution role
warnings.filterwarnings("ignore")

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


In [3]:
# Define input parameters
input_data_uri = ParameterString(
    name="InputDataUri", default_value="s3://hemz-sagemaker-bucket/input-data/"
)

output_data_uri = ParameterString(
    name="OutputDataUri", default_value="s3://hemz-sagemaker-bucket/output-data/"
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="Approved"
)

training_image_uri = (
    "654654222480.dkr.ecr.ap-south-1.amazonaws.com/prophet_training:latest"
)

model_package_group_name = "ProphetModelGroup"

In [4]:
# Define preprocessing step
script_processor = ScriptProcessor(
    image_uri=training_image_uri,  # Use a custom image with Prophet installed
    command=["python3"],
    role=role,
    sagemaker_session=pipeline_session,
    instance_count=1,
    instance_type="ml.t3.medium",
)

In [5]:
preprocessing_step = ProcessingStep(
    name="PreprocessData",
    processor=script_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            source=input_data_uri, destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="train",
            destination=f"{output_data_uri.default_value}train",
            source="/opt/ml/processing/output/train",
        )
    ],
    code="code/preprocess.py",  # Replace with your preprocessing script
)

In [6]:
# Define the custom estimator for Prophet
prophet_estimator = sagemaker.estimator.Estimator(
    image_uri=training_image_uri,  # Use a custom image with Prophet installed
    role=role,
    sagemaker_session=pipeline_session,
    instance_count=1,
    instance_type="ml.m5.large",
    entry_point="code/train.py",
    script_mode=True,
    output_path="s3://hemz-sagemaker-bucket/output-data/model-artifacts/",
)

In [7]:
# Define training step
training_step = TrainingStep(
    name="TrainModel",
    estimator=prophet_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        )
    },
)

In [8]:
model = Model(
    image_uri=training_image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

In [9]:
register_model_step = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
)

In [10]:
# Define model registration step
register_model_step = ModelStep(name="RegisterModel", step_args=register_model_step)

In [11]:
# Training pipeline
training_pipeline = Pipeline(
    name="TrainingPipeline",
    parameters=[input_data_uri, output_data_uri, model_approval_status],
    steps=[preprocessing_step, training_step, register_model_step],
)

In [12]:
# Execute pipelines
training_pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:ap-south-1:654654222480:pipeline/TrainingPipeline',
 'ResponseMetadata': {'RequestId': 'e72484e6-0cf1-43a5-b906-d57683d74fbb',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e72484e6-0cf1-43a5-b906-d57683d74fbb',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '85',
   'date': 'Sun, 18 Aug 2024 10:15:06 GMT'},
  'RetryAttempts': 0}}

In [13]:
# Start pipelines
execution = training_pipeline.start()

In [14]:
execution.wait()