Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/asyncpp/io/detail/io_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ namespace asyncpp::io::detail {
virtual endpoint socket_local_endpoint(socket_handle_t socket) = 0;
virtual endpoint socket_remote_endpoint(socket_handle_t socket) = 0;
virtual void socket_enable_broadcast(socket_handle_t socket, bool enable) = 0;
virtual void socket_multicast_join(socket_handle_t socket, address group, address iface) = 0;
virtual void socket_multicast_drop(socket_handle_t socket, address group, address iface) = 0;
virtual void socket_multicast_set_send_interface(socket_handle_t socket, address iface) = 0;
virtual void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) = 0;
virtual void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) = 0;
virtual void socket_shutdown(socket_handle_t socket, bool receive, bool send) = 0;
virtual bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) = 0;
virtual bool enqueue_accept(socket_handle_t socket, completion_data* cd) = 0;
Expand Down
9 changes: 8 additions & 1 deletion include/asyncpp/io/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ namespace asyncpp::io {
void listen(std::uint32_t backlog = 0);

void allow_broadcast(bool enable);
void multicast_join(address group, address iface);
void multicast_join(address group);
void multicast_drop(address group, address iface);
void multicast_drop(address group);
void multicast_set_send_interface(address iface);
void multicast_set_ttl(size_t ttl);
void multicast_set_loopback(bool enabled);

[[nodiscard]] detail::io_engine::socket_handle_t native_handle() const noexcept { return m_fd; }
[[nodiscard]] detail::io_engine::socket_handle_t release() noexcept {
Expand Down Expand Up @@ -620,7 +627,7 @@ namespace asyncpp::io {
if (that->result)
that->real_cb(that->result);
else
that->real_cb(socket::from_fd(that->service(), that->result_handle));
that->real_cb(socket::from_fd(that->service, that->result_handle));

delete that;
};
Expand Down
110 changes: 110 additions & 0 deletions src/io_engine_generic_unix.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#ifndef _WIN32
#include "io_engine_generic_unix.h"

#ifdef __APPLE__
#define _DARWIN_C_SOURCE
#endif

#include <cstring>

#include <fcntl.h>
Expand Down Expand Up @@ -137,6 +141,99 @@ namespace asyncpp::io::detail {
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
}

void io_engine_generic_unix::socket_multicast_join(socket_handle_t socket, address group, address iface) {
if (group.type() != iface.type())
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
"group and interface need to be of the same type");
if (group.is_ipv4()) {
struct ip_mreq mc_req {};
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr;
auto res = setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mc_req, sizeof(mc_req));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
} else if (group.is_ipv6()) {
struct ipv6_mreq mc_req {};
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mc_req, sizeof(mc_req));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
} else {
throw std::system_error(std::make_error_code(std::errc::not_supported),
"multicast is only supported on IPv4/IPv6");
}
}

void io_engine_generic_unix::socket_multicast_drop(socket_handle_t socket, address group, address iface) {
if (group.type() != iface.type())
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
"group and interface need to be of the same type");
if (group.is_ipv4()) {
struct ip_mreq mc_req {};
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr;
auto res = setsockopt(socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mc_req, sizeof(mc_req));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
} else if (group.is_ipv6()) {
struct ipv6_mreq mc_req {};
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_LEAVE_GROUP, &mc_req, sizeof(mc_req));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
} else {
throw std::system_error(std::make_error_code(std::errc::not_supported),
"multicast is only supported on IPv4/IPv6");
}
}

void io_engine_generic_unix::socket_multicast_set_send_interface(socket_handle_t socket, address iface) {
if (iface.is_ipv4()) {
auto addr = iface.ipv4().to_sockaddr_in().first.sin_addr.s_addr;
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<char*>(&addr), sizeof(addr));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
} else if (iface.is_ipv6()) {
auto scope = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
auto res =
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_IF, reinterpret_cast<char*>(&scope), sizeof(scope));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
} else {
throw std::system_error(std::make_error_code(std::errc::not_supported),
"multicast is only supported on IPv4/IPv6");
}
}

