# DeepFM Tensorflow Parameter Server on SageMaker Sample

### In this sample, we will demo how to run a deepfm sample code in tensorflow parameter server on sagemaker

Notice:

1. Dataset format is TFRecord

2. This model training we will use **CPU** instances based on our experience, DeepFM script TF PS on CPU will more effective and saving cost. 

3. Using [SageMaker Python SDK 2.x](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)

In [None]:
import time

int(time.time())

## File mode

In [8]:
#下面用多个spot实例进行parameter server方式的分布式训练。
import sagemaker
from sagemaker.tensorflow.estimator import TensorFlow
import time
import os

checkpoint_s3_uri = 's3://sagemaker-us-west-2-169088282855/deepfm-checkpoint' #Change to your own path if you want to save ckpt during training
checkpoint_local_path = '/opt/ml/checkpoints'
model_dir = 's3://sagemaker-us-west-2-169088282855/deepfm-ps-ckpt/{}'.format(int(time.time()))
output_path= 's3://sagemaker-us-west-2-169088282855/deepfm-2021'

training_channel_name = 'training'
evaluation_channel_name = 'evaluation'

train_instance_type = 'ml.c5.18xlarge'
train_instance_count= 2

train_use_spot_instances = True
enable_s3_shard = True

train_max_run=36000*2
train_max_wait = 72000 if train_use_spot_instances else None

distributions={'parameter_server': {'enabled': True}}

deep_layer = '128,64,32'

batch_size = 1024
feature_size = 117581

base_job_name='tf-scriptmode-deepfm'

hyperparameters = {'servable_model_dir': '/opt/ml/model', 'training_data_dir': '/opt/ml/input/data/training/',
                   'val_data_dir': '/opt/ml/input/data/evaluation/', 'log_steps': 10, 'num_epochs': 10, 
                   'field_size': 39, 'feature_size': feature_size, 'deep_layers': deep_layer,
                   'perform_shuffle': 0, 'batch_size': batch_size, 'pipe_mode': 0, 'enable_s3_shard': enable_s3_shard,
                   'training_channel_name': training_channel_name, 'evaluation_channel_name': evaluation_channel_name
                  }

estimator = TensorFlow(
                       #source_dir='./',
                       entry_point='DeepFM-dist-ps-for-multipleCPU-multiInstance.py',
                       model_dir=model_dir,
                       #checkpoint_s3_uri = checkpoint_s3_uri,
                       #checkpoint_local_path = checkpoint_local_path,
                       output_path= output_path,
                       instance_type=train_instance_type,
                       instance_count=train_instance_count,
                       #volume_size = 500,
                       hyperparameters=hyperparameters,
                       role=sagemaker.get_execution_role(),
                       base_job_name=base_job_name,
                       framework_version='1.14',
                       py_version='py3',
                       script_mode=True,
                       #input_mode='Pipe',
                       distribution=distributions,
                       use_spot_instances=train_use_spot_instances,
                       max_wait=train_max_wait,
                       max_run=train_max_run,
                       debugger_hook_config = False,
                       disable_profiler=True
                       )

In [None]:
#下面这个测试file mode
from sagemaker.inputs import TrainingInput

train_s3_uri = 's3://sagemaker-us-west-2-169088282855/tf-SM-deepctr-deepfm-sample/data-tfrecord/training/'
validate_s3_uri = 's3://sagemaker-us-west-2-169088282855/tf-SM-deepctr-deepfm-sample/data-tfrecord/val/'

if enable_s3_shard:
    train_input = TrainingInput(train_s3_uri, distribution='ShardedByS3Key')
    val_input = TrainingInput(validate_s3_uri)
else :
    train_input = TrainingInput(train_s3_uri)
    val_input = TrainingInput(validate_s3_uri)

inputs = {training_channel_name : train_input, evaluation_channel_name : val_input}

estimator.fit(inputs)

