1,658 changes: 1,656 additions & 2 deletions asio/include/asio/basic_datagram_socket.hpp

Large diffs are not rendered by default.

1,662 changes: 1,656 additions & 6 deletions asio/include/asio/basic_raw_socket.hpp

Large diffs are not rendered by default.

671 changes: 671 additions & 0 deletions asio/include/asio/basic_seq_packet_socket.hpp

Large diffs are not rendered by default.

838 changes: 838 additions & 0 deletions asio/include/asio/basic_stream_socket.hpp

Large diffs are not rendered by default.

87 changes: 87 additions & 0 deletions asio/include/asio/detail/config.hpp
Expand Up @@ -118,6 +118,11 @@
# include <android/api-level.h>
#endif // defined(__ANDROID__)

// Support static_assert
#if defined(__cpp_static_assert)
# define ASIO_HAS_STATIC_ASSERT 1
#endif // defined(__cpp_static_assert)

// Support move construction and assignment on compilers known to allow it.
#if !defined(ASIO_HAS_MOVE)
# if !defined(ASIO_DISABLE_MOVE)
Expand Down Expand Up @@ -1700,6 +1705,84 @@
# endif // !defined(ASIO_HAS_EPOLL) && defined(ASIO_HAS_IO_URING)
#endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT)

//
// ASIO support for multiple buffer sequences using native syscalls
// sendmmsg/recvmmsg, sendmsg_x/recvmsg_x ... supporting the operating
// systems listed below
//
// - Linux (with GNUlibc >= 2.12, using epoll, io_uring does not support it)
// - FreeBSD 11
// - NetBSD 7
// - OpenBSD 7.2
// - AIX 7.2
// - QNX 7.0
// - NacOS
//
#if !defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
# if !defined(ASIO_DISABLE_MULTIPLE_BUFFER_SEQUENCE_IO)
# if (defined(__MACH__) && defined(__APPLE__))
# define ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO 1
# if !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# include <limits.h>
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO IOV_MAX
# endif // !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# endif // (defined(__MACH__) && defined(__APPLE__))
# if defined(__linux__)
# if (__GLIBC__ > 2) || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 12)
# if !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
# define ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO 1
# if !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO UIO_MAXIOV
# endif // !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT)
# endif // (__GLIBC__ > 2) || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 12)
# endif // defined(__linux__)
# if defined(__FreeBSD__)
# include <sys/param.h>
# if defined(__FreeBSD_version) && (__FreeBSD_version >= 1100000)
# define ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO 1
# if !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO 1024
# endif // !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# endif // defined(__FreeBSD_version) && (__FreeBSD_version >= 1100000)
# endif // defined(__FreeBSD__)
# if defined(__NetBSD__)
# include <sys/param.h>
# if defined(__NetBSD_Version__) && (__NetBSD_Version__ >= 700000000)
# define ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO 1
# if !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO 1024
# endif // !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# endif // defined(__NetBSD_Version__) && (__NetBSD_Version__ >= 700000000)
# endif // defined(__NetBSD__)
# if defined(__OpenBSD__)
# include <sys/param.h>
# if defined(OpenBSD7_2)
# define ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO 1
# if !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO 1024
# endif // !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# endif // defined(OpenBSD7_2)
# endif // defined(__OpenBSD__)
# if defined(_AIX)
# if defined(_AIX72)
# define ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO 1
# if !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO 1024
# endif // !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# endif // defined(_AIX72)
# endif // defined(_AIX)
# if defined(__QNXNTO__)
# if (__QNXNTO__ >= 700)
# define ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO 1
# if !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO 1024
# endif // !defined(ASIO_MULTIPLE_BUFFER_SEQUENCE_MAXIMUM_OPERATIONS_PER_IO)
# endif // (__QNXNTO__ >= 700)
# endif // defined(__QNXNTO__)
# endif // !defined(ASIO_DISABLE_MULTIPLE_BUFFER_SEQUENCE_IO)
#endif // !defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Mac OS X, FreeBSD, NetBSD, OpenBSD: kqueue.
#if (defined(__MACH__) && defined(__APPLE__)) \
|| defined(__FreeBSD__) \
Expand Down Expand Up @@ -2105,6 +2188,10 @@
# define ASIO_UNUSED_VARIABLE
#endif // !defined(ASIO_UNUSED_VARIABLE)

#if !defined(ASIO_UNUSED_PARAMETER)
# define ASIO_UNUSED_PARAMETER(x) (void)x
#endif // !defined(ASIO_UNUSED_PARAMETER)

// Support the co_await keyword on compilers known to allow it.
#if !defined(ASIO_HAS_CO_AWAIT)
# if !defined(ASIO_DISABLE_CO_AWAIT)
Expand Down
105 changes: 105 additions & 0 deletions asio/include/asio/detail/handler_type_requirements.hpp
Expand Up @@ -101,6 +101,53 @@ auto two_arg_move_handler_test(Handler h, Arg1* a1, Arg2* a2)
template <typename Handler>
char (&two_arg_move_handler_test(Handler, ...))[2];

template <typename Handler, typename Arg1, typename Arg2, typename Arg3>
auto three_arg_handler_test(Handler h, Arg1* a1, Arg2* a2, Arg3* a3)
-> decltype(
sizeof(Handler(ASIO_MOVE_CAST(Handler)(h))),
(ASIO_MOVE_OR_LVALUE(Handler)(h)(*a1, *a2, *a3)),
char(0));

template <typename Handler>
char (&three_arg_handler_test(Handler, ...))[2];

template <typename Handler, typename Arg1, typename Arg2, typename Arg3>
auto three_arg_move_handler_test(Handler h, Arg1* a1, Arg2* a2, Arg3* a3)
-> decltype(
sizeof(Handler(ASIO_MOVE_CAST(Handler)(h))),
(ASIO_MOVE_OR_LVALUE(Handler)(h)(
*a1, ASIO_MOVE_CAST(Arg2)(*a2), ASIO_MOVE_CAST(Arg3)(*a3))),
char(0));

template <typename Handler>
char (&three_arg_move_handler_test(Handler, ...))[2];

template <typename Handler, typename Arg1, typename Arg2, typename Arg3,
typename Arg4>
auto four_arg_handler_test(Handler h, Arg1* a1, Arg2* a2, Arg3* a3,
Arg4* a4)
-> decltype(
sizeof(Handler(ASIO_MOVE_CAST(Handler)(h))),
(ASIO_MOVE_OR_LVALUE(Handler)(h)(*a1, *a2, *a3, *a4)),
char(0));

template <typename Handler>
char (&four_arg_handler_test(Handler, ...))[2];

template <typename Handler, typename Arg1, typename Arg2, typename Arg3,
typename Arg4>
auto four_arg_move_handler_test(Handler h, Arg1* a1, Arg2* a2, Arg3* a3,
Arg4* a4)
-> decltype(
sizeof(Handler(ASIO_MOVE_CAST(Handler)(h))),
(ASIO_MOVE_OR_LVALUE(Handler)(h)(
*a1, ASIO_MOVE_CAST(Arg2)(*a2), ASIO_MOVE_CAST(Arg3)(*a3),
ASIO_MOVE_CAST(Arg4)(*a4))),
char(0));

template <typename Handler>
char (&four_arg_move_handler_test(Handler, ...))[2];

# define ASIO_HANDLER_TYPE_REQUIREMENTS_ASSERT(expr, msg) \
static_assert(expr, msg);

Expand Down Expand Up @@ -179,6 +226,35 @@ struct handler_type_requirements
asio::detail::lvref<const std::size_t>()), \
char(0))> ASIO_UNUSED_TYPEDEF

#define ASIO_READ_MULTIPLE_HANDLER_CHECK( \
handler_type, handler) \
\
typedef ASIO_HANDLER_TYPE(handler_type, \
void(asio::error_code, std::size_t, std::size_t, std::size_t)) \
asio_true_handler_type; \
\
ASIO_HANDLER_TYPE_REQUIREMENTS_ASSERT( \
sizeof(asio::detail::three_arg_handler_test( \
asio::detail::rvref< \
asio_true_handler_type>(), \
static_cast<const asio::error_code*>(0), \
static_cast<const std::size_t*>(0), \
static_cast<const std::size_t*>(0))) == 1, \
"ReadMultipleHandler type requirements not met") \
\
typedef asio::detail::handler_type_requirements< \
sizeof( \
asio::detail::argbyv( \
asio::detail::rvref< \
asio_true_handler_type>())) + \
sizeof( \
asio::detail::rorlvref< \
asio_true_handler_type>()( \
asio::detail::lvref<const asio::error_code>(), \
asio::detail::lvref<const std::size_t>(), \
asio::detail::lvref<const std::size_t>()), \
char(0))> ASIO_UNUSED_TYPEDEF

