Skip to content

Commit

Permalink
clean up message queues better in mp.h
Browse files Browse the repository at this point in the history
  • Loading branch information
Austin Matthews committed Apr 4, 2018
1 parent 20fd674 commit 0c23251
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions dynet/mp.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ namespace dynet {
void run_parent(const std::vector<D>& train_data, const std::vector<D>& dev_data, ILearner<D, S>* learner,
std::vector<Workload>& workloads, unsigned num_iterations, unsigned dev_frequency, unsigned report_frequency) {
const unsigned num_children = workloads.size();
boost::interprocess::message_queue mq(boost::interprocess::open_or_create, queue_name.c_str(), 10000, sizeof(unsigned));
boost::interprocess::message_queue mq(boost::interprocess::create_only, queue_name.c_str(), 10000, sizeof(unsigned));
std::vector<unsigned> train_indices(train_data.size());
std::iota(train_indices.begin(), train_indices.end(), 0);

Expand All @@ -171,7 +171,7 @@ namespace dynet {

std::vector<unsigned>::iterator begin = train_indices.begin();
while (begin != train_indices.end()) {
std::vector<unsigned>::iterator end = begin + dev_frequency;
std::vector<unsigned>::iterator end = (dev_frequency > 0) ? begin + dev_frequency : train_indices.end();
if (end > train_indices.end()) {
end = train_indices.end();
}
Expand All @@ -189,16 +189,21 @@ namespace dynet {
break;
}

S dev_loss = run_data_set<S>(dev_indices.begin(), dev_indices.end(), workloads, mq, {true, false, report_frequency});
bool new_best = (first_dev_run || dev_loss < best_dev_loss);
first_dev_run = false;
std::cerr << fractional_iter << "\t" << "dev loss = " << dev_loss << (new_best ? " (New best!)" : "") << std::endl;
if (stop_requested) {
break;
if (dev_data.size() > 0) {
S dev_loss = run_data_set<S>(dev_indices.begin(), dev_indices.end(), workloads, mq, {true, false, report_frequency});
bool new_best = (first_dev_run || dev_loss < best_dev_loss);
first_dev_run = false;
std::cerr << fractional_iter << "\t" << "dev loss = " << dev_loss << (new_best ? " (New best!)" : "") << std::endl;
if (stop_requested) {
break;
}
if (new_best) {
learner->SaveModel();
best_dev_loss = dev_loss;
}
}
if (new_best) {
else {
learner->SaveModel();
best_dev_loss = dev_loss;
}

begin = end;
Expand All @@ -211,6 +216,7 @@ namespace dynet {
write_data(workloads[cid].p2c[1], cont);
wait(NULL);
}
boost::interprocess::message_queue::remove(queue_name.c_str());
}

template <class D, class S>
Expand All @@ -221,7 +227,19 @@ namespace dynet {
unsigned i;
unsigned priority;
boost::interprocess::message_queue::size_type recvd_size;
boost::interprocess::message_queue mq(boost::interprocess::open_or_create, queue_name.c_str(), 10000, sizeof(unsigned));
boost::interprocess::message_queue* mq = nullptr;
while (true) {
try {
mq = new boost::interprocess::message_queue (boost::interprocess::open_only, queue_name.c_str());
break;
}
catch (boost::interprocess::interprocess_exception e) {
if (mq != nullptr) {
delete mq;
mq = nullptr;
}
}
}
while (true) {
// Check if the parent wants us to exit
bool cont = read_data<bool>(workloads[cid].p2c[0]);
Expand All @@ -237,7 +255,7 @@ namespace dynet {
S batch_loss = S();
unsigned batch_counter = 0;
while (true) {
mq.receive(&i, sizeof(unsigned), recvd_size, priority);
mq->receive(&i, sizeof(unsigned), recvd_size, priority);
if (i == -1U) {
break;
}
Expand Down

0 comments on commit 0c23251

Please sign in to comment.