From 2c5363c4e48cd37c9fd9d133d11c53ae5140ed09 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sat, 16 Jul 2016 13:38:09 +0800 Subject: [PATCH] msg/async/AsyncConnection: replace Mutex with std::mutex for peformance The 16714 issue is caused by when replacing process. Accept connection will try to acquire another connection's lock. But all connection's lock name are the same. So it will result in lockdep map make wrong judgement. Fixes: http://tracker.ceph.com/issues/16714 Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 130 +++++++++++++++---------------- src/msg/async/AsyncConnection.h | 26 +++---- 2 files changed, 73 insertions(+), 83 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index ed07afa44ffea..80756a0e27825 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -123,8 +123,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1), - dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), - open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), + dispatch_queue(q), can_write(WriteStatus::NOWRITE), + open_write(false), keepalive(false), recv_buf(NULL), recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), recv_start(0), recv_end(0), last_active(ceph::coarse_mono_clock::now()), @@ -480,7 +480,7 @@ void AsyncConnection::process() ssize_t r = 0; int prev_state = state; bool already_dispatch_writer = false; - Mutex::Locker l(lock); + std::lock_guard l(lock); last_active = ceph::coarse_mono_clock::now(); do { ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl; @@ -532,9 +532,9 @@ void AsyncConnection::process() ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; t = (ceph_timespec*)state_buffer; utime_t kp_t = utime_t(*t); - write_lock.Lock(); + write_lock.lock(); _send_keepalive_or_ack(true, &kp_t); - write_lock.Unlock(); + write_lock.unlock(); ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl; set_last_keepalive(ceph_clock_now(NULL)); state = STATE_OPEN; @@ -924,9 +924,9 @@ void AsyncConnection::process() } delay_state->queue(delay_period, release, message); } else if (async_msgr->ms_can_fast_dispatch(message)) { - lock.Unlock(); + lock.unlock(); dispatch_queue->fast_dispatch(message); - lock.Lock(); + lock.lock(); } else { dispatch_queue->enqueue(message, message->get_priority(), conn_id); } @@ -988,7 +988,7 @@ ssize_t AsyncConnection::_process_connection() switch(state) { case STATE_WAIT_SEND: { - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); if (!outcoming_bl.length()) { assert(state_after_send); state = state_after_send; @@ -1107,7 +1107,7 @@ ssize_t AsyncConnection::_process_connection() } ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl; - lock.Unlock(); + lock.unlock(); async_msgr->learned_addr(peer_addr_for_me); if (async_msgr->cct->_conf->ms_inject_internal_delays) { if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { @@ -1119,7 +1119,7 @@ ssize_t AsyncConnection::_process_connection() } } - lock.Lock(); + lock.lock(); if (state != STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY) { ldout(async_msgr->cct, 1) << __func__ << " state changed while learned_addr, mark_down or " << " replacing must be happened just now" << dendl; @@ -1328,11 +1328,11 @@ ssize_t AsyncConnection::_process_connection() // message may in queue between last _try_send and connection ready // write event may already notify and we need to force scheduler again - write_lock.Lock(); + write_lock.lock(); can_write = WriteStatus::CANWRITE; if (is_queued()) center->dispatch_event_external(write_handler); - write_lock.Unlock(); + write_lock.unlock(); maybe_start_delay_thread(); break; } @@ -1504,11 +1504,11 @@ ssize_t AsyncConnection::_process_connection() center->delete_time_event(last_tick_id); last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); - write_lock.Lock(); + write_lock.lock(); can_write = WriteStatus::CANWRITE; if (is_queued()) center->dispatch_event_external(write_handler); - write_lock.Unlock(); + write_lock.unlock(); maybe_start_delay_thread(); break; } @@ -1645,12 +1645,12 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply); } - lock.Unlock(); + lock.unlock(); bool authorizer_valid; if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl, authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { - lock.Lock(); + lock.lock(); ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl; session_security.reset(); return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply); @@ -1664,7 +1664,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis inject_delay(); - lock.Lock(); + lock.lock(); if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl; assert(state == STATE_CLOSED); @@ -1676,14 +1676,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis if (existing) { // There is no possible that existing connection will acquire this // connection's lock - existing->lock.Lock(true); // skip lockdep check (we are locking a second AsyncConnection here) + existing->lock.lock(); // skip lockdep check (we are locking a second AsyncConnection here) if (existing->replacing || existing->state == STATE_CLOSED) { ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing." << " existing_state=" << get_state_name(existing->state) << dendl; reply.global_seq = existing->peer_global_seq; r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); - existing->lock.Unlock(); + existing->lock.unlock(); if (r < 0) goto fail; return 0; @@ -1694,7 +1694,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis << ".gseq " << existing->peer_global_seq << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; reply.global_seq = existing->peer_global_seq; // so we can send it below.. - existing->lock.Unlock(); + existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); } else { ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing @@ -1728,7 +1728,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis << existing->connect_seq << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; reply.connect_seq = existing->connect_seq + 1; - existing->lock.Unlock(); + existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); } @@ -1743,7 +1743,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl; reply.connect_seq = existing->connect_seq + 1; - existing->lock.Unlock(); + existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); } @@ -1760,7 +1760,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", sending WAIT" << dendl; assert(peer_addr > async_msgr->get_myaddr()); - existing->lock.Unlock(); + existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply); } } @@ -1772,7 +1772,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " << connect.connect_seq << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; - existing->lock.Unlock(); + existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); } @@ -1805,7 +1805,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->dispatch_queue->queue_reset(existing.get()); } else { assert(can_write == WriteStatus::NOWRITE); - existing->write_lock.Lock(true); + existing->write_lock.lock(); // reset the in_seq if this is a hard reset from peer, // otherwise we respect our original connection's value @@ -1844,7 +1844,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // there shouldn't exist any buffer assert(recv_start == recv_end); - existing->write_lock.Unlock(); + existing->write_lock.unlock(); // new sd now isn't registered any event while origin events // have been deleted. // previous existing->sd now is still open, event will continue to @@ -1859,7 +1859,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis [existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable { // we need to delete time event in original thread { - Mutex::Locker l(existing->lock); + std::lock_guard l(existing->lock); if (existing->state == STATE_NONE) { existing->shutdown_socket(); existing->sd = new_fd; @@ -1882,7 +1882,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // Then if we mark down `existing`, it will execute in another thread and clean up connection. // Previous event will result in segment fault auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable { - Mutex::Locker l(existing->lock); + std::lock_guard l(existing->lock); if (existing->state == STATE_CLOSED) return ; assert(new_fd == existing->sd); @@ -1903,10 +1903,10 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->center->get_id(), std::move(transfer_existing), true); }, true); - existing->lock.Unlock(); + existing->lock.unlock(); return 0; } - existing->lock.Unlock(); + existing->lock.unlock(); open: connect_seq = connect.connect_seq + 1; @@ -1954,14 +1954,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis reply_bl.append((char*)&s, sizeof(s)); } - lock.Unlock(); + lock.unlock(); // Because "replacing" will prevent other connections preempt this addr, // it's safe that here we don't acquire Connection's lock r = async_msgr->accept_conn(this); inject_delay(); - lock.Lock(); + lock.lock(); replacing = false; if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr @@ -2017,7 +2017,7 @@ void AsyncConnection::accept(int incoming) ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl; assert(sd < 0); - Mutex::Locker l(lock); + std::lock_guard l(lock); sd = incoming; state = STATE_ACCEPTING; // rescheduler connection in order to avoid lock dep @@ -2037,7 +2037,7 @@ int AsyncConnection::send_message(Message *m) if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); if (can_write != WriteStatus::CLOSED) { dispatch_queue->local_delivery(m, m->get_priority()); } else { @@ -2061,7 +2061,7 @@ int AsyncConnection::send_message(Message *m) if (can_fast_prepare) prepare_send_message(f, m, bl); - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); // "features" changes will change the payload encoding if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) { // ensure the correctness of message encoding @@ -2094,7 +2094,6 @@ int AsyncConnection::send_message(Message *m) void AsyncConnection::requeue_sent() { - assert(write_lock.is_locked()); if (sent.empty()) return; @@ -2112,7 +2111,7 @@ void AsyncConnection::requeue_sent() void AsyncConnection::discard_requeued_up_to(uint64_t seq) { ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl; - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) return; list >& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; @@ -2137,7 +2136,6 @@ void AsyncConnection::discard_requeued_up_to(uint64_t seq) void AsyncConnection::discard_out_queue() { ldout(async_msgr->cct, 10) << __func__ << " started" << dendl; - assert(write_lock.is_locked()); for (list::iterator p = sent.begin(); p != sent.end(); ++p) { ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl; @@ -2185,7 +2183,7 @@ void AsyncConnection::fault() return ; } - write_lock.Lock(); + write_lock.lock(); shutdown_socket(); can_write = WriteStatus::NOWRITE; open_write = false; @@ -2204,7 +2202,7 @@ void AsyncConnection::fault() state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half " << " accept state just closed" << dendl; - write_lock.Unlock(); + write_lock.unlock(); _stop(); dispatch_queue->queue_reset(this); return ; @@ -2213,11 +2211,11 @@ void AsyncConnection::fault() if (policy.standby && !is_queued() && state != STATE_WAIT) { ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl; state = STATE_STANDBY; - write_lock.Unlock(); + write_lock.unlock(); return; } - write_lock.Unlock(); + write_lock.unlock(); if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY) && state != STATE_WAIT) { // STATE_WAIT is coming from STATE_CONNECTING_* // policy maybe empty when state is in accept @@ -2253,8 +2251,7 @@ void AsyncConnection::fault() void AsyncConnection::was_session_reset() { ldout(async_msgr->cct,10) << __func__ << " started" << dendl; - assert(lock.is_locked()); - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); if (delay_state) delay_state->discard(); dispatch_queue->discard_queue(conn_id); @@ -2276,7 +2273,6 @@ void AsyncConnection::was_session_reset() void AsyncConnection::_stop() { - assert(lock.is_locked()); if (state == STATE_CLOSED) return ; @@ -2284,7 +2280,7 @@ void AsyncConnection::_stop() delay_state->flush(); ldout(async_msgr->cct, 1) << __func__ << dendl; - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); reset_recv_state(); dispatch_queue->discard_queue(conn_id); @@ -2465,7 +2461,7 @@ void AsyncConnection::handle_ack(uint64_t seq) { ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl; // trim sent list - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); while (!sent.empty() && sent.front()->get_seq() <= seq) { Message* m = sent.front(); sent.pop_front(); @@ -2480,7 +2476,7 @@ void AsyncConnection::DelayedDelivery::do_request(int id) { Message *m = nullptr; { - Mutex::Locker l(delay_lock); + std::lock_guard l(delay_lock); register_time_events.erase(id); if (stop_dispatch) return ; @@ -2508,7 +2504,7 @@ void AsyncConnection::DelayedDelivery::flush() { stop_dispatch = true; center->submit_to( center->get_id(), [this] () mutable { - Mutex::Locker l(delay_lock); + std::lock_guard l(delay_lock); while (!delay_queue.empty()) { Message *m = delay_queue.front().second; if (msgr->ms_can_fast_dispatch(m)) { @@ -2528,7 +2524,7 @@ void AsyncConnection::DelayedDelivery::flush() { void AsyncConnection::send_keepalive() { ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); if (can_write != WriteStatus::CLOSED) { keepalive = true; center->dispatch_event_external(write_handler); @@ -2538,14 +2534,12 @@ void AsyncConnection::send_keepalive() void AsyncConnection::mark_down() { ldout(async_msgr->cct, 1) << __func__ << " started." << dendl; - Mutex::Locker l(lock); + std::lock_guard l(lock); _stop(); } void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { - assert(write_lock.is_locked()); - if (ack) { assert(tp); struct ceph_timespec ts; @@ -2570,7 +2564,7 @@ void AsyncConnection::handle_write() ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; ssize_t r = 0; - write_lock.Lock(); + write_lock.lock(); if (can_write == WriteStatus::CANWRITE) { if (keepalive) { _send_keepalive_or_ack(); @@ -2590,7 +2584,7 @@ void AsyncConnection::handle_write() r = write_message(m, data, _has_next_outgoing()); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; - write_lock.Unlock(); + write_lock.unlock(); goto fail; } else if (r > 0) { break; @@ -2611,15 +2605,15 @@ void AsyncConnection::handle_write() r = _try_send(); } - write_lock.Unlock(); + write_lock.unlock(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; goto fail; } } else { - write_lock.Unlock(); - lock.Lock(); - write_lock.Lock(); + write_lock.unlock(); + lock.lock(); + write_lock.lock(); if (state == STATE_STANDBY && !policy.server && is_queued()) { ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl; _connect(); @@ -2627,29 +2621,29 @@ void AsyncConnection::handle_write() r = _try_send(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; - write_lock.Unlock(); + write_lock.unlock(); fault(); - lock.Unlock(); + lock.unlock(); return ; } } - write_lock.Unlock(); - lock.Unlock(); + write_lock.unlock(); + lock.unlock(); } return ; fail: - lock.Lock(); + lock.lock(); fault(); - lock.Unlock(); + lock.unlock(); } void AsyncConnection::wakeup_from(uint64_t id) { - lock.Lock(); + lock.lock(); register_time_events.erase(id); - lock.Unlock(); + lock.unlock(); process(); } @@ -2659,7 +2653,7 @@ void AsyncConnection::tick(uint64_t id) ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id << " last_active" << last_active << dendl; assert(last_tick_id == id); - Mutex::Locker l(lock); + std::lock_guard l(lock); last_tick_id = 0; auto idle_period = std::chrono::duration_cast(now - last_active).count(); if (inactive_timeout_us < (uint64_t)idle_period) { diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 2073d5fbdd991..3988bf9a7447b 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -22,12 +22,12 @@ #include #include #include +#include #include using namespace std; #include "auth/AuthSessionHandler.h" #include "common/ceph_time.h" -#include "common/Mutex.h" #include "common/perf_counters.h" #include "include/buffer.h" #include "msg/Connection.h" @@ -55,7 +55,7 @@ class AsyncConnection : public Connection { void restore_sigpipe(); ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more); ssize_t try_send(bufferlist &bl, bool more=false) { - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); outcoming_bl.claim_append(bl); return _try_send(more); } @@ -100,7 +100,6 @@ class AsyncConnection : public Connection { return 0; } bool is_queued() { - assert(write_lock.is_locked()); return !out_q.empty() || outcoming_bl.length(); } void shutdown_socket() { @@ -119,7 +118,6 @@ class AsyncConnection : public Connection { } } Message *_get_next_outgoing(bufferlist *bl) { - assert(write_lock.is_locked()); Message *m = 0; while (!m && !out_q.empty()) { map > >::reverse_iterator it = out_q.rbegin(); @@ -136,7 +134,6 @@ class AsyncConnection : public Connection { return m; } bool _has_next_outgoing() { - assert(write_lock.is_locked()); return !out_q.empty(); } void reset_recv_state(); @@ -150,7 +147,7 @@ class AsyncConnection : public Connection { class DelayedDelivery : public EventCallback { std::set register_time_events; // need to delete it if stop std::deque > delay_queue; - Mutex delay_lock; + std::mutex delay_lock; AsyncMessenger *msgr; EventCenter *center; DispatchQueue *dispatch_queue; @@ -160,8 +157,7 @@ class AsyncConnection : public Connection { public: explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c, DispatchQueue *q, uint64_t cid) - : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), - msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), + : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), stop_dispatch(false) { } ~DelayedDelivery() { assert(register_time_events.empty()); @@ -170,14 +166,14 @@ class AsyncConnection : public Connection { void set_center(EventCenter *c) { center = c; } void do_request(int id) override; void queue(double delay_period, utime_t release, Message *m) { - Mutex::Locker l(delay_lock); + std::lock_guard l(delay_lock); delay_queue.push_back(std::make_pair(release, m)); register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } void discard() { stop_dispatch = true; center->submit_to(center->get_id(), [this] () mutable { - Mutex::Locker l(delay_lock); + std::lock_guard l(delay_lock); while (!delay_queue.empty()) { Message *m = delay_queue.front().second; dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); @@ -219,7 +215,7 @@ class AsyncConnection : public Connection { void send_keepalive() override; void mark_down() override; void mark_disposable() override { - Mutex::Locker l(lock); + std::lock_guard l(lock); policy.lossy = true; } @@ -313,7 +309,7 @@ class AsyncConnection : public Connection { DispatchQueue *dispatch_queue; - Mutex write_lock; + std::mutex write_lock; enum class WriteStatus { NOWRITE, REPLACING, @@ -327,7 +323,7 @@ class AsyncConnection : public Connection { bufferlist outcoming_bl; bool keepalive; - Mutex lock; + std::mutex lock; utime_t backoff; // backoff time EventCallbackRef read_handler; EventCallbackRef write_handler; @@ -395,10 +391,10 @@ class AsyncConnection : public Connection { void tick(uint64_t id); void local_deliver(); void stop(bool queue_reset) { - lock.Lock(); + lock.lock(); bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; _stop(); - lock.Unlock(); + lock.unlock(); if (need_queue_reset) dispatch_queue->queue_reset(this); }