void io_engine_generic_unix::socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) {
auto type = get_handle_type(socket);
if (ttl > std::numeric_limits<int>::max()) throw std::invalid_argument("ttl value out of range");
int ittl = ttl;
if (type == address_type::ipv4) {
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_TTL, reinterpret_cast<char*>(&ittl), sizeof(ittl));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
} else if (type == address_type::ipv6) {
auto res =
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, reinterpret_cast<char*>(&ittl), sizeof(ittl));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
} else {
throw std::system_error(std::make_error_code(std::errc::not_supported),
"multicast is only supported on IPv4/IPv6");
}
}

void io_engine_generic_unix::socket_multicast_set_loopback(socket_handle_t socket, bool enabled) {
auto type = get_handle_type(socket);
int val = enabled ? 1 : 0;
if (type == address_type::ipv4) {
auto res = setsockopt(socket, IPPROTO_IP, IP_MULTICAST_LOOP, reinterpret_cast<char*>(&val), sizeof(val));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
} else if (type == address_type::ipv6) {
auto res =
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, reinterpret_cast<char*>(&val), sizeof(val));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
} else {
throw std::system_error(std::make_error_code(std::errc::not_supported),
"multicast is only supported on IPv4/IPv6");
}
}

void io_engine_generic_unix::socket_shutdown(socket_handle_t socket, bool receive, bool send) {
int mode = 0;
if (receive && send)
Expand All @@ -151,6 +248,19 @@ namespace asyncpp::io::detail {
if (res < 0 && errno != ENOTCONN) throw std::system_error(errno, std::system_category(), "shutdown failed");
}

address_type io_engine_generic_unix::get_handle_type(socket_handle_t socket) {
int type = -1;
socklen_t length = sizeof(int);
auto res = getsockopt(socket, SOL_SOCKET, SO_TYPE, &type, &length);
if (res < 0) throw std::system_error(errno, std::system_category(), "getsockopt failed");
switch (type) {
case AF_INET: return address_type::ipv4;
case AF_INET6: return address_type::ipv6;
case AF_UNIX: return address_type::uds;
default: throw std::logic_error("unknown socket type");
}
}

io_engine::file_handle_t io_engine_generic_unix::file_open(const char* filename, std::ios_base::openmode mode) {
if ((mode & std::ios_base::ate) == std::ios_base::ate) throw std::logic_error("unsupported flag");
int m = 0;
Expand Down
8 changes: 8 additions & 0 deletions src/io_engine_generic_unix.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@ namespace asyncpp::io::detail {
endpoint socket_local_endpoint(socket_handle_t socket) override;
endpoint socket_remote_endpoint(socket_handle_t socket) override;
void socket_enable_broadcast(socket_handle_t socket, bool enable) override;
void socket_multicast_join(socket_handle_t socket, address group, address iface) override;
void socket_multicast_drop(socket_handle_t socket, address group, address iface) override;
void socket_multicast_set_send_interface(socket_handle_t socket, address iface) override;
void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) override;
void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) override;
void socket_shutdown(socket_handle_t socket, bool receive, bool send) override;

file_handle_t file_open(const char* filename, std::ios_base::openmode mode) override;
void file_close(file_handle_t fd) override;
uint64_t file_size(file_handle_t fd) override;

protected:
address_type get_handle_type(socket_handle_t socket);

private:
};

