# LightGBM Model Training, Deployment, and SageMaker Features

This notebook demonstrates an end-to-end machine learning workflow using Amazon SageMaker AI, focusing on the following key steps:

1. Environment Setup: Importing necessary libraries and setting up the SageMaker session.
2. Data Preparation: Generating synthetic data for a regression problem.
3. Model Training: Training a LightGBM model using SageMaker's built-in algorithm.
4. Model Deployment: Deploying the trained model to a SageMaker endpoint for real-time inference.
5. Inference Simulation: Simulating thousands of inference requests to the deployed endpoint.
6. SageMaker Features:
   - Training a second model with different hyperparameters.
   - Conducting a shadow test to compare the performance of two model versions.
   - Implementing a canary deployment to gradually shift traffic to the new model version.

This notebook showcases best practices for model development, testing, and deployment using Amazon SageMaker AI.

# Setup Environment

Import Libraries and Set Up SageMaker Session.

In [None]:
import boto3
import sagemaker
import pandas as pd
import numpy as np
from sagemaker import get_execution_role, image_uris, script_uris, model_uris
from sagemaker.session import Session
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from datetime import datetime

# Set up SageMaker session
role = get_execution_role()
session = sagemaker.Session()
bucket = session.default_bucket()
prefix = 'sagemaker/lightgbm-demo'
aws_region = boto3.Session().region_name
print(f"SageMaker session initialized. Using bucket: {bucket}")

# Data Preparation and Upload

Generate synthetic data for a regression problem, split it into training and test sets, and upload to S3.

The synthetic data is generated using the following equation:

y = 3 * X₁ + 2 * X₂ - 5 * X₃ + ε

where X₁, X₂, X₃ are the first three features, and ε is Gaussian noise with a standard deviation of 0.1.

In [None]:
# Load and prepare your data (replace this with your actual data)
np.random.seed(0)
n_samples = 1000
X = np.random.randn(n_samples, 5)
y = 3 * X[:, 0] + 2 * X[:, 1] - 5 * X[:, 2] + np.random.randn(n_samples) * 0.1

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Combine features and target for training data
train_data = np.column_stack((y_train, X_train_scaled))

# Save the training data to a CSV file
train_file = 'train.csv'
np.savetxt(train_file, train_data, delimiter=',')

# Upload the training data to S3
train_s3_path = session.upload_data(train_file, bucket=bucket, key_prefix=prefix)

print(f"Training data uploaded to: {train_s3_path}")

## Configure LightGBM Model

This section sets up the LightGBM model configuration for training. It includes:

- Specifying the model ID, version, and training instance type
- Retrieving necessary URIs for the training image, source code, and model artifacts
- Setting hyperparameters for the LightGBM model, including learning rate, max depth, and other key parameters

In [None]:
# Configure model
train_model_id = "lightgbm-regression-model"
train_model_version = "*"
train_scope = "training"
training_instance_type = "ml.m5.xlarge"

# Retrieve URIs
train_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    model_id=train_model_id,
    model_version=train_model_version,
    image_scope=train_scope,
    instance_type=training_instance_type
)

train_source_uri = script_uris.retrieve(
    model_id=train_model_id, 
    model_version=train_model_version, 
    script_scope=train_scope
)

train_model_uri = model_uris.retrieve(
    model_id=train_model_id, 
    model_version=train_model_version, 
    model_scope=train_scope
)

# Set hyperparameters
hyperparameters = {
    "num_boost_round": "500",
    "learning_rate": "0.1",
    "max_depth": "6",
    "objective": "regression",
    "metric": "rmse",
    "early_stopping_rounds": "30",
    "feature_fraction": "0.74",
    "bagging_fraction": "0.53",
    "bagging_freq": "5",
    "num_leaves": "67"
}

## Train LightGBM Model with Amazon SageMaker AI's built-in algorithm.

- Creating a SageMaker Estimator with the specified configuration
- Setting up the training job with a unique name
- Launching the model training process using the prepared data

In [None]:
from sagemaker.estimator import Estimator

timestamp = datetime.now().strftime('%Y%m%d-%H%M')
training_job_name = f"tx-train-{timestamp}"[:63]

# Create SageMaker Estimator
tabular_estimator = Estimator(
    role=role,
    image_uri=train_image_uri,
    source_dir=train_source_uri,
    model_uri=train_model_uri,
    entry_point="transfer_learning.py",
    instance_count=1,
    instance_type=training_instance_type,
    max_run=360000,
    hyperparameters=hyperparameters,
    output_path=f's3://{bucket}/{prefix}/output'
)