2021-02-20 14:41:52 Starting - Starting the training job...
2021-02-20 14:41:54 Starting - Launching requested ML instances......
2021-02-20 14:43:01 Starting - Preparing the instances for training......
2021-02-20 14:44:12 Downloading - Downloading input data...
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])[0m
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])[0m
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])[0m
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])[0m
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])[0m
  np_resource = np.dtype([("resource", np.ubyte, 1)])[0m
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])[0m
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])[0m
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])[0m
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])[0m
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])[0m
  np_resource = np.dtype([("resource", np.ubyte, 1)])[0m
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])[0m
  _np_quint8 = np.

[35mW0220 14:45:02.122279 139920681113344 deprecation.py:323] From /usr/local/lib/python3.6/site-packages/tensorflow/python/training/training_util.py:236: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.[0m
[35mInstructions for updating:[0m
[35mUse Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.[0m
[35mW0220 14:45:02.159732 139920681113344 deprecation_wrapper.py:119] From DeepFM-dist-ps-for-multipleCPU-multiInstance.py:81: The name tf.parse_example is deprecated. Please use tf.io.parse_example instead.
[0m
[35mW0220 14:45:02.159915 139920681113344 deprecation_wrapper.py:119] From DeepFM-dist-ps-for-multipleCPU-multiInstance.py:83: The name tf.FixedLenFeature is deprecated. Please use tf.io.FixedLenFeature instead.
[0m
[35mW0220 14:45:02.174629 139920681113344 deprecation.py:323] From DeepFM-dist-ps-for-multipleCPU-multiInstance.py:13

[34mI0220 14:46:07.325784 139959645521664 basic_session_run_hooks.py:260] loss = 0.09591542, step = 9167 (12.621 sec)[0m
[35mI0220 14:46:09.460313 139920681113344 basic_session_run_hooks.py:260] loss = 0.102742985, step = 9217 (6.938 sec)[0m
[35mI0220 14:46:09.740459 139920681113344 basic_session_run_hooks.py:692] global_step/sec: 21.9877[0m
[35mI0220 14:46:14.143340 139920681113344 basic_session_run_hooks.py:692] global_step/sec: 22.7128[0m
[35mI0220 14:46:16.251148 139920681113344 basic_session_run_hooks.py:260] loss = 0.10439545, step = 9370 (6.791 sec)[0m
[35mI0220 14:46:18.606248 139920681113344 basic_session_run_hooks.py:692] global_step/sec: 22.6311[0m
[34mI0220 14:46:20.013451 139959645521664 basic_session_run_hooks.py:260] loss = 0.11969514, step = 9454 (12.688 sec)[0m
[35mI0220 14:46:23.051071 139920681113344 basic_session_run_hooks.py:260] loss = 0.100048274, step = 9524 (6.800 sec)[0m
[35mI0220 14:46:23.053619 139920681113344 basic_session_run_hooks.py:692]

## Pipe mode

In [9]:
#下面用多个spot实例进行parameter server方式的分布式训练。
import sagemaker
from sagemaker.tensorflow.estimator import TensorFlow
import time
import os

checkpoint_s3_uri = 's3://sagemaker-us-west-2-169088282855/deepfm-checkpoint' #Change to your own path if you want to save ckpt during training
checkpoint_local_path = '/opt/ml/checkpoints'
model_dir = 's3://sagemaker-us-west-2-169088282855/deepfm-ps-ckpt/{}'.format(int(time.time()))
output_path= 's3://sagemaker-us-west-2-169088282855/deepfm-2021'

training_channel_name = 'training'
evaluation_channel_name = 'evaluation'

train_instance_type = 'ml.c5.18xlarge'
train_instance_count= 2

train_use_spot_instances = True
enable_s3_shard = True

train_max_run=36000*2
train_max_wait = 72000 if train_use_spot_instances else None

distributions={'parameter_server': {'enabled': True}}

deep_layer = '128,64,32'

batch_size = 1024
feature_size = 117581

base_job_name='tf-scriptmode-deepfm'

hyperparameters = {'servable_model_dir': '/opt/ml/model', 'training_data_dir': '/opt/ml/input/data/training/',
                   'val_data_dir': '/opt/ml/input/data/evaluation/', 'log_steps': 10, 'num_epochs': 10, 
                   'field_size': 39, 'feature_size': feature_size, 'deep_layers': deep_layer,
                   'perform_shuffle': 0, 'batch_size': batch_size, 'pipe_mode': 1, 'enable_s3_shard': enable_s3_shard,
                   'training_channel_name': training_channel_name, 'evaluation_channel_name': evaluation_channel_name
                  }

estimator = TensorFlow(
                       #source_dir='./',
                       entry_point='DeepFM-dist-ps-for-multipleCPU-multiInstance.py',
                       model_dir=model_dir,
                       #checkpoint_s3_uri = checkpoint_s3_uri,
                       #checkpoint_local_path = checkpoint_local_path,
                       output_path= output_path,
                       instance_type=train_instance_type,
                       instance_count=train_instance_count,
                       #volume_size = 500,
                       hyperparameters=hyperparameters,
                       role=sagemaker.get_execution_role(),
                       base_job_name=base_job_name,
                       framework_version='1.15',
                       py_version='py3',
                       script_mode=True,
                       input_mode='Pipe',
                       distribution=distributions,
                       use_spot_instances=train_use_spot_instances,
                       max_wait=train_max_wait,
                       max_run=train_max_run,
                       debugger_hook_config = False,
                       disable_profiler=True
                       )

In [None]:
#下面这个测试pipe mode
from sagemaker.inputs import TrainingInput

train_s3_uri = 's3://sagemaker-us-west-2-169088282855/tf-SM-deepctr-deepfm-sample/data-tfrecord/training/'
validate_s3_uri = 's3://sagemaker-us-west-2-169088282855/tf-SM-deepctr-deepfm-sample/data-tfrecord/val/'

if enable_s3_shard:
    train_input = TrainingInput(train_s3_uri, distribution='ShardedByS3Key')
    val_input = TrainingInput(validate_s3_uri)
else :
    train_input = TrainingInput(train_s3_uri)
    val_input = TrainingInput(validate_s3_uri)

inputs = {training_channel_name : train_input, evaluation_channel_name : val_input}

estimator.fit(inputs)

2021-02-20 10:18:12 Starting - Starting the training job...
2021-02-20 10:18:14 Starting - Launching requested ML instances......
2021-02-20 10:19:24 Starting - Preparing the instances for training.........
2021-02-20 10:21:05 Downloading - Downloading input data
2021-02-20 10:21:05 Training - Downloading the training image...
[0m
[35m2021-02-20 10:21:25,098 sagemaker-training-toolkit INFO     Imported framework sagemaker_tensorflow_container.training[0m
[35m2021-02-20 10:21:25,108 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[35m2021-02-20 10:21:25,449 sagemaker_tensorflow_container.training INFO     Running distributed training job with parameter servers[0m
[35m2021-02-20 10:21:25,449 sagemaker_tensorflow_container.training INFO     Launching parameter server process[0m
[35m2021-02-20 10:21:25,449 sagemaker_tensorflow_container.training INFO     Running distributed training job with parameter servers[0m
[0m
[0m
[0m
[0m
[35m2021

[34m2021-02-20 10:23:44.708785: W tensorflow/core/distributed_runtime/rpc/grpc_worker_service.cc:510] RecvTensor cancelled for 17810557842545804[0m
[0m
[0m
[0m
[0m
[0m
[35mW0220 10:21:29.063510 140180574773504 module_wrapper.py:139] From DeepFM-dist-ps-for-multipleCPU-multiInstance.py:400: The name tf.ConfigProto is deprecated. Please use tf.compat.v1.ConfigProto instead.
[0m
[35mINFO:tensorflow:TF_CONFIG environment variable: {'cluster': {'master': ['algo-1:2222'], 'ps': ['algo-1:2223', 'algo-2:2223'], 'worker': ['algo-2:2222']}, 'environment': 'cloud', 'task': {'index': 0, 'type': 'worker'}}[0m
[35mI0220 10:21:29.063748 140180574773504 run_config.py:535] TF_CONFIG environment variable: {'cluster': {'master': ['algo-1:2222'], 'ps': ['algo-1:2223', 'algo-2:2223'], 'worker': ['algo-2:2222']}, 'environment': 'cloud', 'task': {'index': 0, 'type': 'worker'}}[0m
[35mINFO:tensorflow:Using config: {'_model_dir': 's3://sagemaker-us-west-2-169088282855/deepfm-ps-ckpt', '_tf_random

[35m2021-02-20 10:23:55,868 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:30:05,825 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:30:15,877 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:30:25,929 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:30:35,974 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:30:46,027 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:30:56,080 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:31:06,124 sagemaker_tensorflow_container.training INFO   

[35m2021-02-20 11:40:28,972 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:40:39,024 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:40:49,076 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:40:59,130 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:41:09,182 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:41:19,231 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:41:29,284 sagemaker_tensorflow_container.training INFO     master algo-1 is still up, waiting for it to exit[0m
[35m2021-02-20 11:41:39,336 sagemaker_tensorflow_container.training INFO   

KeyboardInterrupt: 