Skip to content

Commit

Permalink
AsyncConnection: fix delay state using dispatch_queue
Browse files Browse the repository at this point in the history
Signed-off-by: Haomai Wang <haomai@xsky.com>
(cherry picked from commit e5109d5)
  • Loading branch information
yuyuyu101 authored and ddiss committed Jan 31, 2017
1 parent f2b2836 commit dfc5299
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
19 changes: 12 additions & 7 deletions src/msg/async/AsyncConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void AsyncConnection::maybe_start_delay_thread()
if (!delay_state &&
async_msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(peer_type)) != string::npos) {
ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl;
delay_state = new DelayedDelivery(async_msgr, center);
delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, conn_id);
}
}

Expand Down Expand Up @@ -2405,24 +2405,28 @@ void AsyncConnection::DelayedDelivery::do_request(int id)
delay_queue.pop_front();
}
if (msgr->ms_can_fast_dispatch(m)) {
msgr->ms_fast_dispatch(m);
dispatch_queue->fast_dispatch(m);
} else {
msgr->ms_deliver_dispatch(m);
dispatch_queue->enqueue(m, m->get_priority(), conn_id);
}
}

class C_flush_messages : public EventCallback {
std::deque<std::pair<utime_t, Message*> > delay_queue;
AsyncMessenger *msgr;
DispatchQueue *dispatch_queue;
uint64_t conn_id;
public:
C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m): delay_queue(std::move(q)), msgr(m) {}
C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m,
DispatchQueue *dq, uint64_t cid)
: delay_queue(std::move(q)), msgr(m), dispatch_queue(dq), conn_id(cid) {}
void do_request(int id) {
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
if (msgr->ms_can_fast_dispatch(m)) {
msgr->ms_fast_dispatch(m);
dispatch_queue->fast_dispatch(m);
} else {
msgr->ms_deliver_dispatch(m);
dispatch_queue->enqueue(m, m->get_priority(), conn_id);
}
delay_queue.pop_front();
}
Expand All @@ -2432,7 +2436,8 @@ class C_flush_messages : public EventCallback {

void AsyncConnection::DelayedDelivery::flush() {
Mutex::Locker l(delay_lock);
center->dispatch_event_external(new C_flush_messages(std::move(delay_queue), msgr));
center->dispatch_event_external(
new C_flush_messages(std::move(delay_queue), msgr, dispatch_queue, conn_id));
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
Expand Down
5 changes: 3 additions & 2 deletions src/msg/async/AsyncConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,13 @@ class AsyncConnection : public Connection {
AsyncMessenger *msgr;
EventCenter *center;
DispatchQueue *dispatch_queue;
uint64_t conn_id;

public:
explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
DispatchQueue *q)
DispatchQueue *q, uint64_t cid)
: delay_lock("AsyncConnection::DelayedDelivery::delay_lock"),
msgr(omsgr), center(c), dispatch_queue(q) { }
msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { }
~DelayedDelivery() {
assert(register_time_events.empty());
assert(delay_queue.empty());
Expand Down

0 comments on commit dfc5299

Please sign in to comment.