# Train the model
print("Starting model training...")
tabular_estimator.fit({"training": train_s3_path}, logs=True, job_name=training_job_name, wait=True)
print("Model training completed.")

## Deploy Trained Model to SageMaker Endpoint

This section handles the deployment of the trained LightGBM model to a SageMaker endpoint for real-time inference. The process includes:

1. Retrieving the model artifact from the training job
2. Creating a SageMaker model using the trained artifact
3. Setting up an endpoint configuration
4. Creating and deploying the endpoint

The deployment uses an ml.m5.xlarge instance for inference.

In [None]:
# Get the model artifact from training
training_job = boto3.client('sagemaker').describe_training_job(
    TrainingJobName=training_job_name
)
model_artifact = training_job['ModelArtifacts']['S3ModelArtifacts']

# Create shorter unique names for deployment resources
model_name = f"tx-model-{timestamp}"[:63]
endpoint_config_name = f"tx-config-{timestamp}"[:63]
endpoint_name = f"tx-endpoint-{timestamp}"[:63]

# Get inference image and source
inference_instance_type = "ml.m5.xlarge"
deploy_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    image_scope="inference",
    model_id=train_model_id,
    model_version=train_model_version,
    instance_type=inference_instance_type,
)

deploy_source_uri = script_uris.retrieve(
    model_id=train_model_id, 
    model_version=train_model_version, 
    script_scope="inference"
)

# Create model
print(f"Creating model: {model_name}")
create_model_response = boto3.client('sagemaker').create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer={
        'Image': deploy_image_uri,
        'ModelDataUrl': model_artifact,
        'Environment': {
            'SAGEMAKER_PROGRAM': 'inference.py',
            'SAGEMAKER_SUBMIT_DIRECTORY': deploy_source_uri,
            'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
            'SAGEMAKER_REGION': aws_region
        }
    }
)

# Create endpoint configuration
print(f"Creating endpoint configuration: {endpoint_config_name}")
create_endpoint_config_response = boto3.client('sagemaker').create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[{
        'InstanceType': inference_instance_type,
        'InitialInstanceCount': 1,
        'ModelName': model_name,
        'VariantName': 'AllTraffic'
    }]
)

# Create endpoint
print(f"Creating endpoint: {endpoint_name}")
create_endpoint_response = boto3.client('sagemaker').create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name
)

# Wait for endpoint to be ready
print("Waiting for endpoint to be ready...")
waiter = boto3.client('sagemaker').get_waiter('endpoint_in_service')
waiter.wait(EndpointName=endpoint_name)
print("Endpoint is ready!")

## Real-time Inference with SageMaker Endpoint

This section sets up the infrastructure for making real-time predictions using the deployed SageMaker endpoint. It includes:

1. Creating a Predictor object for the endpoint
2. Implementing a mock feature store for demonstration purposes
3. Defining functions to retrieve features and make predictions
4. Demonstrating the prediction process with example calls

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
import random

# Create a predictor
predictor = Predictor(endpoint_name=endpoint_name, serializer=CSVSerializer(), deserializer=JSONDeserializer())

# Mock feature store
mock_feature_store = {
    '123': {
        'model_v1': [0.5, -0.3, 1.2, 0.8, -0.1],
        'model_v2': [0.6, -0.2, 1.1, 0.9, -0.2]
    },
    '456': {
        'model_v1': [-0.1, 0.7, -0.5, 1.3, 0.2],
        'model_v2': [-0.2, 0.8, -0.4, 1.2, 0.3]
    }
}

# Function to retrieve features from online feature store (sample implementation)
def get_features_from_feature_store(id_identifier, id_model):
    # Check if the id_identifier exists in our mock feature store
    if id_identifier in mock_feature_store:
        # Check if the id_model exists for this identifier
        if id_model in mock_feature_store[id_identifier]:
            return mock_feature_store[id_identifier][id_model]
    return [random.uniform(-1, 1) for _ in range(5)]

# Function to make predictions
def predict(id_identifier, id_model):
    # Retrieve features from feature store
    features = get_features_from_feature_store(id_identifier, id_model)
    
    # Scale the features
    features_scaled = scaler.transform(np.array(features).reshape(1, -1))
    
    # Prepare payload
    payload = ','.join(map(str, features_scaled.flatten()))
    
    # Make prediction
    response = predictor.predict(payload)
    
    return {
        'id_identifier': id_identifier,
        'id_model': id_model,
        'features': features,
        'prediction': response
    }

