diff --git a/include/asyncpp/io/detail/io_engine.h b/include/asyncpp/io/detail/io_engine.h index eec102c..d984868 100644 --- a/include/asyncpp/io/detail/io_engine.h +++ b/include/asyncpp/io/detail/io_engine.h @@ -79,6 +79,11 @@ namespace asyncpp::io::detail { virtual endpoint socket_local_endpoint(socket_handle_t socket) = 0; virtual endpoint socket_remote_endpoint(socket_handle_t socket) = 0; virtual void socket_enable_broadcast(socket_handle_t socket, bool enable) = 0; + virtual void socket_multicast_join(socket_handle_t socket, address group, address iface) = 0; + virtual void socket_multicast_drop(socket_handle_t socket, address group, address iface) = 0; + virtual void socket_multicast_set_send_interface(socket_handle_t socket, address iface) = 0; + virtual void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) = 0; + virtual void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) = 0; virtual void socket_shutdown(socket_handle_t socket, bool receive, bool send) = 0; virtual bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) = 0; virtual bool enqueue_accept(socket_handle_t socket, completion_data* cd) = 0; diff --git a/include/asyncpp/io/socket.h b/include/asyncpp/io/socket.h index da2034c..8115a88 100644 --- a/include/asyncpp/io/socket.h +++ b/include/asyncpp/io/socket.h @@ -97,6 +97,13 @@ namespace asyncpp::io { void listen(std::uint32_t backlog = 0); void allow_broadcast(bool enable); + void multicast_join(address group, address iface); + void multicast_join(address group); + void multicast_drop(address group, address iface); + void multicast_drop(address group); + void multicast_set_send_interface(address iface); + void multicast_set_ttl(size_t ttl); + void multicast_set_loopback(bool enabled); [[nodiscard]] detail::io_engine::socket_handle_t native_handle() const noexcept { return m_fd; } [[nodiscard]] detail::io_engine::socket_handle_t release() noexcept { @@ -620,7 +627,7 @@ namespace asyncpp::io { if (that->result) that->real_cb(that->result); else - that->real_cb(socket::from_fd(that->service(), that->result_handle)); + that->real_cb(socket::from_fd(that->service, that->result_handle)); delete that; }; diff --git a/src/io_engine_generic_unix.cpp b/src/io_engine_generic_unix.cpp index 2886aee..61be121 100644 --- a/src/io_engine_generic_unix.cpp +++ b/src/io_engine_generic_unix.cpp @@ -1,6 +1,10 @@ #ifndef _WIN32 #include "io_engine_generic_unix.h" +#ifdef __APPLE__ +#define _DARWIN_C_SOURCE +#endif + #include #include @@ -137,6 +141,99 @@ namespace asyncpp::io::detail { if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); } + void io_engine_generic_unix::socket_multicast_join(socket_handle_t socket, address group, address iface) { + if (group.type() != iface.type()) + throw std::system_error(std::make_error_code(std::errc::invalid_argument), + "group and interface need to be of the same type"); + if (group.is_ipv4()) { + struct ip_mreq mc_req {}; + mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr; + mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr; + auto res = setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mc_req, sizeof(mc_req)); + if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); + } else if (group.is_ipv6()) { + struct ipv6_mreq mc_req {}; + mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr; + mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id; + auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mc_req, sizeof(mc_req)); + if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); + } else { + throw std::system_error(std::make_error_code(std::errc::not_supported), + "multicast is only supported on IPv4/IPv6"); + } + } + + void io_engine_generic_unix::socket_multicast_drop(socket_handle_t socket, address group, address iface) { + if (group.type() != iface.type()) + throw std::system_error(std::make_error_code(std::errc::invalid_argument), + "group and interface need to be of the same type"); + if (group.is_ipv4()) { + struct ip_mreq mc_req {}; + mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr; + mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr; + auto res = setsockopt(socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mc_req, sizeof(mc_req)); + if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); + } else if (group.is_ipv6()) { + struct ipv6_mreq mc_req {}; + mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr; + mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id; + auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_LEAVE_GROUP, &mc_req, sizeof(mc_req)); + if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); + } else { + throw std::system_error(std::make_error_code(std::errc::not_supported), + "multicast is only supported on IPv4/IPv6"); + } + } + + void io_engine_generic_unix::socket_multicast_set_send_interface(socket_handle_t socket, address iface) { + if (iface.is_ipv4()) { + auto addr = iface.ipv4().to_sockaddr_in().first.sin_addr.s_addr; + auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast(&addr), sizeof(addr)); + if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); + } else if (iface.is_ipv6()) { + auto scope = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id; + auto res = + setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_IF, reinterpret_cast(&scope), sizeof(scope)); + if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); + } else { + throw std::system_error(std::make_error_code(std::errc::not_supported), + "multicast is only supported on IPv4/IPv6"); + } + } + + void io_engine_generic_unix::socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) { + auto type = get_handle_type(socket); + if (ttl > std::numeric_limits::max()) throw std::invalid_argument("ttl value out of range"); + int ittl = ttl; + if (type == address_type::ipv4) { + auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_TTL, reinterpret_cast(&ittl), sizeof(ittl)); + if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); + } else if (type == address_type::ipv6) { + auto res = + setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, reinterpret_cast(&ittl), sizeof(ittl)); + if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); + } else { + throw std::system_error(std::make_error_code(std::errc::not_supported), + "multicast is only supported on IPv4/IPv6"); + } + } + + void io_engine_generic_unix::socket_multicast_set_loopback(socket_handle_t socket, bool enabled) { + auto type = get_handle_type(socket); + int val = enabled ? 1 : 0; + if (type == address_type::ipv4) { + auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_LOOP, reinterpret_cast(&val), sizeof(val)); + if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); + } else if (type == address_type::ipv6) { + auto res = + setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, reinterpret_cast(&val), sizeof(val)); + if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed"); + } else { + throw std::system_error(std::make_error_code(std::errc::not_supported), + "multicast is only supported on IPv4/IPv6"); + } + } + void io_engine_generic_unix::socket_shutdown(socket_handle_t socket, bool receive, bool send) { int mode = 0; if (receive && send) @@ -151,6 +248,19 @@ namespace asyncpp::io::detail { if (res < 0 && errno != ENOTCONN) throw std::system_error(errno, std::system_category(), "shutdown failed"); } + address_type io_engine_generic_unix::get_handle_type(socket_handle_t socket) { + int type = -1; + socklen_t length = sizeof(int); + auto res = getsockopt(socket, SOL_SOCKET, SO_TYPE, &type, &length); + if (res < 0) throw std::system_error(errno, std::system_category(), "getsockopt failed"); + switch (type) { + case AF_INET: return address_type::ipv4; + case AF_INET6: return address_type::ipv6; + case AF_UNIX: return address_type::uds; + default: throw std::logic_error("unknown socket type"); + } + } + io_engine::file_handle_t io_engine_generic_unix::file_open(const char* filename, std::ios_base::openmode mode) { if ((mode & std::ios_base::ate) == std::ios_base::ate) throw std::logic_error("unsupported flag"); int m = 0; diff --git a/src/io_engine_generic_unix.h b/src/io_engine_generic_unix.h index 3d4cb28..cadbcaf 100644 --- a/src/io_engine_generic_unix.h +++ b/src/io_engine_generic_unix.h @@ -14,12 +14,20 @@ namespace asyncpp::io::detail { endpoint socket_local_endpoint(socket_handle_t socket) override; endpoint socket_remote_endpoint(socket_handle_t socket) override; void socket_enable_broadcast(socket_handle_t socket, bool enable) override; + void socket_multicast_join(socket_handle_t socket, address group, address iface) override; + void socket_multicast_drop(socket_handle_t socket, address group, address iface) override; + void socket_multicast_set_send_interface(socket_handle_t socket, address iface) override; + void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) override; + void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) override; void socket_shutdown(socket_handle_t socket, bool receive, bool send) override; file_handle_t file_open(const char* filename, std::ios_base::openmode mode) override; void file_close(file_handle_t fd) override; uint64_t file_size(file_handle_t fd) override; + protected: + address_type get_handle_type(socket_handle_t socket); + private: }; diff --git a/src/io_engine_iocp.cpp b/src/io_engine_iocp.cpp index c709f9b..8b31707 100644 --- a/src/io_engine_iocp.cpp +++ b/src/io_engine_iocp.cpp @@ -84,6 +84,11 @@ namespace asyncpp::io::detail { endpoint socket_local_endpoint(socket_handle_t socket) override; endpoint socket_remote_endpoint(socket_handle_t socket) override; void socket_enable_broadcast(socket_handle_t socket, bool enable) override; + void socket_multicast_join(socket_handle_t socket, address group, address iface) override; + void socket_multicast_drop(socket_handle_t socket, address group, address iface) override; + void socket_multicast_set_send_interface(socket_handle_t socket, address iface) override; + void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) override; + void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) override; void socket_shutdown(socket_handle_t socket, bool receive, bool send) override; bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) override; bool enqueue_accept(socket_handle_t socket, completion_data* cd) override; @@ -109,6 +114,8 @@ namespace asyncpp::io::detail { private: HANDLE m_completion_port = INVALID_HANDLE_VALUE; std::atomic m_inflight_count{}; + + address_type get_handle_type(socket_handle_t socket); }; std::unique_ptr create_io_engine_iocp() { return std::make_unique(); } @@ -345,6 +352,106 @@ namespace asyncpp::io::detail { if (res == SOCKET_ERROR) throw std::system_error(WSAGetLastError(), std::system_category()); } + void io_engine_iocp::socket_multicast_join(socket_handle_t socket, address group, address iface) { + if (group.type() != iface.type()) + throw std::system_error(std::make_error_code(std::errc::invalid_argument), + "group and interface need to be of the same type"); + if (group.is_ipv4()) { + struct ip_mreq mc_req {}; + mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr; + mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr; + auto res = setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, reinterpret_cast(&mc_req), + sizeof(mc_req)); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed"); + } else if (group.is_ipv6()) { + struct ipv6_mreq mc_req {}; + mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr; + mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id; + auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, reinterpret_cast(&mc_req), + sizeof(mc_req)); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed"); + } else { + throw std::system_error(std::make_error_code(std::errc::not_supported), + "multicast is only supported on IPv4/IPv6"); + } + } + + void io_engine_iocp::socket_multicast_drop(socket_handle_t socket, address group, address iface) { + if (group.type() != iface.type()) + throw std::system_error(std::make_error_code(std::errc::invalid_argument), + "group and interface need to be of the same type"); + if (group.is_ipv4()) { + struct ip_mreq mc_req {}; + mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr; + mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr; + auto res = setsockopt(socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, reinterpret_cast(&mc_req), + sizeof(mc_req)); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed"); + } else if (group.is_ipv6()) { + struct ipv6_mreq mc_req {}; + mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr; + mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id; + auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, reinterpret_cast(&mc_req), + sizeof(mc_req)); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed"); + } else { + throw std::system_error(std::make_error_code(std::errc::not_supported), + "multicast is only supported on IPv4/IPv6"); + } + } + + void io_engine_iocp::socket_multicast_set_send_interface(socket_handle_t socket, address iface) { + if (iface.is_ipv4()) { + auto addr = iface.ipv4().to_sockaddr_in().first.sin_addr.s_addr; + auto res = + setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast(&addr), sizeof(addr)); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed"); + } else if (iface.is_ipv6()) { + auto scope = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id; + auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_IF, reinterpret_cast(&scope), + sizeof(scope)); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed"); + } else { + throw std::system_error(std::make_error_code(std::errc::not_supported), + "multicast is only supported on IPv4/IPv6"); + } + } + + void io_engine_iocp::socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) { + auto type = get_handle_type(socket); + if (ttl > (std::numeric_limits::max)()) throw std::invalid_argument("ttl value out of range"); + int ittl = ttl; + if (type == address_type::ipv4) { + auto res = + setsockopt(socket, IPPROTO_IP, IP_MULTICAST_TTL, reinterpret_cast(&ittl), sizeof(ittl)); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed"); + } else if (type == address_type::ipv6) { + auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, reinterpret_cast(&ittl), + sizeof(ittl)); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed"); + } else { + throw std::system_error(std::make_error_code(std::errc::not_supported), + "multicast is only supported on IPv4/IPv6"); + } + } + + void io_engine_iocp::socket_multicast_set_loopback(socket_handle_t socket, bool enabled) { + auto type = get_handle_type(socket); + int val = enabled ? 1 : 0; + if (type == address_type::ipv4) { + auto res = + setsockopt(socket, IPPROTO_IP, IP_MULTICAST_LOOP, reinterpret_cast(&val), sizeof(val)); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed"); + } else if (type == address_type::ipv6) { + auto res = + setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, reinterpret_cast(&val), sizeof(val)); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed"); + } else { + throw std::system_error(std::make_error_code(std::errc::not_supported), + "multicast is only supported on IPv4/IPv6"); + } + } + void io_engine_iocp::socket_shutdown(socket_handle_t socket, bool receive, bool send) { int mode = 0; if (receive && send) @@ -532,6 +639,18 @@ namespace asyncpp::io::detail { } } + address_type io_engine_iocp::get_handle_type(socket_handle_t socket) { + WSAPROTOCOL_INFO info{}; + socklen_t length = sizeof(info); + auto res = getsockopt(socket, SOL_SOCKET, SO_PROTOCOL_INFO, reinterpret_cast(&info), &length); + if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "getsockopt failed"); + switch (info.iAddressFamily) { + case AF_INET: return address_type::ipv4; + case AF_INET6: return address_type::ipv6; + default: throw std::logic_error("unknown socket type"); + } + } + io_engine::file_handle_t io_engine_iocp::file_open(const char* filename, std::ios_base::openmode mode) { DWORD access_mode = 0; if ((mode & std::ios_base::in) == std::ios_base::in) access_mode |= GENERIC_READ; diff --git a/src/io_engine_uring.cpp b/src/io_engine_uring.cpp index 1dce978..fc204a7 100644 --- a/src/io_engine_uring.cpp +++ b/src/io_engine_uring.cpp @@ -17,6 +17,14 @@ namespace asyncpp::io::detail { #include #include +#ifndef __NR_io_uring_register +#ifdef __alpha__ +#define __NR_io_uring_register 537 +#else +#define __NR_io_uring_register 427 +#endif +#endif + namespace asyncpp::io::detail { class io_engine_uring : public io_engine_generic_unix { diff --git a/src/socket.cpp b/src/socket.cpp index 3ec52eb..184b7a7 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -110,6 +110,43 @@ namespace asyncpp::io { m_io->engine()->socket_enable_broadcast(m_fd, enable); } + void socket::multicast_join(address group, address interface) { + if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket"); + if (group.type() != interface.type()) throw std::logic_error("group and interface need to be of the same type"); + m_io->engine()->socket_multicast_join(m_fd, group, interface); + } + + void socket::multicast_join(address group) { + auto iface = group.type() == address_type::ipv4 ? address{ipv4_address::any()} : address{ipv6_address::any()}; + return multicast_join(group, iface); + } + + void socket::multicast_drop(address group, address interface) { + if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket"); + if (group.type() != interface.type()) throw std::logic_error("group and interface need to be of the same type"); + m_io->engine()->socket_multicast_drop(m_fd, group, interface); + } + + void socket::multicast_drop(address group) { + auto iface = group.type() == address_type::ipv4 ? address{ipv4_address::any()} : address{ipv6_address::any()}; + return multicast_drop(group, iface); + } + + void socket::multicast_set_send_interface(address interface) { + if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket"); + m_io->engine()->socket_multicast_set_send_interface(m_fd, interface); + } + + void socket::multicast_set_ttl(size_t ttl) { + if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket"); + m_io->engine()->socket_multicast_set_ttl(m_fd, ttl); + } + + void socket::multicast_set_loopback(bool enabled) { + if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket"); + m_io->engine()->socket_multicast_set_loopback(m_fd, enabled); + } + void socket::close_send() { if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket"); m_io->engine()->socket_shutdown(m_fd, false, true);