Skip to content

Commit

Permalink
BnetThreadServer: reworked shutdown
Browse files Browse the repository at this point in the history
- if bind fails retry 3 times every 5 seconds
- when workq is not initialized upon shutdown just send error message,
  not abort the process
  • Loading branch information
franku committed Nov 19, 2018
1 parent 1d0595c commit ff319e0
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 67 deletions.
4 changes: 3 additions & 1 deletion core/src/dird/dird.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,9 @@ int main (int argc, char *argv[])
InitConsoleMsg(working_directory);

Dmsg0(200, "Start UA server\n");
StartSocketServer(me->DIRaddrs);
if (!StartSocketServer(me->DIRaddrs)) {
TerminateDird(0);
}

StartWatchdog(); /* start network watchdog thread */

Expand Down
45 changes: 31 additions & 14 deletions core/src/dird/socket_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "lib/bnet_server_tcp.h"
#include "lib/try_tls_handshake_as_a_server.h"

#include <atomic>

namespace directordaemon {

static char hello_client_with_version[] = "Hello Client %127s FdProtocolVersion=%d calling";
Expand All @@ -47,7 +49,8 @@ static workq_t socket_workq;
static alist *sock_fds = NULL;
static pthread_t tcp_server_tid;
static ConnectionPool *client_connections = NULL;
static bool tcp_server_ready = false;

static std::atomic<BnetServerState> server_state(BnetServerState::kUndefined);

struct s_addr_port {
char *addr;
Expand Down Expand Up @@ -126,43 +129,57 @@ extern "C" void *connect_thread(void *arg)
*/
sock_fds = New(alist(10, not_owned_by_alist));
BnetThreadServerTcp((dlist *)arg, me->MaxConnections, sock_fds, &socket_workq, me->nokeepalive,
HandleConnectionRequest, my_config, &tcp_server_ready);
HandleConnectionRequest, my_config, &server_state);

return NULL;
}

