Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add NotImplementedError for multi optimizers #22181

Merged
merged 3 commits into from
Jan 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions python/paddle/fluid/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@ def _prune_feed_ops(program):
program.global_block()._remove_op(index)


def _has_optimize_op(block):
for op in block.ops:
op_maker = core.op_proto_and_checker_maker
optimize = core.op_proto_and_checker_maker.OpRole.Optimize
if op_maker.kOpRoleVarAttrName() in op.attr_names and \
int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize):
return True
return False


def _has_optimizer_in_control_flow(program):
if not program:
program = framework.default_main_program()
for op in program.global_block().ops:
if op.type == "conditional_block_grad":
sub_block = program.block(op._block_attr_id("sub_block"))
if _has_optimize_op(sub_block):
return True

return False


class CompiledProgram(object):
"""
The CompiledProgram is used to transform a program or graph for
Expand Down Expand Up @@ -386,6 +408,16 @@ def _compile(self, scope, place):
self._places = self._get_places(self._place, self._places)
else:
self._places = [self._place]

# Todo(liym27):If optimizer is used in control flow,
# training on multi-places is not supported now, will
# be supported later.
if len(self._places) > 1 and \
_has_optimizer_in_control_flow(self._program):
raise NotImplementedError(
"If optimizer is used in control flow, "
"training on multi-places is not supported now.")

self._executor = self._compile_data_parallel(
use_cuda=isinstance(self._place, core.CUDAPlace),
scope=self._scope,
Expand Down
1 change: 1 addition & 0 deletions python/paddle/fluid/tests/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,5 @@ set_tests_properties(test_parallel_executor_test_while_train test_parallel_execu
test_parallel_executor_feed_persistable_var
test_parallel_executor_crf_auto_growth test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass
test_data_norm_op test_imperative_using_non_zero_gpu test_fuse_bn_act_pass
test_optimizer_in_control_flow
test_buffer_shared_memory_reuse_pass PROPERTIES LABELS "RUN_TYPE=DIST")
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import paddle.fluid.optimizer as optimizer
from paddle.fluid.framework import Program, program_guard
import paddle.fluid.core as core
import paddle.fluid.compiler as compiler
import os

BATCH_SIZE = 1
INPUT_SIZE = 784
Expand Down Expand Up @@ -104,20 +106,20 @@ def fn_2(opt, avg_loss=None, pred=None, label=None):
avg_loss = layers.case([(mod_two, lambda: fn_1(adam, avg_loss_1))],
lambda: fn_2(sgd, avg_loss_2))

place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

for epoch in range(EPOCH_NUM):
feed_image, feed_label = train_data[epoch]
fetch_list = [hidden, prediction, avg_loss]
feed = {
'image': feed_image,
'label': feed_label,
'id': np.array([epoch]).astype('int32')
}
out = exe.run(main_program, feed=feed, fetch_list=fetch_list)
out_hidden, out_pred, loss = out
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)

for epoch in range(EPOCH_NUM):
feed_image, feed_label = train_data[epoch]
fetch_list = [hidden, prediction, avg_loss]
feed = {
'image': feed_image,
'label': feed_label,
'id': np.array([epoch]).astype('int32')
}
out = exe.run(main_program, feed=feed, fetch_list=fetch_list)
out_hidden, out_pred, loss = out

return out_hidden, out_pred, loss

Expand Down Expand Up @@ -225,5 +227,58 @@ def test_optimzier_in_switch(self):
loss_2))


class TestMultiOptimizersMultiCardsError(unittest.TestCase):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add this unittest here https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/fluid/tests/unittests/CMakeLists.txt#L327 so that it can run in 2 gpu cards in CI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Done

def test_error(self):
startup_program = Program()
main_program = Program()
use_cuda = core.is_compiled_with_cuda()
with program_guard(main_program, startup_program):

def fn_1(opt, avg_loss):
opt.minimize(avg_loss)

def fn_2(opt, avg_loss):
opt.minimize(avg_loss)

x = fluid.layers.data("X", [10], 'float32')
hidden = layers.fc(x, 5)
avg_loss = layers.mean(hidden)

adam = optimizer.Adam(learning_rate=LR)
sgd = optimizer.SGD(learning_rate=LR)

cond = layers.fill_constant([1], 'bool', True)

layers.case([(cond, lambda: fn_1(adam, avg_loss))],
lambda: fn_2(sgd, avg_loss))

cpu_place = fluid.CPUPlace()
cuda_place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()

for place in [cpu_place, cuda_place]:

exe = fluid.Executor(place)
exe.run(startup_program)

np.random.seed(SEED)
os.environ['CPU_NUM'] = str(2)
pe_exe = fluid.ParallelExecutor(
use_cuda=use_cuda,
main_program=main_program,
loss_name=avg_loss.name)
num_devices = pe_exe.device_count

def not_implemented_error():
pe_exe.run(feed={
'X': np.random.random(size=[64, 10]).astype('float32'),
},
fetch_list=[avg_loss.name])

if num_devices > 1:
self.assertRaises(NotImplementedError, not_implemented_error)
else:
not_implemented_error()


if __name__ == '__main__':
unittest.main()