# Complete MLOps Pipeline with SageMaker Pipelines  
## Employee Bonus Prediction - End-to-End Machine Learning Workflow. 
This notebook demonstrates a complete MLOps pipeline using Amazon SageMaker Pipelines for predicting employee bonuses. The pipeline includes data preprocessing, model training, and model registration with proper governance.  

### 🎯 What We'll Build 
A production-ready ML pipeline that:

✅ Preprocesses raw employee data automatically  
✅ Splits data into train/validation/test sets  
✅ Trains a Linear Learner model for regression  
✅ Registers the model in SageMaker Model Registry  
✅ Implements proper MLOps governance and approval workflows  

### Step 1: Environment Setup and Configuration 
#### 🔧 Purpose:  
Initialize SageMaker environment, configure AWS resources, and set up pipeline parameters.

#### 🎯 What happens here:
* Import required SageMaker libraries.  
* Set up execution roles and sessions.  
* Configure S3 buckets for data storage.   
* Define pipeline parameters for flexibility.   

In [48]:
import boto3 
import pandas as pd 
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession 
from sagemaker.workflow.parameters import ( 
 ParameterInteger,
 ParameterString)

s3_client = boto3.resource('s3') 
pipeline_name = f"emp-bonus-training-pipeline"
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
# Create Model Package Group with name "EmpBonusPackageGroup" manually for this pipeline, in next pipeline will create this using pipeline
model_package_group_name = f"EmpBonusPackageGroup"

print(f"Default S3 Bucket Name: {default_bucket}")
print(f"Model Package Group Name: {model_package_group_name}")

Default S3 Bucket Name: sagemaker-us-east-1-654654157683
Model Package Group Name: EmpBonusPackageGroup


In [40]:
base_job_prefix = "emp-bonus"
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString( name="ProcessingInstanceType", default_value="ml.m5.xlarge") 
training_instance_type = ParameterString( name="TrainingInstanceType", default_value="ml.m5.xlarge") 
input_data = "data/mock_data.csv" 
model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval")

### Step 2: Data Preprocessing
#### 🔧 Purpose:
Clean, transform, and prepare raw employee data for machine learning.  

####  🎯 What the preprocessing script does:
* Missing Value Handling: Fills missing age/salary with median values
* JSON Parsing: Extracts address, phone, email from profile JSON column
* Feature Engineering: Creates address_length, salary_category, age_group features
* Data Cleaning: Removes unnecessary columns and handles data types
* Output: Produces cleaned_data.csv and transformed_data.csv

In [41]:
# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="emp-pre-processing",
    role=role,
    sagemaker_session=pipeline_session,
)

processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="processed-data", source='/opt/ml/processing/output',
                         destination=f"s3://{default_bucket}/output/processed")
    ],
    code=f"preprocessing_script.py",
)
step_preprocess = ProcessingStep(name="EmpBonusPreProcessing", step_args=processor_args)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


### Step 3: Data Splitting. 
#### 🔧 Purpose:
Split the preprocessed data into training, validation, and test sets for model development.

#### 🎯 What happens here:  
* Reads transformed data from preprocessing step
* Splits data into 70% train, 20% validation, 10% test
* Saves three separate CSV files to S3
* Maintains data consistency across splits

In [42]:
# Model Traning Step

input_data = f"s3://{default_bucket}/output/processed/transformed_data.csv"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="emp-data-split",
    role=role,
    sagemaker_session=pipeline_session,
)

processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",
                         destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",
                         destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"model_training_script.py",
)
step_data_split = ProcessingStep(name="EmpBonusDataSplit", step_args=processor_args, depends_on=[step_preprocess] )

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


### Step 4: Model Training
#### 🔧 Purpose:
Train a Linear Learner regression model to predict employee bonus amounts.

#### 🎯 Why Linear Learner:
* Built-in Algorithm: Managed by AWS, optimized for performance
* Regression Support: Perfect for predicting continuous bonus values
* Scalable: Automatically handles feature scaling and optimization
* Fast Training: Efficient for structured data

