Skip to content

Commit

Permalink
Merge pull request #10090 from yuyuyu101/wip-remove-async-lock
Browse files Browse the repository at this point in the history
msg/async: remove file event lock

Reviewed-by: Kefu Chai <kefu@redhat.com>
  • Loading branch information
yuyuyu101 committed Jul 12, 2016
2 parents fe98b0b + 96943ee commit e489cd4
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 136 deletions.
156 changes: 100 additions & 56 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -82,7 +82,7 @@ class C_clean_handler : public EventCallback {
public:
explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
void do_request(int id) {
conn->cleanup_handler();
conn->cleanup();
delete this;
}
};
Expand Down Expand Up @@ -362,14 +362,21 @@ ssize_t AsyncConnection::_try_send(bool more)
<< " remaining bytes " << outcoming_bl.length() << dendl;

if (!open_write && is_queued()) {
center->create_file_event(sd, EVENT_WRITABLE, write_handler);
open_write = true;
if (center->in_thread()) {
center->create_file_event(sd, EVENT_WRITABLE, write_handler);
open_write = true;
} else {
center->dispatch_event_external(write_handler);
}
}

if (open_write && !is_queued()) {
center->delete_file_event(sd, EVENT_WRITABLE);
open_write = false;

if (center->in_thread()) {
center->delete_file_event(sd, EVENT_WRITABLE);
open_write = false;
} else {
center->dispatch_event_external(write_handler);
}
if (state_after_send != STATE_NONE)
center->dispatch_event_external(read_handler);
}
Expand Down Expand Up @@ -941,10 +948,14 @@ void AsyncConnection::process()
break;
}

case STATE_NONE:
{
ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl;
break;
}

case STATE_CLOSED:
{
if (sd >= 0)
center->delete_file_event(sd, EVENT_READABLE);
ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
break;
}
Expand Down Expand Up @@ -1310,7 +1321,8 @@ ssize_t AsyncConnection::_process_connection()
async_msgr->ms_deliver_handle_fast_connect(this);

// make sure no pending tick timer
center->delete_time_event(last_tick_id);
if (last_tick_id)
center->delete_time_event(last_tick_id);
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);

// message may in queue between last _try_send and connection ready
Expand All @@ -1331,8 +1343,9 @@ ssize_t AsyncConnection::_process_connection()
if (net.set_nonblock(sd) < 0)
goto fail;

net.set_socket_options(sd);
net.set_socket_options(sd, async_msgr->cct->_conf->ms_tcp_nodelay, async_msgr->cct->_conf->ms_tcp_rcvbuf);
net.set_priority(sd, async_msgr->get_socket_priority());
center->create_file_event(sd, EVENT_READABLE, read_handler);

bl.append(CEPH_BANNER, strlen(CEPH_BANNER));

Expand Down Expand Up @@ -1484,7 +1497,8 @@ ssize_t AsyncConnection::_process_connection()
if (delay_state)
assert(delay_state->ready());
// make sure no pending tick timer
center->delete_time_event(last_tick_id);
if (last_tick_id)
center->delete_time_event(last_tick_id);
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);

write_lock.Lock();
Expand Down Expand Up @@ -1796,15 +1810,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
existing->is_reset_from_peer = true;
}

// Now existing connection will be alive and the current connection will
// exchange socket with existing connection because we want to maintain
// original "connection_state"
if (existing->sd >= 0)
existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);

reply.global_seq = existing->peer_global_seq;

// Clean up output buffer
existing->outcoming_bl.clear();
Expand All @@ -1815,36 +1821,86 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
existing->requeue_sent();
existing->reset_recv_state();

swap(existing->sd, sd);
existing->can_write = WriteStatus::NOWRITE;
int new_fd = sd;
EventCenter *new_center = center;
Worker *new_worker = worker;
// avoid _stop shutdown replacing socket
sd = -1;
// queue a reset on the new connection, which we're dumping for the old
_stop();
dispatch_queue->queue_reset(this);
ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
existing->can_write = WriteStatus::REPLACING;
existing->open_write = false;
existing->replacing = true;
existing->state_offset = 0;
existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
// avoid previous thread modify event
existing->state = STATE_NONE;
// Discard existing prefetch buffer in `recv_buf`
existing->recv_start = existing->recv_end = 0;
// there shouldn't exist any buffer
assert(recv_start == recv_end);

