In [26]:
import sagemaker

from sagemaker.processing import ProcessingInput, ProcessingOutput

from sagemaker.workflow.parameters import Parameter

from sagemaker.workflow.pipeline import Pipeline

from sagemaker.workflow.steps import ProcessingStep, TrainingStep

from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo

from sagemaker.sklearn.processing import SKLearnProcessor

from sagemaker import get_execution_role

from sagemaker.estimator import Estimator

from sagemaker.inputs import TrainingInput

import boto3

# 1. Parameters (could be passed into the pipeline)

processing_instance_type = Parameter("ProcessingInstanceType", default_value="ml.m5.xlarge")

training_instance_type = Parameter("TrainingInstanceType", default_value="ml.m5.2xlarge")

model_approval_status = Parameter("ModelApprovalStatus", default_value="Pending")

role = get_execution_role()

region = "us-west-2"

default_bucket = "databucket-us-west-2-0205956924056955"

def get_session(region, default_bucket):

    """Gets the sagemaker session based on the region.

 

    Args:

        region: the aws region to start the session

        default_bucket: the bucket to use for storing the artifacts

 

    Returns:

        `sagemaker.session.Session instance

    """

    boto_session = boto3.Session(region_name=region)

 

    sagemaker_client = boto_session.client("sagemaker")

    runtime_client = boto_session.client("sagemaker-runtime")

    return sagemaker.session.Session(

        boto_session=boto_session,

        sagemaker_client=sagemaker_client,

        sagemaker_runtime_client=runtime_client,

        default_bucket=default_bucket,

    )


# 2. Preprocessing Step

processor = SKLearnProcessor(

    framework_version="0.20.0",
    
    role = role,
    
    instance_type = training_instance_type,
    
    instance_count = 1,
    

    base_job_name="stc-preprocessing",


)

 

step_process = ProcessingStep(

    name="PreprocessData",

    processor=processor,
    
     outputs=[

        ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train"),

        ProcessingOutput(output_name="validation", source="/opt/ml/processing/output/validation"),

        ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test"),

    ],
    
    code="preprocessing.py",
    
      inputs=[

        ProcessingInput(source="s3://databucket-us-west-2-0205956924056955/churn.csv", destination="/opt/ml/processing/input"),

    ]

)

 

# 3. Training Step



image_uri = sagemaker.image_uris.retrieve(

        framework="xgboost",

        region=region,

        version="1.0-1",

        py_version="py3",

        instance_type=training_instance_type,

    )

 
model_path = "s3://databucket-us-west-2-0205956924056955/model/"
base_job_prefix = "stc-"
sagemaker_session = get_session(region, default_bucket)

xgb_train = Estimator(

    image_uri=image_uri,

    instance_type=training_instance_type,

    instance_count=1,

    output_path=model_path,

    base_job_name=f"{base_job_prefix}/stc-train",

    sagemaker_session=sagemaker_session,

    role=role,

)

 

xgb_train.set_hyperparameters(

    objective="binary:logistic",

    num_round=50,

    max_depth=5,

    eta=0.2,

    gamma=4,

    min_child_weight=6,

    subsample=0.7,

    silent=0,

)

 

step_train = TrainingStep(

    name="trainstcmodel",

    depends_on=["PreprocessData"],

    estimator=xgb_train,

    inputs={

        "train": TrainingInput(

            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[

                "train"

            ].S3Output.S3Uri,

            content_type="text/csv",

        ),

        "validation": TrainingInput(

            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[

                "validation"

            ].S3Output.S3Uri,

            content_type="text/csv",

        ),

    },

)
 

# 4. Evaluation Step (Using a Processing Job)

# script_eval = ScriptProcessor(

#     image_uri="your-evaluation-image-uri",  # Docker image for evaluation

#     command=["python"],

#     instance_type=processing_instance_type,

#     base_job_name="sagemaker-evaluation",

#     inputs=[

#         ProcessingInput(source=training_step.properties.ModelArtifacts.S3ModelArtifacts,

#                         destination="/opt/ml/processing/model"),

#         ProcessingInput(source=processing_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,

#                         destination="/opt/ml/processing/test"),

#     ],

#     outputs=[

#         ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),

#     ],

# )

 

# evaluation_step = ProcessingStep(

#     name="EvaluateModel",

#     processor=script_eval

# )

 

# 5. Condition Step (Check evaluation metric like AUC)

# condition_auc = ConditionGreaterThanOrEqualTo(

#     left=evaluation_step.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri,

#     right=0.8  # Example threshold

# )

 

# 6. Model Registration Step

# model = sagemaker.Model(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts)

# model_register_step = sagemaker.workflow.steps.CreateModelStep(

#     name="RegisterModel",

#     model=model,

#     model_approval_status=model_approval_status,

#     condition=condition_auc

# )

 

# 7. Pipeline Construction

import random as rd

pipeline = Pipeline(

    name=f"SageMakerMLPipeline-{rd.randint(1,10000)}",

    parameters=[processing_instance_type, training_instance_type, model_approval_status],

    steps=[step_process, step_train],
    #steps=[processing_step, training_step, evaluation_step, model_register_step],

)

pipeline.create(role_arn=role)
execution = pipeline.start()
execution.wait()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml




WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"