In [2]:
import sagemaker
from sagemaker.pytorch import PyTorch
from sagemaker.tuner import HyperparameterTuner, IntegerParameter, ContinuousParameter
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import TuningStep, TrainingStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model import Model
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, CreateModelStep, TransformStep
from sagemaker.inputs import TransformInput
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)

# SageMaker session setup
role = 'arn:aws:iam::339712893183:role/SagemakerNotebookRole'
sagemaker_session = sagemaker.Session()
pipeline_session = PipelineSession()

# Define the S3 bucket and prefix
bucket = 's3-churn-predictor'
prefix = 'output'

# Preprocessing step
preprocessing_script_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve(framework='sklearn', region=sagemaker_session.boto_region_name, version='0.23-1'),
    command=['python3'],
    instance_type='ml.m5.large',
    instance_count=1,
    role=role,
    sagemaker_session=sagemaker_session
)

preprocessing_step = ProcessingStep(
    name='PreprocessingStep',
    processor=preprocessing_script_processor,
    inputs=[
        ProcessingInput(source='s3://s3-churn-predictor/data/data.csv', destination='/opt/ml/processing/input')
    ],
    outputs=[
        ProcessingOutput(output_name="train", source='/opt/ml/processing/train', destination=f's3://{bucket}/data/train'),
        ProcessingOutput(output_name="test", source='/opt/ml/processing/test', destination=f's3://{bucket}/data/test')
    ],
    code='scripts/preprocessing.py'
)
model_path = f"s3://{bucket}/output"

pytorch_estimator = PyTorch(
    entry_point='train.py',  # Update to your actual script name
    source_dir='scripts',
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    framework_version='1.8.1',
    py_version='py3',
    sagemaker_session=sagemaker_session,
    hyperparameters={
        'bucket-name': bucket  # Using the bucket name
    },
    output_path=model_path  # Set the output path for the model artifacts
)

# Define hyperparameter ranges
hyperparameter_ranges = {
    'lr': ContinuousParameter(0.0001, 0.1),
    'batch-size': IntegerParameter(32, 128)
}

# Create a HyperparameterTuner object
tuner = HyperparameterTuner(
    estimator=pytorch_estimator,
    objective_metric_name='validation:auc',
    objective_type='Maximize',
    metric_definitions=[
        {'Name': 'validation:auc', 'Regex': 'Validation AUC: ([0-9\\.]+)'}
    ],
    hyperparameter_ranges=hyperparameter_ranges,
    max_jobs=10,
    max_parallel_jobs=4
)



# Define the tuning step using the tuner
tuning_step = TuningStep(
    name='ChurnPredictorTuning',
    tuner=tuner,
    inputs={
            "train": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "test": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                content_type="text/csv",
            )
    }
)

# Get the top model's S3 URI using tuning_step
top_model_s3_uri = tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket, prefix=prefix)


# Define the EvaluationStep
evaluation_script_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve(framework='sklearn', region=sagemaker_session.boto_region_name, version='0.23-1'),
    command=['python3'],
    instance_type='ml.m5.large',
    instance_count=1,
    role=role,
    sagemaker_session=pipeline_session
)

evaluation_report = PropertyFile(
    name="ChurnEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

evaluation_step = ProcessingStep(
    name='EvaluationStep',
    processor=evaluation_script_processor,
    inputs=[
        ProcessingInput(source=top_model_s3_uri, destination='/opt/ml/processing/models'),
        ProcessingInput(source=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri, destination='/opt/ml/processing/test')
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source='/opt/ml/processing/evaluation', destination=f's3://{bucket}/evaluation')
    ],
    code='scripts/evaluate.py',
    property_files=[evaluation_report]
)

# Define the Model object using the best model from TuningStep
model = Model(
    image_uri=pytorch_estimator.training_image_uri(),
    model_data=top_model_s3_uri,  # Use the top model's S3 URI directly
    role=role,
    sagemaker_session=pipeline_session
)

# Define the CreateModelStep using the top model's S3 URI
create_model_step = CreateModelStep(
    name='CreateModelStep',
    model=model,
    inputs=sagemaker.inputs.CreateModelInput(instance_type='ml.m5.large')
)

# Define ModelMetrics for registration
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="s3://s3-churn-predictor/evaluation/evaluation.json",
        content_type="application/json"
    )
)

# Define model_package_group_name
model_package_group_name = "ChurnModelPackageGroup"

# Define RegisterModelStep using estimator and model_data
register_model_step = RegisterModel(
    name="RegisterChurnModel",
    estimator=pytorch_estimator,
    model_data=top_model_s3_uri,  # Assuming top_model_s3_uri is defined earlier
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_metrics=model_metrics
)



# Define the ConditionStep
condition_step = ConditionStep(
    name="CheckAUC",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=evaluation_step.name,
                property_file=evaluation_report,
                json_path="classification_metrics.auc_score.value"
            ),
            right=0.75
        )
    ],
    if_steps=[create_model_step, register_model_step],
    else_steps=[]
)

# Define the pipeline
pipeline = Pipeline(
    name='ChurnPredictorPipeline',
    parameters=[],
    steps=[preprocessing_step, tuning_step, evaluation_step, condition_step],
    sagemaker_session=pipeline_session
)

# Create and start the pipeline
pipeline.create(role_arn=role)
pipeline.start()


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:339712893183:pipeline/ChurnPredictorPipeline/execution/jubmpx3lqdn0', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x7facf4c4cd00>)