In [1]:
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.processing import FrameworkProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import CacheConfig
from sagemaker.tuner import HyperparameterTuner, ContinuousParameter, IntegerParameter
from sagemaker.inputs import TrainingInput
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep
from sagemaker.xgboost import XGBoost
from sagemaker.estimator import InstanceGroup
from sagemaker.workflow.functions import Join
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn import SKLearnModel
from sagemaker import ModelPackage

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [6]:
class ChurnPredictionPipeline:
    def __init__(self, raw_data_s3_uri, output_bucket, output_prefix,
                 artifacts_path):
        self.execution = None
        self.sagemaker_session = sagemaker.Session()
        self.role = sagemaker.get_execution_role()
        self.bucket = self.sagemaker_session.default_bucket()
        self.region = self.sagemaker_session.boto_region_name
        self.raw_data_s3_uri = raw_data_s3_uri
        self.output_bucket = output_bucket
        self.output_prefix = output_prefix
        self.artifacts_path = artifacts_path
        self.pipeline_session = PipelineSession()
        self.cache_config = CacheConfig(enable_caching=True, expire_after="30d")

    def create_data_prep_stage(self):
        sklearn_processor = SKLearnProcessor(
            framework_version="1.2-1",
            role=self.role,
            instance_type="ml.t3.medium",
            instance_count=1,
        )
        data_prep = ProcessingStep(
            name="ChurnDataPrep",
            processor=sklearn_processor,
            code="scripts/data_ingestion.py",
            inputs=[
                ProcessingInput(source=self.raw_data_s3_uri, destination="/opt/ml/processing/input")
            ],
            outputs=[
                ProcessingOutput(
                    output_name="train",
                    source="/opt/ml/processing/output/train",
                    destination="s3://sagemakerantdata/smallchurndataset/processed/train"
                ),
                ProcessingOutput(
                    output_name="test",
                    source="/opt/ml/processing/output/test",
                    destination="s3://sagemakerantdata/smallchurndataset/processed/test"
                ),
            ],
        )
        return data_prep

    def model_traing_and_tuning(self, data_prep):
        output_bucket = self.output_bucket
        output_prefix = self.output_prefix
        output_path = f"s3://{output_bucket}/{output_prefix}"

        sklearn_estimator_rf = SKLearn(
                entry_point="scripts/train.py",
                framework_version="1.2-1",
                instance_type="ml.m5.large",
                instance_count=1,
                role=self.role,
                hyperparameters={"model_type": "randomforest"},
                output_path=output_path
            )
        tuner_rf = HyperparameterTuner(
            estimator=sklearn_estimator_rf,
            objective_metric_name="Validation F1 Score",
            hyperparameter_ranges={
                "n_estimators": IntegerParameter(100, 300),
                "max_depth": IntegerParameter(5, 20),
            },
            metric_definitions=[
                {"Name": "Validation F1 Score", "Regex": "Validation F1 Score: ([0-9\\.]+)"}
            ],
            max_jobs=6,
            max_parallel_jobs=2,
            objective_type="Maximize",
        )
        tune_step_rf = TuningStep(
            name="TuneRandomForest",
            tuner=tuner_rf,
            inputs={
                "train": TrainingInput(
                    s3_data=data_prep.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri
                ),
                "test": TrainingInput(
                    s3_data=data_prep.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri
                ),
            },
            cache_config=self.cache_config
        )
        # Logistic Regression
        logistic_estimator = SKLearn(
            entry_point="scripts/train.py",
            framework_version="1.2-1",
            instance_type="ml.m5.large",
            instance_count=1,
            role=self.role,
            hyperparameters={"model_type": "logistic"},
            output_path=output_path
        )
        tuner_logistic = HyperparameterTuner(
            estimator=logistic_estimator,
            objective_metric_name="Validation F1 Score",
            hyperparameter_ranges={
                "C": ContinuousParameter(0.001, 10.0),
                # penalty can only take ["l1", "l2"], so not tunable like numeric params
            },
            metric_definitions=[
                {"Name": "Validation F1 Score", "Regex": "Validation F1 Score: ([0-9\\.]+)"}
            ],
            max_jobs=4,
            max_parallel_jobs=2,
            objective_type="Maximize",
        )
        tune_step_logistic = TuningStep(
            name="TuneLogisticRegression",
            tuner=tuner_logistic,
            inputs={
                "train": TrainingInput(
                    s3_data=data_prep.
                    properties.
                    ProcessingOutputConfig.
                    Outputs["train"].S3Output.S3Uri
                ),
                "test": TrainingInput(
                    s3_data=data_prep.
                    properties.
                    ProcessingOutputConfig.
                    Outputs["test"].S3Output.S3Uri
                ),
            },
            cache_config=self.cache_config
        )
        return [tune_step_rf, tune_step_logistic]

    def register_best_model(self, tune_step_rf, tune_step_logistic):
        best_rf_f1 = (
                tune_step_rf
                .properties
                .BestTrainingJob
                .FinalHyperParameterTuningJobObjectiveMetric
                .Value
        )
        best_logistic_f1 = (
                tune_step_logistic
                .properties
                .BestTrainingJob
                .FinalHyperParameterTuningJobObjectiveMetric
                .Value
        )
        rf_model = SKLearnModel(
            model_data=tune_step_rf.get_top_model_s3_uri(
                             top_k=1,
                             s3_bucket=self.artifacts_path
                        ),
            role=self.role,
            entry_point="scripts/inference.py",
            framework_version="1.2-1",
            sagemaker_session=self.pipeline_session,
        )
        rf_register_args = rf_model.register(
            content_types=["text/csv"],
            response_types=["text/csv"],
            inference_instances=["ml.m5.large"],
            transform_instances=["ml.m5.large"],
            model_package_group_name="ChurnPredictionModelGroup",
        )
        step_register_rf_model = ModelStep(
            name="RegisterRFModelConditional",
            step_args=rf_register_args,
        )
        # Same for logistic:
        log_model = SKLearnModel(
            model_data=tune_step_logistic.get_top_model_s3_uri(
                                                top_k=1,
                                                s3_bucket=self.artifacts_path
                                            ),
            role=self.role,
            entry_point="scripts/inference.py",
            framework_version="1.2-1",
            sagemaker_session=self.pipeline_session,
        )
        log_register_args = log_model.register(
            content_types=["text/csv"],
            response_types=["text/csv"],
            inference_instances=["ml.m5.large"],
            transform_instances=["ml.m5.large"],
            model_package_group_name="ChurnPredictionModelGroup",
        )
        step_register_logistic_model = ModelStep(
            name="RegisterLogisticModelConditional",
            step_args=log_register_args,
        )
        # Conditional step to choose and register the best model
        choose_best_model_step = ConditionStep(
            name="ChooseAndRegisterBestModelStep",  # Made name more explicit
            conditions=[ConditionGreaterThan(left=best_rf_f1,
                                             right=best_logistic_f1)],
            if_steps=[step_register_rf_model],
            else_steps=[step_register_logistic_model],
        )
        return choose_best_model_step

    def create_pipeline(self):

        data_prep = self.create_data_prep_stage()
        [tune_step_rf, tune_step_logistic] = self. model_traing_and_tuning(
                                            data_prep)
        choose_best_model_step = self.register_best_model(tune_step_rf,
                                                          tune_step_logistic)
        pipeline = Pipeline(
            name="ChurnPredictionPipelineV2",
            steps=[
                data_prep,           # Data preparation step
                tune_step_rf,        # RF hyperparameter tuning
                tune_step_logistic,  # Logistic hyperparameter tuning
                choose_best_model_step  # Conditional step (contains the RegisterModel steps)
            ],
            sagemaker_session=self.sagemaker_session,
        )
        return pipeline

    def run_pipeline(self):
        pipeline = self.create_pipeline()
        pipeline.upsert(role_arn=self.role)
        self.execution = pipeline.start()
        self.execution.describe()

    def check_pipeline_status(self):
        if self.execution is None:
            raise RuntimeError("Pipeline has not been started yet")
        steps = self.execution.list_steps()
        for step in steps:
            print(f"{step['StepName']} → {step['StepStatus']}")
            if 'FailureReason' in step:
                print(f"   Reason: {step['FailureReason']}")

    def get_model_arn(self, execution):
        steps = self.execution.list_steps()
        for step in steps:
            if step["StepName"] == "ChooseAndRegisterBestModelStep":
                outcome = step["Metadata"]["Condition"]["Outcome"]
                print("Condition outcome:", outcome)
        if outcome == "True":  # RF was better
            best_model_step = [s for s in steps if "RegisterRFModelConditional" in s["StepName"]][0]
        else:  # Logistic was better
            best_model_step = [s for s in steps if "RegisterLogisticModelConditional" in s["StepName"]][0]

        model_arn = best_model_step["Metadata"]["RegisterModel"]["Arn"]
        print("Best model ARN:", model_arn)
        return model_arn

    def sagemaker_endpoint(self):
        model_package = get_model_arn()
        model = ModelPackage(
            role=role,
            model_package_arn=model_package,
            sagemaker_session=self.sagemaker_session,
        )

        predictor = model.deploy(
            initial_instance_count=1,
            instance_type="ml.m5.large",
            endpoint_name="churn-prediction-endpoint",
        )
        return predictor

    def deploy_model_with_sagemaker_endpoint(self):
        predictor = self.sagemaker_endpoint()
        return predictor


In [7]:
def entry_point():
    raw_data_s3_uri = "s3://sagemakerantdata/smallchurndataset/raw/"
    output_bucket = "sagemakerantdata"
    output_prefix = "smallchurndataset/artifacts"
    artifacts_path = "sagemakerantdata/smallchurndataset/artifacts"
    pipeline_instance = ChurnPredictionPipeline(
        raw_data_s3_uri=raw_data_s3_uri,
        output_bucket=output_bucket,
        output_prefix=output_prefix,
        artifacts_path=artifacts_path
    )
    return pipeline_instance

In [9]:
pipeline_instance = entry_point()

In [10]:
pipeline_instance.run_pipeline()

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [13]:
pipeline_instance.check_pipeline_status()

TuneLogisticRegression → Executing
TuneRandomForest → Executing
ChurnDataPrep → Succeeded