#define ASIO_WRITE_HANDLER_CHECK( \
handler_type, handler) \
\
Expand All @@ -205,6 +281,35 @@ struct handler_type_requirements
asio::detail::lvref<const asio::error_code>(), \
asio::detail::lvref<const std::size_t>()), \
char(0))> ASIO_UNUSED_TYPEDEF

#define ASIO_WRITE_MULTIPLE_HANDLER_CHECK( \
handler_type, handler) \
\
typedef ASIO_HANDLER_TYPE(handler_type, \
void(asio::error_code, std::size_t, std::size_t, std::size_t)) \
asio_true_handler_type; \
\
ASIO_HANDLER_TYPE_REQUIREMENTS_ASSERT( \
sizeof(asio::detail::three_arg_handler_test( \
asio::detail::rvref< \
asio_true_handler_type>(), \
static_cast<const asio::error_code*>(0), \
static_cast<const std::size_t*>(0), \
static_cast<const std::size_t*>(0))) == 1, \
"WriteMultipleHandler type requirements not met") \
\
typedef asio::detail::handler_type_requirements< \
sizeof( \
asio::detail::argbyv( \
asio::detail::rvref< \
asio_true_handler_type>())) + \
sizeof( \
asio::detail::rorlvref< \
asio_true_handler_type>()( \
asio::detail::lvref<const asio::error_code>(), \
asio::detail::lvref<const std::size_t>(), \
asio::detail::lvref<const std::size_t>()), \
char(0))> ASIO_UNUSED_TYPEDEF

#define ASIO_ACCEPT_HANDLER_CHECK( \
handler_type, handler) \
Expand Down
233 changes: 233 additions & 0 deletions asio/include/asio/detail/impl/socket_ops.ipp
Expand Up @@ -41,6 +41,12 @@
#endif // defined(ASIO_WINDOWS) || defined(__CYGWIN__)
// || defined(__MACH__) && defined(__APPLE__)

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
# if (defined(__MACH__) && defined(__APPLE__))
# include <sys/syscall.h>
# endif // (defined(__MACH__) && defined(__APPLE__))
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

#include "asio/detail/push_options.hpp"

