Skip to content

Commit

Permalink
Disable speculative operations after a short read or write.
Browse files Browse the repository at this point in the history
When using epoll, interpret a short read or write on a stream-oriented
socket as an indication that we should wait for a readiness event. After
this condition, speculative operations are disabled until we receive the
event notification from epoll.
  • Loading branch information
chriskohlhoff committed Sep 10, 2016
1 parent c9e33ce commit 6cec69e
Show file tree
Hide file tree
Showing 21 changed files with 123 additions and 51 deletions.
35 changes: 35 additions & 0 deletions asio/include/asio/detail/buffer_sequence_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ class buffer_sequence_adapter
return count_;
}

std::size_t total_size() const
{
return total_buffer_size_;
}

bool all_empty() const
{
return total_buffer_size_ == 0;
Expand Down Expand Up @@ -223,6 +228,11 @@ class buffer_sequence_adapter<Buffer, asio::mutable_buffer>
return 1;
}

std::size_t total_size() const
{
return total_buffer_size_;
}

bool all_empty() const
{
return total_buffer_size_ == 0;
Expand Down Expand Up @@ -270,6 +280,11 @@ class buffer_sequence_adapter<Buffer, asio::const_buffer>
return 1;
}

std::size_t total_size() const
{
return total_buffer_size_;
}

bool all_empty() const
{
return total_buffer_size_ == 0;
Expand Down Expand Up @@ -319,6 +334,11 @@ class buffer_sequence_adapter<Buffer, asio::mutable_buffers_1>
return 1;
}

std::size_t total_size() const
{
return total_buffer_size_;
}

bool all_empty() const
{
return total_buffer_size_ == 0;
Expand Down Expand Up @@ -366,6 +386,11 @@ class buffer_sequence_adapter<Buffer, asio::const_buffers_1>
return 1;
}

std::size_t total_size() const
{
return total_buffer_size_;
}

bool all_empty() const
{
return total_buffer_size_ == 0;
Expand Down Expand Up @@ -416,6 +441,11 @@ class buffer_sequence_adapter<Buffer, boost::array<Elem, 2> >
return 2;
}

std::size_t total_size() const
{
return total_buffer_size_;
}

bool all_empty() const
{
return total_buffer_size_ == 0;
Expand Down Expand Up @@ -468,6 +498,11 @@ class buffer_sequence_adapter<Buffer, std::array<Elem, 2> >
return 2;
}

std::size_t total_size() const
{
return total_buffer_size_;
}

bool all_empty() const
{
return total_buffer_size_ == 0;
Expand Down
7 changes: 4 additions & 3 deletions asio/include/asio/detail/descriptor_read_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ class descriptor_read_op_base : public reactor_op
{
}

static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
descriptor_read_op_base* o(static_cast<descriptor_read_op_base*>(base));

buffer_sequence_adapter<asio::mutable_buffer,
MutableBufferSequence> bufs(o->buffers_);

bool result = descriptor_ops::non_blocking_read(o->descriptor_,
bufs.buffers(), bufs.count(), o->ec_, o->bytes_transferred_);
status result = descriptor_ops::non_blocking_read(o->descriptor_,
bufs.buffers(), bufs.count(), o->ec_, o->bytes_transferred_)
? done : not_done;

ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_read",
o->ec_, o->bytes_transferred_));
Expand Down
7 changes: 4 additions & 3 deletions asio/include/asio/detail/descriptor_write_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ class descriptor_write_op_base : public reactor_op
{
}

static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
descriptor_write_op_base* o(static_cast<descriptor_write_op_base*>(base));

buffer_sequence_adapter<asio::const_buffer,
ConstBufferSequence> bufs(o->buffers_);

bool result = descriptor_ops::non_blocking_write(o->descriptor_,
bufs.buffers(), bufs.count(), o->ec_, o->bytes_transferred_);
status result = descriptor_ops::non_blocking_write(o->descriptor_,
bufs.buffers(), bufs.count(), o->ec_, o->bytes_transferred_)
? done : not_done;

ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_write",
o->ec_, o->bytes_transferred_));
Expand Down
1 change: 1 addition & 0 deletions asio/include/asio/detail/epoll_reactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class epoll_reactor
int descriptor_;
uint32_t registered_events_;
op_queue<reactor_op> op_queue_[max_ops];
bool try_speculative_[max_ops];
bool shutdown_;

