Skip to content

Commit

Permalink
Merge pull request #9336 from typhoonzero/fix_dist_compile
Browse files Browse the repository at this point in the history
fix dist compile
  • Loading branch information
typhoonzero committed Mar 23, 2018
2 parents 76ae540 + bf66ce0 commit c83dd9b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 21 deletions.
3 changes: 2 additions & 1 deletion paddle/fluid/operators/detail/grpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ limitations under the License. */
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/operators/detail/grpc_service.h"
#include "paddle/fluid/operators/detail/grpc_service.h"
#include "paddle/fluid/operators/detail/send_recv.grpc.pb.h"
#include "paddle/fluid/operators/detail/send_recv.pb.h"
#include "paddle/fluid/operators/detail/sendrecvop_utils.h"
#include "paddle/fluid/operators/detail/simple_block_queue.h"

namespace paddle {
namespace operators {
Expand Down
10 changes: 5 additions & 5 deletions paddle/fluid/operators/detail/test_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ TEST(LodTensor, Run) {
RunTestLodTensor(place);
RunTestLodTensor(place, 1);
#ifdef PADDLE_WITH_CUDA
platform::CUDAPlace place;
RunTestLodTensor(place);
RunTestLodTensor(place, 1);
platform::CUDAPlace gpu(0);
RunTestLodTensor(gpu);
RunTestLodTensor(gpu, 1);
#endif
}

Expand All @@ -210,7 +210,7 @@ TEST(SelectedRows, Run) {
RunSerdeTestSelectedRows(place);

#ifdef PADDLE_WITH_CUDA
platform::CUDAPlace place;
RunSerdeTestSelectedRows(place);
platform::CUDAPlace gpu;
RunSerdeTestSelectedRows(gpu);
#endif
}
22 changes: 7 additions & 15 deletions paddle/fluid/operators/listen_and_serv_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ class ListenAndServOp : public framework::OperatorBase {
"server program should have at least 2 blocks");

framework::Executor executor(dev_place);
std::vector<framework::ExecutorPrepareContext *> blk_ctx_list;
blk_ctx_list.push_back(nullptr); // block0 is not used.
for (int blkid = 1; blkid < num_blocks; ++blkid) {
auto *exe_ctx = executor.Prepare(*program, blkid);
blk_ctx_list.push_back(exe_ctx);
}

// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false;
Expand Down Expand Up @@ -149,12 +143,11 @@ class ListenAndServOp : public framework::OperatorBase {
std::vector<std::future<void>> fs;
// block0 contains only listen_and_serv op, start run from block1.
for (int blkid = 1; blkid < num_blocks - 1; ++blkid) {
fs.push_back(framework::Async(
[&executor, &program, &recv_scope, &blk_ctx_list, blkid]() {
fs.push_back(
framework::Async([&executor, &program, &recv_scope, blkid]() {
int run_block = blkid; // thread local
try {
executor.RunPreparedContext(blk_ctx_list[run_block],
&recv_scope, false, false);
executor.Run(*program, &recv_scope, run_block, false, false);
} catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what();
}
Expand All @@ -164,8 +157,7 @@ class ListenAndServOp : public framework::OperatorBase {
// Run global block at final step, or block1 if there are only 2 blocks
if (num_blocks >= 2) {
try {
executor.RunPreparedContext(blk_ctx_list[num_blocks - 1], &recv_scope,
false, false);
executor.Run(*program, &recv_scope, num_blocks - 1, false, false);
} catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what();
}
Expand All @@ -185,9 +177,9 @@ class ListenAndServOp : public framework::OperatorBase {
sparse_vars.clear();
} // while(true)

for (int i = 0; i < num_blocks; ++i) {
delete blk_ctx_list[i];
}
// for (int i = 0; i < num_blocks; ++i) {
// delete blk_ctx_list[i];
// }
}

protected:
Expand Down

0 comments on commit c83dd9b

Please sign in to comment.