namespace asio {
Expand Down Expand Up @@ -1365,6 +1371,129 @@ bool non_blocking_recvmsg(socket_type s,

#endif // defined(ASIO_HAS_IOCP)

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

#if (defined(__MACH__) && defined(__APPLE__))
# include <sys/syscall.h>
#endif

ASIO_DECL signed_size_type recvmmsg(socket_type s, mbufs* bufs,
size_t count, int flags, asio::error_code& ec)
{
#if defined(ASIO_HAS_MSG_NOSIGNAL)
flags |= MSG_NOSIGNAL;
#endif // defined(ASIO_HAS_MSG_NOSIGNAL)
#if (defined(__MACH__) && defined(__APPLE__))
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
signed_size_type result = ::syscall(SYS_recvmsg_x, s, bufs, count, flags);
#pragma clang diagnostic pop
#else // (defined(__MACH__) && defined(__APPLE__))
flags |= MSG_WAITFORONE;
signed_size_type result = ::recvmmsg(s, bufs, count, flags, nullptr);
#endif // (defined(__MACH__) && defined(__APPLE__))
get_last_error(ec, result < 0);
return result;
}

ASIO_DECL size_t sync_recvmmsg(socket_type s, state_type state, mbufs* bufs,
size_t count, int flags, bool all_empty, asio::error_code& ec)
{
if (s == invalid_socket)
{
ec = asio::error::bad_descriptor;
return 0;
}

// A request to read 0 bytes on a stream is a no-op.
if (all_empty && (state & stream_oriented))
{
asio::error::clear(ec);
return 0;
}

// Read some data.
for (;;)
{
// Try to complete the operation without blocking.
signed_size_type operations = socket_ops::recvmmsg(
s, bufs, count, flags, ec);

// Check for EOF.
if ((state & stream_oriented) && operations == 0)
{
ec = asio::error::eof;
return 0;
}

// Check if operation succeeded.
if (operations >= 0)
return operations;

// Operation failed.
if ((state & user_set_non_blocking)
|| (ec != asio::error::would_block
&& ec != asio::error::try_again))
return 0;

// Wait for socket to become ready.
if (socket_ops::poll_read(s, 0, -1, ec) < 0)
return 0;
}
}

#if !defined(ASIO_HAS_IOCP)

ASIO_DECL bool non_blocking_recvmmsg(socket_type s, mbufs* bufs, size_t count,
int flags, bool is_stream, asio::error_code& ec, size_t& bytes_transferred,
size_t& operations_executed)
{
for (;;)
{
// Read some data.
signed_size_type operations = socket_ops::recvmmsg(
s, bufs, count, flags, ec);

// Check for end of stream.
if (is_stream && operations == 0)
{
ec = asio::error::eof;
return true;
}

// Check if operation succeeded.
if (operations >= 0)
{
bytes_transferred = 0;
for (signed_size_type i = 0; i < operations; ++i)
{
bytes_transferred +=
ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT_LEN(bufs[i]);
}
operations_executed = operations;
return true;
}

// Retry operation if interrupted by signal.
if (ec == asio::error::interrupted)
continue;

// Check if we need to run the operation again.
if (ec == asio::error::would_block
|| ec == asio::error::try_again)
return false;

// Operation failed.
bytes_transferred = 0;
operations = 0;
return true;
}
}

#endif // !defined(ASIO_HAS_IOCP)

#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

signed_size_type send(socket_type s, const buf* bufs, size_t count,
int flags, asio::error_code& ec)
{
Expand Down Expand Up @@ -1805,6 +1934,110 @@ bool non_blocking_sendto1(socket_type s,

#endif // !defined(ASIO_HAS_IOCP)

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

ASIO_DECL signed_size_type sendmmsg(socket_type s,
mbufs* bufs, size_t count, int flags, asio::error_code& ec)
{
#if defined(ASIO_HAS_MSG_NOSIGNAL)
flags |= MSG_NOSIGNAL;
#endif // defined(ASIO_HAS_MSG_NOSIGNAL)
#if (defined(__MACH__) && defined(__APPLE__))
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
signed_size_type result = ::syscall(SYS_sendmsg_x, s, bufs, count, flags);
#pragma clang diagnostic pop
#else // (defined(__MACH__) && defined(__APPLE__))
signed_size_type result = ::sendmmsg(s, bufs, count, flags);
#endif // (defined(__MACH__) && defined(__APPLE__))
get_last_error(ec, result < 0);
return result;
}

ASIO_DECL size_t sync_sendmmsg(socket_type s, state_type state,
mbufs* bufs, size_t count, int flags, bool all_empty, asio::error_code& ec)
{
if (s == invalid_socket)
{
ec = asio::error::bad_descriptor;
return 0;
}

// A request to write 0 bytes to a stream is a no-op.
if (all_empty && (state & stream_oriented))
{
asio::error::clear(ec);
return 0;
}

// Read some data.
for (;;)
{
// Try to complete the operation without blocking.
signed_size_type operations = socket_ops::sendmmsg(
s, bufs, count, flags, ec);

// Check if operation succeeded.
if (operations >= 0)
return operations;

// Operation failed.
if ((state & user_set_non_blocking)
|| (ec != asio::error::would_block
&& ec != asio::error::try_again))
return 0;

// Wait for socket to become ready.
if (socket_ops::poll_write(s, 0, -1, ec) < 0)
return 0;
}
}

#if !defined(ASIO_HAS_IOCP)

ASIO_DECL bool non_blocking_sendmmsg(socket_type s, mbufs* bufs,
size_t count, int flags, asio::error_code& ec, size_t& bytes_transferred,
size_t& operations_executed)
{
for (;;)
{
// Try to complete the operation without blocking.
signed_size_type operations = socket_ops::sendmmsg(
s, bufs, count, flags, ec);

// Check if operation succeeded.
if (operations >= 0)
{
bytes_transferred = 0;
for (signed_size_type i = 0; i < operations; ++i)
{
bytes_transferred +=
ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT_LEN(bufs[i]);
}
operations_executed = operations;
return true;
}

// Retry operation if interrupted by signal.
if (ec == asio::error::interrupted)
continue;

// Check if we need to run the operation again.
if (ec == asio::error::would_block
|| ec == asio::error::try_again)
return false;

// Operation failed.
bytes_transferred = 0;
operations_executed = 0;
return true;
}
}

#endif // !defined(ASIO_HAS_IOCP)

#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

socket_type socket(int af, int type, int protocol,
asio::error_code& ec)
{
Expand Down
249 changes: 249 additions & 0 deletions asio/include/asio/detail/multiple_buffer_sequence_adapter.hpp
@@ -0,0 +1,249 @@
//
// detail/multiple_buffer_sequence_adapter.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// ( TODO-MBS: update header with copyright of asio C++ library )
//
// Support for multiple datagram buffers code patches on Linux operating system
// Copyright (c) 2023 virgilio A. Fornazin (virgiliofornazin at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef ASIO_DETAIL_MULTIPLE_BUFFER_SEQUENCE_ADAPTER_HPP
#define ASIO_DETAIL_MULTIPLE_BUFFER_SEQUENCE_ADAPTER_HPP

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include "asio/detail/config.hpp"

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

#include "asio/multiple_buffer_sequence.hpp"
#include "asio/detail/socket_types.hpp"

#include "asio/detail/push_options.hpp"

namespace asio {
namespace detail {

// Base helper class to translate buffers into the native multiple buffer
// representation.
class base_multiple_buffer_sequence_adapter
{
protected:
typedef ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT native_multiple_buffer_type;

template <typename MultipleBufferSequence>
void do_prepare_op(MultipleBufferSequence& source,
native_multiple_buffer_type& destination)
{
typename MultipleBufferSequence::buffer_sequence_adapter_type&
buffer_sequence_adapter = source.buffer_sequence_adapter();
const typename MultipleBufferSequence::endpoint_type& endpoint =
source.endpoint();
auto& hdr = ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT_HDR_PTR(destination);
auto& len = ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT_LEN(destination);
socket_ops::init_msghdr_msg_name(hdr.msg_name, endpoint.data());
hdr.msg_namelen = static_cast<int>(endpoint.size());
hdr.msg_iov = buffer_sequence_adapter.buffers();
hdr.msg_iovlen = buffer_sequence_adapter.count();
hdr.msg_control = NULL;
hdr.msg_controllen = 0;
hdr.msg_flags = 0;
len = 0;
}

template <typename MultipleBufferSequence>
void do_complete_op(native_multiple_buffer_type& source,
MultipleBufferSequence& destination, const asio::error_code& ec)
{
typename MultipleBufferSequence::endpoint_type& endpoint =
destination.endpoint();
auto const& hdr = ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT_HDR_PTR(source);
auto const& len = ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT_LEN(source);
if (!ec)
{
endpoint.resize(hdr.msg_namelen);
}
destination.do_complete(socket_base::message_flags(hdr.msg_flags),
static_cast<std::size_t>(len), ec);
}
};

// Helper class to translate buffers into the native multiple buffer
// representation.
template <typename MultipleBufferSequence>
class multiple_buffer_sequence_adapter
: public base_multiple_buffer_sequence_adapter
{
public:
typedef typename
base_multiple_buffer_sequence_adapter::native_multiple_buffer_type
native_multiple_buffer_type;
typedef native_multiple_buffer_type& native_reference;

typedef MultipleBufferSequence multiple_buffer_sequence_type;

#if defined(ASIO_STANDALONE)
typedef typename std::remove_const<typename std::remove_reference<
MultipleBufferSequence>::type>::type raw_multiple_buffer_sequence_type;
#else // defined(ASIO_STANDALONE)
typedef typename boost::remove_const<typename boost::remove_reference<
MultipleBufferSequence>::type>::type raw_multiple_buffer_sequence_type;
#endif // defined(ASIO_STANDALONE)

typedef typename raw_multiple_buffer_sequence_type::reference reference;

private:
// TODO-MBS: specialize native_multiple_buffer_type in asio::detail::array
// for fixed size multiple buffer sequence object
typedef std::vector<native_multiple_buffer_type>
native_multiple_buffer_type_container_type;

multiple_buffer_sequence_type& multiple_buffer_sequence_;

native_multiple_buffer_type_container_type
native_multiple_buffer_type_container_;

public:
explicit multiple_buffer_sequence_adapter(
multiple_buffer_sequence_type& _multiple_buffer_sequence)
: multiple_buffer_sequence_(_multiple_buffer_sequence)
{
do_prepare_at(offset());
}

native_multiple_buffer_type* native_buffers()
{
return native_multiple_buffer_type_container_.data();
}

std::size_t offset() const ASIO_NOEXCEPT
{
return multiple_buffer_sequence_.offset();
}

std::size_t count() const ASIO_NOEXCEPT
{
return multiple_buffer_sequence_.count();
}

std::size_t native_buffer_size() const ASIO_NOEXCEPT
{
return native_multiple_buffer_type_container_.size();
}

std::size_t size() const ASIO_NOEXCEPT
{
return multiple_buffer_sequence_.size();
}

std::size_t total_size() const ASIO_NOEXCEPT
{
return multiple_buffer_sequence_.total_size();
}

bool all_empty() const
{
return multiple_buffer_sequence_.all_empty();
}

bool full() const ASIO_NOEXCEPT
{
return multiple_buffer_sequence_.full();
}

std::size_t operations_executed() const ASIO_NOEXCEPT
{
return multiple_buffer_sequence_.operations_executed();
}

std::size_t bytes_transferred() const ASIO_NOEXCEPT
{
return multiple_buffer_sequence_.bytes_transferred();
}

void do_prepare_at(std::size_t offset)
{
if (offset >= multiple_buffer_sequence_.size())
{
throw std::out_of_range("offset not less than operations count");
}
std::size_t count_op = multiple_buffer_sequence_.size() - offset;
native_multiple_buffer_type_container_.resize(count_op);
for (std::size_t i = 0; i < count_op; ++i)
{
reference asio_multiple_buffer_sequence =
multiple_buffer_sequence_.at(i + offset);
native_reference native_multiple_buffer_sequence =
native_multiple_buffer_type_container_.at(i);
this->do_prepare_op(asio_multiple_buffer_sequence,
native_multiple_buffer_sequence);
}
multiple_buffer_sequence_.set_operations_executed(0);
multiple_buffer_sequence_.set_bytes_transferred(0);
}

void do_prepare()
{
do_prepare_at(offset());
}

void do_complete_at(std::size_t offset, std::size_t operations_executed,
const asio::error_code& ec)
{
if (offset >= multiple_buffer_sequence_.size())
{
throw std::out_of_range("offset not less than operations count");
}
std::size_t count_op = multiple_buffer_sequence_.size() - offset;
std::size_t bytes_transferred = 0;
for (std::size_t i = 0; i < count_op; ++i)
{
reference asio_multiple_buffer_sequence =
multiple_buffer_sequence_.at(i + offset);
native_reference native_multiple_buffer_sequence =
native_multiple_buffer_type_container_.at(i);
this->do_complete_op(native_multiple_buffer_sequence,
asio_multiple_buffer_sequence, ec);
bytes_transferred += asio_multiple_buffer_sequence.bytes_transferred();
}
multiple_buffer_sequence_.set_operations_executed(operations_executed);
multiple_buffer_sequence_.set_bytes_transferred(bytes_transferred);
}

void do_complete(std::size_t operations_executed,
const asio::error_code& ec)
{
do_complete_at(offset(), operations_executed, ec);
}

void do_complete_at(std::size_t offset, std::size_t operations_executed,
std::size_t bytes_transferred, const asio::error_code& ec)
{
do_complete_at(offset, operations_executed, ec);
if (multiple_buffer_sequence_.bytes_transferred() != bytes_transferred)
{
throw std::logic_error("bytes_transferred mismatch");
}
}

void do_complete(std::size_t operations_executed,
std::size_t bytes_transferred, const asio::error_code& ec)
{
do_complete_at(offset(), operations_executed, bytes_transferred, ec);
}
};

} // namespace detail
} // namespace asio

#include "asio/detail/pop_options.hpp"

#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

#endif // ASIO_DETAIL_MULTIPLE_BUFFER_SEQUENCE_ADAPTER_HPP
307 changes: 307 additions & 0 deletions asio/include/asio/detail/multiple_buffer_sequence_op.hpp
@@ -0,0 +1,307 @@
//
// multiple_buffer_sequence_op.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// ( TODO-MBS: update header with copyright of asio C++ library )
//
// Support for multiple datagram buffers code patches on Linux operating system
// Copyright (c) 2023 virgilio A. Fornazin (virgiliofornazin at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef ASIO_MULTIPLE_BUFFER_SEQUENCE_OP_HPP
#define ASIO_MULTIPLE_BUFFER_SEQUENCE_OP_HPP

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include "asio/detail/config.hpp"
#include "asio/detail/buffer_sequence_adapter.hpp"
#include "asio/error_code.hpp"
#include "asio/socket_base.hpp"
#include <cstddef>
#include <vector>

#include "asio/detail/push_options.hpp"

namespace asio {
namespace detail {

// a class that envelop standard asio const/mutable buffers sequences for
// send/receive multiple datagrams within a single system call in supported
// operating systems (Linux only at this time)
/**
* The @c buffer_sequence class envelop standard asio const/mutable
* buffers sequences for send/receive multiple datagrams within a single system
* call in supported operating systems (Linux only at this time)
*/
template <typename BufferSequence, typename EndpointType>
class multiple_buffer_sequence_op
{
public:
typedef BufferSequence buffer_sequence_type;
typedef EndpointType endpoint_type;
typedef asio::detail::buffer_sequence_adapter<buffer_sequence_type,
buffer_sequence_type> buffer_sequence_adapter_type;

void fixup_buffer_sequence_adapter()
{
buffer_sequence_adapter_ = buffer_sequence_adapter_type(buffer_sequence_);
}

public:
multiple_buffer_sequence_op() ASIO_NOEXCEPT
: buffer_sequence_(), buffer_sequence_adapter_(buffer_sequence_),
endpoint_(), completed_(false), flags_(0), bytes_transferred_(0),
error_code_()
{
fixup_buffer_sequence_adapter();
}

explicit multiple_buffer_sequence_op(
const buffer_sequence_type& _buffer_sequence)
ASIO_NOEXCEPT
: buffer_sequence_(_buffer_sequence),
buffer_sequence_adapter_(buffer_sequence_), endpoint_(),
completed_(false), flags_(0), bytes_transferred_(0),
error_code_()
{
fixup_buffer_sequence_adapter();
}

explicit multiple_buffer_sequence_op(
const buffer_sequence_type& _buffer_sequence,
const endpoint_type& _endpoint) ASIO_NOEXCEPT
: buffer_sequence_(_buffer_sequence),
buffer_sequence_adapter_(buffer_sequence_), endpoint_(_endpoint),
completed_(false), flags_(0), bytes_transferred_(0),
error_code_()
{
fixup_buffer_sequence_adapter();
}

#if defined(ASIO_HAS_MOVE)
multiple_buffer_sequence_op(
multiple_buffer_sequence_op const&& other)
: buffer_sequence_(std::move(other.buffer_sequence_)),
buffer_sequence_adapter_(buffer_sequence_),
endpoint_(std::move(other.endpoint_)),
completed_(std::move(other.completed_)),
flags_(std::move(other.flags_)),
bytes_transferred_(std::move(other.bytes_transferred_)),
error_code_(std::move(other.error_code_))
{
fixup_buffer_sequence_adapter();
}

multiple_buffer_sequence_op & operator = (
multiple_buffer_sequence_op const&& other)
{
buffer_sequence_ = std::move(other.buffer_sequence_);

fixup_buffer_sequence_adapter();

endpoint_ = std::move(other.endpoint_);
completed_ = std::move(other.completed_);
flags_ = std::move(other.flags_);
bytes_transferred_ = std::move(other.bytes_transferred_);
error_code_ = std::move(other.error_code_);

return (*this);
}
#else // defined(ASIO_HAS_MOVE)
multiple_buffer_sequence_op(
multiple_buffer_sequence_op const& other)
: buffer_sequence_(other.buffer_sequence_),
buffer_sequence_adapter_(buffer_sequence_), endpoint_(other.endpoint_),
completed_(other.completed_), flags_(other.flags_),
bytes_transferred_(other.bytes_transferred_),
error_code_(other.error_code_)
{
fixup_buffer_sequence_adapter();
}

multiple_buffer_sequence_op & operator = (
multiple_buffer_sequence_op const& other)
{
buffer_sequence_ = other.buffer_sequence_;

fixup_buffer_sequence_adapter();

endpoint_ = other.endpoint_;
completed_ = other.completed_;
flags_ = other.flags_;
bytes_transferred_ = other;bytes_transferred_;
error_code_ = other.error_code_;

return (*this);
}
#endif // defined(ASIO_HAS_MOVE)

public:
std::size_t count() const
{
return buffer_sequence_adapter_.count();
}

std::size_t total_size() const
{
return buffer_sequence_adapter_.total_size();
}

registered_buffer_id registered_id() const
{
return buffer_sequence_adapter_.registered_id();
}

bool all_empty() const
{
return buffer_sequence_adapter_.all_empty();
}

void reset() ASIO_NOEXCEPT
{
buffer_sequence_ = buffer_sequence_type();

fixup_buffer_sequence_adapter();

endpoint_ = endpoint_type();
completed_ = false;
flags_ = 0;
bytes_transferred_ = 0;
error_code_ = asio::error_code();
}

void reset(const buffer_sequence_type& _buffer_sequence) ASIO_NOEXCEPT
{
buffer_sequence_ = _buffer_sequence;

fixup_buffer_sequence_adapter();

endpoint_ = endpoint_type();
completed_ = false;
flags_ = 0;
bytes_transferred_ = 0;
error_code_ = asio::error_code();
}

void reset(const buffer_sequence_type& _buffer_sequence,
const endpoint_type& _endpoint) ASIO_NOEXCEPT
{
buffer_sequence_ = _buffer_sequence;

fixup_buffer_sequence_adapter();

endpoint_ = _endpoint;
completed_ = false;
flags_ = 0;
bytes_transferred_ = 0;
error_code_ = asio::error_code();
}

bool empty() const
{
return all_empty();
}

buffer_sequence_type& buffer_sequence() ASIO_NOEXCEPT
{
return buffer_sequence_;
}

const buffer_sequence_type& buffer_sequence() const
ASIO_NOEXCEPT
{
return buffer_sequence_;
}

buffer_sequence_adapter_type& buffer_sequence_adapter() ASIO_NOEXCEPT
{
return buffer_sequence_adapter_;
}

const buffer_sequence_adapter_type& buffer_sequence_adapter() const
ASIO_NOEXCEPT
{
return buffer_sequence_adapter_;
}

void set_buffer_sequence(const buffer_sequence_type& _buffer_sequence)
ASIO_NOEXCEPT
{
buffer_sequence_ = _buffer_sequence;

fixup_buffer_sequence_adapter();
}

endpoint_type& endpoint() ASIO_NOEXCEPT
{
return endpoint_;
}

const endpoint_type& endpoint() const ASIO_NOEXCEPT
{
return endpoint_;
}

void set_endpoint(const endpoint_type& _endpoint)
{
endpoint_ = _endpoint;
}

bool completed() const ASIO_NOEXCEPT
{
return completed_;
}

const socket_base::message_flags& flags() const ASIO_NOEXCEPT
{
return flags_;
}

std::size_t bytes_transferred() const ASIO_NOEXCEPT
{
return bytes_transferred_;
}

const asio::error_code& error_code() const ASIO_NOEXCEPT
{
return error_code_;
}

void do_complete(socket_base::message_flags _flags,
std::size_t _bytes_transferred, const asio::error_code& _error_code)
{
completed_ = true;
flags_ = _flags;
bytes_transferred_ = _bytes_transferred;
error_code_ = _error_code;
}

void do_complete(std::size_t _bytes_transferred,
const asio::error_code& _error_code)
{
completed_ = true;
bytes_transferred_ = _bytes_transferred;
error_code_ = _error_code;
}

private:
buffer_sequence_type buffer_sequence_;
buffer_sequence_adapter_type buffer_sequence_adapter_;
endpoint_type endpoint_;
bool completed_;
socket_base::message_flags flags_;
std::size_t bytes_transferred_;
asio::error_code error_code_;
};

} // namespace detail
} // namespace asio

#include "asio/detail/pop_options.hpp"

#endif // ASIO_MULTIPLE_BUFFER_SEQUENCE_OP_HPP
172 changes: 172 additions & 0 deletions asio/include/asio/detail/reactive_socket_recvmmsg_op.hpp
@@ -0,0 +1,172 @@
//
// detail/reactive_socket_recvmmsg_op.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// ( TODO-MBS: update header with copyright of asio C++ library )
//
// Support for multiple datagram buffers code patches on Linux operating system
// Copyright (c) 2023 virgilio A. Fornazin (virgiliofornazin at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef ASIO_DETAIL_REACTIVE_SOCKET_RECVMMSG_OP_HPP
#define ASIO_DETAIL_REACTIVE_SOCKET_RECVMMSG_OP_HPP

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include "asio/detail/config.hpp"

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

#include "asio/detail/bind_handler.hpp"
#include "asio/detail/multiple_buffer_sequence_adapter.hpp"
#include "asio/detail/fenced_block.hpp"
#include "asio/detail/handler_alloc_helpers.hpp"
#include "asio/detail/handler_invoke_helpers.hpp"
#include "asio/detail/handler_work.hpp"
#include "asio/detail/memory.hpp"
#include "asio/detail/reactor_op.hpp"
#include "asio/detail/socket_ops.hpp"
#include "asio/socket_base.hpp"

#include "asio/detail/push_options.hpp"

namespace asio {
namespace detail {

template <typename MultipleBufferSequence>
class reactive_socket_recvmmsg_op_base : public reactor_op
{
public:
reactive_socket_recvmmsg_op_base(const asio::error_code& success_ec,
socket_type socket, socket_ops::state_type state,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, func_type complete_func)
: reactor_op(success_ec,
&reactive_socket_recvmmsg_op_base::do_perform, complete_func),
socket_(socket),
state_(state),
multiple_buffer_sequence_(multiple_buffer_sequence),
flags_(flags)
{
}

static status do_perform(reactor_op* base)
{
reactive_socket_recvmmsg_op_base* o(
static_cast<reactive_socket_recvmmsg_op_base*>(base));

multiple_buffer_sequence_adapter<MultipleBufferSequence>
mbufs(o->multiple_buffer_sequence_);

status result = socket_ops::non_blocking_recvmmsg(o->socket_,
mbufs.native_buffers(), mbufs.native_buffer_size(), o->flags_,
(o->state_ & socket_ops::stream_oriented) != 0, o->ec_,
o->bytes_transferred_, o->operations_executed_)
? done : not_done;

mbufs.do_complete_at(mbufs.offset(), o->operations_executed_,
o->bytes_transferred_, o->ec_);

if (result == done)
if ((o->state_ & socket_ops::stream_oriented) != 0)
if (o->bytes_transferred_ == 0)
result = done_and_exhausted;

ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_recvmmsg",
o->ec_, o->bytes_transferred_));

return result;
}

MultipleBufferSequence& multiple_buffer_sequence() ASIO_NOEXCEPT
{
return multiple_buffer_sequence_;
}

private:
socket_type socket_;
socket_ops::state_type state_;
MultipleBufferSequence& multiple_buffer_sequence_;
socket_base::message_flags flags_;
};

template <typename MultipleBufferSequence, typename Handler,
typename IoExecutor>
class reactive_socket_recvmmsg_op :
public reactive_socket_recvmmsg_op_base<MultipleBufferSequence>
{
public:
ASIO_DEFINE_HANDLER_PTR(reactive_socket_recvmmsg_op);

reactive_socket_recvmmsg_op(const asio::error_code& success_ec,
socket_type socket, socket_ops::state_type state,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, Handler& handler,
const IoExecutor& io_ex)
: reactive_socket_recvmmsg_op_base<MultipleBufferSequence>(
success_ec, socket, state, multiple_buffer_sequence, flags,
&reactive_socket_recvmmsg_op::do_complete),
handler_(ASIO_MOVE_CAST(Handler)(handler)),
work_(handler_, io_ex)
{
}

static void do_complete(void* owner, operation* base,
const asio::error_code& /*ec*/,
std::size_t /*bytes_transferred*/)
{
// Take ownership of the handler object.
reactive_socket_recvmmsg_op* o(
static_cast<reactive_socket_recvmmsg_op*>(base));
ptr p = { asio::detail::addressof(o->handler_), o, o };

ASIO_HANDLER_COMPLETION((*o));

// Take ownership of the operation's outstanding work.
handler_work<Handler, IoExecutor> w(
ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));

ASIO_ERROR_LOCATION(o->ec_);

// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder3<Handler, asio::error_code, std::size_t, std::size_t>
handler(o->handler_, o->ec_, o->bytes_transferred_,
o->operations_executed_);
p.h = asio::detail::addressof(handler.handler_);
p.reset();

// Make the upcall if required.
if (owner)
{
fenced_block b(fenced_block::half);
ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_,
handler.arg3_, handler.arg4_));
w.complete(handler, handler.handler_);
ASIO_HANDLER_INVOCATION_END;
}
}

private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;
};

} // namespace detail
} // namespace asio