In [43]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

# Specify the container image for Linear Learner (built-in algorithm)
linear_learner_container = sagemaker.image_uris.retrieve('linear-learner', sagemaker_session.boto_region_name)

# Define the LinearLearner estimator
linear_estimator = Estimator(
    image_uri=linear_learner_container,
    role=role,
    instance_count=1,
    instance_type=training_instance_type,
    output_path=f's3://{default_bucket}/model-output',
    sagemaker_session=pipeline_session,
    base_job_name="emp-bonus-linear-learner"
)

# Set hyperparameters for Linear Learner
linear_estimator.set_hyperparameters(
    predictor_type='regressor',  # We're solving a regression problem
    mini_batch_size=32,
    epochs=10
)

train_path = f"s3://{default_bucket}/output/train/train.csv"
val_path = f"s3://{default_bucket}/output/validation/validation.csv"

# Specify the input data channels
train_input = TrainingInput(
    s3_data=train_path, 
    content_type='text/csv'
)
val_input = TrainingInput(
    s3_data=val_path, 
    content_type='text/csv'
)

# Training Step
step_training = TrainingStep(
    name="EmpBonusTraining",
    estimator=linear_estimator,
    inputs={
        'train': train_input,
        'validation': val_input
    },
    depends_on=[step_data_split]
)


INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


### Step 5: Model Registration
#### 🔧 Purpose:
Register the trained model in SageMaker Model Registry for governance and deployment.

#### 🎯 Model Registry Benefits:
* Version Control: Track different model versions
* Approval Workflow: Require manual approval before deployment
* Metadata Storage: Store model performance metrics and descriptions
* Deployment Governance: Control which models go to production

In [44]:
import sagemaker
from sagemaker import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics

# Determine what AWS region we are in
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name 
# Retrieve the Linear Learner image URI
container_image_uri = sagemaker.image_uris.retrieve(framework="linear-learner", region=region)

model = Model(
    image_uri=container_image_uri,
    model_data=step_training.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

model_approval_status = "PendingManualApproval"
customer_metadata_properties = {"ModelType": "EmpBonusPrediction"}

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    customer_metadata_properties=customer_metadata_properties,
)

step_register = ModelStep(
    name="EmpBonusRegisterModel",
    step_args=register_args
)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


### Step 6: Pipeline Creation and Configuration
#### 🔧 Purpose:
Assemble all steps into a cohesive, executable pipeline with proper dependencies.

#### 🎯 Pipeline Benefits:
* Automation: Run entire workflow with one command
* Reproducibility: Same results every time
* Scalability: Easy to modify and extend
* Monitoring: Track execution and debug issues

In [45]:
import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
    ],
    steps=[step_preprocess, step_data_split, step_training, step_register],
)
definition = json.loads(pipeline.definition())
# print(definition)



In [46]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:654654157683:pipeline/emp-bonus-training-pipeline',
 'ResponseMetadata': {'RequestId': 'cc0ca534-9266-4f92-95e4-698fa6bf6f30',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'cc0ca534-9266-4f92-95e4-698fa6bf6f30',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '117',
   'date': 'Sun, 03 Aug 2025 18:51:32 GMT'},
  'RetryAttempts': 0}}

### Step 7: Pipeline Execution
#### 🔧 Purpose:
Execute the complete MLOps pipeline and monitor progress.

In [47]:
# start Pipeline execution
pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:654654157683:pipeline/emp-bonus-training-pipeline/execution/zq1k7knn6eia', sagemaker_session=<sagemaker.session.Session object at 0x7fbae1477aa0>)

## 🎉 What You've Built
Congratulations! You've created a production-ready MLOps pipeline that:  
✅ Automatically processes employee data.  
✅ Trains models with best practices.  
✅ Implements governance through model registry.  
✅ Enables reproducibility through parameterization.  
✅ Provides monitoring and debugging capabilities.  