Skip to content
Permalink
Browse files

workq: refactor code or remove unneded code for worker threads

  fixes #385: bareos daemon stop restart hang if bareos-tray-monitor is connected

  The functionality was overengineered and did not do what it
  probably should do.

  The names workq or workqueue did not explain what the code
  actually did and it was not reliable.

  Therefore refactored the complete code:

- ThreadList is now a class that contains the calls to create
  worker-threads who will call a pre-defined handler routine
- ThreadListPrivate contains the implementation and is hidden from
  the public interface
- using ThreadList with RAII in the daemon code, accordingly
- the new functionality consists mainly of a list where new threads
  are added to, therefore the name of all files and variables
  are changed into names containing "thread list" or similar
- used std::thread instead of pthread
- used std::set instead of a linked list
- worker threads start detached and cleanup resources by themself

- removed or renamed variables
- removed unused code in the thread start and at the end
- previous return type of int was not used as intended,
  therefore a simple bool is enough
- upon shutdown threads are not joined but the parent thread will
  wait until the thread list is empty or a timeout occurs
- added a user callback for each worker-thread that is called from
  the parent thread before it enters the waiting state

- ua_server: close user_agent_socket only in FreeUaContext
- synchronize shutdown of worker threads on daemon shutdown
- updated code that calls the new functions accordingly

- fd and dir: added a worker-thread shutdown handler to each
  socket server
- sd: do not cleanup jcr memory of a console connection in the
  terminate handler
- dird: StopSocketServer is now called before StopWatchdog so
  socket operations still can time out
- set the timeout for the second call of SSL_shutdown from
  120 seconds to 2 seconds as this makes director shutdown
  faster

- added a unit test
  • Loading branch information...
franku committed Aug 27, 2019
1 parent 7cd5065 commit d594cd8ac0699ba61172fcdc1609e4eda4c93763
@@ -304,9 +304,7 @@ int main(int argc, char* argv[])
break;

case 'z': /* switch network debugging on */
if (!BnetDump::EvaluateCommandLineArgs(optarg)) {
exit(1);
}
if (!BnetDump::EvaluateCommandLineArgs(optarg)) { exit(1); }
break;

case '?':
@@ -484,6 +482,7 @@ static
debug_level = 0; /* turn off debug */

DestroyConfigureUsageString();
StopSocketServer();
StopStatisticsThread();
StopWatchdog();
DbSqlPoolDestroy();
@@ -506,7 +505,6 @@ static
my_config = NULL;
}

StopSocketServer();
TermMsg(); /* Terminate message handler */
CleanupCrypto();
CloseMemoryPool(); /* release free memory in pool */
@@ -20,14 +20,6 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
*/
/*
* Kern Sibbald, July MMIII
*
*
* This code was adapted from the Bareos workq, which was
* adapted from "Programming with POSIX Threads", by
* David R. Butenhof
*/
/**
* BAREOS job queue routines.
*
@@ -312,8 +304,6 @@ int JobqAdd(jobq_t* jq, JobControlRecord* jcr)

/**
* Remove a job from the job queue. Used only by CancelJob().
* jq is a queue that was created with jobq_init
* work_item is an element of work
*
* Note, it is "removed" from the job queue.
* If you want to cancel it, you need to provide some external means
@@ -958,10 +948,12 @@ void DecReadStore(JobControlRecord* jcr)
Dmsg2(50, "Dec Rstore=%s rncj=%d\n", jcr->res.read_storage->resource_name_,
jcr->res.read_storage->runtime_storage_status->NumConcurrentJobs);

if (jcr->res.read_storage->runtime_storage_status->NumConcurrentReadJobs < 0) {
Jmsg(jcr, M_FATAL, 0, _("NumConcurrentReadJobs Dec Rstore=%s rncj=%d\n"),
jcr->res.read_storage->resource_name_,
jcr->res.read_storage->runtime_storage_status->NumConcurrentReadJobs);
if (jcr->res.read_storage->runtime_storage_status->NumConcurrentReadJobs <
0) {
Jmsg(
jcr, M_FATAL, 0, _("NumConcurrentReadJobs Dec Rstore=%s rncj=%d\n"),
jcr->res.read_storage->resource_name_,
jcr->res.read_storage->runtime_storage_status->NumConcurrentReadJobs);
}

if (jcr->res.read_storage->runtime_storage_status->NumConcurrentJobs < 0) {
@@ -35,6 +35,7 @@
#include "dird/ua_server.h"
#include "lib/berrno.h"
#include "lib/bnet_server_tcp.h"
#include "lib/thread_list.h"
#include "lib/try_tls_handshake_as_a_server.h"

#include <atomic>
@@ -47,7 +48,7 @@ static char hello_client_with_version[] =
static char hello_client[] = "Hello Client %127s calling";

/* Global variables */
static workq_t socket_workq;
static ThreadList thread_list;
static alist* sock_fds = NULL;
static pthread_t tcp_server_tid;
static ConnectionPool* client_connections = NULL;
@@ -125,6 +126,15 @@ static void* HandleConnectionRequest(ConfigurationParser* config, void* arg)
return HandleUserAgentClientRequest(bs);
}

