Skip to content

Commit

Permalink
lib: cleanup bnet thread server
Browse files Browse the repository at this point in the history
- rename or remove confusing variables
- move code into separate functions
- set SO_REUSEADDR always on
  • Loading branch information
franku committed Aug 14, 2020
1 parent 51d6dcb commit 39ddee3
Showing 1 changed file with 121 additions and 107 deletions.
228 changes: 121 additions & 107 deletions core/src/lib/bnet_server_tcp.cc
Expand Up @@ -56,6 +56,7 @@
#include <sys/poll.h>
#endif
#include <atomic>
#include <array>

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static std::atomic<bool> quit{false};
Expand Down Expand Up @@ -120,6 +121,93 @@ class BNetThreadServerCleanupObject {
ThreadList& thread_list_;
};

static void RemoveDuplicateAddresses(dlist* addr_list)
{
IPADDR* ipaddr = nullptr;
IPADDR* next = nullptr;
IPADDR* to_free = nullptr;

for (ipaddr = (IPADDR*)addr_list->first(); ipaddr;
ipaddr = (IPADDR*)addr_list->next(ipaddr)) {
next = (IPADDR*)addr_list->next(ipaddr);
while (next) {
/*
* See if the addresses match.
*/
if (ipaddr->GetSockaddrLen() == next->GetSockaddrLen() &&
memcmp(ipaddr->get_sockaddr(), next->get_sockaddr(),
ipaddr->GetSockaddrLen()) == 0) {
to_free = next;
next = (IPADDR*)addr_list->next(next);
addr_list->remove(to_free);
delete to_free;
} else {
next = (IPADDR*)addr_list->next(next);
}
}
}
}

static void LogAllAddresses(dlist* addr_list)
{
std::array<char, 256 * 10> allbuf;

Dmsg1(100, "Addresses %s\n",
BuildAddressesString(addr_list, allbuf.data(), allbuf.size()));
}

static int OpenSocketAndBind(IPADDR* ipaddr,
dlist* addr_list,
uint16_t port_number)
{
int fd = -1;
int tries = 0;

do {
++tries;
if ((fd = socket(ipaddr->GetFamily(), SOCK_STREAM, 0)) < 0) {
Bmicrosleep(10, 0);
}
} while (fd < 0 && tries < 6);

if (fd < 0) {
BErrNo be;
std::array<char, 256> buf1;
std::array<char, 256 * 10> buf2;
Emsg3(M_ABORT, 0,
_("Cannot open stream socket. ERR=%s. Current %s All %s\n"),
be.bstrerror(), ipaddr->build_address_str(buf1.data(), buf1.size()),
BuildAddressesString(addr_list, buf2.data(), buf2.size()));
return -1;
}

int reuseaddress = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (sockopt_val_t)&reuseaddress,
sizeof(reuseaddress)) < 0) {
BErrNo be;
Emsg1(M_WARNING, 0, _("Cannot set SO_REUSEADDR on socket: %s\n"),
be.bstrerror());
return -2;
}

tries = 0;

do {
++tries;
if (bind(fd, ipaddr->get_sockaddr(), ipaddr->GetSockaddrLen()) < 0) {
BErrNo be;
Emsg2(M_WARNING, 0, _("Cannot bind port %d: ERR=%s: Retrying ...\n"),
ntohs(port_number), be.bstrerror());
Bmicrosleep(5, 0);
} else {
// success
return fd;
}
} while (tries < 3);

return -3;
}

/**
* Become Threaded Network Server
*
Expand All @@ -142,111 +230,29 @@ void BnetThreadServerTcp(
std::function<void*(void* bsock)> UserAgentShutdownCallback,
std::function<void()> CustomCallback)
{
int newsockfd;
socklen_t clilen;
struct sockaddr_storage cli_addr; /* client's address */
int tlog;
int value;
IPADDR *ipaddr, *next, *to_free;
s_sockfd* fd_ptr = NULL;
char buf[128];
#ifdef HAVE_POLL
nfds_t nfds;
int events;
struct pollfd* pfds;
#endif

char allbuf[256 * 10];

BNetThreadServerCleanupObject cleanup_object(sockfds, thread_list);

quit = false;
quit = false; // allow other threads to set this true during initialization
if (server_state) { server_state->store(BnetServerState::kStarting); }

/*
* Remove any duplicate addresses.
*/
for (ipaddr = (IPADDR*)addr_list->first(); ipaddr;
ipaddr = (IPADDR*)addr_list->next(ipaddr)) {
next = (IPADDR*)addr_list->next(ipaddr);
while (next) {
/*
* See if the addresses match.
*/
if (ipaddr->GetSockaddrLen() == next->GetSockaddrLen() &&
memcmp(ipaddr->get_sockaddr(), next->get_sockaddr(),
ipaddr->GetSockaddrLen()) == 0) {
to_free = next;
next = (IPADDR*)addr_list->next(next);
addr_list->remove(to_free);
delete to_free;
} else {
next = (IPADDR*)addr_list->next(next);
}
}
}

Dmsg1(100, "Addresses %s\n",
BuildAddressesString(addr_list, allbuf, sizeof(allbuf)));
RemoveDuplicateAddresses(addr_list);
LogAllAddresses(addr_list);

if (nokeepalive) {
value = 0;
} else {
value = 1;
}
int keepalive = nokeepalive ? 0 : 1;