#include "asio/detail/pop_options.hpp"

#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

#endif // ASIO_DETAIL_REACTIVE_SOCKET_RECVMMSG_OP_HPP
171 changes: 171 additions & 0 deletions asio/include/asio/detail/reactive_socket_sendmmsg_op.hpp
@@ -0,0 +1,171 @@
//
// detail/reactive_socket_sendmmsg_op.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// ( TODO-MBS: update header with copyright of asio C++ library )
//
// Support for multiple datagram buffers code patches on Linux operating system
// Copyright (c) 2023 virgilio A. Fornazin (virgiliofornazin at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef ASIO_DETAIL_REACTIVE_SOCKET_SENDMMSG_OP_HPP
#define ASIO_DETAIL_REACTIVE_SOCKET_SENDMMSG_OP_HPP

#include "asio/buffer.hpp"
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include "asio/detail/config.hpp"

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

#include "asio/detail/bind_handler.hpp"
#include "asio/detail/multiple_buffer_sequence_adapter.hpp"
#include "asio/detail/fenced_block.hpp"
#include "asio/detail/handler_alloc_helpers.hpp"
#include "asio/detail/handler_invoke_helpers.hpp"
#include "asio/detail/handler_work.hpp"
#include "asio/detail/memory.hpp"
#include "asio/detail/reactor_op.hpp"
#include "asio/detail/socket_ops.hpp"

