In [None]:
# !pip install boto3
# !pip install pandas
# !pip install openpyxl

In [2]:
"""
============================================================================
COMPLETE AWS SAGEMAKER PIPELINE - CUSTOMER CHURN PREDICTION
============================================================================
Real-life scenario: A telecom company wants to predict which customers 
will cancel their subscription (churn). The pipeline automatically:
1. Preprocesses customer data daily
2. Trains a model to predict churn
3. Evaluates model performance
4. Registers the model if it performs well

This runs automatically every night to keep predictions fresh.
============================================================================
"""



In [None]:
import boto3
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.parameters import ParameterString, ParameterFloat
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.inputs import TrainingInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker import get_execution_role, Session
import logging


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [4]:
# Setup logging to track pipeline execution
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ============================================================================
# CONFIGURATION AND SETUP
# ============================================================================

# Get AWS resources
sagemaker_session = Session()
region = boto3.Session().region_name
role = get_execution_role()  # IAM role with SageMaker permissions
default_bucket = sagemaker_session.default_bucket()  # S3 bucket for storing data

logger.info(f"Region: {region}")
logger.info(f"Role: {role}")
logger.info(f"Default bucket: {default_bucket}")

# Define S3 paths for data storage
base_path = f"s3://{default_bucket}/churn-pipeline"
input_data_path = f"{base_path}/input-data"
output_data_path = f"{base_path}/output-data"
model_path = f"{base_path}/models"

INFO:__main__:Region: ap-south-1
INFO:__main__:Role: arn:aws:iam::542810186858:role/service-role/AmazonSageMaker-ExecutionRole-20251230T121523
INFO:__main__:Default bucket: sagemaker-ap-south-1-542810186858


In [5]:
# ============================================================================
# PIPELINE PARAMETERS - Can be changed at runtime without code changes
# ============================================================================

# Input data location (can be updated for new data batches)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"{input_data_path}/customer_data.csv"
)

# Instance type for processing (can scale up/down based on data size)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

# Instance type for training
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)

# Minimum accuracy threshold for model deployment
# Only models with accuracy >= this value will be registered
model_approval_threshold = ParameterFloat(
    name="ModelApprovalThreshold",
    default_value=0.75  # 75% accuracy minimum
)

logger.info("Pipeline parameters configured")

INFO:__main__:Pipeline parameters configured


In [None]:
# ============================================================================
# STEP 1: DATA PREPROCESSING
# ============================================================================
# This step cleans raw data, handles missing values, encodes categories,
# and splits into train/test sets

logger.info("Configuring preprocessing step...")

# Create processor instance - this defines the compute resources
preprocessing_processor = SKLearnProcessor(
    framework_version="1.2-1",  # Scikit-learn version
    role=role,
    instance_type=processing_instance_type,
    instance_count=1,  # Number of instances (can scale for large data)
    base_job_name="churn-preprocessing",
    sagemaker_session=sagemaker_session
)

# Define the preprocessing step
preprocessing_step = ProcessingStep(
    name="PreprocessCustomerData",  # Step name shown in SageMaker UI
    processor=preprocessing_processor,
    
    # Path to preprocessing script
    code="src/preprocessing.py",
    
    # Input: Raw customer data from S3
    inputs=[
        ProcessingInput(
            source=input_data,
            destination="/opt/ml/processing/input",  # Where data lands in container
            input_name="raw_data"
        )
    ],
    
    # Outputs: Processed train and test datasets
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",  # Where script saves train data
            destination=f"{output_data_path}/train"  # S3 destination
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test",
            destination=f"{output_data_path}/test"
        ),
        ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/validation",
            destination=f"{output_data_path}/validation"
        )
    ],
    
    # Job configuration
    job_arguments=["--train-split", "0.7", "--test-split", "0.2"]  # 70-20-10 split
)

logger.info("Preprocessing step configured")


INFO:__main__:Configuring preprocessing step...
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:__main__:Preprocessing step configured


