Skip to content
9 changes: 3 additions & 6 deletions tests/data/chainer_mnist/distributed_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __call__(self, x):


def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb_format):
images = raw["x"]
images = raw["x"][-100:]
if ndim == 2:
images = images.reshape(-1, 28, 28)
elif ndim == 3:
Expand All @@ -59,7 +59,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
images *= scale / 255.0

if withlabel:
labels = raw["y"].astype(label_dtype)
labels = raw["y"][-100:].astype(label_dtype)
return tuple_dataset.TupleDataset(images, labels)
return images

Expand Down Expand Up @@ -111,9 +111,6 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
optimizer = chainermn.create_multi_node_optimizer(chainer.optimizers.Adam(), comm)
optimizer.setup(model)

train_file = np.load(os.path.join(args.train, "train.npz"))
test_file = np.load(os.path.join(args.test, "test.npz"))

preprocess_mnist_options = {
"withlabel": True,
"ndim": 1,
Expand Down Expand Up @@ -173,7 +170,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
trainer.run()

# only save the model in the master node
if args.host == "algo-1":
if args.host == env.hosts[0]:
serializers.save_npz(os.path.join(env.model_dir, "model.npz"), model)


Expand Down
4 changes: 2 additions & 2 deletions tests/data/chainer_mnist/mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __call__(self, x):


def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb_format):
images = raw["x"]
images = raw["x"][-100:]
if ndim == 2:
images = images.reshape(-1, 28, 28)
elif ndim == 3:
Expand All @@ -55,7 +55,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
images *= scale / 255.0

if withlabel:
labels = raw["y"].astype(label_dtype)
labels = raw["y"][-100:].astype(label_dtype)
return tuple_dataset.TupleDataset(images, labels)
else:
return images
Expand Down
174 changes: 80 additions & 94 deletions tests/integ/test_chainer_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,152 +13,138 @@
from __future__ import absolute_import

import os
import time

import pytest
import numpy
import pytest

from sagemaker.chainer.defaults import CHAINER_VERSION
from sagemaker.chainer.estimator import Chainer
from sagemaker.chainer.model import ChainerModel
from sagemaker.utils import unique_name_from_base
import tests.integ
from tests.integ import DATA_DIR, PYTHON_VERSION, TRAINING_DEFAULT_TIMEOUT_MINUTES
from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name


@pytest.fixture(scope="module")
def chainer_training_job(sagemaker_session, chainer_full_version):
return _run_mnist_training_job(sagemaker_session, "ml.c4.xlarge", 1, chainer_full_version)
def chainer_local_training_job(sagemaker_local_session, chainer_full_version):
return _run_mnist_training_job(sagemaker_local_session, "local", 1, chainer_full_version)


@pytest.mark.local_mode
def test_distributed_cpu_training(sagemaker_local_session, chainer_full_version):
_run_mnist_training_job(sagemaker_local_session, "local", 2, chainer_full_version)


def test_distributed_cpu_training(sagemaker_session, chainer_full_version):
_run_mnist_training_job(sagemaker_session, "ml.c4.xlarge", 2, chainer_full_version)
@pytest.mark.local_mode
def test_training_with_additional_hyperparameters(sagemaker_local_session, chainer_full_version):
script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")
data_path = os.path.join(DATA_DIR, "chainer_mnist")

chainer = Chainer(
entry_point=script_path,
role="SageMakerRole",
train_instance_count=1,
train_instance_type="local",
framework_version=chainer_full_version,
py_version=PYTHON_VERSION,
sagemaker_session=sagemaker_local_session,
hyperparameters={"epochs": 1},
use_mpi=True,
num_processes=2,
process_slots_per_host=2,
additional_mpi_options="-x NCCL_DEBUG=INFO",
)

@pytest.mark.skipif(
tests.integ.test_region() in tests.integ.HOSTING_NO_P2_REGIONS
or tests.integ.test_region() in tests.integ.TRAINING_NO_P2_REGIONS,
reason="no ml.p2 instances in these regions",
)
def test_distributed_gpu_training(sagemaker_session, chainer_full_version):
_run_mnist_training_job(sagemaker_session, "ml.p2.xlarge", 2, chainer_full_version)
train_input = "file://" + os.path.join(data_path, "train")
test_input = "file://" + os.path.join(data_path, "test")

chainer.fit({"train": train_input, "test": test_input})

def test_training_with_additional_hyperparameters(sagemaker_session, chainer_full_version):

@pytest.mark.canary_quick
@pytest.mark.regional_testing
def test_attach_deploy(sagemaker_session, chainer_full_version):
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")
data_path = os.path.join(DATA_DIR, "chainer_mnist")

chainer = Chainer(
entry_point=script_path,
role="SageMakerRole",
train_instance_count=1,
train_instance_type="ml.c4.xlarge",
framework_version=chainer_full_version,
py_version=PYTHON_VERSION,
train_instance_count=1,
train_instance_type="ml.c4.xlarge",
sagemaker_session=sagemaker_session,
hyperparameters={"epochs": 1},
use_mpi=True,
num_processes=2,
process_slots_per_host=2,
additional_mpi_options="-x NCCL_DEBUG=INFO",
)

