Skip to content

Commit

Permalink
Implement WakeupSelect() to allow preliminary wakeup after message push
Browse files Browse the repository at this point in the history
This adds the reading side of a pipe to the read-set when calling select().
Writing to the writing side of the pipe then causes select() to wake up
immediately. Otherwise it would wait for the timeout of 50ms, even if there
is data that could possibly be sent.

This is useful when many messages need are pushed with optimistic send being
disabled. After all messages have been pushed, WakeSelect() can then wakeup
the select() thread and force a re-check for pending data to send.

This is currently only implemented for POSIX compliant systems as we assume
that heavy-load daemons (like masternodes) are usually run on Linux.
  • Loading branch information
codablock committed Feb 15, 2019
1 parent cf29320 commit acb8789
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
53 changes: 53 additions & 0 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,18 @@ void CConnman::ThreadSocketHandler()
SOCKET hSocketMax = 0;
bool have_fds = false;

#ifndef WIN32
// We add a pipe to the read set so that the select() call can be woken up from the outside
// This is done when data is available for sending and at the same time optimistic sending was disabled
// when pushing the data.
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
// run on Linux and friends.
FD_SET(wakeupPipe[0], &fdsetRecv);
hSocketMax = std::max(hSocketMax, (SOCKET)wakeupPipe[0]);
have_fds = true;
#endif

BOOST_FOREACH(const ListenSocket& hListenSocket, vhListenSocket) {
FD_SET(hListenSocket.socket, &fdsetRecv);
hSocketMax = std::max(hSocketMax, hListenSocket.socket);
Expand Down Expand Up @@ -1276,6 +1288,20 @@ void CConnman::ThreadSocketHandler()
return;
}

#ifndef WIN32
// drain the wakeup pipe
if (FD_ISSET(wakeupPipe[0], &fdsetRecv)) {
LogPrint("net", "woke up select()\n");
char buf[128];
while (true) {
int r = read(wakeupPipe[0], buf, sizeof(buf));
if (r <= 0) {
break;
}
}
}
#endif

//
// Accept new connections
//
Expand Down Expand Up @@ -1426,6 +1452,21 @@ void CConnman::WakeMessageHandler()
condMsgProc.notify_one();
}

void CConnman::WakeSelect()
{
#ifndef WIN32
if (wakeupPipe[1] == -1) {
return;
}

LogPrint("net", "waking up select()\n");

char buf[1];
if (write(wakeupPipe[1], buf, 1) != 1) {
LogPrint("net", "write to wakeupPipe failed\n");
}
#endif
}



Expand Down Expand Up @@ -2387,6 +2428,12 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
fMsgProcWake = false;
}

#ifndef WIN32
if (pipe2(wakeupPipe, O_NONBLOCK) != 0) {
wakeupPipe[0] = wakeupPipe[1] = -1;
}
#endif

// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));

Expand Down Expand Up @@ -2512,6 +2559,12 @@ void CConnman::Stop()
semAddnode = NULL;
delete semMasternodeOutbound;
semMasternodeOutbound = NULL;

#ifndef WIN32
if (wakeupPipe[0] != -1) close(wakeupPipe[0]);
if (wakeupPipe[1] != -1) close(wakeupPipe[1]);
wakeupPipe[0] = wakeupPipe[1] = -1;
#endif
}

void CConnman::DeleteNode(CNode* pnode)
Expand Down
7 changes: 7 additions & 0 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ class CConnman
unsigned int GetReceiveFloodSize() const;

void WakeMessageHandler();
void WakeSelect();

private:
struct ListenSocket {
SOCKET socket;
Expand Down Expand Up @@ -525,6 +527,11 @@ class CConnman

CThreadInterrupt interruptNet;

#ifndef WIN32
/** a pipe which is added to select() calls to wakeup before the timeout */
int wakeupPipe[2]{-1,-1};
#endif

std::thread threadDNSAddressSeed;
std::thread threadSocketHandler;
std::thread threadOpenAddedConnections;
Expand Down

0 comments on commit acb8789

Please sign in to comment.