ASIO_DECL descriptor_state(bool locking);
Expand Down
2 changes: 1 addition & 1 deletion asio/include/asio/detail/impl/epoll_reactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void epoll_reactor::move_timer(timer_queue<Time_Traits>& queue,
typename timer_queue<Time_Traits>::per_timer_data& target,
typename timer_queue<Time_Traits>::per_timer_data& source)
{
asio::detail::mutex::scoped_lock lock(mutex_);
mutex::scoped_lock lock(mutex_);
op_queue<operation> ops;
queue.cancel_timer(target, ops);
queue.move_timer(target, source);
Expand Down
26 changes: 21 additions & 5 deletions asio/include/asio/detail/impl/epoll_reactor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ int epoll_reactor::register_descriptor(socket_type descriptor,
descriptor_data->reactor_ = this;
descriptor_data->descriptor_ = descriptor;
descriptor_data->shutdown_ = false;
for (int i = 0; i < max_ops; ++i)
descriptor_data->try_speculative_[i] = true;
}

epoll_event ev = { 0, { 0 } };
Expand Down Expand Up @@ -203,6 +205,8 @@ int epoll_reactor::register_internal_descriptor(
descriptor_data->descriptor_ = descriptor;
descriptor_data->shutdown_ = false;
descriptor_data->op_queue_[op_type].push(op);
for (int i = 0; i < max_ops; ++i)
descriptor_data->try_speculative_[i] = true;
}

epoll_event ev = { 0, { 0 } };
Expand Down Expand Up @@ -249,11 +253,17 @@ void epoll_reactor::start_op(int op_type, socket_type descriptor,
&& (op_type != read_op
|| descriptor_data->op_queue_[except_op].empty()))
{
if (op->perform())
if (descriptor_data->try_speculative_[op_type])
{
descriptor_lock.unlock();
scheduler_.post_immediate_completion(op, is_continuation);
return;
if (reactor_op::status status = op->perform())
{
if (status == reactor_op::done_and_exhausted)
if (descriptor_data->registered_events_ != 0)
descriptor_data->try_speculative_[op_type] = false;
descriptor_lock.unlock();
scheduler_.post_immediate_completion(op, is_continuation);
return;
}
}

if (descriptor_data->registered_events_ == 0)
Expand Down Expand Up @@ -695,12 +705,18 @@ operation* epoll_reactor::descriptor_state::perform_io(uint32_t events)
{
if (events & (flag[j] | EPOLLERR | EPOLLHUP))
{
try_speculative_[j] = true;
while (reactor_op* op = op_queue_[j].front())
{
if (op->perform())
if (reactor_op::status status = op->perform())
{
op_queue_[j].pop();
io_cleanup.ops_.push(op);
if (status == reactor_op::done_and_exhausted)
{
try_speculative_[j] = false;
break;
}
}
else
break;
Expand Down
4 changes: 2 additions & 2 deletions asio/include/asio/detail/impl/signal_set_service.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public:
{
}

static bool do_perform(reactor_op*)
static status do_perform(reactor_op*)
{
signal_state* state = get_signal_state();

Expand All @@ -103,7 +103,7 @@ public:
if (signal_number >= 0 && signal_number < max_signal_number)
signal_set_service::deliver_signal(signal_number);

return false;
return not_done;
}

static void do_complete(void* /*owner*/, operation* base,
Expand Down
4 changes: 2 additions & 2 deletions asio/include/asio/detail/reactive_null_buffers_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class reactive_null_buffers_op : public reactor_op
handler_work<Handler>::start(handler_);
}

static bool do_perform(reactor_op*)
static status do_perform(reactor_op*)
{
return true;
return done;
}

static void do_complete(void* owner, operation* base,
Expand Down
8 changes: 4 additions & 4 deletions asio/include/asio/detail/reactive_socket_accept_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ class reactive_socket_accept_op_base : public reactor_op
{
}

static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_accept_op_base* o(
static_cast<reactive_socket_accept_op_base*>(base));

std::size_t addrlen = o->peer_endpoint_ ? o->peer_endpoint_->capacity() : 0;
socket_type new_socket = invalid_socket;
bool result = socket_ops::non_blocking_accept(o->socket_,
o->state_, o->peer_endpoint_ ? o->peer_endpoint_->data() : 0,
o->peer_endpoint_ ? &addrlen : 0, o->ec_, new_socket);
status result = socket_ops::non_blocking_accept(o->socket_,
o->state_, o->peer_endpoint_ ? o->peer_endpoint_->data() : 0,
o->peer_endpoint_ ? &addrlen : 0, o->ec_, new_socket) ? done : not_done;

ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_accept", o->ec_));

Expand Down
5 changes: 3 additions & 2 deletions asio/include/asio/detail/reactive_socket_connect_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ class reactive_socket_connect_op_base : public reactor_op
{
}

static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_connect_op_base* o(
static_cast<reactive_socket_connect_op_base*>(base));

bool result = socket_ops::non_blocking_connect(o->socket_, o->ec_);
status result = socket_ops::non_blocking_connect(
o->socket_, o->ec_) ? done : not_done;

ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_connect", o->ec_));

Expand Down
11 changes: 8 additions & 3 deletions asio/include/asio/detail/reactive_socket_recv_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,23 @@ class reactive_socket_recv_op_base : public reactor_op
{
}

static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_recv_op_base* o(
static_cast<reactive_socket_recv_op_base*>(base));

buffer_sequence_adapter<asio::mutable_buffer,
MutableBufferSequence> bufs(o->buffers_);

bool result = socket_ops::non_blocking_recv(o->socket_,
status result = socket_ops::non_blocking_recv(o->socket_,
bufs.buffers(), bufs.count(), o->flags_,
(o->state_ & socket_ops::stream_oriented) != 0,
o->ec_, o->bytes_transferred_);
o->ec_, o->bytes_transferred_) ? done : not_done;

if (result == done)
if ((o->state_ & socket_ops::stream_oriented) != 0)
if (o->bytes_transferred_ < bufs.total_size())
result = done_and_exhausted;

ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_recv",
o->ec_, o->bytes_transferred_));
Expand Down
6 changes: 3 additions & 3 deletions asio/include/asio/detail/reactive_socket_recvfrom_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class reactive_socket_recvfrom_op_base : public reactor_op
{
}

static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_recvfrom_op_base* o(
static_cast<reactive_socket_recvfrom_op_base*>(base));
Expand All @@ -53,10 +53,10 @@ class reactive_socket_recvfrom_op_base : public reactor_op
MutableBufferSequence> bufs(o->buffers_);

std::size_t addr_len = o->sender_endpoint_.capacity();
bool result = socket_ops::non_blocking_recvfrom(o->socket_,
status result = socket_ops::non_blocking_recvfrom(o->socket_,
bufs.buffers(), bufs.count(), o->flags_,
o->sender_endpoint_.data(), &addr_len,
o->ec_, o->bytes_transferred_);
o->ec_, o->bytes_transferred_) ? done : not_done;

if (result && !o->ec_)
o->sender_endpoint_.resize(addr_len);
Expand Down
6 changes: 3 additions & 3 deletions asio/include/asio/detail/reactive_socket_recvmsg_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ class reactive_socket_recvmsg_op_base : public reactor_op
{
}

static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_recvmsg_op_base* o(
static_cast<reactive_socket_recvmsg_op_base*>(base));

buffer_sequence_adapter<asio::mutable_buffer,
MutableBufferSequence> bufs(o->buffers_);

bool result = socket_ops::non_blocking_recvmsg(o->socket_,
status result = socket_ops::non_blocking_recvmsg(o->socket_,
bufs.buffers(), bufs.count(),
o->in_flags_, o->out_flags_,
o->ec_, o->bytes_transferred_);
o->ec_, o->bytes_transferred_) ? done : not_done;

ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_recvmsg",
o->ec_, o->bytes_transferred_));
Expand Down
19 changes: 13 additions & 6 deletions asio/include/asio/detail/reactive_socket_send_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,32 @@ class reactive_socket_send_op_base : public reactor_op
{
public:
reactive_socket_send_op_base(socket_type socket,
const ConstBufferSequence& buffers,
socket_ops::state_type state, const ConstBufferSequence& buffers,
socket_base::message_flags flags, func_type complete_func)
: reactor_op(&reactive_socket_send_op_base::do_perform, complete_func),
socket_(socket),
state_(state),
buffers_(buffers),
flags_(flags)
{
}

static bool do_perform(reactor_op* base)
static status do_perform(reactor_op* base)
{
reactive_socket_send_op_base* o(
static_cast<reactive_socket_send_op_base*>(base));

buffer_sequence_adapter<asio::const_buffer,
ConstBufferSequence> bufs(o->buffers_);

bool result = socket_ops::non_blocking_send(o->socket_,
status result = socket_ops::non_blocking_send(o->socket_,
bufs.buffers(), bufs.count(), o->flags_,
o->ec_, o->bytes_transferred_);
o->ec_, o->bytes_transferred_) ? done : not_done;

if (result == done)
if ((o->state_ & socket_ops::stream_oriented) != 0)
if (o->bytes_transferred_ < bufs.total_size())
result = done_and_exhausted;

ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_send",
o->ec_, o->bytes_transferred_));
Expand All @@ -62,6 +68,7 @@ class reactive_socket_send_op_base : public reactor_op

private:
socket_type socket_;
socket_ops::state_type state_;
ConstBufferSequence buffers_;
socket_base::message_flags flags_;
};
Expand All @@ -74,10 +81,10 @@ class reactive_socket_send_op :
ASIO_DEFINE_HANDLER_PTR(reactive_socket_send_op);

reactive_socket_send_op(socket_type socket,
const ConstBufferSequence& buffers,
socket_ops::state_type state, const ConstBufferSequence& buffers,
socket_base::message_flags flags, Handler& handler)
: reactive_socket_send_op_base<ConstBufferSequence>(socket,
buffers, flags, &reactive_socket_send_op::do_complete),
state, buffers, flags, &reactive_socket_send_op::do_complete),
handler_(ASIO_MOVE_CAST(Handler)(handler))
{
handler_work<Handler>::start(handler_);
Expand Down

0 comments on commit 6cec69e

Please sign in to comment.