#include <errno.h>
/**
* Called here by Director daemon to start UA (user agent)
* command thread. This routine creates the thread and then
* returns.
*/
void StartSocketServer(dlist *addrs)
bool StartSocketServer(dlist *addrs)
{
int status;
static dlist *myaddrs = addrs;
tcp_server_ready = false;

if (client_connections == NULL) { client_connections = New(ConnectionPool()); }
if ((status = pthread_create(&tcp_server_tid, NULL, connect_thread, (void *)myaddrs)) != 0) {
if (client_connections == nullptr) { client_connections = New(ConnectionPool()); }

if ((status = pthread_create(&tcp_server_tid, nullptr, connect_thread, (void *)myaddrs)) != 0) {
BErrNo be;
Emsg1(M_ABORT, 0, _("Cannot create UA thread: %s\n"), be.bstrerror(status));
}

int timeout_ctr = 1000; /* 1000 * 10000µs = 10 sec */
while (!tcp_server_ready && timeout_ctr--) {
Bmicrosleep(0, 10000);
int tries = 200; /* consider bind() tries in BnetThreadServerTcp */
int wait_ms = 100;
do {
Bmicrosleep(0, wait_ms * 1000);
if (server_state.load() == BnetServerState::kStarted) {
break;
}
} while (--tries);

if (server_state != BnetServerState::kStarted) {
if (client_connections) {
delete (client_connections);
client_connections = nullptr;
}
return false;
}

return;
return true;
}

void StopSocketServer()
{
if (sock_fds) {
BnetStopAndWaitForThreadServerTcp(tcp_server_tid);
delete sock_fds;
sock_fds = NULL;
sock_fds = nullptr;
}
if (client_connections) {
delete (client_connections);
client_connections = nullptr;
}
if (client_connections) { delete (client_connections); }
}
} /* namespace directordaemon */
2 changes: 1 addition & 1 deletion core/src/dird/socket_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace directordaemon {

void StartSocketServer(dlist *addrs);
bool StartSocketServer(dlist *addrs);
void StopSocketServer();

} /* namespace directordaemon */
Expand Down
110 changes: 62 additions & 48 deletions core/src/lib/bnet_server_tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#elif HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
#include <atomic>

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

Expand All @@ -72,11 +73,13 @@ struct s_sockfd {
*/
void BnetStopAndWaitForThreadServerTcp(pthread_t tid)
{
Dmsg0(100, "Stop BnetThreadServer and wait until finished\n");
Dmsg0(100, "BnetThreadServer: Request Stop\n");
quit = true;
if (!pthread_equal(tid, pthread_self())) {
pthread_kill(tid, TIMEOUT_SIGNAL);
Dmsg0(100, "BnetThreadServer: Wait until finished\n");
pthread_join(tid, nullptr);
Dmsg0(100, "BnetThreadServer: finished\n");
}
}

Expand All @@ -86,37 +89,46 @@ void BnetStopAndWaitForThreadServerTcp(pthread_t tid)
*/
static void CleanupBnetThreadServerTcp(alist *sockfds, workq_t *client_wq)
{
int status;
s_sockfd *fd_ptr = NULL;

Dmsg0(100, "CleanupBnetThreadServerTcp: start\n");

if (!sockfds->empty()) {
/*
* Cleanup open files and pointers to them
*/
fd_ptr = (s_sockfd *)sockfds->first();
if (sockfds && !sockfds->empty()) {
s_sockfd *fd_ptr = (s_sockfd *)sockfds->first();
while (fd_ptr) {
close(fd_ptr->fd);
fd_ptr = (s_sockfd *)sockfds->next();
}

sockfds->destroy();
}

/*
* Stop work queue thread
*/
if ((status = WorkqDestroy(client_wq)) != 0) {
BErrNo be;
be.SetErrno(status);
Emsg1(M_FATAL, 0, _("Could not destroy client queue: ERR=%s\n"),
be.bstrerror());
if (client_wq) {
int status = WorkqDestroy(client_wq);
if (status != 0) {
BErrNo be;
be.SetErrno(status);
Emsg1(M_ERROR, 0, _("Could not destroy client queue: ERR=%s\n"),
be.bstrerror());
}
}

Dmsg0(100, "CleanupBnetThreadServerTcp: finish\n");
}

class BNetThreadServerCleanupObject
{
public:
BNetThreadServerCleanupObject(alist *sockfds,
workq_t *client_wq)
: sockfds_(sockfds)
, client_wq_(client_wq)
{}

~BNetThreadServerCleanupObject() {
CleanupBnetThreadServerTcp(sockfds_, client_wq_);
}
private:
alist *sockfds_;
workq_t *client_wq_;
};

/**
* Become Threaded Network Server
*
Expand All @@ -134,12 +146,12 @@ void BnetThreadServerTcp(dlist *addr_list,
void *HandleConnectionRequest(ConfigurationParser *config,
void *bsock),
ConfigurationParser *config,
bool *const tcp_server_ready)
std::atomic<BnetServerState> *const server_state)
{
int newsockfd, status;
socklen_t clilen;
struct sockaddr cli_addr; /* client's address */
int tlog, tmax;
int tlog;
int value;
#ifdef HAVE_LIBWRAP
struct request_info request;
Expand All @@ -155,6 +167,10 @@ void BnetThreadServerTcp(dlist *addr_list,

char allbuf[256 * 10];

BNetThreadServerCleanupObject cleanup_object(sockfds, client_wq);

if (server_state) { server_state->store(BnetServerState::kStarting); }

/*
* Remove any duplicate addresses.
*/
Expand Down Expand Up @@ -212,28 +228,34 @@ void BnetThreadServerTcp(dlist *addr_list,
Bmicrosleep(10, 0);
}

/*
* Reuse old sockets
*/
if (setsockopt(fd_ptr->fd, SOL_SOCKET, SO_REUSEADDR, (sockopt_val_t)&value, sizeof(value)) < 0) {
BErrNo be;
Emsg1(M_WARNING, 0, _("Cannot set SO_REUSEADDR on socket: %s\n"),
be.bstrerror());
}

tmax = 30 * (60 / 5); /* wait 30 minutes max */
for (tlog = 0; bind(fd_ptr->fd, ipaddr->get_sockaddr(), ipaddr->GetSockaddrLen()) < 0; tlog -= 5) {
BErrNo be;
if (tlog <= 0) {
tlog = 2 * 60; /* Complain every 2 minutes */
Emsg2(M_WARNING, 0, _("Cannot bind port %d: ERR=%s: Retrying ...\n"),
ntohs(fd_ptr->port), be.bstrerror());
}
Bmicrosleep(5, 0);
if (--tmax <= 0) {
Emsg2(M_ABORT, 0, _("Cannot bind port %d: ERR=%s.\n"), ntohs(fd_ptr->port),
be.bstrerror());
int tries = 3;
int wait_seconds = 5;
bool ok = false;

do {
int ret = bind(fd_ptr->fd, ipaddr->get_sockaddr(), ipaddr->GetSockaddrLen());
if (ret < 0) {
BErrNo be;
Emsg2(M_WARNING, 0, _("Cannot bind port %d: ERR=%s: Retrying ...\n"),
ntohs(fd_ptr->port), be.bstrerror());
Bmicrosleep(wait_seconds, 0);
} else {
ok = true;
}
} while (!ok && --tries);

if (!ok) {
BErrNo be;
Emsg2(M_ERROR, 0, _("Cannot bind port %d: ERR=%s.\n"), ntohs(fd_ptr->port),
be.bstrerror());
if (server_state) { server_state->store(BnetServerState::kError); }
return;
}

listen(fd_ptr->fd, 50); /* tell system we are ready */
Expand Down Expand Up @@ -279,13 +301,8 @@ void BnetThreadServerTcp(dlist *addr_list,
}
#endif

if (tcp_server_ready) {
*tcp_server_ready = true;
}
if (server_state) { server_state->store(BnetServerState::kStarted); }

/*
* Wait for a connection from the client process.
*/
while (!quit) {
#ifndef HAVE_POLL
unsigned int maxfd = 0;
Expand All @@ -305,6 +322,7 @@ void BnetThreadServerTcp(dlist *addr_list,
if (errno == EINTR) {
continue;
}
if(server_state) { server_state->store(BnetServerState::kError); }
Emsg1(M_FATAL, 0, _("Error in select: %s\n"), be.bstrerror());
break;
}
Expand All @@ -321,6 +339,7 @@ void BnetThreadServerTcp(dlist *addr_list,
continue;
}
Emsg1(M_FATAL, 0, _("Error in poll: %s\n"), be.bstrerror());

break;
}

Expand Down Expand Up @@ -394,10 +413,5 @@ void BnetThreadServerTcp(dlist *addr_list,
}
}
}

CleanupBnetThreadServerTcp(sockfds, client_wq);

if (tcp_server_ready) {
*tcp_server_ready = false;
}
if(server_state) { server_state->store(BnetServerState::kEnded); }
}
12 changes: 11 additions & 1 deletion core/src/lib/bnet_server_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,18 @@
#ifndef BAREOS_LIB_BNET_SEVER_TCP_H_
#define BAREOS_LIB_BNET_SEVER_TCP_H_

#include <atomic>

class ConfigurationParser;

enum class BnetServerState {
kUndefined = 0,
kStarting,
kError,
kStarted,
kEnded
};

void BnetThreadServerTcp(dlist *addr_list,
int max_clients,
alist *sockfds,
Expand All @@ -31,7 +41,7 @@ void BnetThreadServerTcp(dlist *addr_list,
void *HandleConnectionRequest(ConfigurationParser *config,
void *bsock),
ConfigurationParser *config,
bool * const tcp_server_ready = nullptr);
std::atomic<BnetServerState> * const server_state = nullptr);
void BnetStopAndWaitForThreadServerTcp(pthread_t tid);

#endif // BAREOS_LIB_BNET_SEVER_TCP_H_
2 changes: 1 addition & 1 deletion core/src/lib/workq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ int WorkqDestroy(workq_t *wq)
int status, status1, status2;

if (wq->valid != WORKQ_VALID) {
return EINVAL;
return 0; /* nothing to do */
}
P(wq->mutex);
wq->valid = 0; /* prevent any more operations */
Expand Down
4 changes: 3 additions & 1 deletion core/src/tests/bsock_test_connection_setup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ TEST(bsock, console_director_connection_test)
Dmsg0(200, "Start UA server\n");
directordaemon::me = (directordaemon::DirectorResource *)
director_config_parser->GetNextRes(directordaemon::R_DIRECTOR, nullptr);
directordaemon::StartSocketServer(directordaemon::me->DIRaddrs);
ok = directordaemon::StartSocketServer(directordaemon::me->DIRaddrs);
EXPECT_TRUE(ok) << "Could not start StartSocketServer";
if (!ok) { return; }

JobControlRecord jcr;
memset(&jcr, 0, sizeof(jcr));
Expand Down

0 comments on commit ff319e0

Please sign in to comment.