static void* UserAgentShutdownCallback(void* bsock)
{
if (bsock) {
BareosSocket* b = reinterpret_cast<BareosSocket*>(bsock);
b->SetTerminated();
}
return nullptr;
}

extern "C" void* connect_thread(void* arg)
{
SetJcrInTsd(INVALID_JCR);
@@ -133,9 +143,9 @@ extern "C" void* connect_thread(void* arg)
* Permit MaxConnections connections.
*/
sock_fds = new alist(10, not_owned_by_alist);
BnetThreadServerTcp((dlist*)arg, me->MaxConnections, sock_fds, &socket_workq,
BnetThreadServerTcp((dlist*)arg, me->MaxConnections, sock_fds, &thread_list,
me->nokeepalive, HandleConnectionRequest, my_config,
&server_state);
&server_state, UserAgentShutdownCallback);

return NULL;
}
@@ -134,7 +134,6 @@ void* HandleUserAgentClientRequest(BareosSocket* user_agent_socket)
CloseDb(ua);
FreeUaContext(ua);
FreeJcr(jcr);
user_agent_socket->close();
delete user_agent_socket;

return NULL;
@@ -35,12 +35,13 @@
#include "filed/sd_cmds.h"
#include "lib/bsock.h"
#include "lib/bnet_server_tcp.h"
#include "lib/thread_list.h"
#include "lib/try_tls_handshake_as_a_server.h"

namespace filedaemon {

/* Global variables */
static workq_t socket_workq;
static ThreadList thread_list;
static pthread_t tcp_server_tid;
static alist* sock_fds = NULL;

@@ -105,6 +106,15 @@ static void* HandleConnectionRequest(ConfigurationParser* config, void* arg)
return nullptr;
}

static void* UserAgentShutdownCallback(void* bsock)
{
if (bsock) {
BareosSocket* b = reinterpret_cast<BareosSocket*>(bsock);
b->SetTerminated();
}
return nullptr;
}

void StartSocketServer(dlist* addrs)
{
IPADDR* p;
@@ -122,8 +132,9 @@ void StartSocketServer(dlist* addrs)
* Permit MaxConnections connections.
*/
sock_fds = new alist(10, not_owned_by_alist);
BnetThreadServerTcp(addrs, me->MaxConnections, sock_fds, &socket_workq,
me->nokeepalive, HandleConnectionRequest, my_config);
BnetThreadServerTcp(addrs, me->MaxConnections, sock_fds, &thread_list,
me->nokeepalive, HandleConnectionRequest, my_config,
nullptr, UserAgentShutdownCallback);
}