Expand Down
119 changes: 119 additions & 0 deletions src/io_engine_iocp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ namespace asyncpp::io::detail {
endpoint socket_local_endpoint(socket_handle_t socket) override;
endpoint socket_remote_endpoint(socket_handle_t socket) override;
void socket_enable_broadcast(socket_handle_t socket, bool enable) override;
void socket_multicast_join(socket_handle_t socket, address group, address iface) override;
void socket_multicast_drop(socket_handle_t socket, address group, address iface) override;
void socket_multicast_set_send_interface(socket_handle_t socket, address iface) override;
void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) override;
void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) override;
void socket_shutdown(socket_handle_t socket, bool receive, bool send) override;
bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) override;
bool enqueue_accept(socket_handle_t socket, completion_data* cd) override;
Expand All @@ -109,6 +114,8 @@ namespace asyncpp::io::detail {
private:
HANDLE m_completion_port = INVALID_HANDLE_VALUE;
std::atomic<size_t> m_inflight_count{};

address_type get_handle_type(socket_handle_t socket);
};

std::unique_ptr<io_engine> create_io_engine_iocp() { return std::make_unique<io_engine_iocp>(); }
Expand Down Expand Up @@ -345,6 +352,106 @@ namespace asyncpp::io::detail {
if (res == SOCKET_ERROR) throw std::system_error(WSAGetLastError(), std::system_category());
}

void io_engine_iocp::socket_multicast_join(socket_handle_t socket, address group, address iface) {
if (group.type() != iface.type())
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
"group and interface need to be of the same type");
if (group.is_ipv4()) {
struct ip_mreq mc_req {};
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr;
auto res = setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, reinterpret_cast<const char*>(&mc_req),
sizeof(mc_req));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
} else if (group.is_ipv6()) {
struct ipv6_mreq mc_req {};
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, reinterpret_cast<const char*>(&mc_req),
sizeof(mc_req));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
} else {
throw std::system_error(std::make_error_code(std::errc::not_supported),
"multicast is only supported on IPv4/IPv6");
}
}

void io_engine_iocp::socket_multicast_drop(socket_handle_t socket, address group, address iface) {
if (group.type() != iface.type())
throw std::system_error(std::make_error_code(std::errc::invalid_argument),
"group and interface need to be of the same type");
if (group.is_ipv4()) {
struct ip_mreq mc_req {};
mc_req.imr_multiaddr = group.ipv4().to_sockaddr_in().first.sin_addr;
mc_req.imr_interface = iface.ipv4().to_sockaddr_in().first.sin_addr;
auto res = setsockopt(socket, IPPROTO_IP, IP_DROP_MEMBERSHIP, reinterpret_cast<const char*>(&mc_req),
sizeof(mc_req));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
} else if (group.is_ipv6()) {
struct ipv6_mreq mc_req {};
mc_req.ipv6mr_multiaddr = group.ipv6().to_sockaddr_in6().first.sin6_addr;
mc_req.ipv6mr_interface = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, reinterpret_cast<const char*>(&mc_req),
sizeof(mc_req));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
} else {
throw std::system_error(std::make_error_code(std::errc::not_supported),
"multicast is only supported on IPv4/IPv6");
}
}

void io_engine_iocp::socket_multicast_set_send_interface(socket_handle_t socket, address iface) {
if (iface.is_ipv4()) {
auto addr = iface.ipv4().to_sockaddr_in().first.sin_addr.s_addr;
auto res =
setsockopt(socket, IPPROTO_IP, IP_MULTICAST_IF, reinterpret_cast<const char*>(&addr), sizeof(addr));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
} else if (iface.is_ipv6()) {
auto scope = iface.ipv6().to_sockaddr_in6().first.sin6_scope_id;
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_IF, reinterpret_cast<const char*>(&scope),
sizeof(scope));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
} else {
throw std::system_error(std::make_error_code(std::errc::not_supported),
"multicast is only supported on IPv4/IPv6");
}
}

void io_engine_iocp::socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) {
auto type = get_handle_type(socket);
if (ttl > (std::numeric_limits<int>::max)()) throw std::invalid_argument("ttl value out of range");
int ittl = ttl;
if (type == address_type::ipv4) {
auto res =
setsockopt(socket, IPPROTO_IP, IP_MULTICAST_TTL, reinterpret_cast<const char*>(&ittl), sizeof(ittl));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
} else if (type == address_type::ipv6) {
auto res = setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, reinterpret_cast<const char*>(&ittl),
sizeof(ittl));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
} else {
throw std::system_error(std::make_error_code(std::errc::not_supported),
"multicast is only supported on IPv4/IPv6");
}
}

