Skip to content

Commit

Permalink
Fix occasional close() system call hang on MacOS.
Browse files Browse the repository at this point in the history
Repeated re-registration of kqueue event filters seems to behave as
though there is some kind of "leak" on MacOS, culminating in a suspended
close() system call and an unkillable process. To avoid this, we will
register a descriptor's kqueue event filters once only i.e. when the
descriptor is first created.
  • Loading branch information
chriskohlhoff committed May 3, 2014
1 parent 92a53bd commit 660e9fe
Showing 1 changed file with 56 additions and 118 deletions.
174 changes: 56 additions & 118 deletions include/boost/asio/detail/impl/kqueue_reactor.ipp
Expand Up @@ -47,10 +47,15 @@ kqueue_reactor::kqueue_reactor(boost::asio::io_service& io_service)
interrupter_(),
shutdown_(false)
{
// The interrupter is put into a permanently readable state. Whenever we want
// to interrupt the blocked kevent call we register a read operation against
// the descriptor.
interrupter_.interrupt();
struct kevent event;
BOOST_ASIO_KQUEUE_EV_SET(&event, interrupter_.read_descriptor(),
EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &interrupter_);
if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
{
boost::system::error_code error(errno,
boost::asio::error::get_system_category());
boost::asio::detail::throw_error(error);
}
}

kqueue_reactor::~kqueue_reactor()
Expand Down Expand Up @@ -89,26 +94,27 @@ void kqueue_reactor::fork_service(boost::asio::io_service::fork_event fork_ev)

interrupter_.recreate();

struct kevent event;
BOOST_ASIO_KQUEUE_EV_SET(&event, interrupter_.read_descriptor(),
EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &interrupter_);
if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
{
boost::system::error_code error(errno,
boost::asio::error::get_system_category());
boost::asio::detail::throw_error(error);
}

// Re-register all descriptors with kqueue.
mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
for (descriptor_state* state = registered_descriptors_.first();
state != 0; state = state->next_)
{
struct kevent events[2];
int num_events = 0;

if (!state->op_queue_[read_op].empty())
BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_,
EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
else if (!state->op_queue_[except_op].empty())
BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_,
EVFILT_READ, EV_ADD | EV_CLEAR, EV_OOBAND, 0, state);

if (!state->op_queue_[write_op].empty())
BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_,
EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);

if (num_events && ::kevent(kqueue_fd_, events, num_events, 0, 0, 0) == -1)
BOOST_ASIO_KQUEUE_EV_SET(&events[0], state->descriptor_,
EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
BOOST_ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_,
EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
if (::kevent(kqueue_fd_, events, 2, 0, 0, 0) == -1)
{
boost::system::error_code error(errno,
boost::asio::error::get_system_category());
Expand All @@ -133,6 +139,14 @@ int kqueue_reactor::register_descriptor(socket_type descriptor,
descriptor_data->descriptor_ = descriptor;
descriptor_data->shutdown_ = false;

struct kevent events[2];
BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
if (::kevent(kqueue_fd_, events, 2, 0, 0, 0) == -1)
return errno;

return 0;
}

Expand All @@ -148,23 +162,13 @@ int kqueue_reactor::register_internal_descriptor(
descriptor_data->shutdown_ = false;
descriptor_data->op_queue_[op_type].push(op);

struct kevent event;
switch (op_type)
{
case read_op:
BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
break;
case write_op:
BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
break;
case except_op:
BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data);
break;
}
::kevent(kqueue_fd_, &event, 1, 0, 0, 0);
struct kevent events[2];
BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
if (::kevent(kqueue_fd_, events, 2, 0, 0, 0) == -1)
return errno;

return 0;
}
Expand Down Expand Up @@ -199,52 +203,30 @@ void kqueue_reactor::start_op(int op_type, socket_type descriptor,
bool first = descriptor_data->op_queue_[op_type].empty();
if (first)
{
if (allow_speculative)
if (allow_speculative
&& (op_type != read_op
|| descriptor_data->op_queue_[except_op].empty()))
{
if (op_type != read_op || descriptor_data->op_queue_[except_op].empty())
if (op->perform())
{
if (op->perform())
{
descriptor_lock.unlock();
io_service_.post_immediate_completion(op, is_continuation);
return;
}
descriptor_lock.unlock();
io_service_.post_immediate_completion(op, is_continuation);
return;
}
}
}

descriptor_data->op_queue_[op_type].push(op);
io_service_.work_started();

if (first)
{
struct kevent event;
switch (op_type)
else
{
case read_op:
BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
struct kevent events[2];
BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
break;
case write_op:
BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE,
BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
break;
case except_op:
if (!descriptor_data->op_queue_[read_op].empty())
return; // Already registered for read events.
BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data);
break;
}

if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
{
op->ec_ = boost::system::error_code(errno,
boost::asio::error::get_system_category());
descriptor_data->op_queue_[op_type].pop();
io_service_.post_deferred_completion(op);
::kevent(kqueue_fd_, events, 2, 0, 0, 0);
}
}

descriptor_data->op_queue_[op_type].push(op);
io_service_.work_started();
}

void kqueue_reactor::cancel_ops(socket_type,
Expand Down Expand Up @@ -367,12 +349,10 @@ void kqueue_reactor::run(bool block, op_queue<operation>& ops)
// Dispatch the waiting events.
for (int i = 0; i < num_events; ++i)
{
int descriptor = static_cast<int>(events[i].ident);
void* ptr = reinterpret_cast<void*>(events[i].udata);
if (ptr == &interrupter_)
{
// No need to reset the interrupter since we're leaving the descriptor
// in a ready-to-read state and relying on edge-triggered notifications.
interrupter_.reset();
}
else
{
Expand Down Expand Up @@ -414,45 +394,6 @@ void kqueue_reactor::run(bool block, op_queue<operation>& ops)
}
}
}

// Renew registration for event notifications.
struct kevent event;
switch (events[i].filter)
{
case EVFILT_READ:
if (!descriptor_data->op_queue_[read_op].empty())
BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
else if (!descriptor_data->op_queue_[except_op].empty())
BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ,
EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data);
else
continue;
break;
case EVFILT_WRITE:
if (!descriptor_data->op_queue_[write_op].empty())
BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE,
EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
else
continue;
break;
default:
break;
}
if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
{
boost::system::error_code error(errno,
boost::asio::error::get_system_category());
for (int j = 0; j < max_ops; ++j)
{
while (reactor_op* op = descriptor_data->op_queue_[j].front())
{
op->ec_ = error;
descriptor_data->op_queue_[j].pop();
ops.push(op);
}
}
}
}
}

Expand All @@ -462,10 +403,7 @@ void kqueue_reactor::run(bool block, op_queue<operation>& ops)

void kqueue_reactor::interrupt()
{
struct kevent event;
BOOST_ASIO_KQUEUE_EV_SET(&event, interrupter_.read_descriptor(),
EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &interrupter_);
::kevent(kqueue_fd_, &event, 1, 0, 0, 0);
interrupter_.interrupt();
}

int kqueue_reactor::do_kqueue_create()
Expand Down

0 comments on commit 660e9fe

Please sign in to comment.