Skip to content

Commit

Permalink
Use the SageMaker Python SDK for local MNIST integration tests (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurenyu committed Apr 9, 2019
1 parent 2aa9a08 commit d126049
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 90 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -27,3 +27,4 @@ venv/
.pytest_cache/
*.swp
*.whl
test/resources/local_mode_lock
3 changes: 2 additions & 1 deletion src/sagemaker_chainer_container/training.py
@@ -1,4 +1,4 @@
# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
Expand Down Expand Up @@ -203,6 +203,7 @@ def _get_mpi_command(env, hyperparameters):
value = json.dumps(env_dict)

if not name == 'SM_FRAMEWORK_PARAMS':
# TODO: preserve JSON encoding for environment variable values
mpi_command += ' -x {}="{}"'.format(name, value)

mpi_command += ' {} '.format(additional_mpi_options) + ' {}'.format(_MPI_SCRIPT)
Expand Down
13 changes: 10 additions & 3 deletions test/conftest.py
Expand Up @@ -20,7 +20,7 @@

import boto3
import pytest
from sagemaker import Session
from sagemaker import LocalSession, Session
from sagemaker.chainer import Chainer

from test.utils import local_mode
Expand Down Expand Up @@ -112,8 +112,10 @@ def fixture_aws_id(request):


@pytest.fixture(scope='session', name='instance_type')
def fixture_instance_type(request):
return request.config.getoption('--instance-type')
def fixture_instance_type(request, processor):
provided_instance_type = request.config.getoption('--instance-type')
default_instance_type = 'local' if processor == 'cpu' else 'local_gpu'
return provided_instance_type or default_instance_type


@pytest.fixture(scope='session', name='docker_registry')
Expand All @@ -131,6 +133,11 @@ def fixture_sagemaker_session(region):
return Session(boto_session=boto3.Session(region_name=region))


@pytest.fixture(scope='session', name='sagemaker_local_session')
def fixture_sagemaker_local_session(region):
return LocalSession(boto_session=boto3.Session(region_name=region))


@pytest.fixture(scope='session', autouse=True, name='build_base_image')
def fixture_build_base_image(request, framework_version, processor, tag, docker_base_name):
build_base_image = request.config.getoption('--build-base-image')
Expand Down
186 changes: 110 additions & 76 deletions test/integration/local/test_mnist.py
@@ -1,4 +1,4 @@
# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
Expand All @@ -16,109 +16,143 @@

import numpy as np
import pytest
from sagemaker.chainer import Chainer
from sagemaker.predictor import csv_deserializer, csv_serializer, json_deserializer, json_serializer

from test.utils import local_mode, test_utils
from test.utils import test_utils

path = os.path.dirname(os.path.realpath(__file__))
mnist_path = os.path.join(path, '..', '..', 'resources', 'mnist')
data_dir = os.path.join(mnist_path, 'data')


def test_chainer_mnist_single_machine(docker_image, opt_ml, use_gpu):

def test_chainer_mnist_single_machine(docker_image, sagemaker_local_session, instance_type, tmpdir):
customer_script = 'single_machine_customer_script.py'
hyperparameters = {'batch-size': 10000, 'epochs': 1}

local_mode.train(customer_script, data_dir, docker_image, opt_ml,
hyperparameters=hyperparameters, source_dir=mnist_path, use_gpu=use_gpu)

files = ['model/model.npz', 'output/success', 'output/data/accuracy.png',
'output/data/cg.dot', 'output/data/log', 'output/data/loss.png']

test_utils.files_exist(opt_ml, files)

assert not local_mode.file_exists(opt_ml, 'output/failure'), 'Failure happened'

script_path = os.path.join(mnist_path, customer_script)

with local_mode.serve(script_path, model_dir=None, image_name=docker_image,
opt_ml=opt_ml, use_gpu=use_gpu, source_dir=mnist_path):

test_arrays = [np.zeros((100, 784), dtype='float32'),
np.zeros((100, 1, 28, 28), dtype='float32'),
np.zeros((100, 28, 28), dtype='float32')]

request_data = np.zeros((100, 784), dtype='float32')

data_as_list = request_data.tolist()

test_utils.predict_and_assert_response_length(data_as_list, 'text/csv')

for array in test_arrays:
# JSON and NPY can take multidimensional (n > 2) arrays
data_as_list = array.tolist()
test_utils.predict_and_assert_response_length(data_as_list, 'application/json')
test_utils.predict_and_assert_response_length(request_data, 'application/x-npy')


def test_chainer_mnist_custom_loop(docker_image, opt_ml, use_gpu):

estimator = Chainer(entry_point=customer_script,
source_dir=mnist_path,
role='SageMakerRole',
image_name=docker_image,
train_instance_count=1,
train_instance_type=instance_type,
sagemaker_session=sagemaker_local_session,
hyperparameters=hyperparameters,
output_path='file://{}'.format(tmpdir))

estimator.fit({'train': 'file://{}'.format(os.path.join(data_dir, 'train')),
'test': 'file://{}'.format(os.path.join(data_dir, 'test'))})

success_files = {
'model': ['model.npz'],
'output': ['success', 'data/accuracy.png', 'data/cg.dot', 'data/log', 'data/loss.png'],
}
test_utils.files_exist(str(tmpdir), success_files)

request_data = np.zeros((100, 784), dtype='float32')

test_utils.predict_and_assert_response_length(estimator, request_data, instance_type)
test_utils.predict_and_assert_response_length(estimator, request_data, instance_type,
csv_serializer, csv_deserializer, 'text/csv')

test_arrays = [np.zeros((100, 784), dtype='float32'),
np.zeros((100, 1, 28, 28), dtype='float32'),
np.zeros((100, 28, 28), dtype='float32')]

with test_utils.local_mode_lock():
try:
predictor = _json_predictor(estimator, instance_type)
for array in test_arrays:
response = predictor.predict(array)
assert len(response) == len(array)
finally:
predictor.delete_endpoint()


def test_chainer_mnist_custom_loop(docker_image, sagemaker_local_session, instance_type, tmpdir):
customer_script = 'single_machine_custom_loop.py'
hyperparameters = {'batch-size': 10000, 'epochs': 1}

local_mode.train(customer_script, data_dir, docker_image, opt_ml,
hyperparameters=hyperparameters, source_dir=mnist_path, use_gpu=use_gpu)

files = ['model/model.npz', 'output/success']

test_utils.files_exist(opt_ml, files)

assert not local_mode.file_exists(opt_ml, 'output/failure'), 'Failure happened'
estimator = Chainer(entry_point=customer_script,
source_dir=mnist_path,
role='SageMakerRole',
image_name=docker_image,
train_instance_count=1,
train_instance_type=instance_type,
sagemaker_session=sagemaker_local_session,
hyperparameters=hyperparameters,
output_path='file://{}'.format(tmpdir))

script_path = os.path.join(mnist_path, customer_script)
estimator.fit({'train': 'file://{}'.format(os.path.join(data_dir, 'train')),
'test': 'file://{}'.format(os.path.join(data_dir, 'test'))})

with local_mode.serve(script_path, model_dir=None, image_name=docker_image, opt_ml=opt_ml):
success_files = {
'model': ['model.npz'],
'output': ['success'],
}

request_data = np.zeros((100, 784), dtype='float32')
test_utils.files_exist(str(tmpdir), success_files)

data_as_list = request_data.tolist()
request_data = np.zeros((100, 784), dtype='float32')

test_utils.predict_and_assert_response_length(data_as_list, 'application/json')
test_utils.predict_and_assert_response_length(data_as_list, 'text/csv')
test_utils.predict_and_assert_response_length(request_data, 'application/x-npy')
test_utils.predict_and_assert_response_length(estimator, request_data, instance_type)
test_utils.predict_and_assert_response_length(estimator, request_data, instance_type,
json_serializer, json_deserializer,
'application/json')
test_utils.predict_and_assert_response_length(estimator, request_data, instance_type,
csv_serializer, csv_deserializer, 'text/csv')


@pytest.mark.parametrize('customer_script',
['distributed_customer_script.py',
'distributed_customer_script_with_env_vars.py'])
def test_chainer_mnist_distributed(docker_image, opt_ml, use_gpu, customer_script):
def test_chainer_mnist_distributed(docker_image, sagemaker_local_session, instance_type,
customer_script, tmpdir):
if instance_type == 'local_gpu':
pytest.skip('Local Mode does not support distributed GPU training.')

cluster_size = 2
# pure_nccl communicator hangs when only one gpu is available.
cluster_size = 2
hyperparameters = {'sagemaker_process_slots_per_host': 1,
'sagemaker_num_processes': cluster_size,
'batch-size': 10000,
'epochs': 1,
'communicator': 'hierarchical'}

local_mode.train(customer_script, data_dir, docker_image, opt_ml,
hyperparameters=hyperparameters,
cluster_size=cluster_size, source_dir=mnist_path, use_gpu=use_gpu)

files = ['model/model.npz', 'output/success', 'output/data/accuracy.png',
'output/data/cg.dot', 'output/data/log', 'output/data/loss.png']

test_utils.files_exist(opt_ml, files)

assert not local_mode.file_exists(opt_ml, 'output/failure'), 'Failure happened'

with local_mode.serve(os.path.join(mnist_path, customer_script), model_dir=None,
image_name=docker_image, opt_ml=opt_ml):

request_data = np.zeros((100, 784), dtype='float32')

data_as_list = request_data.tolist()

test_utils.predict_and_assert_response_length(data_as_list, 'application/json')
test_utils.predict_and_assert_response_length(data_as_list, 'text/csv')
test_utils.predict_and_assert_response_length(request_data, 'application/x-npy')
estimator = Chainer(entry_point=customer_script,
source_dir=mnist_path,
role='SageMakerRole',
image_name=docker_image,
train_instance_count=cluster_size,
train_instance_type=instance_type,
sagemaker_session=sagemaker_local_session,
hyperparameters=hyperparameters,
output_path='file://{}'.format(tmpdir))

estimator.fit({'train': 'file://{}'.format(os.path.join(data_dir, 'train')),
'test': 'file://{}'.format(os.path.join(data_dir, 'test'))})

success_files = {
'model': ['model.npz'],
'output': ['success', 'data/accuracy.png', 'data/cg.dot', 'data/log', 'data/loss.png'],
}

test_utils.files_exist(str(tmpdir), success_files)

request_data = np.zeros((100, 784), dtype='float32')

test_utils.predict_and_assert_response_length(estimator, request_data, instance_type)
test_utils.predict_and_assert_response_length(estimator, request_data, instance_type,
json_serializer, json_deserializer,
'application/json')
test_utils.predict_and_assert_response_length(estimator, request_data, instance_type,
csv_serializer, csv_deserializer, 'text/csv')


def _json_predictor(estimator, instance_type):
predictor = estimator.deploy(1, instance_type)
predictor.content_type = 'application/json'
predictor.serializer = json_serializer
predictor.accept = 'application/json'
predictor.deserializer = json_deserializer
return predictor
2 changes: 1 addition & 1 deletion test/resources/mnist/distributed_customer_script.py
Expand Up @@ -166,7 +166,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
trainer.run()

# only save the model in the master node
if args.host == 'algo-1':
if args.host == str(env.hosts[0]):
serializers.save_npz(os.path.join(env.model_dir, 'model.npz'), model)


Expand Down
Expand Up @@ -162,8 +162,13 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb

trainer.run()

# the MPI command makes the SM_HOSTS variable an invalid JSON
hosts = os.environ['SM_HOSTS']
master_node = hosts[1:hosts.find(',')]

# only save the model in the master node
if args.host == 'algo-1':
if args.host == master_node:
print('saving model')
serializers.save_npz(os.path.join(os.environ['SM_MODEL_DIR'], 'model.npz'), model)


Expand Down
53 changes: 45 additions & 8 deletions test/utils/test_utils.py
@@ -1,4 +1,4 @@
# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
Expand All @@ -12,14 +12,51 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import test.utils.local_mode as localmode
from contextlib import contextmanager
import fcntl
import os
import tarfile
import time


def files_exist(opt_ml, files):
for f in files:
assert localmode.file_exists(opt_ml, f), 'file {} was not created'.format(f)
def predict_and_assert_response_length(estimator, data, instance_type, serializer=None,
deserializer=None, content_type='application/x-npy'):
with local_mode_lock():
try:
predictor = estimator.deploy(1, instance_type)
if content_type != 'application/x-npy':
predictor.content_type = content_type
predictor.serializer = serializer
predictor.accept = content_type
predictor.deserializer = deserializer

response = predictor.predict(data)
assert len(response) == len(data)
finally:
predictor.delete_endpoint()

def predict_and_assert_response_length(data, content_type):
predict_response = localmode.request(data, content_type=content_type)
assert len(predict_response) == len(data)

def files_exist(output_path, directory_file_map):
for directory, files in directory_file_map.items():
with tarfile.open(os.path.join(output_path, '{}.tar.gz'.format(directory))) as tar:
for f in files:
tar.getmember(f)


@contextmanager
def local_mode_lock():
base_path = os.path.dirname(os.path.realpath(__file__))
lock_path = os.path.join(base_path, '..', 'resources', 'local_mode_lock')

# Since Local Mode uses the same port for serving, we need a lock in order
# to allow concurrent test execution.
local_mode_lock_fd = open(lock_path, 'w')
local_mode_lock = local_mode_lock_fd.fileno()

fcntl.lockf(local_mode_lock, fcntl.LOCK_EX)

try:
yield
finally:
time.sleep(5)
fcntl.lockf(local_mode_lock, fcntl.LOCK_UN)

0 comments on commit d126049

Please sign in to comment.