# SageMaker Pipeline Pre-load data onto FSx and training

*(This notebook was tested with the "Python 3 (PyTorch 1.8 CPU Optimized)" kernel.)*

Amazon SageMaker Pipelines provides ML developers and MLOps engineers configurable ability to orchestrate SageMaker jobs. The definition of the pipeline orchestration can be exported as a JSON object which represents domain acyclic graph (DAG). Amazon SageMaker provides the ability to integrate with Amazon FSx for Lustre to speed up the training jobs. Amazon FSx for Lustre can be linked to S3 and will automatically synchronize the files. Upon linking with S3, FSx copies files from S3 on-demand basis as they are accessed. Copy includes any file that is not in Fsx locally or if the source has changed since the last copy.  However, after the first epoch files should be fully copied and subsequent epochs should be faster (Assuming one epoch sweeps through the entire training dataset). After the training is done, the files are persistent in FSx and not removed automatically. Also as long as the files in S3 have not changed, ay subsequent runs will not copy the files from S3 to FSx. 

One of the options to reduce the cost is to pre-load the files onto Fsx and initiate this from a cheaper instance before starting the training job.  This page provides details. https://docs.aws.amazon.com/fsx/latest/LustreGuide/preload-file-contents-hsm-dra.html

This notebook illustrates how SageMaker training can be initiated with FSx storage using Pipelines and trigger preload of the files. In this notebook, we use Amazon SageMaker to train a convolutional neural network using PyTorch and the CIFAR-10 dataset, and then run SageMaker Batch transform on the trained model.

The steps in this pipeline include:
* Pre load data into FSx file system
* Train a Pytorch Model with the files preloaded
* Persist the trained model
* Batch Transform using the trained model

<img src="images/SM-Pipelines-FSx-Preload.png" alt="FSx Selection configuration" style="width: 750px;"/>

## Pre-requisities setup

Following steps need to done as a prequisite before you proceed with the rest of the steps in the notebook. 

1) Create an S3 bucket that will store the training data and Batch transform input and output
2) Create a VPC , subnet or leverage an existing VPC, Subnet and Security Group. 
3) Create FSx for Lustre that will be synced with the S3 bucket. This file system will be hosted in VPC and Subnet and security group will be associated with FSx file system's network interface 


<img src="images/fsx-select.png" alt="FSx Selection configuration" style="width: 500px;"/>
<br>
<img src="images/fsx-network.png" alt="FSx Network configuration" style="width: 500px;"/>
<br>
<img src="images/fsx-datarepo.png" alt="FSx Data repo configuration" style="width: 500px;"/>


Once the file system is created, get the File system Id and File system mount name from the details page. This will be used to do associate the preloader and training jobs.


<img src="images/fsx-details.png" alt="FSx Details" style="width: 500px;"/>

## Setup

Specify the following items after the pre-requisites setp is complate.

- VPC Subnets
- Security Group Id
- An Amazon S3 bucket and prefix for training and model data. This should be in the same region used for SageMaker Studio, training, and hosting.

In the below section enter FSX configuration details and S3 bucket


In [None]:
# Add Network configuration details here
security_group_ids = #['sg-aaa','sg-bbb']
subnets = #['subnet-'] 

# Specify FSx Lustre file system id.
file_system_id = #'fs-'
file_system_mount_name = #'aabbcc'
file_system_path = #'/fsx'


# Specify directory path for input data on the file system. 
# You need to provide normalized and absolute path below.
file_system_directory_path = f'/{file_system_mount_name}{file_system_path}/data'
print(f'FSx file-system data input path: {file_system_directory_path}')

s3_bucket  = #'s3_bucket_name'
print(f'S3 bucket {s3_bucket}')

In [None]:
! pip install --upgrade sagemaker

In [None]:
import os
import time
import boto3
import sagemaker
import sys
import numpy as np
import torch
import torchvision

from sagemaker import get_execution_role
from sagemaker import Model
from sagemaker.tensorflow.estimator import TensorFlow
from sagemaker.inputs import FileSystemInput, TrainingInput, TransformInput
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
)

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import TrainingStep, CacheConfig,TransformStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.s3 import S3Uploader
from sagemaker.pytorch import PyTorch, PyTorchModel

