# DeepFM Tensorflow Parameter Server on SageMaker Sample

### In this sample, we will demo how to run a deepfm sample code in tensorflow parameter server on sagemaker

Notice:

1. Dataset format is TFRecord

2. This model training we will use **CPU** instances based on our experience, DeepFM script TF PS on CPU will more effective and saving cost. 

3. Using [SageMaker Python SDK 2.x](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)
4. TensorFlow version is 1.14 or 1.15.2

In [None]:
import sagemaker
print(sagemaker.__version__)

In [None]:
sess = sagemaker.session.Session()

bucket = sess.default_bucket() # use default bucket to store data and model. you can change this to other buckets
checkpoint_s3_uri = 's3://{}/deepfm-checkpoint'.format(bucket) #Change to your own path if you want to save ckpt during training
checkpoint_local_path = '/opt/ml/checkpoints'
model_dir = 's3://{}/deepfm-ps-ckpt/{}'.format(bucket, datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))
output_path= 's3://{}/deepfm-2021'.format(bucket)

## Prepare training, validation data

In [None]:
training_data = f's3://{bucket}/data/train/'

os.system(f'aws s3 cp ../data/train.tfrecords {training_data}train_1.tfrecords')
os.system(f'aws s3 cp ../data/train.tfrecords {training_data}train_2.tfrecords')

In [None]:
validation_data = f's3://{bucket}/data/validation/'

os.system(f'aws s3 cp ../data/val.tfrecords {training_data}val_1.tfrecords')

## File mode

In [None]:
import sagemaker
from sagemaker.tensorflow.estimator import TensorFlow
from datetime import datetime
import os

training_channel_name = 'training'
evaluation_channel_name = 'evaluation'

train_instance_type = 'ml.c5.4xlarge'
train_instance_count = 2

train_use_spot_instances = True
enable_s3_shard = True

train_max_run = 36000*2
train_max_wait = 72000 if train_use_spot_instances else None

distributions = {'parameter_server': {'enabled': True}}

deep_layer = '128,64,32'

batch_size = 1024
feature_size = 117581

base_job_name='tf-scriptmode-deepfm'

hyperparameters = {
    'servable_model_dir': '/opt/ml/model', 'training_data_dir': '/opt/ml/input/data/training/',
    'val_data_dir': '/opt/ml/input/data/evaluation/', 'log_steps': 10, 'num_epochs': 10, 
    'field_size': 39, 'feature_size': feature_size, 'deep_layers': deep_layer,
    'perform_shuffle': 0, 'batch_size': batch_size, 'pipe_mode': 0, 'enable_s3_shard': enable_s3_shard,
    'training_channel_name': training_channel_name, 'evaluation_channel_name': evaluation_channel_name
}

estimator = TensorFlow(
    #source_dir='./',
    entry_point='DeepFM-dist-ps-for-multipleCPU-multiInstance.py',
    
    # S3 location where the checkpoint data and models can be exported to during training. 
    # It will be passed in the training script as one of the command line arguments. 
    model_dir=model_dir,
    
    # The S3 URI in which to persist checkpoints that the algorithm persists (if any) during training.
    #checkpoint_s3_uri = checkpoint_s3_uri,

    # The local path that the algorithm writes its checkpoints to. 
    # SageMaker will persist all files under this path to checkpoint_s3_uri continually during training. 
    # On job startup the reverse happens - data from the s3 location is downloaded to this path before the algorithm is started. 
    # If the path is unset then SageMaker assumes the checkpoints will be provided under /opt/ml/checkpoints/
    #checkpoint_local_path = checkpoint_local_path,

    # S3 location for saving the training result (model artifacts and output files). 
    # If not specified, results are stored to a default bucket.
    output_path=output_path,
    instance_type=train_instance_type,
    instance_count=train_instance_count,
    #volume_size = 500,
    hyperparameters=hyperparameters,
    role=sagemaker.get_execution_role(),
    base_job_name=base_job_name,
    framework_version='1.15.2',
    py_version='py3',
    #input_mode='Pipe',
    distribution=distributions,
    use_spot_instances=train_use_spot_instances,
    max_wait=train_max_wait,
    max_run=train_max_run,
    debugger_hook_config=False, # Configuration for how debugging information is emitted with SageMaker Debugger. 
                                # If not specified, a default one is created using the estimator’s output_path, unless the region does not support SageMaker Debugger. 
                                # To disable SageMaker Debugger, set this parameter to False.
    disable_profiler=True,  # Specifies whether Debugger monitoring and profiling will be disabled
)

In [None]:
# File mode
from sagemaker.inputs import TrainingInput

train_s3_uri = training_data # Path to training data
validate_s3_uri = validation_data # Path to validation data

if enable_s3_shard:
    train_input = TrainingInput(train_s3_uri, distribution='ShardedByS3Key')
    val_input = TrainingInput(validate_s3_uri)
else:
    train_input = TrainingInput(train_s3_uri)
    val_input = TrainingInput(validate_s3_uri)

inputs = {training_channel_name:train_input, evaluation_channel_name:val_input}

estimator.fit(inputs)

## Pipe mode

In [None]:
import sagemaker
from sagemaker.tensorflow.estimator import TensorFlow
from datetime import datetime
import os

training_channel_name = 'training'
evaluation_channel_name = 'evaluation'

train_instance_type = 'ml.c5.18xlarge'
train_instance_count = 2

train_use_spot_instances = True
enable_s3_shard = True

train_max_run = 36000*2
train_max_wait = 72000 if train_use_spot_instances else None

distributions = {'parameter_server': {'enabled': True}}

deep_layer = '128,64,32'

batch_size = 1024
feature_size = 117581

base_job_name='tf-scriptmode-deepfm'

hyperparameters = {
    'servable_model_dir': '/opt/ml/model', 'training_data_dir': '/opt/ml/input/data/training/',
    'val_data_dir': '/opt/ml/input/data/evaluation/', 'log_steps': 10, 'num_epochs': 10, 
    'field_size': 39, 'feature_size': feature_size, 'deep_layers': deep_layer,
    'perform_shuffle': 0, 'batch_size': batch_size, 'pipe_mode': 1, 'enable_s3_shard': enable_s3_shard,
    'training_channel_name': training_channel_name, 'evaluation_channel_name': evaluation_channel_name
}

estimator = TensorFlow(
    #source_dir='./',
    entry_point='DeepFM-dist-ps-for-multipleCPU-multiInstance.py',
    model_dir=model_dir,
    #checkpoint_s3_uri = checkpoint_s3_uri,
    #checkpoint_local_path = checkpoint_local_path,
    output_path= output_path,
    instance_type=train_instance_type,
    instance_count=train_instance_count,
    #volume_size = 500,
    hyperparameters=hyperparameters,
    role=sagemaker.get_execution_role(),
    base_job_name=base_job_name,
    framework_version='1.14',
    py_version='py3',
    input_mode='Pipe',
    distribution=distributions,
    use_spot_instances=train_use_spot_instances,
    max_wait=train_max_wait,
    max_run=train_max_run,
    debugger_hook_config =False,
    disable_profiler=True
)

In [None]:
from sagemaker.inputs import TrainingInput

train_s3_uri = ''# Path to training data
validate_s3_uri = '' # Path to validation data

if enable_s3_shard:
    train_input = TrainingInput(train_s3_uri, distribution='ShardedByS3Key')
    val_input = TrainingInput(validate_s3_uri)
else :
    train_input = TrainingInput(train_s3_uri)
    val_input = TrainingInput(validate_s3_uri)

inputs = {training_channel_name:train_input, evaluation_channel_name:val_input}

estimator.fit(inputs)