void io_engine_iocp::socket_multicast_set_loopback(socket_handle_t socket, bool enabled) {
auto type = get_handle_type(socket);
int val = enabled ? 1 : 0;
if (type == address_type::ipv4) {
auto res =
setsockopt(socket, IPPROTO_IP, IP_MULTICAST_LOOP, reinterpret_cast<const char*>(&val), sizeof(val));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
} else if (type == address_type::ipv6) {
auto res =
setsockopt(socket, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, reinterpret_cast<const char*>(&val), sizeof(val));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
} else {
throw std::system_error(std::make_error_code(std::errc::not_supported),
"multicast is only supported on IPv4/IPv6");
}
}

void io_engine_iocp::socket_shutdown(socket_handle_t socket, bool receive, bool send) {
int mode = 0;
if (receive && send)
Expand Down Expand Up @@ -532,6 +639,18 @@ namespace asyncpp::io::detail {
}
}

address_type io_engine_iocp::get_handle_type(socket_handle_t socket) {
WSAPROTOCOL_INFO info{};
socklen_t length = sizeof(info);
auto res = getsockopt(socket, SOL_SOCKET, SO_PROTOCOL_INFO, reinterpret_cast<char*>(&info), &length);
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "getsockopt failed");
switch (info.iAddressFamily) {
case AF_INET: return address_type::ipv4;
case AF_INET6: return address_type::ipv6;
default: throw std::logic_error("unknown socket type");
}
}

io_engine::file_handle_t io_engine_iocp::file_open(const char* filename, std::ios_base::openmode mode) {
DWORD access_mode = 0;
if ((mode & std::ios_base::in) == std::ios_base::in) access_mode |= GENERIC_READ;
Expand Down
8 changes: 8 additions & 0 deletions src/io_engine_uring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ namespace asyncpp::io::detail {
#include <sys/time.h>
#include <unistd.h>

#ifndef __NR_io_uring_register
#ifdef __alpha__
#define __NR_io_uring_register 537
#else
#define __NR_io_uring_register 427
#endif
#endif

namespace asyncpp::io::detail {

class io_engine_uring : public io_engine_generic_unix {
Expand Down
37 changes: 37 additions & 0 deletions src/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,43 @@ namespace asyncpp::io {
m_io->engine()->socket_enable_broadcast(m_fd, enable);
}

void socket::multicast_join(address group, address interface) {
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
if (group.type() != interface.type()) throw std::logic_error("group and interface need to be of the same type");
m_io->engine()->socket_multicast_join(m_fd, group, interface);
}

void socket::multicast_join(address group) {
auto iface = group.type() == address_type::ipv4 ? address{ipv4_address::any()} : address{ipv6_address::any()};
return multicast_join(group, iface);
}

void socket::multicast_drop(address group, address interface) {
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
if (group.type() != interface.type()) throw std::logic_error("group and interface need to be of the same type");
m_io->engine()->socket_multicast_drop(m_fd, group, interface);
}

void socket::multicast_drop(address group) {
auto iface = group.type() == address_type::ipv4 ? address{ipv4_address::any()} : address{ipv6_address::any()};
return multicast_drop(group, iface);
}

void socket::multicast_set_send_interface(address interface) {
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
m_io->engine()->socket_multicast_set_send_interface(m_fd, interface);
}

void socket::multicast_set_ttl(size_t ttl) {
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
m_io->engine()->socket_multicast_set_ttl(m_fd, ttl);
}

void socket::multicast_set_loopback(bool enabled) {
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
m_io->engine()->socket_multicast_set_loopback(m_fd, enabled);
}

void socket::close_send() {
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
m_io->engine()->socket_shutdown(m_fd, false, true);
Expand Down