# Example usage
result1 = predict('123', 'model_v1')
print("Result 1:", result1)

result2 = predict('456', 'model_v2')
print("Result 2:", result2)

result3 = predict('789', 'model_v3')  # This will use random features
print("Result 3:", result3)

## Simulate Inference Requests

This section demonstrates how to simulate a large number of inference requests to the deployed SageMaker endpoint. Key features include:

- A function to generate and process multiple random prediction requests
- Timing the overall process and calculating average request time
- Simulating 5000 requests to stress-test the endpoint

In [None]:
import time
import random

def simulate_requests(num_requests=1000):
    start_time = time.time()
    results = []
    
    for _ in range(num_requests):
        id_identifier = str(random.randint(100, 999))
        id_model = f"model_v{random.randint(1, 2)}"
        result = predict(id_identifier, id_model)
        results.append(result)
    
    end_time = time.time()
    total_time = end_time - start_time
    
    print(f"Completed {num_requests} requests in {total_time:.2f} seconds")
    print(f"Average time per request: {(total_time / num_requests) * 1000:.2f} ms")
    
    return results

# Simulate requests
simulation_results = simulate_requests(5000)  # Simulate 5000 requests

## Train and Create Second LightGBM Model

This section demonstrates the process of training and creating a second LightGBM model with different hyperparameters. Key steps include:

1. Defining new hyperparameters for the second model
2. Creating a new SageMaker Estimator with these hyperparameters
3. Training the second model using the same training data
4. Retrieving the model artifact from the new training job
5. Creating a new SageMaker model using the trained artifact

In [None]:
# Define hyperparameters for the second model
hyperparameters_v2 = {
    "num_boost_round": "1000",
    "learning_rate": "0.05",
    "max_depth": "8",
    "objective": "regression",
    "metric": "rmse",
    "early_stopping_rounds": "50",
    "feature_fraction": "0.8",
    "bagging_fraction": "0.7",
    "bagging_freq": "3",
    "num_leaves": "100"
}

timestamp = datetime.now().strftime('%Y%m%d-%H%M')
training_job_name_v2 = f"tx-train-v2-{timestamp}"[:63]

# Create SageMaker Estimator for the second model
tabular_estimator_v2 = Estimator(
    role=role,
    image_uri=train_image_uri,
    source_dir=train_source_uri,
    model_uri=train_model_uri,
    entry_point="transfer_learning.py",
    instance_count=1,
    instance_type=training_instance_type,
    max_run=360000,
    hyperparameters=hyperparameters_v2,
    output_path=f's3://{bucket}/{prefix}/output'
)

# Train the second model
print("Starting model v2 training...")
tabular_estimator_v2.fit({"training": train_s3_path}, logs=True, job_name=training_job_name_v2)
print("Model v2 training completed.")