void StopSocketServer(bool wait)
@@ -497,7 +497,6 @@ class JobControlRecord {
pthread_cond_t start_wait; /**< Wait for FD to start Job */
pthread_cond_t term_wait; /**< Wait for job termination */
pthread_cond_t nextrun_ready; /**< Wait for job next run to become ready */
workq_ele_t* work_item; /**< Work queue item if scheduled */
BareosSocket* ua; /**< User agent */
Resources res; /**< Resources assigned */
TREE_ROOT* restore_tree_root; /**< Selected files to restore (some protocols
@@ -30,8 +30,8 @@ set(INCLUDE_FILES ../include/baconfig.h ../include/bareos.h
mntent_cache.h parse_conf.h
plugins.h qualified_resource_name_type_converter.h queue.h rblist.h
runscript.h rwlock.h scsi_crypto.h scsi_lli.h scsi_tapealert.h
serial.h sha1.h status.h tls.h tls_conf.h tree.h try_tls_handshake_as_a_server.h
var.h watchdog.h workq.h)
serial.h sha1.h status.h thread_list.h tls.h tls_conf.h tree.h try_tls_handshake_as_a_server.h
var.h watchdog.h)

INSTALL(FILES ${INCLUDE_FILES} DESTINATION ${includedir})
ENDIF()
@@ -58,10 +58,10 @@ set (BAREOS_SRCS address_conf.cc alist.cc attr.cc attribs.cc backtrace.cc base6
mntent_cache.cc output_formatter.cc passphrase.cc path_list.cc plugins.cc
bpoll.cc priv.cc
queue.cc rblist.cc runscript.cc rwlock.cc scan.cc scsi_crypto.cc scsi_lli.cc
serial.cc sha1.cc signal.cc tls.cc
serial.cc sha1.cc signal.cc thread_list.cc tls.cc
tls_conf.cc tls_openssl.cc
tls_openssl_crl.cc tls_openssl_private.cc tree.cc try_tls_handshake_as_a_server.cc
compression.cc util.cc var.cc watchdog.cc workq.cc)
compression.cc util.cc var.cc watchdog.cc)

IF(HAVE_WIN32)
LIST(APPEND BAREOS_SRCS
@@ -35,6 +35,7 @@
#include "lib/bnet_server_tcp.h"
#include "lib/bsock_tcp.h"
#include "lib/bsys.h"
#include "lib/thread_list.h"
#include "lib/watchdog.h"

#include <netinet/in.h>
@@ -91,7 +92,7 @@ void BnetStopAndWaitForThreadServerTcp(pthread_t tid)
* Perform a cleanup for the Threaded Network Server check if there is still
* something to do or that the cleanup already took place.
*/
static void CleanupBnetThreadServerTcp(alist* sockfds, workq_t* client_wq)
static void CleanupBnetThreadServerTcp(alist* sockfds, ThreadList* thread_list)
{
Dmsg0(100, "CleanupBnetThreadServerTcp: start\n");

@@ -104,33 +105,29 @@ static void CleanupBnetThreadServerTcp(alist* sockfds, workq_t* client_wq)
sockfds->destroy();
}

if (client_wq) {
int status = WorkqDestroy(client_wq);
if (status != 0) {
BErrNo be;
be.SetErrno(status);
Emsg1(M_ERROR, 0, _("Could not destroy client queue: ERR=%s\n"),
be.bstrerror());
if (thread_list) {
if (!thread_list->WaitUntilThreadListIsEmpty()) {
Emsg1(M_ERROR, 0, _("Could not destroy client queue.\n"));
}
}
Dmsg0(100, "CleanupBnetThreadServerTcp: finish\n");
}

class BNetThreadServerCleanupObject {
public:
BNetThreadServerCleanupObject(alist* sockfds, workq_t* client_wq)
: sockfds_(sockfds), client_wq_(client_wq)
BNetThreadServerCleanupObject(alist* sockfds, ThreadList* thread_list)
: sockfds_(sockfds), thread_list_(thread_list)
{
}

~BNetThreadServerCleanupObject()
{
CleanupBnetThreadServerTcp(sockfds_, client_wq_);
CleanupBnetThreadServerTcp(sockfds_, thread_list_);
}

private:
alist* sockfds_;
workq_t* client_wq_;
ThreadList* thread_list_;
};

/**
@@ -146,11 +143,12 @@ void BnetThreadServerTcp(
dlist* addr_list,
int max_clients,
alist* sockfds,
workq_t* client_wq,
ThreadList* thread_list,
bool nokeepalive,
void* HandleConnectionRequest(ConfigurationParser* config, void* bsock),
ConfigurationParser* config,
std::atomic<BnetServerState>* const server_state)
std::atomic<BnetServerState>* const server_state,
void* UserAgentShutdownCallback(void* bsock))
{
int newsockfd, status;
socklen_t clilen;
@@ -171,7 +169,7 @@ void BnetThreadServerTcp(

char allbuf[256 * 10];

BNetThreadServerCleanupObject cleanup_object(sockfds, client_wq);
BNetThreadServerCleanupObject cleanup_object(sockfds, thread_list);

quit = false;
if (server_state) { server_state->store(BnetServerState::kStarting); }
@@ -275,16 +273,8 @@ void BnetThreadServerTcp(
#endif
}

/*
* Start work queue thread
*/
if ((status = WorkqInit(client_wq, max_clients, HandleConnectionRequest)) !=
0) {
BErrNo be;
be.SetErrno(status);
Emsg1(M_ABORT, 0, _("Could not init client queue: ERR=%s\n"),
be.bstrerror());
}
thread_list->Init(max_clients, HandleConnectionRequest,
UserAgentShutdownCallback);

#ifdef HAVE_POLL
/*
@@ -357,7 +347,8 @@ void BnetThreadServerTcp(
*/
do {
clilen = sizeof(cli_addr);
newsockfd = accept(fd_ptr->fd, reinterpret_cast<sockaddr*>(&cli_addr), &clilen);
newsockfd = accept(fd_ptr->fd, reinterpret_cast<sockaddr*>(&cli_addr),
&clilen);
} while (newsockfd < 0 && errno == EINTR);
if (newsockfd < 0) { continue; }
#ifdef HAVE_LIBWRAP
@@ -368,7 +359,8 @@ void BnetThreadServerTcp(
V(mutex);
Jmsg2(NULL, M_SECURITY, 0,
_("Connection from %s:%d refused by hosts.access\n"),
SockaddrToAscii(reinterpret_cast<sockaddr*>(&cli_addr), buf, sizeof(buf)),
SockaddrToAscii(reinterpret_cast<sockaddr*>(&cli_addr), buf,
sizeof(buf)),
SockaddrGetPort(reinterpret_cast<sockaddr*>(&cli_addr)));
close(newsockfd);
continue;
@@ -390,7 +382,8 @@ void BnetThreadServerTcp(
* See who client is. i.e. who connected to us.
*/
P(mutex);
SockaddrToAscii(reinterpret_cast<sockaddr*>(&cli_addr), buf, sizeof(buf));
SockaddrToAscii(reinterpret_cast<sockaddr*>(&cli_addr), buf,
sizeof(buf));
V(mutex);

BareosSocket* bs;
@@ -404,15 +397,8 @@ void BnetThreadServerTcp(
memset(&bs->peer_addr, 0, sizeof(bs->peer_addr));
memcpy(&bs->client_addr, &cli_addr, sizeof(bs->client_addr));

/*
* Queue client to be served
*/
if ((status = WorkqAdd(client_wq, config, (void*)bs, NULL)) != 0) {
BErrNo be;
be.SetErrno(status);
Jmsg1(NULL, M_ABORT, 0,
_("Could not add job to client queue: ERR=%s\n"),
be.bstrerror());
if (!thread_list->CreateAndAddNewThread(config, (void*)bs)) {
Jmsg1(NULL, M_ABORT, 0, _("Could not add job to client queue.\n"));
}
}
}
@@ -24,6 +24,7 @@
#include <atomic>

class ConfigurationParser;
class ThreadList;

enum class BnetServerState
{
@@ -41,11 +42,12 @@ void BnetThreadServerTcp(
dlist* addr_list,
int max_clients,
alist* sockfds,
workq_t* client_wq,
ThreadList* thread_list,
bool nokeepalive,
void* HandleConnectionRequest(ConfigurationParser* config, void* bsock),
ConfigurationParser* config,
std::atomic<BnetServerState>* const server_state = nullptr);
std::atomic<BnetServerState>* const server_state = nullptr,
void* UserAgentShutdownCallback(void* bsock) = nullptr);
void BnetStopAndWaitForThreadServerTcp(pthread_t tid);

#endif // BAREOS_LIB_BNET_SEVER_TCP_H_
@@ -36,7 +36,6 @@
#include "queue.h"
#include "serial.h"
#include "lex.h"
#include "workq.h"
#ifndef BAREOS_LIB_LIB_H_
#include "fnmatch.h"
#endif

0 comments on commit d594cd8

Please sign in to comment.
You can’t perform that action at this time.