from cifar_utils import classes, show_img, train_data_loader, test_data_loader

### Session
Let's start by specifying:
- SageMaker session, Pipeline session, region and account Id
- An IAM role for SageMaker to access to your training and model data. If you wish to use a different role than the one set up for SageMaker Studio, replace `sagemaker.get_execution_role()` with the appropriate IAM role or ARN. For more about using IAM roles with SageMaker, see [the AWS documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html).

In [None]:
# Create Session objects, region and account ID

sagemaker_session = sagemaker.Session()
pipeline_session = PipelineSession()
sm_client = sagemaker_session.sagemaker_client
region = sagemaker_session.boto_region_name
account_id = sagemaker_session.account_id()

role = sagemaker.get_execution_role()
print(f"SageMaker Execution Role:{role}")

### Create a custom Docker image
Below section creates a custom docker image. This will be used as a training image and will preload the files into FSx. We will use sagemaker studio image build to build the image and upload the image to ECR.

NOTE: If you have already built the image and published to ECR, you can skip this step.

In [None]:
!pip install -q sagemaker-studio-image-build

Create the preloader script. This will be used as the entrypoint in custom docker image

In [None]:
%%writefile container/preload-fsx.sh

ls /opt/ml/input/data/train/
echo "Starting preload with hsm_restore...."
nohup find /opt/ml/input/data/train/ -type f -print0 | xargs -0 -n 1 lfs hsm_restore
echo "Preload is complete"

Create the DockerFile

In [None]:
%%writefile container/Dockerfile

FROM amazonlinux:2

MAINTAINER Amazon AI <sage-learner@amazon.com>

RUN amazon-linux-extras install -y lustre

# Set up the entrypoint
COPY preload-fsx.sh /opt/preload-fsx.sh

ENTRYPOINT /opt/preload-fsx.sh

Trigger the docker build using SageMaker studio sm-docker utility

In [None]:
%%sh

cd container

chmod +x preload-fsx.sh

sm-docker build .  --repository fsx-demo-preload:latest

In [None]:
preloader_image = '{}.dkr.ecr.{}.amazonaws.com/fsx-demo-preload:latest'.format(account_id, region)
image_dir = "data/images"
inference_prefix = "batch_transform"
inference_inputs = f"s3://{s3_bucket}/{inference_prefix}"
inreference_output = f"s3://{s3_bucket}/BatchTransformOutput"

### Prepare training Data