#include "asio/detail/push_options.hpp"

namespace asio {
namespace detail {

template <typename MultipleBufferSequence>
class reactive_socket_sendmmsg_op_base : public reactor_op
{
public:
reactive_socket_sendmmsg_op_base(const asio::error_code& success_ec,
socket_type socket, socket_ops::state_type state,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, func_type complete_func)
: reactor_op(success_ec,
&reactive_socket_sendmmsg_op_base::do_perform, complete_func),
socket_(socket),
state_(state),
multiple_buffer_sequence_(multiple_buffer_sequence),
flags_(flags)
{
}

static status do_perform(reactor_op* base)
{
reactive_socket_sendmmsg_op_base* o(
static_cast<reactive_socket_sendmmsg_op_base*>(base));

multiple_buffer_sequence_adapter<MultipleBufferSequence>
mbufs(o->multiple_buffer_sequence_);

status result = socket_ops::non_blocking_sendmmsg(o->socket_,
mbufs.native_buffers(), mbufs.native_buffer_size(), o->flags_,
o->ec_, o->bytes_transferred_, o->operations_executed_)
? done : not_done;

mbufs.do_complete_at(mbufs.offset(), o->operations_executed_,
o->bytes_transferred_, o->ec_);

if (result == done)
if ((o->state_ & socket_ops::stream_oriented) != 0)
if (o->bytes_transferred_ < mbufs.total_size())
result = done_and_exhausted;

ASIO_HANDLER_REACTOR_OPERATION((*o, "non_blocking_sendmmsg",
o->ec_, o->bytes_transferred_));

return result;
}

MultipleBufferSequence& multiple_buffer_sequence() ASIO_NOEXCEPT
{
return multiple_buffer_sequence_;
}

private:
socket_type socket_;
socket_ops::state_type state_;
MultipleBufferSequence& multiple_buffer_sequence_;
socket_base::message_flags flags_;
};

template <typename MultipleBufferSequence, typename Handler,
typename IoExecutor>
class reactive_socket_sendmmsg_op :
public reactive_socket_sendmmsg_op_base<MultipleBufferSequence>
{
public:
ASIO_DEFINE_HANDLER_PTR(reactive_socket_sendmmsg_op);

reactive_socket_sendmmsg_op(const asio::error_code& success_ec,
socket_type socket, socket_ops::state_type state,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags,
Handler& handler, const IoExecutor& io_ex)
: reactive_socket_sendmmsg_op_base<MultipleBufferSequence>(
success_ec, socket, state, multiple_buffer_sequence, flags,
&reactive_socket_sendmmsg_op::do_complete),
handler_(ASIO_MOVE_CAST(Handler)(handler)),
work_(handler_, io_ex)
{
}

static void do_complete(void* owner, operation* base,
const asio::error_code& /*ec*/,
std::size_t /*bytes_transferred*/)
{
// Take ownership of the handler object.
reactive_socket_sendmmsg_op* o(
static_cast<reactive_socket_sendmmsg_op*>(base));
ptr p = { asio::detail::addressof(o->handler_), o, o };

ASIO_HANDLER_COMPLETION((*o));

// Take ownership of the operation's outstanding work.
handler_work<Handler, IoExecutor> w(
ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));

ASIO_ERROR_LOCATION(o->ec_);

// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder3<Handler, asio::error_code, std::size_t, std::size_t>
handler(o->handler_, o->ec_, o->bytes_transferred_,
o->operations_executed_);
p.h = asio::detail::addressof(handler.handler_);
p.reset();

// Make the upcall if required.
if (owner)
{
fenced_block b(fenced_block::half);
ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_,
handler.arg3_, handler.arg4_));
w.complete(handler, handler.handler_);
ASIO_HANDLER_INVOCATION_END;
}
}