#ifdef HAVE_POLL
nfds = 0;
nfds_t number_of_filedescriptors = 0;
#endif
foreach_dlist (ipaddr, addr_list) {
/*
* Allocate on stack from -- no need to free
*/
fd_ptr = (s_sockfd*)alloca(sizeof(s_sockfd));
fd_ptr->port = ipaddr->GetPortNetOrder();

/*
* Open a TCP socket
*/
for (tlog = 60;
(fd_ptr->fd = socket(ipaddr->GetFamily(), SOCK_STREAM, 0)) < 0;
tlog -= 10) {
if (tlog <= 0) {
BErrNo be;
char curbuf[256];
Emsg3(M_ABORT, 0,
_("Cannot open stream socket. ERR=%s. Current %s All %s\n"),
be.bstrerror(), ipaddr->build_address_str(curbuf, sizeof(curbuf)),
BuildAddressesString(addr_list, allbuf, sizeof(allbuf)));
}
Bmicrosleep(10, 0);
}
IPADDR* ipaddr = nullptr;

if (setsockopt(fd_ptr->fd, SOL_SOCKET, SO_REUSEADDR, (sockopt_val_t)&value,
sizeof(value)) < 0) {
BErrNo be;
Emsg1(M_WARNING, 0, _("Cannot set SO_REUSEADDR on socket: %s\n"),
be.bstrerror());
}
foreach_dlist (ipaddr, addr_list) {
s_sockfd* fd_ptr = (s_sockfd*)alloca(sizeof(s_sockfd));

int tries = 3;
int wait_seconds = 5;
bool ok = false;

do {
int ret =
bind(fd_ptr->fd, ipaddr->get_sockaddr(), ipaddr->GetSockaddrLen());
if (ret < 0) {
BErrNo be;
Emsg2(M_WARNING, 0, _("Cannot bind port %d: ERR=%s: Retrying ...\n"),
ntohs(fd_ptr->port), be.bstrerror());
Bmicrosleep(wait_seconds, 0);
} else {
ok = true;
}
} while (!ok && --tries);
fd_ptr->port = ipaddr->GetPortNetOrder();
fd_ptr->fd = OpenSocketAndBind(ipaddr, addr_list, fd_ptr->port);

if (!ok) {
if (fd_ptr->fd < 0) {
BErrNo be;
Emsg2(M_ERROR, 0, _("Cannot bind port %d: ERR=%s.\n"),
ntohs(fd_ptr->port), be.bstrerror());
Expand All @@ -258,22 +264,19 @@ void BnetThreadServerTcp(
sockfds->append(fd_ptr);

#ifdef HAVE_POLL
nfds++;
number_of_filedescriptors++;
#endif
}

thread_list.Init(max_clients, HandleConnectionRequest,
UserAgentShutdownCallback);

#ifdef HAVE_POLL
/*
* Allocate on stack from -- no need to free
*/
pfds = (struct pollfd*)alloca(sizeof(struct pollfd) * nfds);
memset(pfds, 0, sizeof(struct pollfd) * nfds);

nfds = 0;
events = POLLIN;
struct pollfd* pfds =
(struct pollfd*)alloca(sizeof(struct pollfd) * number_of_filedescriptors);
memset(pfds, 0, sizeof(struct pollfd) * number_of_filedescriptors);

int events = POLLIN;
#if defined(POLLRDNORM)
events |= POLLRDNORM;
#endif
Expand All @@ -284,10 +287,13 @@ void BnetThreadServerTcp(
events |= POLLPRI;
#endif

s_sockfd* fd_ptr = nullptr;
int i = 0;

foreach_alist (fd_ptr, sockfds) {
pfds[nfds].fd = fd_ptr->fd;
pfds[nfds].events = events;
nfds++;
pfds[i].fd = fd_ptr->fd;
pfds[i].events = events;
i++;
}
#endif

Expand All @@ -300,6 +306,7 @@ void BnetThreadServerTcp(
fd_set sockset;
FD_ZERO(&sockset);

s_sockfd* fd_ptr = nullptr;
foreach_alist (fd_ptr, sockfds) {
FD_SET((unsigned)fd_ptr->fd, &sockset);
if ((unsigned)fd_ptr->fd > maxfd) { maxfd = fd_ptr->fd; }
Expand Down Expand Up @@ -328,7 +335,7 @@ void BnetThreadServerTcp(
static constexpr int timeout_ms{1000};

errno = 0;
int status = poll(pfds, nfds, timeout_ms);
int status = poll(pfds, number_of_filedescriptors, timeout_ms);

if (status == 0) {
continue; // timeout: check if thread should quit
Expand All @@ -344,6 +351,11 @@ void BnetThreadServerTcp(
foreach_alist (fd_ptr, sockfds) {
if (pfds[cnt++].revents & events) {
#endif

int newsockfd = -1;
socklen_t clilen;
struct sockaddr_storage cli_addr; /* client's address */

/*
* Got a connection, now accept it.
*/
Expand All @@ -358,7 +370,7 @@ void BnetThreadServerTcp(
* Receive notification when connection dies.
*/
if (setsockopt(newsockfd, SOL_SOCKET, SO_KEEPALIVE,
(sockopt_val_t)&value, sizeof(value)) < 0) {
(sockopt_val_t)&keepalive, sizeof(keepalive)) < 0) {
BErrNo be;
Emsg1(M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
be.bstrerror());
Expand All @@ -367,6 +379,8 @@ void BnetThreadServerTcp(
/*
* See who client is. i.e. who connected to us.
*/
char buf[128];

P(mutex);
SockaddrToAscii(reinterpret_cast<sockaddr*>(&cli_addr), buf,
sizeof(buf));
Expand Down

0 comments on commit 39ddee3

Please sign in to comment.