Skip to content

Commit

Permalink
refactor: introduce EdgeTriggeredEvents, move {epoll, kqueue} fd there
Browse files Browse the repository at this point in the history
  • Loading branch information
kwvg committed May 14, 2024
1 parent 3b11ef9 commit 212df06
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 51 deletions.
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ BITCOIN_CORE_H = \
util/bip32.h \
util/bytevectorhash.h \
util/check.h \
util/edge.h \
util/enumerate.h \
util/epochguard.h \
util/error.h \
Expand Down Expand Up @@ -776,6 +777,7 @@ libbitcoin_util_a_SOURCES = \
util/bip32.cpp \
util/bytevectorhash.cpp \
util/check.cpp \
util/edge.cpp \
util/error.cpp \
util/fees.cpp \
util/hasher.cpp \
Expand Down
75 changes: 31 additions & 44 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,7 @@ void CConnman::SocketEventsKqueue(std::set<SOCKET>& recv_set,
timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000;

wakeupSelectNeeded = true;
int n = kevent(kqueuefd, nullptr, 0, events, maxEvents, &timeout);
int n = kevent(Assert(m_edge_trig_events)->m_fd, nullptr, 0, events, maxEvents, &timeout);
wakeupSelectNeeded = false;
if (n == -1) {
LogPrintf("kevent wait error\n");
Expand Down Expand Up @@ -1622,7 +1622,7 @@ void CConnman::SocketEventsEpoll(std::set<SOCKET>& recv_set,
epoll_event events[maxEvents];

wakeupSelectNeeded = true;
int n = epoll_wait(epollfd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
int n = epoll_wait(Assert(m_edge_trig_events)->m_fd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
wakeupSelectNeeded = false;
for (int i = 0; i < n; i++) {
auto& e = events[i];
Expand Down Expand Up @@ -3136,8 +3136,8 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError,
if (socketEventsMode == SocketEventsMode::KQueue) {
struct kevent event;
EV_SET(&event, sock->Get(), EVFILT_READ, EV_ADD, 0, 0, nullptr);
if (kevent(kqueuefd, &event, 1, nullptr, 0, nullptr) != 0) {
strError = strprintf(_("Error: failed to add socket to kqueuefd (kevent returned error %s)"), NetworkErrorString(WSAGetLastError()));
if (kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr) != 0) {
strError = strprintf(_("Error: failed to add socket to kqueue fd (kevent returned error %s)"), NetworkErrorString(WSAGetLastError()));
LogPrintf("%s\n", strError.original);
return false;
}
Expand All @@ -3149,8 +3149,8 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError,
epoll_event event;
event.data.fd = sock->Get();
event.events = EPOLLIN;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sock->Get(), &event) != 0) {
strError = strprintf(_("Error: failed to add socket to epollfd (epoll_ctl returned error %s)"), NetworkErrorString(WSAGetLastError()));
if (epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, sock->Get(), &event) != 0) {
strError = strprintf(_("Error: failed to add socket to epoll fd (epoll_ctl returned error %s)"), NetworkErrorString(WSAGetLastError()));
LogPrintf("%s\n", strError.original);
return false;
}
Expand Down Expand Up @@ -3301,25 +3301,14 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met
AssertLockNotHeld(m_total_bytes_sent_mutex);
Init(connOptions);

#ifdef USE_KQUEUE
if (socketEventsMode == SocketEventsMode::KQueue) {
kqueuefd = kqueue();
if (kqueuefd == -1) {
LogPrintf("kqueue failed\n");
return false;
}
}
#endif

#ifdef USE_EPOLL
if (socketEventsMode == SocketEventsMode::EPoll) {
epollfd = epoll_create1(0);
if (epollfd == -1) {
LogPrintf("epoll_create1 failed\n");
if (socketEventsMode == SocketEventsMode::EPoll || socketEventsMode == SocketEventsMode::KQueue) {
m_edge_trig_events = std::make_unique<EdgeTriggeredEvents>(socketEventsMode);
if (!m_edge_trig_events->IsValid()) {
LogPrintf("Unable to initialize EdgeTriggeredEvents instance\n");
m_edge_trig_events.reset();
return false;
}
}
#endif

if (fListen && !InitBinds(connOptions.vBinds, connOptions.vWhiteBinds, connOptions.onion_binds)) {
if (clientInterface) {
Expand Down Expand Up @@ -3408,10 +3397,10 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met
if (socketEventsMode == SocketEventsMode::KQueue) {
struct kevent event;
EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_ADD, 0, 0, nullptr);
int r = kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
int r = kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EV_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
return false;
}
}
Expand All @@ -3421,10 +3410,10 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met
epoll_event event;
event.events = EPOLLIN;
event.data.fd = wakeupPipe[0];
int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, wakeupPipe[0], &event);
int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, wakeupPipe[0], &event);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
epollfd, EPOLL_CTL_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EPOLL_CTL_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
return false;
}
}
Expand Down Expand Up @@ -3570,12 +3559,12 @@ void CConnman::StopNodes()
if (socketEventsMode == SocketEventsMode::KQueue) {
struct kevent event;
EV_SET(&event, hListenSocket.socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr);
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SocketEventsMode::EPoll) {
epoll_ctl(epollfd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr);
epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr);
}
#endif
if (!CloseSocket(hListenSocket.socket))
Expand Down Expand Up @@ -3606,24 +3595,22 @@ void CConnman::StopNodes()
semAddnode.reset();

#ifdef USE_KQUEUE
if (socketEventsMode == SocketEventsMode::KQueue && kqueuefd != -1) {
if (socketEventsMode == SocketEventsMode::KQueue && Assert(m_edge_trig_events)->m_fd != -1) {
#ifdef USE_WAKEUP_PIPE
struct kevent event;
EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_DELETE, 0, 0, nullptr);
kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
kevent(m_edge_trig_events->m_fd, &event, 1, nullptr, 0, nullptr);
#endif
close(kqueuefd);
m_edge_trig_events.reset();
}
kqueuefd = -1;
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SocketEventsMode::EPoll && epollfd != -1) {
if (socketEventsMode == SocketEventsMode::EPoll && Assert(m_edge_trig_events)->m_fd != -1) {
#ifdef USE_WAKEUP_PIPE
epoll_ctl(epollfd, EPOLL_CTL_DEL, wakeupPipe[0], nullptr);
epoll_ctl(m_edge_trig_events->m_fd, EPOLL_CTL_DEL, wakeupPipe[0], nullptr);
#endif
close(epollfd);
m_edge_trig_events.reset();
}
epollfd = -1;
#endif

#ifdef USE_WAKEUP_PIPE
Expand Down Expand Up @@ -4245,10 +4232,10 @@ void CConnman::RegisterEvents(CNode *pnode)
EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_ADD, 0, 0, nullptr);
EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);

