# Run TensorFlow 2 Parameter Server on Amazon SageMaker

### In this sample, we will demo how to run a sample code in TF2 parameter server on SageMaker

Notice:

1. Dataset format is TFRecord

2. We will use **CPU** instances to run model training.

3. Using [SageMaker Python SDK 2.x](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)
4. TensorFlow version is 2.14

In [1]:
%pip install sagemaker -U

Collecting sagemaker
  Using cached sagemaker-2.231.0-py3-none-any.whl.metadata (16 kB)
Collecting sagemaker-core<2.0.0,>=1.0.0 (from sagemaker)
  Using cached sagemaker_core-1.0.1-py3-none-any.whl.metadata (4.6 kB)
Collecting platformdirs (from sagemaker)
  Using cached platformdirs-4.2.2-py3-none-any.whl.metadata (11 kB)
Collecting mock<5.0,>4.0 (from sagemaker-core<2.0.0,>=1.0.0->sagemaker)
  Using cached mock-4.0.3-py3-none-any.whl.metadata (2.8 kB)
Using cached sagemaker-2.231.0-py3-none-any.whl (1.6 MB)
Using cached sagemaker_core-1.0.1-py3-none-any.whl (375 kB)
Using cached platformdirs-4.2.2-py3-none-any.whl (18 kB)
Using cached mock-4.0.3-py3-none-any.whl (28 kB)
Installing collected packages: platformdirs, mock, sagemaker-core, sagemaker
  Attempting uninstall: platformdirs
    Found existing installation: platformdirs 3.11.0
    Uninstalling platformdirs-3.11.0:
      Successfully uninstalled platformdirs-3.11.0
  Attempting uninstall: mock
    Found existing installation: m

In [2]:
from datetime import datetime
import sagemaker
import os
print(sagemaker.__version__)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml
2.231.0


In [3]:
sess = sagemaker.session.Session()

bucket = sess.default_bucket() # use default bucket to store data and model. you can change this to other buckets
model_dir = 's3://{}/tf2-ps-ckpt/{}'.format(bucket, datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))
output_path= 's3://{}/model-output-tf2'.format(bucket)

## Prepare training, validation data

### Generate sample training and validation dataset

In [4]:
import tensorflow as tf
import numpy as np


def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value] if isinstance(value, float) else value))


def create_example(x, y):
    feature = {
        'x': _float_feature(x.flatten().tolist()),  # Convert to Python list
        'y': _float_feature(float(y))  # Convert to Python float
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))


def write_tfrecord(filename, num_examples):
    with tf.io.TFRecordWriter(filename) as writer:
        for _ in range(num_examples):
            x = np.random.uniform(size=(10, 10)).astype(np.float32)
            y = np.random.uniform()  # This returns a scalar
            tf_example = create_example(x, y)
            writer.write(tf_example.SerializeToString())
    print(f"TFRecord file '{filename}' has been created with {num_examples} examples.")


# Create train.tfrecord with 64*100 examples
write_tfrecord('train.tfrecord', 64*100)

2024-09-03 08:02:11.509853: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-03 08:02:11.509906: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-03 08:02:11.509930: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


TFRecord file 'train.tfrecord' has been created with 6400 examples.


In [5]:
training_data = f's3://{bucket}/data/train/'

os.system(f'aws s3 cp train.tfrecord {training_data}train_1.tfrecord')
os.system(f'aws s3 cp train.tfrecord {training_data}train_2.tfrecord')

upload: ./train.tfrecord to s3://sagemaker-us-east-1-022346938362/data/train/train_1.tfrecord
upload: ./train.tfrecord to s3://sagemaker-us-east-1-022346938362/data/train/train_2.tfrecord


0

## 使用 EFS 文件系统保存模型

In [6]:

efs_fs = sagemaker.inputs.FileSystemInput(
    file_system_id = 'fs-7df3f288',
    file_system_type='EFS', # FSxLustre
    directory_path='/', # Absolute or normalized path to the root directory (mount point) in the file system
    file_system_access_mode='rw', # defaults to 'ro'
)


In [7]:
import sagemaker
from sagemaker.tensorflow.estimator import TensorFlow
from datetime import datetime
import os

train_instance_type = 'ml.m5.4xlarge'
train_instance_count = 2

train_use_spot_instances = False

train_max_run = 36000*2
train_max_wait = 72000 if train_use_spot_instances else None

distributions = {'parameter_server': {'enabled': True}}

base_job_name='tf2-ps'

estimator = TensorFlow(
    source_dir='./code',
    entry_point='sm-tf2-train.py',

    # S3 location where the checkpoint data and models can be exported to during training. 
    # It will be passed in the training script as one of the command line arguments. 
    model_dir=model_dir,

    # S3 location for saving the training result (model artifacts and output files). 
    # If not specified, results are stored to a default bucket.
    output_path=output_path,
    instance_type=train_instance_type,
    instance_count=train_instance_count,
    #volume_size = 500,
    hyperparameters={},
    role=sagemaker.get_execution_role(),
    base_job_name=base_job_name,
    framework_version='2.14',
    py_version='py310',
    distribution=distributions,
    use_spot_instances=train_use_spot_instances,
    max_wait=train_max_wait,
    max_run=train_max_run,

    subnets = [
        'subnet-011da1e5fcad41e1c'
    ],
    security_group_ids = [
        'sg-0cc9803aec026eeb1',
    ],

    debugger_hook_config=False, # Configuration for how debugging information is emitted with SageMaker Debugger. 
                                # If not specified, a default one is created using the estimator’s output_path, unless the region does not support SageMaker Debugger. 
                                # To disable SageMaker Debugger, set this parameter to False.
    disable_profiler=True,  # Specifies whether Debugger monitoring and profiling will be disabled
    keep_alive_period_in_seconds=600,
    enable_remote_debug=True,
)

In [8]:
from sagemaker.inputs import TrainingInput

train_input = TrainingInput(training_data)

inputs = {"training":train_input, "model_output": efs_fs}

estimator.fit(inputs)

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker:Creating training-job with name: tf2-ps-2024-09-03-08-02-14-837


2024-09-03 08:02:16 Starting - Starting the training job...
2024-09-03 08:02:32 Starting - Preparing the instances for training...
2024-09-03 08:03:08 Downloading - Downloading input data...
2024-09-03 08:03:33 Downloading - Downloading the training image...
  "cipher": algorithms.TripleDES,[0m
  "class": algorithms.TripleDES,[0m
[35m2024-09-03 08:04:30.377228: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.[0m
[35mTo enable the following instructions: AVX512F, in other operations, rebuild TensorFlow with the appropriate compiler flags.[0m
[35m2024-09-03 08:04:32,631 sagemaker-training-toolkit INFO     Imported framework sagemaker_tensorflow_container.training[0m
[35m2024-09-03 08:04:32,632 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[35m2024-09-03 08:04:32,633 sagemaker-training-toolkit INFO     No Neurons detected (normal 