In this section we will import training data images and upload to S3. We will use CIFAR-10 dataset to train a CNN model. The [CIFAR-10 dataset](https://www.cs.toronto.edu/~kriz/cifar.html) is a subset of the [80 million tiny images dataset](https://people.csail.mit.edu/torralba/tinyimages). It consists of 60,000 32x32 color images in 10 classes, with 6,000 images per class.

Below steps download CIFAR images and uploads to S3

NOTE: If you have already imported the images and uploaded to S3, you can skip this step

In [None]:
train_loader = train_data_loader()
test_loader = test_data_loader()

# get some random training images
dataiter = iter(train_loader)
images, labels = dataiter.next()

# show images
show_img(torchvision.utils.make_grid(images))

# print labels
print(" ".join("%9s" % classes[labels[j]] for j in range(4)))

prefix = "pytorch-cnn-cifar10-example"
inputs = S3Uploader.upload("data", "s3://{}/{}/data".format(s3_bucket, prefix))

# get some random training images
dataiter = iter(test_loader)
images, labels = dataiter.next()

for i in range(100):
    images, labels = dataiter.next()
    for j in range(len(images)):
        torchvision.utils.save_image(tensor=images[j],fp=f"{image_dir}/{i}-{j}.png",format="png")
        
inference_inputs = sagemaker_session.upload_data(
    path=image_dir, bucket=s3_bucket, key_prefix=inference_prefix
)
print("Input S3 path for batch inference: {}".format(inference_inputs))

## Setup SM pipeline parameters and File system inputs

In this section we will create a FileSystemInput and a channel with the input. This will be passed to the Estimator fit method

In [None]:
file_system_access_mode = 'ro'
file_system_type = 'FSxLustre'

train = FileSystemInput(file_system_id=file_system_id,
                                    file_system_type=file_system_type,
                                    directory_path=file_system_directory_path,
                                    file_system_access_mode=file_system_access_mode)

data_channels = {'train': train}

In the below section, you can configure the instances for 
* Preloader 
* Training instances 
* Batch inference instances in the below section. 
You can select different types of instances for this

In [None]:
# Cache Pipeline steps to reduce execution time on subsequent executions
cache_config = CacheConfig(enable_caching=True, expire_after="30d")
preloader_instance_count = ParameterInteger(name="PreLoaderInstanceCount", default_value=1)
preloader_instance_type = ParameterString(name="PreLoaderInstanceType", default_value="ml.m5.large")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.c5.xlarge")
transform_instance_count = ParameterInteger(name="TransformInstanceCount", default_value=1)
transform_instance_type = ParameterString(name="TransformInstanceType", default_value="ml.c5.xlarge")

### Configure Preloader step
Here we will create a preloader step. We will use a egenric SageMaker estimator and provide the custom docker image we built for this. Instance type parameters createed above will be added to the estimator as input.


In [None]:
preloader_job_name = f"fsx-demo-preloader-{int(time.time())}"
preloader = sagemaker.estimator.Estimator(
    image_uri=preloader_image,
    role=role, 
    instance_type=preloader_instance_type,
    instance_count=preloader_instance_count,
    volume_size = 100,
    sagemaker_session=pipeline_session, 
    subnets=subnets,
    security_group_ids=security_group_ids
)

preloader_args = preloader.fit(inputs=data_channels, job_name=preloader_job_name, logs='All', wait=True)
step_preloader = TrainingStep(
    name='PreLoad-FSX',
    step_args=preloader_args,
    cache_config=cache_config,
)

### Configure Training Step
In this step we will create a PyTorch estimator with an entry point and create a training step. 


In [None]:
estimator_job_name = f"fsx-demo-training-{int(time.time())}"
estimator = PyTorch(
    entry_point="cifar10.py",
    role=role,
    framework_version="1.8.0",
    py_version="py3",
    instance_count=training_instance_count,
    instance_type=training_instance_type,
    sagemaker_session=pipeline_session, 
    subnets=subnets,
    security_group_ids=security_group_ids

)

train_args = estimator.fit(inputs=data_channels, job_name=estimator_job_name, logs='All', wait=True)

step_train = TrainingStep(
    name='TrainCNNModel',
    step_args=train_args,
    cache_config=cache_config,
)

### Create Model Step
This creates a PyTorch model with model create step.

In [None]:
model_name = "fsx-demo-CNNModel"
pytorch_model = PyTorchModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
    entry_point="cifar10.py",
    name=model_name,
    framework_version="1.8.0",
    py_version="py3",

)

step_model_create = ModelStep(
    name="ModelCreationStep",
    step_args=pytorch_model.create(instance_type="ml.c5.xlarge"),
    display_name=model_name, 
    description="Model to predict cifar data"
)

### Create Transformer Step

In [None]:
from sagemaker.transformer import Transformer

transformer = Transformer(
    model_name=step_model_create.properties.ModelName,
    instance_type=transform_instance_type,
    instance_count=transform_instance_count,
    output_path=inreference_output,
    sagemaker_session=pipeline_session,
)

transform_args = transformer.transform(
    data=inference_inputs,
    data_type="S3Prefix",
    content_type="application/x-image",
)

step_transform = TransformStep(
    name="Transform", step_args=transform_args
)

### Configure SM Pipeline

We will add dependencies for the steps, parameters to the pipeline and create the pipeline defintion

In [None]:
step_train.add_depends_on([step_preloader])
#step_model_create.add_depends_on([step_train])
step_transform.add_depends_on([step_model_create])

In [None]:
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())
pipeline_name = "SageMaker-FSx-Lambda-pipeline" + current_time

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        preloader_instance_count,
        preloader_instance_type,
        training_instance_count,
        training_instance_type,
        transform_instance_count,
        transform_instance_type
    ],
    steps=[step_preloader,step_train,step_model_create,step_transform],
    sagemaker_session=pipeline_session,
)

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

### Create the Pipeline and run

We will create the pipeline with the definition created above and start an instance of the pipeline run

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
execution.wait()

In [None]:
execution.list_steps()