int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr);
int r = kevent(Assert(m_edge_trig_events)->m_fd, events, 2, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
Expand All @@ -4262,10 +4249,10 @@ void CConnman::RegisterEvents(CNode *pnode)
e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
e.data.fd = pnode->hSocket;

int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, pnode->hSocket, &e);
int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, pnode->hSocket, &e);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
epollfd, EPOLL_CTL_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EPOLL_CTL_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
Expand All @@ -4284,10 +4271,10 @@ void CConnman::UnregisterEvents(CNode *pnode)
EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);

int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr);
int r = kevent(Assert(m_edge_trig_events)->m_fd, events, 2, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
Expand All @@ -4298,10 +4285,10 @@ void CConnman::UnregisterEvents(CNode *pnode)
return;
}

int r = epoll_ctl(epollfd, EPOLL_CTL_DEL, pnode->hSocket, nullptr);
int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_DEL, pnode->hSocket, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
epollfd, EPOLL_CTL_DEL, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EPOLL_CTL_DEL, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
Expand Down
10 changes: 3 additions & 7 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
#include <sync.h>
#include <threadinterrupt.h>
#include <uint256.h>
#include <util/check.h>
#include <util/edge.h>
#include <util/system.h>
#include <consensus/params.h>
#include <util/check.h>

#include <atomic>
#include <condition_variable>
Expand Down Expand Up @@ -1514,12 +1515,7 @@ friend class CNode;
std::atomic<bool> wakeupSelectNeeded{false};

SocketEventsMode socketEventsMode;
#ifdef USE_KQUEUE
int kqueuefd{-1};
#endif
#ifdef USE_EPOLL
int epollfd{-1};
#endif
std::unique_ptr<EdgeTriggeredEvents> m_edge_trig_events{nullptr};

Mutex cs_sendable_receivable_nodes;
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);
Expand Down
60 changes: 60 additions & 0 deletions src/util/edge.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#include <util/edge.h>

#include <logging.h>
#include <util/sock.h>

#include <assert.h>

#ifdef USE_EPOLL
#include <sys/epoll.h>
#endif

#ifdef USE_KQUEUE
#include <sys/event.h>
#endif

EdgeTriggeredEvents::EdgeTriggeredEvents(SocketEventsMode events_mode)
: m_mode(events_mode)
{
if (m_mode == SocketEventsMode::EPoll) {
#ifdef USE_EPOLL
m_fd = epoll_create1(0);
if (m_fd == -1) {
LogPrintf("Unable to initialize EdgeTriggeredEvents, epoll_create1 returned -1\n");
return;
}
#else
LogPrintf("Attempting to initialize EdgeTriggeredEvents for epoll without support compiled in!\n");
return;
#endif /* USE_EPOLL */
} else if (m_mode == SocketEventsMode::KQueue) {
#ifdef USE_KQUEUE
m_fd = kqueue();
if (m_fd == -1) {
LogPrintf("Unable to initialize EdgeTriggeredEvents, kqueue returned -1\n");
return;
}
#else
LogPrintf("Attempting to initialize EdgeTriggeredEvents for kqueue without support compiled in!\n");
return;
#endif /* USE_KQUEUE */
} else {
assert(false);
}
m_valid = true;
}

EdgeTriggeredEvents::~EdgeTriggeredEvents()
{
if (m_valid) {
#if defined(USE_KQUEUE) || defined(USE_EPOLL)
close(m_fd);
#else
assert(false);
#endif /* defined(USE_KQUEUE) || defined(USE_EPOLL) */
}
}
35 changes: 35 additions & 0 deletions src/util/edge.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#ifndef BITCOIN_UTIL_EDGE_H
#define BITCOIN_UTIL_EDGE_H

#include <cstdint>

enum class SocketEventsMode : int8_t;

/**
* A manager for abstracting logic surrounding edge-triggered socket events
* modes like kqueue and epoll.
*/
class EdgeTriggeredEvents
{
public:
explicit EdgeTriggeredEvents(SocketEventsMode events_mode);
~EdgeTriggeredEvents();

bool IsValid() const { return m_valid; }

public:
/* File descriptor used to interact with events mode */
int m_fd{-1};

private:
/* Instance validity flag set during construction */
bool m_valid{false};
/* Flag for storing selected socket events mode */
SocketEventsMode m_mode;
};

#endif /* BITCOIN_UTIL_EDGE_H */

0 comments on commit 212df06

Please sign in to comment.