In [7]:
# ============================================================================
# STEP 2: MODEL TRAINING
# ============================================================================
# This step trains a Random Forest model on the preprocessed data

logger.info("Configuring training step...")

# Create estimator for training
sklearn_estimator = SKLearn(
    entry_point="training.py",
    source_dir="src",
    framework_version="1.2-1",
    role=role,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,  # Where trained model is saved
    base_job_name="churn-training",
    hyperparameters={
        "n_estimators": 100,      # Number of trees in Random Forest
        "max_depth": 10,          # Tree depth
        "min_samples_split": 4    # Minimum samples to split node
    },
    sagemaker_session=sagemaker_session
)

# Define training step
training_step = TrainingStep(
    name="TrainChurnModelV2",
    estimator=sklearn_estimator,
    
    # Use output from preprocessing step as training input
    inputs={
        "train": TrainingInput(
            s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

logger.info("Training step configured")

INFO:__main__:Configuring training step...
INFO:__main__:Training step configured


In [None]:
# ============================================================================
# STEP 3: MODEL EVALUATION
# ============================================================================
# This step evaluates model performance on test data

logger.info("Configuring evaluation step...")

# Create processor for evaluation
evaluation_processor = SKLearnProcessor(
    framework_version="1.2-1",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="churn-evaluation",
    sagemaker_session=sagemaker_session
)

# Define evaluation metrics output location
evaluation_report = PropertyFile(
    name="ChurnEvaluationReport",
    output_name="evaluation",
    path="evaluation.json"  # Script will create this JSON file
)

# Define evaluation step
evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    code="src/evaluation.py",  # Evaluation script
    
    inputs=[
        # Test data for evaluation
        ProcessingInput(
            source=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        ),
        # Trained model artifact
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        )
    ],
    
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=f"{output_data_path}/evaluation"
        )
    ],
    
    property_files=[evaluation_report]  # Makes metrics available to next steps
)

logger.info("Evaluation step configured")


INFO:__main__:Configuring evaluation step...
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:__main__:Evaluation step configured


In [None]:
# ============================================================================
# STEP 4: CONDITIONAL MODEL REGISTRATION
# ============================================================================
# Only register model if accuracy is above threshold

logger.info("Configuring conditional registration...")

# Create condition: accuracy >= threshold
model_approval_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path="metrics.accuracy"  # Extract accuracy from evaluation.json
    ),
    right=model_approval_threshold
)

MODEL_PACKAGE_GROUP_NAME = "ChurnModelPackageGroup"

# Create model package for registration
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=f"{output_data_path}/evaluation/evaluation.json",
        content_type="application/json"
    )
)

# Model registration step (only runs if condition is met)
from sagemaker.workflow.step_collections import RegisterModel

try:
    register_step = RegisterModel(
        name="RegisterChurnModel",
        estimator=sklearn_estimator,
        model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.m5.xlarge"],
        transform_instances=["ml.m5.xlarge"],
        model_package_group_name=MODEL_PACKAGE_GROUP_NAME,
        approval_status="PendingManualApproval",  # Can be auto-approved if needed
        # entry_point="inference.py",
        # source_s3_uri="s3://my-bucket-5428/src/",
        model_metrics=model_metrics
    )
except Exception as e:
    logger.info(f"Registration failed: {e}")
    logger.info(f"Check ModelPackageGroup: {MODEL_PACKAGE_GROUP_NAME}")
    raise

# Conditional step: register only if accuracy is good
condition_step = ConditionStep(
    name="CheckModelAccuracy",
    conditions=[model_approval_condition],
    if_steps=[register_step],  # Register if condition true
    else_steps=[]  # Do nothing if condition false
)

logger.info("Conditional registration configured")


INFO:__main__:Configuring conditional registration...
INFO:__main__:Conditional registration configured


In [10]:
# ============================================================================
# CREATE AND EXECUTE PIPELINE
# ============================================================================

logger.info("Creating pipeline...")