private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;
};

} // namespace detail
} // namespace asio

#include "asio/detail/pop_options.hpp"

#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

#endif // ASIO_DETAIL_REACTIVE_SOCKET_SENDMMSG_OP_HPP
213 changes: 213 additions & 0 deletions asio/include/asio/detail/reactive_socket_service.hpp
Expand Up @@ -261,6 +261,19 @@ class reactive_socket_service :
return n;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Send a datagram to the specified endpoint. Returns the number of bytes
// sent.
template <typename MultipleBufferSequence>
size_t send_multiple_buffer_sequence_to(implementation_type& impl,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, asio::error_code& ec)
{
return send_multiple_buffer_sequence(impl, multiple_buffer_sequence,
flags, ec);
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Wait until data can be sent without blocking.
size_t send_to(implementation_type& impl, const null_buffers&,
const endpoint_type&, socket_base::message_flags,
Expand All @@ -273,6 +286,21 @@ class reactive_socket_service :
return 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Send a datagram to the specified endpoint. Returns the number of bytes
// sent.
size_t send_multiple_buffer_sequence_to(implementation_type& impl,
const null_buffers&, socket_base::message_flags,
asio::error_code& ec)
{
// Wait for socket to become ready.
socket_ops::poll_write(impl.socket_, impl.state_, -1, ec);

ASIO_ERROR_LOCATION(ec);
return 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Start an asynchronous send. The data being sent must be valid for the
// lifetime of the asynchronous operation.
template <typename ConstBufferSequence, typename Handler, typename IoExecutor>
Expand Down Expand Up @@ -310,6 +338,45 @@ class reactive_socket_service :
p.v = p.p = 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Start an asynchronous send. The data being sent must be valid for the
// lifetime of the asynchronous operation.
template <typename MultipleBufferSequence, typename Handler, typename IoExecutor>
void async_send_multiple_buffer_sequence_to(implementation_type& impl,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags,
Handler& handler, const IoExecutor& io_ex)
{
bool is_continuation =
asio_handler_cont_helpers::is_continuation(handler);

typename associated_cancellation_slot<Handler>::type slot
= asio::get_associated_cancellation_slot(handler);

// Allocate and construct an operation to wrap the handler.
typedef reactive_socket_sendmmsg_op<MultipleBufferSequence, Handler,
IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(success_ec_, impl.socket_, impl.state_,
multiple_buffer_sequence, flags, handler, io_ex);

// Optionally register for per-operation cancellation.
if (slot.is_connected())
{
p.p->cancellation_key_ =
&slot.template emplace<reactor_op_cancellation>(
&reactor_, &impl.reactor_data_, impl.socket_, reactor::write_op);
}

ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_send_multiple_buffer_sequence_to"));

start_op(impl, reactor::write_op, p.p, is_continuation, true, false);
p.v = p.p = 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Start an asynchronous wait until data can be sent without blocking.
template <typename Handler, typename IoExecutor>
void async_send_to(implementation_type& impl, const null_buffers&,
Expand Down Expand Up @@ -343,6 +410,41 @@ class reactive_socket_service :
p.v = p.p = 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Start an asynchronous wait until data can be sent without blocking.
template <typename Handler, typename IoExecutor>
void async_send_multiple_buffer_sequence_to(implementation_type& impl,
const null_buffers&, const endpoint_type&, socket_base::message_flags,
Handler& handler, const IoExecutor& io_ex)
{
bool is_continuation =
asio_handler_cont_helpers::is_continuation(handler);

typename associated_cancellation_slot<Handler>::type slot
= asio::get_associated_cancellation_slot(handler);

// Allocate and construct an operation to wrap the handler.
typedef reactive_null_buffers_op<Handler, IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(success_ec_, handler, io_ex);

// Optionally register for per-operation cancellation.
if (slot.is_connected())
{
p.p->cancellation_key_ =
&slot.template emplace<reactor_op_cancellation>(
&reactor_, &impl.reactor_data_, impl.socket_, reactor::write_op);
}

ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_send_multiple_buffer_sequence_to(null_buffers)"));

start_op(impl, reactor::write_op, p.p, is_continuation, false, false);
p.v = p.p = 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Receive a datagram with the endpoint of the sender. Returns the number of
// bytes received.
template <typename MutableBufferSequence>
Expand Down Expand Up @@ -376,6 +478,19 @@ class reactive_socket_service :
return n;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Receive a datagram with the endpoint of the sender. Returns the number of
// bytes received.
template <typename MultipleBufferSequence>
size_t receive_multiple_buffer_sequence_to_from(implementation_type& impl,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, asio::error_code& ec)
{
return receive_multiple_buffer_sequence(impl, multiple_buffer_sequence,
flags, ec);
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Wait until data can be received without blocking.
size_t receive_from(implementation_type& impl, const null_buffers&,
endpoint_type& sender_endpoint, socket_base::message_flags,
Expand All @@ -391,6 +506,21 @@ class reactive_socket_service :
return 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Receive a datagram with the endpoint of the sender. Returns the number of
// bytes received.
size_t receive_multiple_buffer_sequence_to_from(implementation_type& impl,
const null_buffers&, socket_base::message_flags,
asio::error_code& ec)
{
// Wait for socket to become ready.
socket_ops::poll_read(impl.socket_, impl.state_, -1, ec);

ASIO_ERROR_LOCATION(ec);
return 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Start an asynchronous receive. The buffer for the data being received and
// the sender_endpoint object must both be valid for the lifetime of the
// asynchronous operation.
Expand Down Expand Up @@ -434,6 +564,50 @@ class reactive_socket_service :
p.v = p.p = 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Start an asynchronous receive. The buffer for the data being received and
// the sender_endpoint object must both be valid for the lifetime of the
// asynchronous operation.
template <typename MultipleBufferSequence,
typename Handler, typename IoExecutor>
void async_receive_multiple_buffer_sequence_from(implementation_type& impl,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, Handler& handler,
const IoExecutor& io_ex)
{
bool is_continuation =
asio_handler_cont_helpers::is_continuation(handler);

typename associated_cancellation_slot<Handler>::type slot
= asio::get_associated_cancellation_slot(handler);

// Allocate and construct an operation to wrap the handler.
typedef reactive_socket_recvmmsg_op<MultipleBufferSequence,
Handler, IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(success_ec_, impl.socket_, impl.state_,
multiple_buffer_sequence, flags, handler, io_ex);

// Optionally register for per-operation cancellation.
if (slot.is_connected())
{
p.p->cancellation_key_ =
&slot.template emplace<reactor_op_cancellation>(
&reactor_, &impl.reactor_data_, impl.socket_, reactor::read_op);
}

ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_receive_multiple_buffer_sequence_from"));

start_op(impl,
(flags & socket_base::message_out_of_band)
? reactor::except_op : reactor::read_op,
p.p, is_continuation, true, false);
p.v = p.p = 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Wait until data can be received without blocking.
template <typename Handler, typename IoExecutor>
void async_receive_from(implementation_type& impl, const null_buffers&,
Expand Down Expand Up @@ -473,6 +647,45 @@ class reactive_socket_service :
p.v = p.p = 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Wait until data can be received without blocking.
template <typename Handler, typename IoExecutor>
void async_receive_multiple_buffer_sequence_from(implementation_type& impl,
const null_buffers&, socket_base::message_flags flags, Handler& handler,
const IoExecutor& io_ex)
{
bool is_continuation =
asio_handler_cont_helpers::is_continuation(handler);

typename associated_cancellation_slot<Handler>::type slot
= asio::get_associated_cancellation_slot(handler);

// Allocate and construct an operation to wrap the handler.
typedef reactive_null_buffers_op<Handler, IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(success_ec_, handler, io_ex);

// Optionally register for per-operation cancellation.
if (slot.is_connected())
{
p.p->cancellation_key_ =
&slot.template emplace<reactor_op_cancellation>(
&reactor_, &impl.reactor_data_, impl.socket_, reactor::read_op);
}

ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_,
"async_receive_multiple_buffer_sequence_from(null_buffers)"));

start_op(impl,
(flags & socket_base::message_out_of_band)
? reactor::except_op : reactor::read_op,
p.p, is_continuation, false, false);
p.v = p.p = 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Accept a new connection.
template <typename Socket>
asio::error_code accept(implementation_type& impl,
Expand Down
282 changes: 282 additions & 0 deletions asio/include/asio/detail/reactive_socket_service_base.hpp
Expand Up @@ -28,10 +28,13 @@
#include "asio/execution_context.hpp"
#include "asio/socket_base.hpp"
#include "asio/detail/buffer_sequence_adapter.hpp"
#include "asio/detail/multiple_buffer_sequence_adapter.hpp"
#include "asio/detail/memory.hpp"
#include "asio/detail/reactive_null_buffers_op.hpp"
#include "asio/detail/reactive_socket_recv_op.hpp"
#include "asio/detail/reactive_socket_recvmsg_op.hpp"
#include "asio/detail/reactive_socket_recvmmsg_op.hpp"
#include "asio/detail/reactive_socket_sendmmsg_op.hpp"
#include "asio/detail/reactive_socket_send_op.hpp"
#include "asio/detail/reactive_wait_op.hpp"
#include "asio/detail/reactor.hpp"
Expand Down Expand Up @@ -267,6 +270,26 @@ class reactive_socket_service_base
bufs.buffers(), bufs.count(), flags, bufs.all_empty(), ec);
}
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Send the given data buffer(s) to the peer(s).
template <typename MultipleBufferSequence>
size_t send_multiple_buffer_sequence(base_implementation_type& impl,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, asio::error_code& ec)
{
multiple_buffer_sequence_adapter<MultipleBufferSequence>
mbufs(multiple_buffer_sequence);

size_t result = socket_ops::sync_sendmmsg(impl.socket_, impl.state_,
mbufs.native_buffers(), mbufs.native_buffer_size(), flags,
mbufs.all_empty(), ec);

mbufs.do_complete_at(mbufs.offset(), result, ec);

return result;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Wait until data can be sent without blocking.
size_t send(base_implementation_type& impl, const null_buffers&,
Expand All @@ -278,6 +301,18 @@ class reactive_socket_service_base
return 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Wait until data can be sent without blocking.
size_t send_multiple_buffer_sequence(base_implementation_type& impl,
const null_buffers&, socket_base::message_flags, asio::error_code& ec)
{
// Wait for socket to become ready.
socket_ops::poll_write(impl.socket_, impl.state_, -1, ec);

return 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Start an asynchronous send. The data being sent must be valid for the
// lifetime of the asynchronous operation.
template <typename ConstBufferSequence, typename Handler, typename IoExecutor>
Expand Down Expand Up @@ -316,6 +351,48 @@ class reactive_socket_service_base
ConstBufferSequence>::all_empty(buffers)));
p.v = p.p = 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Start an asynchronous send. The data being sent must be valid for the
// lifetime of the asynchronous operation.
template <typename MultipleBufferSequence, typename Handler,
typename IoExecutor>
void async_send_multiple_buffer_sequence(base_implementation_type& impl,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, Handler& handler,
const IoExecutor& io_ex)
{
bool is_continuation =
asio_handler_cont_helpers::is_continuation(handler);

typename associated_cancellation_slot<Handler>::type slot
= asio::get_associated_cancellation_slot(handler);

// Allocate and construct an operation to wrap the handler.
typedef reactive_socket_sendmmsg_op<
MultipleBufferSequence, Handler, IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(success_ec_, impl.socket_,
impl.state_, multiple_buffer_sequence, flags, handler, io_ex);

// Optionally register for per-operation cancellation.
if (slot.is_connected())
{
p.p->cancellation_key_ =
&slot.template emplace<reactor_op_cancellation>(
&reactor_, &impl.reactor_data_, impl.socket_, reactor::write_op);
}

ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_send_multiple_buffer_sequence"));

start_op(impl, reactor::write_op, p.p, is_continuation, true,
((impl.state_ & socket_ops::stream_oriented)
&& multiple_buffer_sequence.all_empty()));
p.v = p.p = 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Start an asynchronous wait until data can be sent without blocking.
template <typename Handler, typename IoExecutor>
Expand Down Expand Up @@ -349,6 +426,42 @@ class reactive_socket_service_base
p.v = p.p = 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Start an asynchronous wait until data can be sent without blocking.
template <typename Handler, typename IoExecutor>
void async_send_multiple_buffer_sequence(base_implementation_type& impl,
const null_buffers&, socket_base::message_flags, Handler& handler,
const IoExecutor& io_ex)
{
bool is_continuation =
asio_handler_cont_helpers::is_continuation(handler);

typename associated_cancellation_slot<Handler>::type slot
= asio::get_associated_cancellation_slot(handler);

// Allocate and construct an operation to wrap the handler.
typedef reactive_null_buffers_op<Handler, IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(success_ec_, handler, io_ex);

// Optionally register for per-operation cancellation.
if (slot.is_connected())
{
p.p->cancellation_key_ =
&slot.template emplace<reactor_op_cancellation>(
&reactor_, &impl.reactor_data_, impl.socket_, reactor::write_op);
}

ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_,
"async_send_multiple_buffer_sequence(null_buffers)"));

start_op(impl, reactor::write_op, p.p, is_continuation, false, false);
p.v = p.p = 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Receive some data from the peer. Returns the number of bytes received.
template <typename MutableBufferSequence>
size_t receive(base_implementation_type& impl,
Expand All @@ -372,6 +485,18 @@ class reactive_socket_service_base
}
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Receive some data from the peer. Returns the number of bytes received.
template <typename MultipleBufferSequence>
size_t receive_multiple_buffer_sequence(base_implementation_type& impl,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, asio::error_code& ec)
{
return receive_multiple_buffer_sequence_with_flags(impl,
multiple_buffer_sequence, flags, ec);
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Wait until data can be received without blocking.
size_t receive(base_implementation_type& impl, const null_buffers&,
socket_base::message_flags, asio::error_code& ec)
Expand All @@ -382,6 +507,15 @@ class reactive_socket_service_base
return 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Wait until data can be received without blocking.
size_t receive_multiple_buffer_sequence(base_implementation_type& impl,
const null_buffers& nb, socket_base::message_flags, asio::error_code& ec)
{
return receive_multiple_buffer_sequence_with_flags(impl, nb, 0, ec);
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Start an asynchronous receive. The buffer for the data being received
// must be valid for the lifetime of the asynchronous operation.
template <typename MutableBufferSequence,
Expand Down Expand Up @@ -426,6 +560,21 @@ class reactive_socket_service_base
p.v = p.p = 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Start an asynchronous receive. The buffer for the data being received
// must be valid for the lifetime of the asynchronous operation.
template <typename MultipleBufferSequence,
typename Handler, typename IoExecutor>
void async_receive_multiple_buffer_sequence(base_implementation_type& impl,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, Handler& handler,
const IoExecutor& io_ex)
{
return async_receive_multiple_buffer_sequence_with_flags(impl,
multiple_buffer_sequence, flags, handler, io_ex);
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Wait until data can be received without blocking.
template <typename Handler, typename IoExecutor>
void async_receive(base_implementation_type& impl,
Expand Down Expand Up @@ -462,6 +611,18 @@ class reactive_socket_service_base
p.v = p.p = 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Wait until data can be received without blocking.
template <typename Handler, typename IoExecutor>
void async_receive_multiple_buffer_sequence(base_implementation_type& impl,
const null_buffers& nb, socket_base::message_flags flags,
Handler& handler, const IoExecutor& io_ex)
{
return async_receive_multiple_buffer_sequence_with_flags(impl, nb, flags,
handler, io_ex);
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Receive some data with associated flags. Returns the number of bytes
// received.
template <typename MutableBufferSequence>
Expand All @@ -477,6 +638,28 @@ class reactive_socket_service_base
bufs.buffers(), bufs.count(), in_flags, out_flags, ec);
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Receive some data with associated flags. Returns the number of bytes
// received.
template <typename MultipleBufferSequence>
size_t receive_multiple_buffer_sequence_with_flags(
base_implementation_type& impl,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, asio::error_code& ec)
{
multiple_buffer_sequence_adapter<MultipleBufferSequence>
mbufs(multiple_buffer_sequence);

size_t result = socket_ops::sync_recvmmsg(impl.socket_, impl.state_,
mbufs.native_buffers(), mbufs.native_buffer_size(), flags,
ec);

mbufs.do_complete_at(mbufs.offset(), result, ec);

return result;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Wait until data can be received without blocking.
size_t receive_with_flags(base_implementation_type& impl,
const null_buffers&, socket_base::message_flags,
Expand All @@ -492,6 +675,19 @@ class reactive_socket_service_base
return 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Wait until data can be received without blocking.
size_t receive_multiple_buffer_sequence_with_flags(
base_implementation_type& impl, const null_buffers&,
socket_base::message_flags, asio::error_code& ec)
{
// Wait for socket to become ready.
socket_ops::poll_read(impl.socket_, impl.state_, -1, ec);

return 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Start an asynchronous receive. The buffer for the data being received
// must be valid for the lifetime of the asynchronous operation.
template <typename MutableBufferSequence,
Expand Down Expand Up @@ -534,6 +730,52 @@ class reactive_socket_service_base
p.v = p.p = 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Start an asynchronous receive. The buffer for the data being received
// must be valid for the lifetime of the asynchronous operation.
template <typename MultipleBufferSequence,
typename Handler, typename IoExecutor>
void async_receive_multiple_buffer_sequence_with_flags(
base_implementation_type& impl,
MultipleBufferSequence& multiple_buffer_sequence,
socket_base::message_flags flags, Handler& handler,
const IoExecutor& io_ex)
{
bool is_continuation =
asio_handler_cont_helpers::is_continuation(handler);

typename associated_cancellation_slot<Handler>::type slot
= asio::get_associated_cancellation_slot(handler);

// Allocate and construct an operation to wrap the handler.
typedef reactive_socket_recvmmsg_op<
MultipleBufferSequence, Handler, IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(success_ec_, impl.socket_, impl.state_,
multiple_buffer_sequence, flags, handler, io_ex);

// Optionally register for per-operation cancellation.
if (slot.is_connected())
{
p.p->cancellation_key_ =
&slot.template emplace<reactor_op_cancellation>(
&reactor_, &impl.reactor_data_, impl.socket_, reactor::read_op);
}

ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_,
"async_receive_multiple_buffer_sequence_with_flags"));

start_op(impl,
(flags & socket_base::message_out_of_band)
? reactor::except_op : reactor::read_op,
p.p, is_continuation,
(flags & socket_base::message_out_of_band) == 0, false);
p.v = p.p = 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

// Wait until data can be received without blocking.
template <typename Handler, typename IoExecutor>
void async_receive_with_flags(base_implementation_type& impl,
Expand Down Expand Up @@ -575,6 +817,46 @@ class reactive_socket_service_base
p.v = p.p = 0;
}

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
// Wait until data can be received without blocking.
template <typename Handler, typename IoExecutor>
void async_receive_multiple_buffer_sequence_with_flags(
base_implementation_type& impl, const null_buffers&,
socket_base::message_flags flags, Handler& handler,
const IoExecutor& io_ex)
{
bool is_continuation =
asio_handler_cont_helpers::is_continuation(handler);

typename associated_cancellation_slot<Handler>::type slot
= asio::get_associated_cancellation_slot(handler);

// Allocate and construct an operation to wrap the handler.
typedef reactive_null_buffers_op<Handler, IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(success_ec_, handler, io_ex);

// Optionally register for per-operation cancellation.
if (slot.is_connected())
{
p.p->cancellation_key_ =
&slot.template emplace<reactor_op_cancellation>(
&reactor_, &impl.reactor_data_, impl.socket_, reactor::read_op);
}

ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_,
"async_receive_multiple_buffer_sequence_with_flags(null_buffers)"));

start_op(impl,
(flags & socket_base::message_out_of_band)
? reactor::except_op : reactor::read_op,
p.p, is_continuation, false, false);
p.v = p.p = 0;
}
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

protected:
// Open a new socket implementation.
ASIO_DECL asio::error_code do_open(
Expand Down
4 changes: 4 additions & 0 deletions asio/include/asio/detail/reactor_op.hpp
Expand Up @@ -36,6 +36,9 @@ class reactor_op
// The number of bytes transferred, to be passed to the completion handler.
std::size_t bytes_transferred_;

// The number of operations completed, to be passed to the completion handler.
std::size_t operations_executed_;

// Status returned by perform function. May be used to decide whether it is
// worth performing more operations on the descriptor immediately.
enum status { not_done, done, done_and_exhausted };
Expand All @@ -55,6 +58,7 @@ class reactor_op
ec_(success_ec),
cancellation_key_(0),
bytes_transferred_(0),
operations_executed_(0),
perform_func_(perform_func)
{
}
Expand Down
40 changes: 40 additions & 0 deletions asio/include/asio/detail/socket_ops.hpp
Expand Up @@ -130,6 +130,10 @@ typedef WSABUF buf;
typedef iovec buf;
#endif // defined(ASIO_WINDOWS) || defined(__CYGWIN__)

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
typedef ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT mbufs;
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

ASIO_DECL void init_buf(buf& b, void* data, size_t size);

ASIO_DECL void init_buf(buf& b, const void* data, size_t size);
Expand Down Expand Up @@ -220,6 +224,24 @@ ASIO_DECL bool non_blocking_recvmsg(socket_type s,

#endif // defined(ASIO_HAS_IOCP)

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

ASIO_DECL signed_size_type recvmmsg(socket_type s, mbufs* bufs,
size_t count, int flags, asio::error_code& ec);

ASIO_DECL size_t sync_recvmmsg(socket_type s, state_type state, mbufs* bufs,
size_t count, int flags, bool all_empty, asio::error_code& ec);

#if !defined(ASIO_HAS_IOCP)

ASIO_DECL bool non_blocking_recvmmsg(socket_type s, mbufs* bufs, size_t count,
int flags, bool is_stream, asio::error_code& ec, size_t& bytes_transferred,
size_t& operations_executed);

#endif // !defined(ASIO_HAS_IOCP)

#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

ASIO_DECL signed_size_type send(socket_type s, const buf* bufs,
size_t count, int flags, asio::error_code& ec);

Expand Down Expand Up @@ -279,6 +301,24 @@ ASIO_DECL bool non_blocking_sendto1(socket_type s, const void* data,

#endif // !defined(ASIO_HAS_IOCP)

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

ASIO_DECL signed_size_type sendmmsg(socket_type s,
mbufs* bufs, size_t count, int flags, asio::error_code& ec);

ASIO_DECL size_t sync_sendmmsg(socket_type s, state_type state,
mbufs* bufs, size_t count, int flags, bool all_empty, asio::error_code& ec);

#if !defined(ASIO_HAS_IOCP)

ASIO_DECL bool non_blocking_sendmmsg(socket_type s, mbufs* bufs,
size_t count, int flags, asio::error_code& ec, size_t& bytes_transferred,
size_t& operations_executed);

#endif // !defined(ASIO_HAS_IOCP)

#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

ASIO_DECL socket_type socket(int af, int type, int protocol,
asio::error_code& ec);

Expand Down
27 changes: 27 additions & 0 deletions asio/include/asio/detail/socket_types.hpp
Expand Up @@ -91,6 +91,33 @@
# endif
#endif

#if defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)
# if (defined(__MACH__) && defined(__APPLE__))
struct msghdr_x {
void *msg_name; /* optional address */
socklen_t msg_namelen; /* size of address */
struct iovec *msg_iov; /* scatter/gather array */
int msg_iovlen; /* # elements in msg_iov */
void *msg_control; /* ancillary data, see below */
socklen_t msg_controllen; /* ancillary data buffer len */
int msg_flags; /* flags on received message */
size_t msg_datalen; /* byte length of buffer in msg_iov */
};
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT struct msghdr_x
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT_HDR_PTR(x) x
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT_LEN(x) x.msg_datalen
# endif // (defined(__MACH__) && defined(__APPLE__))
# if defined(__linux__) || defined(__FreeBSD__) \
|| defined(__NetBSD__) || defined(__OpenBSD__) \
|| defined(_AIX) || defined(__QNXNTO__)
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT struct mmsghdr
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT_HDR_PTR(x) x.msg_hdr
# define ASIO_MULTIPLE_BUFFER_SEQUENCE_STRUCT_LEN(x) x.msg_len
# endif // defined(__linux__) || defined(__FreeBSD__)
// || defined(__NetBSD__) || defined(__OpenBSD__)
// || defined(_AIX) || defined(__QNXNTO__)
#endif // defined(ASIO_HAS_MULTIPLE_BUFFER_SEQUENCE_IO)

#include "asio/detail/push_options.hpp"

namespace asio {
Expand Down
918 changes: 918 additions & 0 deletions asio/include/asio/multiple_buffer_sequence.hpp

Large diffs are not rendered by default.