# Scaling training and serving thousands of models with Amazon SageMaker

As machine learning becomes increasingly prevalent in a wide range of industries, organizations are finding the need to train and serve large numbers of machine learning models to meet the diverse needs of their customers. For SaaS providers in particular, the ability to train and serve thousands of models efficiently and cost-effectively is crucial for staying competitive in a rapidly evolving market. 

Training and serving thousands of models requires a robust and scalable infrastructure, and this is where Amazon SageMaker (http://aws.amazon.com/sagemaker) can help. Amazon SageMaker is a fully-managed platform that enables developers and data scientists to build, train, and deploy machine learning models quickly, while also offering the cost-saving benefits of using Amazon's cloud infrastructure. 

In this blog post, we will explore how SageMaker's features, including SageMaker Processing, SageMaker Training Jobs, and Amazon SageMaker Multi-Model Endpoint, can be used to train and serve thousands of models in a cost-effective way.

In [None]:
%pip install -qr dev-requirements.txt

### Generate the dataset

For this blog post, we will play the role of an ISV company that helps their customers become more sustainable by tracking their energy consumption and providing forecasts. Our company has 1000 customers who want to better understand their energy usage and make informed decisions about how to reduce their environmental impact. To do this, we will use a synthetic dataset and train a machine learning model based on [Prophet](https://facebook.github.io/prophet/) for each customer to make energy consumption forecasts. Using Amazon SageMaker, we will be able to efficiently train and serve these 1000 models, providing our customers with accurate and actionable insights into their energy usage.

There are three features in the generated dataset:

* `customer_id`: This is an integer identifier for each customer, ranging from 0 to 999.
* `timestamp`: This is a date-time value that indicates the time at which the energy consumption was measured. The timestamps are randomly generated between the start and end dates specified in the code.
* `consumption`: This is a float value that indicates the energy consumption, measured in some arbitrary unit. The consumption values are randomly generated between 0 and 1000 with sinusoidal seasonality.

If you already have a dataset, you can also leverage your own. Make sure the schema is the same as the one below: `(customer_id, timestamp, consumption)`, or change the code in the preprocessing script [data_splitter.py](./source/data_splitter.py).


In [None]:
import source.data_generator as datagen

!mkdir -p data
output_path = datagen.main("2022-01-01 00:00:00", "2022-12-31 23:59:59", 1000, "energy_consumption.csv", "./data")
output_path

In [None]:
import pandas as pd
df = pd.read_csv(output_path, parse_dates=True)
df.head()

In [None]:
df[(df['customer_id']==0)][:100].plot(x='timestamp', y='consumption', style='o-', legend=False, colormap='viridis', figsize=(50,15))

In [None]:
# Upload data to Amazon S3
import sagemaker
from sagemaker.utils import name_from_base
from sagemaker import get_execution_role

session = sagemaker.Session()
execution_role = get_execution_role()

bucket = session.default_bucket()
prefix = 'sagemaker/' + name_from_base('scaling-thousand-models')
raw_data = session.upload_data(output_path, bucket, f"{prefix}/data")
print(f'Data uploaded to {raw_data}')

## Data processing

In order to leverage the same training job to create a thousand models, we will leverage the data distribution capability of the SageMaker SDK. When we launch a training job with 10 instances, and we configure the training data to be split across the training instances (thanks to the `distribution` parameter of the [`TrainingInput`](https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html#sagemaker.inputs.TrainingInput) class in the SageMaker Python SDK), SageMaker will round-robin data frmo the folder where the data is stored. Each instance will contain a subset of the full dataset. To make sure that each instance receives all the data for a single customer, we pre-process the data to create a CSV per customer, then store it to S3.

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn import SKLearnProcessor

processing_output = f"s3://{bucket}/{prefix}/data/processed"

skp = SKLearnProcessor(
    framework_version="1.0-1",
    role = execution_role, 
    instance_count=1,
    instance_type="ml.m5.xlarge",
    base_job_name="scaling-thousand-dataprep",
    sagemaker_session=session,
)

skp.run(
    code="source/data_splitter.py",
    inputs=[ProcessingInput(source=raw_data, input_name="raw_data", destination="/opt/ml/processing/input/")],
    outputs=[ProcessingOutput(output_name="customer_data", source="/opt/ml/processing/output/customer_data/", destination=processing_output)],
)


In [None]:
!aws s3 ls $processing_output/ --recursive

We can indeed see what is the folder structure for the processed data. Customer data is stored in a specific file per each customer, and this will be round-robined to the different training instances.

## Models Training

We distribute the training over multiple instances, to do so we use the [`TrainingInput`](https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html#sagemaker.inputs.TrainingInput) class in SageMaker Python SDK. The `distribution` specifies how the training is distributed, `ShardedByS3Key` option means that the training data is sharded by S3 object key, with each training instance receiving a unique subset of the data, avoiding duplication.

In [None]:
from sagemaker.inputs import TrainingInput

processed_data_input = TrainingInput(
    s3_data=processing_output,
    distribution="ShardedByS3Key",
    content_type="text/csv",
    s3_data_type="S3Prefix",
    input_mode="File",
)

Now, we can train the models

In [None]:
from sagemaker.mxnet import MXNet

n_instances = 10

checkpoint_local_path="/opt/ml/checkpoints"
training_output = f's3://{bucket}/{prefix}/models/'

pt = MXNet(
    entry_point='training.py',
    source_dir='source',
    role = execution_role,
    framework_version='1.9.0',
    py_version='py38',
    instance_count=n_instances,
    instance_type="ml.c5.xlarge",
    base_job_name='scaling-thousand-training',
    checkpoint_s3_uri=training_output,
    checkpoint_local_path=checkpoint_local_path,
)
pt.fit(processed_data_input)

Afterwards, we can view the trained models in the `training_output` S3 bucket.

In [None]:
!aws s3 ls $training_output

## Model serving with MultiModel Endpoint

Now, we define the [model object](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html), to deploy on an endpoint later.

In [None]:
from sagemaker.sklearn import SKLearnModel
import os

model = SKLearnModel(
    model_data=os.path.join(training_output, "1.tar.gz"),
    role=execution_role,
    entry_point="inference.py",
    source_dir="source",
    framework_version="1.0-1",
    py_version="py3",
    name=name_from_base("scaling-thousand-sklearn-model"),
    sagemaker_session=session,
)

From this we create the Multi-Model endpoint, this endpoint allows you to serve multiple models at the same time by creating an endpoint configuration that includes a list of all the models to serve, and then creating an endpoint using that endpoint configuration.

In [None]:
from sagemaker.multidatamodel import MultiDataModel

multimodel = MultiDataModel(
    name=name_from_base('customer-models'),
    model_data_prefix=training_output,
    model=model,
    sagemaker_session=session
)

Finally, we can deploy the models

In [None]:
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

predictor = multimodel.deploy(
    initial_instance_count=1,
    instance_type='ml.c5.xlarge',
    endpoint_name=name_from_base('scaling-models-sklearn'),
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

Once the models are deployed we can use them for inference. Here we need to specify which model we want to use with the `target_model` parameter (i.e., model `8.tar.gz`).

In [None]:
response = predictor.predict(data='{"period": 7}', target_model='8.tar.gz')

In [None]:
pd.read_json(response)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]

Or model `735.tar.gz`

In [None]:
response = predictor.predict(data='{"period": 7}', target_model='735.tar.gz')

In [None]:
pd.read_json(response)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]

## Clean up
Remove the resources created

In [None]:
predictor.delete_endpoint()
multimodel.delete_model()