From 0c23251d3e0c7a9304b68f2f05e989ee7dec0f13 Mon Sep 17 00:00:00 2001 From: Austin Matthews Date: Wed, 4 Apr 2018 13:12:18 -0400 Subject: [PATCH] clean up message queues better in mp.h --- dynet/mp.h | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/dynet/mp.h b/dynet/mp.h index 0a47f6371..50aab111e 100644 --- a/dynet/mp.h +++ b/dynet/mp.h @@ -154,7 +154,7 @@ namespace dynet { void run_parent(const std::vector& train_data, const std::vector& dev_data, ILearner* learner, std::vector& workloads, unsigned num_iterations, unsigned dev_frequency, unsigned report_frequency) { const unsigned num_children = workloads.size(); - boost::interprocess::message_queue mq(boost::interprocess::open_or_create, queue_name.c_str(), 10000, sizeof(unsigned)); + boost::interprocess::message_queue mq(boost::interprocess::create_only, queue_name.c_str(), 10000, sizeof(unsigned)); std::vector train_indices(train_data.size()); std::iota(train_indices.begin(), train_indices.end(), 0); @@ -171,7 +171,7 @@ namespace dynet { std::vector::iterator begin = train_indices.begin(); while (begin != train_indices.end()) { - std::vector::iterator end = begin + dev_frequency; + std::vector::iterator end = (dev_frequency > 0) ? begin + dev_frequency : train_indices.end(); if (end > train_indices.end()) { end = train_indices.end(); } @@ -189,16 +189,21 @@ namespace dynet { break; } - S dev_loss = run_data_set(dev_indices.begin(), dev_indices.end(), workloads, mq, {true, false, report_frequency}); - bool new_best = (first_dev_run || dev_loss < best_dev_loss); - first_dev_run = false; - std::cerr << fractional_iter << "\t" << "dev loss = " << dev_loss << (new_best ? " (New best!)" : "") << std::endl; - if (stop_requested) { - break; + if (dev_data.size() > 0) { + S dev_loss = run_data_set(dev_indices.begin(), dev_indices.end(), workloads, mq, {true, false, report_frequency}); + bool new_best = (first_dev_run || dev_loss < best_dev_loss); + first_dev_run = false; + std::cerr << fractional_iter << "\t" << "dev loss = " << dev_loss << (new_best ? " (New best!)" : "") << std::endl; + if (stop_requested) { + break; + } + if (new_best) { + learner->SaveModel(); + best_dev_loss = dev_loss; + } } - if (new_best) { + else { learner->SaveModel(); - best_dev_loss = dev_loss; } begin = end; @@ -211,6 +216,7 @@ namespace dynet { write_data(workloads[cid].p2c[1], cont); wait(NULL); } + boost::interprocess::message_queue::remove(queue_name.c_str()); } template @@ -221,7 +227,19 @@ namespace dynet { unsigned i; unsigned priority; boost::interprocess::message_queue::size_type recvd_size; - boost::interprocess::message_queue mq(boost::interprocess::open_or_create, queue_name.c_str(), 10000, sizeof(unsigned)); + boost::interprocess::message_queue* mq = nullptr; + while (true) { + try { + mq = new boost::interprocess::message_queue (boost::interprocess::open_only, queue_name.c_str()); + break; + } + catch (boost::interprocess::interprocess_exception e) { + if (mq != nullptr) { + delete mq; + mq = nullptr; + } + } + } while (true) { // Check if the parent wants us to exit bool cont = read_data(workloads[cid].p2c[0]); @@ -237,7 +255,7 @@ namespace dynet { S batch_loss = S(); unsigned batch_counter = 0; while (true) { - mq.receive(&i, sizeof(unsigned), recvd_size, priority); + mq->receive(&i, sizeof(unsigned), recvd_size, priority); if (i == -1U) { break; }