# Assemble all steps into pipeline
pipeline = Pipeline(
    name="CustomerChurnPredictionPipelineV2",
    parameters=[
        input_data,
        processing_instance_type,
        training_instance_type,
        model_approval_threshold
    ],
    steps=[
        preprocessing_step,
        training_step,
        evaluation_step,
        condition_step
    ],
    sagemaker_session=sagemaker_session
)

# Create or update pipeline in SageMaker
logger.info("Upserting pipeline...")
pipeline.upsert(role_arn=role)
logger.info("Pipeline created/updated successfully!")

INFO:__main__:Creating pipeline...
INFO:__main__:Upserting pipeline...
INFO:__main__:Pipeline created/updated successfully!


In [11]:
# ============================================================================
# EXECUTE PIPELINE
# ============================================================================

logger.info("Starting pipeline execution...")
execution = pipeline.start()
logger.info(f"Pipeline execution started: {execution.arn}")
# logger.info(f"View execution in console: https://{region}.console.aws.amazon.com/sagemaker/home?region={region}#/pipelines")

INFO:__main__:Starting pipeline execution...
INFO:__main__:Pipeline execution started: arn:aws:sagemaker:ap-south-1:542810186858:pipeline/CustomerChurnPredictionPipelineV2/execution/ilczan7529ic


In [12]:
# Wait for completion (optional - remove for async execution)
# execution.wait(delay=30, max_attempts=60)
# logger.info(f"Pipeline execution completed with status: {execution.describe()['PipelineExecutionStatus']}")

In [None]:
# ============================================================================
# SCHEDULE AUTOMATIC RUNS
# ============================================================================

def schedule_daily_pipeline_run():
    """
    Schedule pipeline to run automatically every day at 2 AM UTC
    using AWS EventBridge (CloudWatch Events)
    """
    events_client = boto3.client('events', region_name=region)
    
    rule_name = 'DailyChurnPipelineRun'
    
    logger.info(f"Creating EventBridge rule: {rule_name}")
    
    # Create rule to trigger daily
    events_client.put_rule(
        Name=rule_name,
        Description='Trigger churn prediction pipeline daily',
        ScheduleExpression='cron(0 2 * * ? *)',  # 2 AM UTC daily
        State='ENABLED'
    )
    
    # Get AWS account ID
    sts_client = boto3.client('sts')
    account_id = sts_client.get_caller_identity()['Account']
    
    # Add pipeline as target
    events_client.put_targets(
        Rule=rule_name,
        Targets=[
            {
                'Id': '1',
                'Arn': f'arn:aws:sagemaker:{region}:{account_id}:pipeline/CustomerChurnPredictionPipeline',
                'RoleArn': role,
                'SageMakerPipelineParameters': {
                    'PipelineParameterList': [
                        {
                            'Name': 'InputDataUrl',
                            'Value': f'{input_data_path}/customer_data.csv'
                        }
                    ]
                }
            }
        ]
    )
    
    logger.info(f"Pipeline scheduled to run daily at 2 AM UTC")
    logger.info(f"Rule ARN: arn:aws:events:{region}:{account_id}:rule/{rule_name}")

# Uncomment to schedule automatic runs:
# schedule_daily_pipeline_run()

# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================

def list_pipeline_executions():
    """List recent pipeline executions"""
    sm_client = boto3.client('sagemaker')
    response = sm_client.list_pipeline_executions(
        PipelineName='CustomerChurnPredictionPipeline',
        MaxResults=10
    )
    for execution in response['PipelineExecutionSummaries']:
        print(f"Execution: {execution['PipelineExecutionArn']}")
        print(f"Status: {execution['PipelineExecutionStatus']}")
        print(f"Start Time: {execution['StartTime']}")
        print("---")

def stop_pipeline_execution(execution_arn):
    """Stop a running pipeline execution"""
    sm_client = boto3.client('sagemaker')
    sm_client.stop_pipeline_execution(PipelineExecutionArn=execution_arn)
    logger.info(f"Stopped execution: {execution_arn}")

# Example usage:
# list_pipeline_executions()
# stop_pipeline_execution("arn:aws:sagemaker:...")