existing->write_lock.Unlock();

ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
_stop();
// queue a reset on the new connection, which we're dumping for the old
dispatch_queue->queue_reset(this);
int new_fd = existing->sd;
center->submit_to(existing->center->get_id(), [existing, new_fd, connect, reply, authorizer_reply]() mutable {
Mutex::Locker l(existing->lock);
if (new_fd != existing->sd)
return ;

if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
// handle error
existing->fault();
// new sd now isn't registered any event while origin events
// have been deleted.
// previous existing->sd now is still open, event will continue to
// notify previous existing->center from now.
// From now, no one will dispatch event to `existing`
// Note: we must use async dispatch instead of execute this inline
// even existing->center == center. Because we must ensure below
// event executed after all pending external events like
// "dispatch_state->queue"
existing->center->submit_to(
existing->center->get_id(),
[existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable {
// we need to delete time event in original thread
{
Mutex::Locker l(existing->lock);
if (existing->state == STATE_NONE) {
existing->shutdown_socket();
existing->sd = new_fd;
existing->worker->references--;
new_worker->references++;
existing->logger = new_worker->get_perf_counter();
existing->worker = new_worker;
existing->center = new_center;
if (existing->delay_state)
existing->delay_state->set_center(new_center);
} else if (existing->state == STATE_CLOSED) {
::close(new_fd);
return ;
} else {
assert(0);
}
}

// Before changing existing->center, it may already exists some events in existing->center's queue.
// Then if we mark down `existing`, it will execute in another thread and clean up connection.
// Previous event will result in segment fault
auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable {
Mutex::Locker l(existing->lock);
if (existing->state == STATE_CLOSED)
return ;
assert(new_fd == existing->sd);
assert(existing->state == STATE_NONE);

existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
existing->center->create_file_event(existing->sd, EVENT_READABLE, existing->read_handler);
reply.global_seq = existing->peer_global_seq;
if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
// handle error
existing->fault();
}
};
if (existing->center->in_thread())
transfer_existing();
else
existing->center->submit_to(
existing->center->get_id(), std::move(transfer_existing), true);
}, true);
existing->lock.Unlock();

existing->lock.Unlock();
return 0;
}
existing->lock.Unlock();
Expand Down Expand Up @@ -1961,7 +2017,6 @@ void AsyncConnection::accept(int incoming)
Mutex::Locker l(lock);
sd = incoming;
state = STATE_ACCEPTING;
center->create_file_event(sd, EVENT_READABLE, read_handler);
// rescheduler connection in order to avoid lock dep
center->dispatch_event_external(read_handler);
}
Expand Down Expand Up @@ -2028,7 +2083,8 @@ int AsyncConnection::send_message(Message *m)
} else {
out_q[m->get_priority()].emplace_back(std::move(bl), m);
ldout(async_msgr->cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl;
center->dispatch_event_external(write_handler);
if (can_write != WriteStatus::REPLACING)
center->dispatch_event_external(write_handler);
}
return 0;
}
Expand Down Expand Up @@ -2114,7 +2170,7 @@ int AsyncConnection::randomize_out_seq()

void AsyncConnection::fault()
{
if (state == STATE_CLOSED) {
if (state == STATE_CLOSED || state == STATE_NONE) {
ldout(async_msgr->cct, 10) << __func__ << " connection is already closed" << dendl;
return ;
}
Expand All @@ -2127,12 +2183,7 @@ void AsyncConnection::fault()
}

write_lock.Lock();
if (sd >= 0) {
shutdown_socket();
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
::close(sd);
sd = -1;
}
shutdown_socket();
can_write = WriteStatus::NOWRITE;
open_write = false;

Expand All @@ -2156,7 +2207,7 @@ void AsyncConnection::fault()
return ;
}
reset_recv_state();
if (policy.standby && !is_queued()) {
if (policy.standby && !is_queued() && state != STATE_WAIT) {
ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
write_lock.Unlock();
Expand Down Expand Up @@ -2231,8 +2282,6 @@ void AsyncConnection::_stop()

ldout(async_msgr->cct, 1) << __func__ << dendl;
Mutex::Locker l(write_lock);
if (sd >= 0)
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);

reset_recv_state();
dispatch_queue->discard_queue(conn_id);
Expand All @@ -2244,14 +2293,8 @@ void AsyncConnection::_stop()
open_write = false;
can_write = WriteStatus::CLOSED;
state_offset = 0;
if (sd >= 0) {
shutdown_socket();
::close(sd);
}
sd = -1;
// Make sure in-queue events will been processed
center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));

}

void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
Expand Down Expand Up @@ -2577,7 +2620,7 @@ void AsyncConnection::handle_write()
if (state == STATE_STANDBY && !policy.server && is_queued()) {
ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl;
_connect();
} else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
} else if (sd >= 0 && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
r = _try_send();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
Expand Down Expand Up @@ -2614,6 +2657,7 @@ void AsyncConnection::tick(uint64_t id)
<< " last_active" << last_active << dendl;
assert(last_tick_id == id);
Mutex::Locker l(lock);
last_tick_id = 0;
auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
if (inactive_timeout_us < (uint64_t)idle_period) {
ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
Expand Down
24 changes: 17 additions & 7 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -104,8 +104,19 @@ class AsyncConnection : public Connection {
return !out_q.empty() || outcoming_bl.length();
}
void shutdown_socket() {
if (sd >= 0)
for (auto &&t : register_time_events)
center->delete_time_event(t);
register_time_events.clear();
if (last_tick_id) {
center->delete_time_event(last_tick_id);
last_tick_id = 0;
}
if (sd >= 0) {
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
::shutdown(sd, SHUT_RDWR);
::close(sd);
sd = -1;
}
}
Message *_get_next_outgoing(bufferlist *bl) {
assert(write_lock.is_locked());
Expand Down Expand Up @@ -156,6 +167,7 @@ class AsyncConnection : public Connection {
assert(register_time_events.empty());
assert(delay_queue.empty());
}
void set_center(EventCenter *c) { center = c; }
void do_request(int id) override;
void queue(double delay_period, utime_t release, Message *m) {
Mutex::Locker l(delay_lock);
Expand Down Expand Up @@ -304,6 +316,7 @@ class AsyncConnection : public Connection {
Mutex write_lock;
enum class WriteStatus {
NOWRITE,
REPLACING,
CANWRITE,
CLOSED
};
Expand Down Expand Up @@ -383,16 +396,13 @@ class AsyncConnection : public Connection {
void stop(bool queue_reset) {
lock.Lock();
bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
_stop();
lock.Unlock();
mark_down();
if (need_queue_reset)
dispatch_queue->queue_reset(this);
}
void cleanup_handler() {
for (auto &&t : register_time_events)
center->delete_time_event(t);
register_time_events.clear();
center->delete_time_event(last_tick_id);
void cleanup() {
shutdown_socket();
delete read_handler;
delete write_handler;
delete wakeup_handler;
Expand Down

0 comments on commit e489cd4

Please sign in to comment.