Permalink
Browse files

add retransmission, see the docs/how_to.md for how to use

  • Loading branch information...
1 parent 9cee45a commit 80c24c46ffef28181ba4743b0e4ad1ce45f3fe2b @mli mli committed Apr 10, 2016
Showing with 193 additions and 25 deletions.
  1. +17 −1 docs/how_to.md
  2. +21 −7 include/ps/internal/message.h
  3. +8 −0 include/ps/internal/van.h
  4. +1 −0 src/meta.proto
  5. +1 −0 src/postoffice.cc
  6. +106 −8 src/resender.h
  7. +36 −7 src/van.cc
  8. +1 −0 src/zmq_van.h
  9. +2 −2 tests/local.sh
View
@@ -27,7 +27,7 @@ Possible outputs are
```
where `H`, `S` and `W` stand for scheduler, server, and worker respectively.
-### Use a Particular Network Interface ###
+## Use a Particular Network Interface
In default PS-Lite automatically chooses an available network interface. But for
machines have multiple interfaces, we can specify the network interface to use
@@ -56,3 +56,19 @@ environment variables.
- `DMLC_ROLE` : the role of the current node, can be `worker`, `server`, or `scheduler`
- `DMLC_PS_ROOT_URI` : the ip or hostname of the scheduler node
- `DMLC_PS_ROOT_PORT` : the port that the scheduler node is listening
+
+## Retransmission for Unreliable Network
+
+It's not uncommon that a message disappear when sending from one node to another
+node. The program hangs when a critical message is not delivered
+successfully. In that case, we can let PS-Lite send an additional ACK for each
+message, and resend that message if the ACK is not received within a given
+time. To enable this feature, we can set the environment variables
+
+- `PS_RESEND` : if or not enable retransmission. Default is 0.
+- `PS_RESEND_TIMEOUT` : timeout in millisecond if an ACK message if not
+ received. PS-Lite then will resend that message. Default is 1000.
+
+We can set `PS_DROP_MSG`, the percent of probability to drop a received
+message, for testing. For example, `PS_DROP_MSG=10` will let a node drop a
+received message with 10% probability.
@@ -71,7 +71,7 @@ struct Node {
std::string DebugString() const {
std::stringstream ss;
ss << "role=" << (role == SERVER ? "server" : (role == WORKER ? "worker" : "scheduler"))
- << (id != kEmpty ? "id=" + std::to_string(id) : "")
+ << (id != kEmpty ? ", id=" + std::to_string(id) : "")
<< ", ip=" << hostname << ", port=" << port;
return ss.str();
@@ -102,7 +102,8 @@ struct Control {
/** \brief get debug string */
std::string DebugString() const {
if (empty()) return "";
- std::vector<std::string> cmds = {"EMPTY", "TERMINATE", "ADD_NODE", "BARRIER" };
+ std::vector<std::string> cmds = {
+ "EMPTY", "TERMINATE", "ADD_NODE", "BARRIER", "ACK"};
std::stringstream ss;
ss << "cmd=" << cmds[cmd];
if (node.size()) {
@@ -111,16 +112,19 @@ struct Control {
ss << " }";
}
if (cmd == BARRIER) ss << ", barrier_group=" << barrier_group;
+ if (cmd == ACK) ss << ", msg_sig=" << msg_sig;
return ss.str();
}
/** \brief all commands */
- enum Command { EMPTY, TERMINATE, ADD_NODE, BARRIER };
+ enum Command { EMPTY, TERMINATE, ADD_NODE, BARRIER, ACK };
/** \brief the command */
Command cmd;
/** \brief node infos */
std::vector<Node> node;
/** \brief the node group for a barrier, such as kWorkerGroup */
int barrier_group;
+ /** message signature */
+ uint64_t msg_sig;
};
/**
* \brief meta info of a message
@@ -134,11 +138,21 @@ struct Meta {
request(false), simple_app(false) {}
std::string DebugString() const {
std::stringstream ss;
- ss << "Meta: request=" << request << ", push=" << push
- << ", simple_app=" << simple_app;
- if (customer_id != kEmpty) ss << ", customer_id=" << customer_id;
+ if (sender == Node::kEmpty) {
+ ss << "?";
+ } else {
+ ss << sender;
+ }
+ ss << " => " << recver;
+ ss << ". Meta: request=" << request;
if (timestamp != kEmpty) ss << ", timestamp=" << timestamp;
- if (!control.empty()) ss << ", control={ " << control.DebugString() << " }";
+ if (!control.empty()) {
+ ss << ", control={ " << control.DebugString() << " }";
+ } else {
+ ss << ", customer_id=" << customer_id
+ << ", simple_app=" << simple_app
+ << ", push=" << push;
+ }
if (head != kEmpty) ss << ", head=" << head;
if (body.size()) ss << ", body=" << body;
if (data_type.size()) {
@@ -60,6 +60,12 @@ class Van {
*/
virtual void Stop();
+ /**
+ * \brief get next available timestamp. thread safe
+ */
+ int GetTimestamp() {
+ return timestamp_ ++;
+ }
protected:
/**
* \brief connect to a node
@@ -110,6 +116,8 @@ class Van {
/** msg resender */
Resender* resender_ = nullptr;
DISALLOW_COPY_AND_ASSIGN(Van);
+ int drop_rate_ = 0;
+ std::atomic<int> timestamp_{0};
};
} // namespace ps
#endif // PS_INTERNAL_VAN_H_
View
@@ -20,6 +20,7 @@ message PBControl {
required int32 cmd = 1;
repeated PBNode node = 2;
optional int32 barrier_group = 3;
+ optional uint64 msg_sig = 4;
}
// mete information about a message
View
@@ -119,6 +119,7 @@ void Postoffice::Barrier(int node_group) {
req.meta.request = true;
req.meta.control.cmd = Control::BARRIER;
req.meta.control.barrier_group = node_group;
+ req.meta.timestamp = van_->GetTimestamp();
CHECK_GT(van_->Send(req), 0);
barrier_cond_.wait(ulk, [this] {
View
@@ -1,39 +1,137 @@
#ifndef PS_RESENDER_H_
#define PS_RESENDER_H_
-
+#include <chrono>
+#include <unordered_set>
+#include <unordered_map>
namespace ps {
/**
* \brief resend a messsage if no ack is received within a given time
*/
class Resender {
-
public:
/**
* \param timeout timeout in millisecond
*/
- Resender(int timeout, int num_retry, Van* van);
-
+ Resender(int timeout, int max_num_retry, Van* van) {
+ timeout_ = timeout;
+ max_num_retry_ = max_num_retry;
+ van_ = van;
+ monitor_ = new std::thread(&Resender::Monitoring, this);
+ }
~Resender() {
-
+ exit_ = true;
+ monitor_->join();
+ delete monitor_;
}
-
/**
* \brief add an outgoining message
*
*/
void AddOutgoing(const Message& msg) {
+ if (msg.meta.control.cmd == Control::ACK) return;
+ CHECK_NE(msg.meta.timestamp, Meta::kEmpty) << msg.DebugString();
+ auto key = GetKey(msg);
+ std::lock_guard<std::mutex> lk(mu_);
+ // already buffered, which often due to call Send by the monitor thread
+ if (send_buff_.find(key) != send_buff_.end()) return;
+ auto& ent = send_buff_[key];
+ ent.msg = msg;
+ ent.send = Now();
+ ent.num_retry = 0;
}
/**
* \brief add an incomming message
- * \brief return true if msg has been added before
+ * \brief return true if msg has been added before or a ACK message
*/
bool AddIncomming(const Message& msg) {
- return false;
+ // a message can be received by multiple times
+ if (msg.meta.control.cmd == Control::TERMINATE) {
+ return false;
+ } else if (msg.meta.control.cmd == Control::ACK) {
+ mu_.lock();
+ auto key = msg.meta.control.msg_sig;
+ auto it = send_buff_.find(key);
+ if (it != send_buff_.end()) send_buff_.erase(it);
+ mu_.unlock();
+ return true;
+ } else {
+ mu_.lock();
+ auto key = GetKey(msg);
+ auto it = acked_.find(key);
+ bool duplicated = it != acked_.end();
+ if (!duplicated) acked_.insert(key);
+ mu_.unlock();
+ // send back ack message (even if it is duplicated)
+ Message ack;
+ ack.meta.recver = msg.meta.sender;
+ ack.meta.sender = msg.meta.recver;
+ ack.meta.control.cmd = Control::ACK;
+ ack.meta.control.msg_sig = key;
+ van_->Send(ack);
+ // warning
+ if (duplicated) LOG(WARNING) << "Duplicated message: " << msg.DebugString();
+ return duplicated;
+ }
+ }
+
+ private:
+ using Time = std::chrono::milliseconds;
+ // the buffer entry
+ struct Entry {
+ Message msg;
+ Time send;
+ int num_retry = 0;
+ };
+ std::unordered_map<uint64_t, Entry> send_buff_;
+
+ uint64_t GetKey(const Message& msg) {
+ CHECK_NE(msg.meta.timestamp, Meta::kEmpty) << msg.DebugString();
+ uint16_t id = msg.meta.customer_id;
+ uint8_t sender = msg.meta.sender == Node::kEmpty ?
+ van_->my_node().id : msg.meta.sender;
+ uint8_t recver = msg.meta.recver;
+ return (static_cast<uint64_t>(id) << 48) |
+ (static_cast<uint64_t>(sender) << 40) |
+ (static_cast<uint64_t>(recver) << 32) |
+ (msg.meta.timestamp << 1) | msg.meta.request;
+ }
+ Time Now() {
+ return std::chrono::duration_cast<Time>(
+ std::chrono::high_resolution_clock::now().time_since_epoch());
+ }
+
+ void Monitoring() {
+ while (!exit_) {
+ std::this_thread::sleep_for(Time(timeout_));
+ std::vector<Message> resend;
+ Time now = Now();
+ mu_.lock();
+ for (auto& it : send_buff_) {
+ if (it.second.send + Time(timeout_) * (1+it.second.num_retry) < now) {
+ resend.push_back(it.second.msg);
+ ++it.second.num_retry;
+ LOG(WARNING) << van_->my_node().ShortDebugString()
+ << ": Timeout to get the ACK message. Resend (retry="
+ << it.second.num_retry << ") " << it.second.msg.DebugString();
+ CHECK_LT(it.second.num_retry, max_num_retry_);
+ }
+ }
+ mu_.unlock();
+
+ for (const auto& msg : resend) van_->Send(msg);
+ }
}
+ std::thread* monitor_;
+ std::unordered_set<uint64_t> acked_;
+ std::atomic<bool> exit_{false};
+ std::mutex mu_;
+ int timeout_;
+ int max_num_retry_;
+ Van* van_;
};
} // namespace ps
#endif // RESENDER_H_
View
@@ -68,6 +68,10 @@ void Van::Start() {
// connect to the scheduler
Connect(scheduler_);
+ // for debug use
+ if (getenv("PS_DROP_MSG")) {
+ drop_rate_ = atoi(getenv("PS_DROP_MSG"));
+ }
// start receiver
receiver_thread_ = std::unique_ptr<std::thread>(
new std::thread(&Van::Receiving, this));
@@ -78,12 +82,20 @@ void Van::Start() {
msg.meta.recver = kScheduler;
msg.meta.control.cmd = Control::ADD_NODE;
msg.meta.control.node.push_back(my_node_);
+ msg.meta.timestamp = timestamp_++;
Send(msg);
}
// wait until ready
while (!ready_) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
+
+ // resender
+ if (getenv("PS_RESEND") && atoi(getenv("PS_RESEND")) != 0) {
+ int timeout = 1000;
+ if (getenv("PS_RESEND_TIMEOUT")) timeout = atoi(getenv("PS_RESEND_TIMEOUT"));
+ resender_ = new Resender(timeout, 10, this);
+ }
}
void Van::Stop() {
@@ -93,6 +105,7 @@ void Van::Stop() {
exit.meta.recver = my_node_.id;
SendMsg(exit);
receiver_thread_->join();
+ delete resender_;
}
int Van::Send(const Message& msg) {
@@ -101,8 +114,7 @@ int Van::Send(const Message& msg) {
send_bytes_ += send_bytes_;
if (resender_) resender_->AddOutgoing(msg);
if (Postoffice::Get()->verbose() >= 2) {
- PS_VLOG(2) << my_node_.ShortDebugString() << " => " << msg.meta.recver << ": "
- << msg.DebugString();
+ PS_VLOG(2) << msg.DebugString();
}
return send_bytes;
}
@@ -112,11 +124,20 @@ void Van::Receiving() {
while (true) {
Message msg;
int recv_bytes = RecvMsg(&msg);
+
+ // For debug, drop received message
+ if (ready_ && drop_rate_ > 0) {
+ unsigned seed = time(NULL) + my_node_.id;
+ if (rand_r(&seed) % 100 < drop_rate_) {
+ LOG(WARNING) << "Drop message " << msg.DebugString();
+ continue;
+ }
+ }
+
CHECK_NE(recv_bytes, -1);
recv_bytes_ += recv_bytes;
if (Postoffice::Get()->verbose() >= 2) {
- PS_VLOG(2) << my_node_.ShortDebugString() << " <= " << msg.meta.sender << ": "
- << msg.DebugString();
+ PS_VLOG(2) << msg.DebugString();
}
// duplicated message
if (resender_ && resender_->AddIncomming(msg)) continue;
@@ -176,7 +197,9 @@ void Van::Receiving() {
Message back; back.meta = nodes;
for (int r : Postoffice::Get()->GetNodeIDs(
kWorkerGroup + kServerGroup)) {
- back.meta.recver = r; SendMsg(back);
+ back.meta.recver = r;
+ back.meta.timestamp = timestamp_++;
+ Send(back);
}
PS_VLOG(1) << "the scheduler is connected to "
<< num_workers_ << " workers and " << num_servers_ << " servers";
@@ -207,7 +230,8 @@ void Van::Receiving() {
res.meta.control.cmd = Control::BARRIER;
for (int r : Postoffice::Get()->GetNodeIDs(group)) {
res.meta.recver = r;
- CHECK_GT(SendMsg(res), 0);
+ res.meta.timestamp = timestamp_++;
+ CHECK_GT(Send(res), 0);
}
}
} else {
@@ -240,7 +264,11 @@ void Van::PackMeta(const Meta& meta, char** meta_buf, int* buf_size) {
if (!meta.control.empty()) {
auto ctrl = pb.mutable_control();
ctrl->set_cmd(meta.control.cmd);
- ctrl->set_barrier_group(meta.control.barrier_group);
+ if (meta.control.cmd == Control::BARRIER) {
+ ctrl->set_barrier_group(meta.control.barrier_group);
+ } else if (meta.control.cmd == Control::ACK) {
+ ctrl->set_msg_sig(meta.control.msg_sig);
+ }
for (const auto& n : meta.control.node) {
auto p = ctrl->add_node();
p->set_id(n.id);
@@ -279,6 +307,7 @@ void Van::UnpackMeta(const char* meta_buf, int buf_size, Meta* meta) {
const auto& ctrl = pb.control();
meta->control.cmd = static_cast<Control::Command>(ctrl.cmd());
meta->control.barrier_group = ctrl.barrier_group();
+ meta->control.msg_sig = ctrl.msg_sig();
for (int i = 0; i < ctrl.node_size(); ++i) {
const auto& p = ctrl.node(i);
Node n;
Oops, something went wrong.

0 comments on commit 80c24c4

Please sign in to comment.