Permalink
Browse files

add back monitors, terminate all nodes if the scheduler is killed

  • Loading branch information...
1 parent f2ab107 commit 6dae00dea6b9f370915777bb64f4e1d9fdee54b3 @mli mli committed Mar 23, 2016
Showing with 22 additions and 19 deletions.
  1. +1 −1 Makefile
  2. +2 −2 make/deps.mk
  3. +19 −16 src/van.cc
View
@@ -20,7 +20,7 @@ endif
INCPATH = -I./src -I./include -I$(DEPS_PATH)/include
CFLAGS = -std=c++11 -msse2 -fPIC -O3 -ggdb -Wall -finline-functions $(INCPATH) $(ADD_CFLAGS)
-all: ps
+all: ps test
include make/deps.mk
View
@@ -19,8 +19,8 @@ ${PROTOBUF}:
ZMQ = ${DEPS_PATH}/include/zmq.h
${ZMQ}:
- $(eval FILE=zeromq-4.1.2.tar.gz)
- $(eval DIR=zeromq-4.1.2)
+ $(eval FILE=zeromq-4.1.4.tar.gz)
+ $(eval DIR=zeromq-4.1.4)
rm -rf $(FILE) $(DIR)
$(WGET) $(URL)/$(FILE) && tar -zxf $(FILE)
cd $(DIR) && export CFLAGS=-fPIC && export CXXFLAGS=-fPIC && ./configure -prefix=$(DEPS_PATH) --with-libsodium=no --with-libgssapi_krb5=no && $(MAKE) && $(MAKE) install
View
@@ -78,20 +78,17 @@ void Van::Start() {
// connect to the scheduler
Connect(scheduler_);
- // start monitor, TODO(mli)
- // if (is_scheduler_) {
- // CHECK(!zmq_socket_monitor(receiver_, "inproc://monitor", ZMQ_EVENT_ALL));
- // } else {
- // CHECK(!zmq_socket_monitor(
- // senders_[kScheduler], "inproc://monitor", ZMQ_EVENT_ALL));
- // }
- // monitor_thread_ = std::unique_ptr<std::thread>(
- // new std::thread(&Van::Monitoring, this));
-
// start receiver
receiver_thread_ = std::unique_ptr<std::thread>(
new std::thread(&Van::Receiving, this));
+ // aliveness monitor
+ CHECK(!zmq_socket_monitor(
+ senders_[kScheduler], "inproc://monitor", ZMQ_EVENT_ALL));
+ monitor_thread_ = std::unique_ptr<std::thread>(
+ new std::thread(&Van::Monitoring, this));
+ monitor_thread_->detach();
+
if (!is_scheduler_) {
// let the schduler know myself
Message msg;
@@ -115,9 +112,10 @@ void Van::Stop() {
receiver_thread_->join();
// close sockets
- for (auto& it : senders_) zmq_close(it.second);
- zmq_close(receiver_);
- zmq_ctx_destroy(context_);
+ // commentted out due enabled monitor...
+ // zmq_close(receiver_);
+ // for (auto& it : senders_) zmq_close(it.second);
+ // zmq_ctx_destroy(context_);
}
void Van::Connect(const Node& node) {
@@ -127,7 +125,7 @@ void Van::Connect(const Node& node) {
int id = node.id;
if (senders_.find(id) != senders_.end()) {
- zmq_close(senders_[id]);
+ return;
}
// worker doesn't need to connect to the other workers. same for server
@@ -419,9 +417,14 @@ void Van::Monitoring() {
// address. no help
if (event == ZMQ_EVENT_DISCONNECTED) {
- // huh...
+ if (!is_scheduler_) {
+ PS_VLOG(1) << my_node_.ShortDebugString() << ": scheduler is dead. exit.";
+ exit(-1);
+ }
+ }
+ if (event == ZMQ_EVENT_MONITOR_STOPPED) {
+ break;
}
- if (event == ZMQ_EVENT_MONITOR_STOPPED) break;
}
zmq_close(s);
}

0 comments on commit 6dae00d

Please sign in to comment.