diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index ed07afa44ffea..80756a0e27825 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -123,8 +123,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1), - dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), - open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), + dispatch_queue(q), can_write(WriteStatus::NOWRITE), + open_write(false), keepalive(false), recv_buf(NULL), recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), recv_start(0), recv_end(0), last_active(ceph::coarse_mono_clock::now()), @@ -480,7 +480,7 @@ void AsyncConnection::process() ssize_t r = 0; int prev_state = state; bool already_dispatch_writer = false; - Mutex::Locker l(lock); + std::lock_guard l(lock); last_active = ceph::coarse_mono_clock::now(); do { ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl; @@ -532,9 +532,9 @@ void AsyncConnection::process() ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; t = (ceph_timespec*)state_buffer; utime_t kp_t = utime_t(*t); - write_lock.Lock(); + write_lock.lock(); _send_keepalive_or_ack(true, &kp_t); - write_lock.Unlock(); + write_lock.unlock(); ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl; set_last_keepalive(ceph_clock_now(NULL)); state = STATE_OPEN; @@ -924,9 +924,9 @@ void AsyncConnection::process() } delay_state->queue(delay_period, release, message); } else if (async_msgr->ms_can_fast_dispatch(message)) { - lock.Unlock(); + lock.unlock(); dispatch_queue->fast_dispatch(message); - lock.Lock(); + lock.lock(); } else { dispatch_queue->enqueue(message, message->get_priority(), conn_id); } @@ -988,7 +988,7 @@ ssize_t AsyncConnection::_process_connection() switch(state) { case STATE_WAIT_SEND: { - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); if (!outcoming_bl.length()) { assert(state_after_send); state = state_after_send; @@ -1107,7 +1107,7 @@ ssize_t AsyncConnection::_process_connection() } ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl; - lock.Unlock(); + lock.unlock(); async_msgr->learned_addr(peer_addr_for_me); if (async_msgr->cct->_conf->ms_inject_internal_delays) { if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { @@ -1119,7 +1119,7 @@ ssize_t AsyncConnection::_process_connection() } } - lock.Lock(); + lock.lock(); if (state != STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY) { ldout(async_msgr->cct, 1) << __func__ << " state changed while learned_addr, mark_down or " << " replacing must be happened just now" << dendl; @@ -1328,11 +1328,11 @@ ssize_t AsyncConnection::_process_connection() // message may in queue between last _try_send and connection ready // write event may already notify and we need to force scheduler again - write_lock.Lock(); + write_lock.lock(); can_write = WriteStatus::CANWRITE; if (is_queued()) center->dispatch_event_external(write_handler); - write_lock.Unlock(); + write_lock.unlock(); maybe_start_delay_thread(); break; } @@ -1504,11 +1504,11 @@ ssize_t AsyncConnection::_process_connection() center->delete_time_event(last_tick_id); last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); - write_lock.Lock(); + write_lock.lock(); can_write = WriteStatus::CANWRITE; if (is_queued()) center->dispatch_event_external(write_handler); - write_lock.Unlock(); + write_lock.unlock(); maybe_start_delay_thread(); break; } @@ -1645,12 +1645,12 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply); } - lock.Unlock(); + lock.unlock(); bool authorizer_valid; if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl, authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { - lock.Lock(); + lock.lock(); ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl; session_security.reset(); return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply); @@ -1664,7 +1664,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis inject_delay(); - lock.Lock(); + lock.lock(); if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl; assert(state == STATE_CLOSED); @@ -1676,14 +1676,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis if (existing) { // There is no possible that existing connection will acquire this // connection's lock - existing->lock.Lock(true); // skip lockdep check (we are locking a second AsyncConnection here) + existing->lock.lock(); // skip lockdep check (we are locking a second AsyncConnection here) if (existing->replacing || existing->state == STATE_CLOSED) { ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing." << " existing_state=" << get_state_name(existing->state) << dendl; reply.global_seq = existing->peer_global_seq; r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); - existing->lock.Unlock(); + existing->lock.unlock(); if (r < 0) goto fail; return 0; @@ -1694,7 +1694,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis << ".gseq " << existing->peer_global_seq << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; reply.global_seq = existing->peer_global_seq; // so we can send it below.. - existing->lock.Unlock(); + existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply); } else { ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing @@ -1728,7 +1728,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis << existing->connect_seq << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; reply.connect_seq = existing->connect_seq + 1; - existing->lock.Unlock(); + existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); } @@ -1743,7 +1743,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl; reply.connect_seq = existing->connect_seq + 1; - existing->lock.Unlock(); + existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply); } @@ -1760,7 +1760,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", sending WAIT" << dendl; assert(peer_addr > async_msgr->get_myaddr()); - existing->lock.Unlock(); + existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply); } } @@ -1772,7 +1772,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " << connect.connect_seq << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; - existing->lock.Unlock(); + existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); } @@ -1805,7 +1805,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->dispatch_queue->queue_reset(existing.get()); } else { assert(can_write == WriteStatus::NOWRITE); - existing->write_lock.Lock(true); + existing->write_lock.lock(); // reset the in_seq if this is a hard reset from peer, // otherwise we respect our original connection's value @@ -1844,7 +1844,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // there shouldn't exist any buffer assert(recv_start == recv_end); - existing->write_lock.Unlock(); + existing->write_lock.unlock(); // new sd now isn't registered any event while origin events // have been deleted. // previous existing->sd now is still open, event will continue to @@ -1859,7 +1859,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis [existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable { // we need to delete time event in original thread { - Mutex::Locker l(existing->lock); + std::lock_guard l(existing->lock); if (existing->state == STATE_NONE) { existing->shutdown_socket(); existing->sd = new_fd; @@ -1882,7 +1882,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // Then if we mark down `existing`, it will execute in another thread and clean up connection. // Previous event will result in segment fault auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable { - Mutex::Locker l(existing->lock); + std::lock_guard l(existing->lock); if (existing->state == STATE_CLOSED) return ; assert(new_fd == existing->sd); @@ -1903,10 +1903,10 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->center->get_id(), std::move(transfer_existing), true); }, true); - existing->lock.Unlock(); + existing->lock.unlock(); return 0; } - existing->lock.Unlock(); + existing->lock.unlock(); open: connect_seq = connect.connect_seq + 1; @@ -1954,14 +1954,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis reply_bl.append((char*)&s, sizeof(s)); } - lock.Unlock(); + lock.unlock(); // Because "replacing" will prevent other connections preempt this addr, // it's safe that here we don't acquire Connection's lock r = async_msgr->accept_conn(this); inject_delay(); - lock.Lock(); + lock.lock(); replacing = false; if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr @@ -2017,7 +2017,7 @@ void AsyncConnection::accept(int incoming) ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl; assert(sd < 0); - Mutex::Locker l(lock); + std::lock_guard l(lock); sd = incoming; state = STATE_ACCEPTING; // rescheduler connection in order to avoid lock dep @@ -2037,7 +2037,7 @@ int AsyncConnection::send_message(Message *m) if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); if (can_write != WriteStatus::CLOSED) { dispatch_queue->local_delivery(m, m->get_priority()); } else { @@ -2061,7 +2061,7 @@ int AsyncConnection::send_message(Message *m) if (can_fast_prepare) prepare_send_message(f, m, bl); - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); // "features" changes will change the payload encoding if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) { // ensure the correctness of message encoding @@ -2094,7 +2094,6 @@ int AsyncConnection::send_message(Message *m) void AsyncConnection::requeue_sent() { - assert(write_lock.is_locked()); if (sent.empty()) return; @@ -2112,7 +2111,7 @@ void AsyncConnection::requeue_sent() void AsyncConnection::discard_requeued_up_to(uint64_t seq) { ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl; - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) return; list >& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; @@ -2137,7 +2136,6 @@ void AsyncConnection::discard_requeued_up_to(uint64_t seq) void AsyncConnection::discard_out_queue() { ldout(async_msgr->cct, 10) << __func__ << " started" << dendl; - assert(write_lock.is_locked()); for (list::iterator p = sent.begin(); p != sent.end(); ++p) { ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl; @@ -2185,7 +2183,7 @@ void AsyncConnection::fault() return ; } - write_lock.Lock(); + write_lock.lock(); shutdown_socket(); can_write = WriteStatus::NOWRITE; open_write = false; @@ -2204,7 +2202,7 @@ void AsyncConnection::fault() state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half " << " accept state just closed" << dendl; - write_lock.Unlock(); + write_lock.unlock(); _stop(); dispatch_queue->queue_reset(this); return ; @@ -2213,11 +2211,11 @@ void AsyncConnection::fault() if (policy.standby && !is_queued() && state != STATE_WAIT) { ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl; state = STATE_STANDBY; - write_lock.Unlock(); + write_lock.unlock(); return; } - write_lock.Unlock(); + write_lock.unlock(); if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY) && state != STATE_WAIT) { // STATE_WAIT is coming from STATE_CONNECTING_* // policy maybe empty when state is in accept @@ -2253,8 +2251,7 @@ void AsyncConnection::fault() void AsyncConnection::was_session_reset() { ldout(async_msgr->cct,10) << __func__ << " started" << dendl; - assert(lock.is_locked()); - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); if (delay_state) delay_state->discard(); dispatch_queue->discard_queue(conn_id); @@ -2276,7 +2273,6 @@ void AsyncConnection::was_session_reset() void AsyncConnection::_stop() { - assert(lock.is_locked()); if (state == STATE_CLOSED) return ; @@ -2284,7 +2280,7 @@ void AsyncConnection::_stop() delay_state->flush(); ldout(async_msgr->cct, 1) << __func__ << dendl; - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); reset_recv_state(); dispatch_queue->discard_queue(conn_id); @@ -2465,7 +2461,7 @@ void AsyncConnection::handle_ack(uint64_t seq) { ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl; // trim sent list - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); while (!sent.empty() && sent.front()->get_seq() <= seq) { Message* m = sent.front(); sent.pop_front(); @@ -2480,7 +2476,7 @@ void AsyncConnection::DelayedDelivery::do_request(int id) { Message *m = nullptr; { - Mutex::Locker l(delay_lock); + std::lock_guard l(delay_lock); register_time_events.erase(id); if (stop_dispatch) return ; @@ -2508,7 +2504,7 @@ void AsyncConnection::DelayedDelivery::flush() { stop_dispatch = true; center->submit_to( center->get_id(), [this] () mutable { - Mutex::Locker l(delay_lock); + std::lock_guard l(delay_lock); while (!delay_queue.empty()) { Message *m = delay_queue.front().second; if (msgr->ms_can_fast_dispatch(m)) { @@ -2528,7 +2524,7 @@ void AsyncConnection::DelayedDelivery::flush() { void AsyncConnection::send_keepalive() { ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); if (can_write != WriteStatus::CLOSED) { keepalive = true; center->dispatch_event_external(write_handler); @@ -2538,14 +2534,12 @@ void AsyncConnection::send_keepalive() void AsyncConnection::mark_down() { ldout(async_msgr->cct, 1) << __func__ << " started." << dendl; - Mutex::Locker l(lock); + std::lock_guard l(lock); _stop(); } void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { - assert(write_lock.is_locked()); - if (ack) { assert(tp); struct ceph_timespec ts; @@ -2570,7 +2564,7 @@ void AsyncConnection::handle_write() ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; ssize_t r = 0; - write_lock.Lock(); + write_lock.lock(); if (can_write == WriteStatus::CANWRITE) { if (keepalive) { _send_keepalive_or_ack(); @@ -2590,7 +2584,7 @@ void AsyncConnection::handle_write() r = write_message(m, data, _has_next_outgoing()); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; - write_lock.Unlock(); + write_lock.unlock(); goto fail; } else if (r > 0) { break; @@ -2611,15 +2605,15 @@ void AsyncConnection::handle_write() r = _try_send(); } - write_lock.Unlock(); + write_lock.unlock(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; goto fail; } } else { - write_lock.Unlock(); - lock.Lock(); - write_lock.Lock(); + write_lock.unlock(); + lock.lock(); + write_lock.lock(); if (state == STATE_STANDBY && !policy.server && is_queued()) { ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl; _connect(); @@ -2627,29 +2621,29 @@ void AsyncConnection::handle_write() r = _try_send(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; - write_lock.Unlock(); + write_lock.unlock(); fault(); - lock.Unlock(); + lock.unlock(); return ; } } - write_lock.Unlock(); - lock.Unlock(); + write_lock.unlock(); + lock.unlock(); } return ; fail: - lock.Lock(); + lock.lock(); fault(); - lock.Unlock(); + lock.unlock(); } void AsyncConnection::wakeup_from(uint64_t id) { - lock.Lock(); + lock.lock(); register_time_events.erase(id); - lock.Unlock(); + lock.unlock(); process(); } @@ -2659,7 +2653,7 @@ void AsyncConnection::tick(uint64_t id) ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id << " last_active" << last_active << dendl; assert(last_tick_id == id); - Mutex::Locker l(lock); + std::lock_guard l(lock); last_tick_id = 0; auto idle_period = std::chrono::duration_cast(now - last_active).count(); if (inactive_timeout_us < (uint64_t)idle_period) { diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 2073d5fbdd991..3988bf9a7447b 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -22,12 +22,12 @@ #include #include #include +#include #include using namespace std; #include "auth/AuthSessionHandler.h" #include "common/ceph_time.h" -#include "common/Mutex.h" #include "common/perf_counters.h" #include "include/buffer.h" #include "msg/Connection.h" @@ -55,7 +55,7 @@ class AsyncConnection : public Connection { void restore_sigpipe(); ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more); ssize_t try_send(bufferlist &bl, bool more=false) { - Mutex::Locker l(write_lock); + std::lock_guard l(write_lock); outcoming_bl.claim_append(bl); return _try_send(more); } @@ -100,7 +100,6 @@ class AsyncConnection : public Connection { return 0; } bool is_queued() { - assert(write_lock.is_locked()); return !out_q.empty() || outcoming_bl.length(); } void shutdown_socket() { @@ -119,7 +118,6 @@ class AsyncConnection : public Connection { } } Message *_get_next_outgoing(bufferlist *bl) { - assert(write_lock.is_locked()); Message *m = 0; while (!m && !out_q.empty()) { map > >::reverse_iterator it = out_q.rbegin(); @@ -136,7 +134,6 @@ class AsyncConnection : public Connection { return m; } bool _has_next_outgoing() { - assert(write_lock.is_locked()); return !out_q.empty(); } void reset_recv_state(); @@ -150,7 +147,7 @@ class AsyncConnection : public Connection { class DelayedDelivery : public EventCallback { std::set register_time_events; // need to delete it if stop std::deque > delay_queue; - Mutex delay_lock; + std::mutex delay_lock; AsyncMessenger *msgr; EventCenter *center; DispatchQueue *dispatch_queue; @@ -160,8 +157,7 @@ class AsyncConnection : public Connection { public: explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c, DispatchQueue *q, uint64_t cid) - : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), - msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), + : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), stop_dispatch(false) { } ~DelayedDelivery() { assert(register_time_events.empty()); @@ -170,14 +166,14 @@ class AsyncConnection : public Connection { void set_center(EventCenter *c) { center = c; } void do_request(int id) override; void queue(double delay_period, utime_t release, Message *m) { - Mutex::Locker l(delay_lock); + std::lock_guard l(delay_lock); delay_queue.push_back(std::make_pair(release, m)); register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } void discard() { stop_dispatch = true; center->submit_to(center->get_id(), [this] () mutable { - Mutex::Locker l(delay_lock); + std::lock_guard l(delay_lock); while (!delay_queue.empty()) { Message *m = delay_queue.front().second; dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); @@ -219,7 +215,7 @@ class AsyncConnection : public Connection { void send_keepalive() override; void mark_down() override; void mark_disposable() override { - Mutex::Locker l(lock); + std::lock_guard l(lock); policy.lossy = true; } @@ -313,7 +309,7 @@ class AsyncConnection : public Connection { DispatchQueue *dispatch_queue; - Mutex write_lock; + std::mutex write_lock; enum class WriteStatus { NOWRITE, REPLACING, @@ -327,7 +323,7 @@ class AsyncConnection : public Connection { bufferlist outcoming_bl; bool keepalive; - Mutex lock; + std::mutex lock; utime_t backoff; // backoff time EventCallbackRef read_handler; EventCallbackRef write_handler; @@ -395,10 +391,10 @@ class AsyncConnection : public Connection { void tick(uint64_t id); void local_deliver(); void stop(bool queue_reset) { - lock.Lock(); + lock.lock(); bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; _stop(); - lock.Unlock(); + lock.unlock(); if (need_queue_reset) dispatch_queue->queue_reset(this); }