# Get the model artifact for the second model
training_job_v2 = boto3.client('sagemaker').describe_training_job(
    TrainingJobName=training_job_name_v2
)
model_artifact_v2 = training_job_v2['ModelArtifacts']['S3ModelArtifacts

# Create the second model

model_name_v2 = f"tx-model-v2-{timestamp}"[:63]

create_model_response_v2 = boto3.client('sagemaker').create_model(
    ModelName=model_name_v2,
    ExecutionRoleArn=role,
    PrimaryContainer={
        'Image': deploy_image_uri,
        'ModelDataUrl': model_artifact_v2,
        'Environment': {
            'SAGEMAKER_PROGRAM': 'inference.py',
            'SAGEMAKER_SUBMIT_DIRECTORY': deploy_source_uri,
            'SAGEMAKER_CONTAINER_LOG_LEVEL': '20',
            'SAGEMAKER_REGION': aws_region
        }
    }
)

print(f"Created second model: {model_name_v2}")

## Shadow Test Experiment Setup

This section creates a shadow test experiment to compare the performance of two model versions. It sets up a one-hour experiment where 5% of the production traffic is routed to the shadow model (model_v2) for evaluation. The experiment uses ml.m5.xlarge instances for both production and shadow variants, with results stored in S3 for later analysis.

In [None]:
from datetime import timedelta

# Create shadow test experiment
timestamp = datetime.now().strftime('%Y%m%d-%H%M-%S')
experiment_name = f"tx-shadow-test-{timestamp}"[:63]

# Set up S3 destination for shadow test results
shadow_test_results_path = f"s3://{bucket}/{prefix}/shadow-test-results"

# Set start time and end time (1 hour)
start_time = datetime.now()
end_time = start_time + timedelta(hours=1)

inference_instance_type = "ml.m5.xlarge"

response = boto3.client('sagemaker').create_inference_experiment(
    Name=experiment_name,
    Type='ShadowMode',
    RoleArn=role,
    EndpointName=endpoint_name,
    ModelVariants=[
        {
            'ModelName': model_name,
            'VariantName': 'Production',
            'InfrastructureConfig': {
                'InfrastructureType': 'RealTimeInference',
                'RealTimeInferenceConfig': {
                    'InstanceType': inference_instance_type,
                    'InstanceCount': 1
                }
            }
        },
        {
            'ModelName': model_name_v2,
            'VariantName': 'Shadow',
            'InfrastructureConfig': {
                'InfrastructureType': 'RealTimeInference',
                'RealTimeInferenceConfig': {
                    'InstanceType': inference_instance_type,
                    'InstanceCount': 1
                }
            }
        }
    ],
    Schedule={
        'StartTime': start_time,
        'EndTime': end_time
    },
    DataStorageConfig={
        'Destination': shadow_test_results_path,
        'ContentType': {
            'CsvContentTypes': ['text/csv'],
            'JsonContentTypes': ['application/json']
        }
    },
    ShadowModeConfig={
        'SourceModelVariantName': 'Production',
        'ShadowModelVariants': [
            {
                'ShadowModelVariantName': 'Shadow',
                'SamplingPercentage': 5
            }
        ]
    }
)

print(f"Created inference experiment: {experiment_name}")
print(f"Experiment ARN: {response.get('ExperimentArn', 'N/A')}")
print(f"Start time: {start_time}")
print(f"End time: {end_time}")
print(f"Duration: 1 hour")

## Canary Deployment

This section implements a canary deployment strategy for updating the SageMaker endpoint. It gradually shifts traffic from the old model to the new one, allocating 20% capacity to the new version in each step with a 5-minute interval between steps. This approach allows for monitoring the new model's performance and minimizes risk during the deployment process.

In [None]:
##canary deployment
import boto3
from datetime import datetime
import time

# Get the current endpoint configuration
sagemaker_client = boto3.client('sagemaker')
endpoint_description = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
current_endpoint_config_name = endpoint_description['EndpointConfigName']

# Get the current endpoint configuration details
current_endpoint_config = sagemaker_client.describe_endpoint_config(
    EndpointConfigName=current_endpoint_config_name
)
current_variant_name = current_endpoint_config['ProductionVariants'][0]['VariantName']
print(f"Current variant name: {current_variant_name}")

# Create a new endpoint configuration for the canary deployment
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
canary_config_name = f"tx-config-canary-{timestamp}"[:63]

create_endpoint_config_response = sagemaker_client.create_endpoint_config(
    EndpointConfigName=canary_config_name,
    ProductionVariants=[
        {
            'VariantName': current_variant_name,  
            'ModelName': model_name_v2,  
            'InstanceType': 'ml.m5.xlarge',
            'InitialInstanceCount': 2,
            'InitialVariantWeight': 1  
        }
    ]
)

# Define the canary deployment configuration
canary_config = {
    "BlueGreenUpdatePolicy": {
        "TrafficRoutingConfiguration": {
            "Type": "CANARY",
            "CanarySize": {
                "Type": "CAPACITY_PERCENT",
                "Value": 20  # Increase traffic to new version by 20% each step
            },
            "WaitIntervalInSeconds": 300  # 5 minutes between steps
        },
        "TerminationWaitInSeconds": 600,  # Wait 10 minutes before terminating old instances
        "MaximumExecutionTimeoutInSeconds": 3600  # 60 minutes maximum for the entire deployment
    }
}

# Start canary deployment
print("Starting canary deployment...")
update_response = sagemaker_client.update_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=canary_config_name,
    DeploymentConfig=canary_config
)

## Resource Cleanup

Don't forget to delete the endpoint when you're done to avoid incurring unnecessary costs.

In [None]:
# After successful deployment, you may want to clean up the old model and endpoint configuration
def cleanup_old_resources():
    try:
        sagemaker_client.delete_model(ModelName=model_name)
        sagemaker_client.delete_endpoint_config(EndpointConfigName=current_endpoint_config_name)
        print("Cleaned up old model and endpoint configuration.")
    except Exception as e:
        print(f"Error during cleanup: {str(e)}")

# Uncomment the following lines when you're ready to clean up
#cleanup_old_resources()
#sagemaker_client.delete_endpoint(EndpointName=endpoint_name)