train_input = chainer.sagemaker_session.upload_data(
train_input = sagemaker_session.upload_data(
path=os.path.join(data_path, "train"), key_prefix="integ-test-data/chainer_mnist/train"
)
test_input = chainer.sagemaker_session.upload_data(

test_input = sagemaker_session.upload_data(
path=os.path.join(data_path, "test"), key_prefix="integ-test-data/chainer_mnist/test"
)

job_name = unique_name_from_base("test-chainer-training")
chainer.fit({"train": train_input, "test": test_input}, job_name=job_name)
return chainer.latest_training_job.name

chainer.fit({"train": train_input, "test": test_input}, wait=False, job_name=job_name)

@pytest.mark.canary_quick
@pytest.mark.regional_testing
def test_attach_deploy(chainer_training_job, sagemaker_session):
endpoint_name = unique_name_from_base("test-chainer-attach-deploy")

with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
estimator = Chainer.attach(chainer_training_job, sagemaker_session=sagemaker_session)
estimator = Chainer.attach(
chainer.latest_training_job.name, sagemaker_session=sagemaker_session
)
predictor = estimator.deploy(1, "ml.m4.xlarge", endpoint_name=endpoint_name)
_predict_and_assert(predictor)


def test_deploy_model(chainer_training_job, sagemaker_session):
endpoint_name = unique_name_from_base("test-chainer-deploy-model")
with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
desc = sagemaker_session.sagemaker_client.describe_training_job(
TrainingJobName=chainer_training_job
)
model_data = desc["ModelArtifacts"]["S3ModelArtifacts"]
script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")
model = ChainerModel(
model_data,
"SageMakerRole",
entry_point=script_path,
sagemaker_session=sagemaker_session,
)
predictor = model.deploy(1, "ml.m4.xlarge", endpoint_name=endpoint_name)
_predict_and_assert(predictor)

@pytest.mark.local_mode
def test_deploy_model(chainer_local_training_job, sagemaker_local_session):
script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")

def test_async_fit(sagemaker_session):
with timeout(minutes=5):
training_job_name = _run_mnist_training_job(
sagemaker_session, "ml.c4.xlarge", 1, chainer_full_version=CHAINER_VERSION, wait=False
)
model = ChainerModel(
chainer_local_training_job.model_data,
"SageMakerRole",
entry_point=script_path,
sagemaker_session=sagemaker_local_session,
)

print("Waiting to re-attach to the training job: %s" % training_job_name)
time.sleep(20)

endpoint_name = unique_name_from_base("test-chainer-async-fit")
with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
print("Re-attaching now to: %s" % training_job_name)
estimator = Chainer.attach(
training_job_name=training_job_name, sagemaker_session=sagemaker_session
)
predictor = estimator.deploy(1, "ml.c4.xlarge", endpoint_name=endpoint_name)
predictor = model.deploy(1, "local")
try:
_predict_and_assert(predictor)
finally:
predictor.delete_endpoint()


def _run_mnist_training_job(
sagemaker_session, instance_type, instance_count, chainer_full_version, wait=True
):
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):

script_path = (
os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")
if instance_type == 1
else os.path.join(DATA_DIR, "chainer_mnist", "distributed_mnist.py")
)

data_path = os.path.join(DATA_DIR, "chainer_mnist")

chainer = Chainer(
entry_point=script_path,
role="SageMakerRole",
framework_version=chainer_full_version,
py_version=PYTHON_VERSION,
train_instance_count=instance_count,
train_instance_type=instance_type,
sagemaker_session=sagemaker_session,
hyperparameters={"epochs": 1},
)

train_input = chainer.sagemaker_session.upload_data(
path=os.path.join(data_path, "train"), key_prefix="integ-test-data/chainer_mnist/train"
)
test_input = chainer.sagemaker_session.upload_data(
path=os.path.join(data_path, "test"), key_prefix="integ-test-data/chainer_mnist/test"
)

job_name = unique_name_from_base("test-chainer-training")
chainer.fit({"train": train_input, "test": test_input}, wait=wait, job_name=job_name)
return chainer.latest_training_job.name
script_path = (
os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")
if instance_type == 1
else os.path.join(DATA_DIR, "chainer_mnist", "distributed_mnist.py")
)

data_path = os.path.join(DATA_DIR, "chainer_mnist")

chainer = Chainer(
entry_point=script_path,
role="SageMakerRole",
framework_version=chainer_full_version,
py_version=PYTHON_VERSION,
train_instance_count=instance_count,
train_instance_type=instance_type,
sagemaker_session=sagemaker_session,
hyperparameters={"epochs": 1},
)

train_input = "file://" + os.path.join(data_path, "train")
test_input = "file://" + os.path.join(data_path, "test")

job_name = unique_name_from_base("test-chainer-training")
chainer.fit({"train": train_input, "test": test_input}, wait=wait, job_name=job_name)
return chainer


def _predict_and_assert(predictor):
Expand Down