Skip to content

Commit

Permalink
Tear down of daemons is flawed.
Browse files Browse the repository at this point in the history
When one of the servers is teared down in most daemons the thread
spawned for handling the bnet_thread_server or the main thread
never closes all sockets and releases the workq as most of the time
the thread is just shot inflight and as a side effect things get
cleaned up. When the process exits resources are released but its
not very clean.

Fixes #53: Tear down of daemons is flawed.
  • Loading branch information
Marco van Wieringen committed Feb 17, 2015
1 parent 2825a9e commit fc9ba4c
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 57 deletions.
4 changes: 2 additions & 2 deletions src/dird/dird.c
Expand Up @@ -53,8 +53,8 @@ extern void invalidate_schedules();
extern bool parse_dir_config(CONFIG *config, const char *configfile, int exit_code);

/* Imported subroutines */
void term_ua_server();
void start_UA_server(dlist *addrs);
void stop_UA_server(void);
void init_job_server(int max_workers);
void term_job_server();
void store_jobtype(LEX *lc, RES_ITEM *item, int index, int pass);
Expand Down Expand Up @@ -406,7 +406,7 @@ void terminate_dird(int sig)
free(config);
config = NULL;
}
term_ua_server();
stop_UA_server();
term_msg(); /* terminate message handler */
cleanup_crypto();
close_memory_pool(); /* release free memory in pool */
Expand Down
1 change: 0 additions & 1 deletion src/dird/job.c
Expand Up @@ -38,7 +38,6 @@ static bool job_check_maxruntime(JCR *jcr);
static bool job_check_maxrunschedtime(JCR *jcr);

/* Imported subroutines */
extern void term_ua_server();

/* Imported variables */

Expand Down
36 changes: 20 additions & 16 deletions src/dird/ua_server.c
Expand Up @@ -31,15 +31,15 @@

/* Imported variables */


/* Forward referenced functions */
extern "C" void *connect_thread(void *arg);
static void *handle_UA_client_request(void *arg);


/* Global variables */
static int started = FALSE;
static workq_t ua_workq;
static alist *sock_fds;
static pthread_t server_tid;

struct s_addr_port {
char *addr;
Expand All @@ -52,26 +52,41 @@ struct s_addr_port {
*/
void start_UA_server(dlist *addrs)
{
pthread_t thid;
int status;
static dlist *myaddrs = addrs;

if ((status=pthread_create(&thid, NULL, connect_thread, (void *)myaddrs)) != 0) {
if ((status = pthread_create(&server_tid, NULL, connect_thread, (void *)myaddrs)) != 0) {
berrno be;
Emsg1(M_ABORT, 0, _("Cannot create UA thread: %s\n"), be.bstrerror(status));
}
started = TRUE;

return;
}

void stop_UA_server()
{
if (!started) {
return;
}

bnet_stop_thread_server(server_tid);

cleanup_bnet_thread_server(sock_fds, &ua_workq);
delete sock_fds;
sock_fds = NULL;
}

extern "C"
void *connect_thread(void *arg)
{
pthread_detach(pthread_self());
set_jcr_in_tsd(INVALID_JCR);

/* Permit MaxConsoleConnect console connections */
bnet_thread_server((dlist*)arg, director->MaxConsoleConnect, &ua_workq, handle_UA_client_request);
sock_fds = New(alist(10, not_owned_by_alist));
bnet_thread_server((dlist*)arg, director->MaxConsoleConnect, sock_fds, &ua_workq, handle_UA_client_request);

return NULL;
}

Expand Down Expand Up @@ -213,14 +228,3 @@ void free_ua_context(UAContext *ua)
}
free(ua);
}


/*
* Called from main BAREOS thread
*/
void term_ua_server()
{
if (!started) {
return;
}
}
11 changes: 10 additions & 1 deletion src/filed/filed.c
Expand Up @@ -49,6 +49,7 @@ void *start_heap;
char *configfile = NULL;
static bool foreground = false;
static workq_t dir_workq; /* queue of work from Director */
static alist *sock_fds;
static pthread_t server_tid;
static CONFIG *config;

Expand Down Expand Up @@ -258,9 +259,12 @@ int main (int argc, char *argv[])
foreach_dlist(p, me->FDaddrs) {
Dmsg1(10, "filed: listening on port %d\n", p->get_port_host_order());
}
bnet_thread_server(me->FDaddrs, me->MaxConcurrentJobs, &dir_workq, handle_client_request);

sock_fds = New(alist(10, not_owned_by_alist));
bnet_thread_server(me->FDaddrs, me->MaxConcurrentJobs, sock_fds, &dir_workq, handle_client_request);

terminate_filed(0);

exit(0); /* should never get here */
}

Expand All @@ -277,6 +281,11 @@ void terminate_filed(int sig)
stop_watchdog();

bnet_stop_thread_server(server_tid);

cleanup_bnet_thread_server(sock_fds, &dir_workq);
delete sock_fds;
sock_fds = NULL;

unload_fd_plugins();
flush_mntent_cache();
write_state_file(me->working_directory, "bareos-fd", get_first_port_host_order(me->FDaddrs));
Expand Down
79 changes: 51 additions & 28 deletions src/lib/bnet_server.c
Expand Up @@ -54,6 +54,15 @@ int deny_severity = LOG_WARNING;

static bool quit = false;

struct s_sockfd {
int fd;
int port;
};

/*
* Stop the Threaded Network Server if its realy running in a seperate thread.
* e.g. set the quit flag and wait for the other thread to exit cleanly.
*/
void bnet_stop_thread_server(pthread_t tid)
{
quit = true;
Expand All @@ -62,6 +71,39 @@ void bnet_stop_thread_server(pthread_t tid)
}
}

/*
* Perform a cleanup for the Threaded Network Server check if there is still
* something to do or that the cleanup already took place.
*/
void cleanup_bnet_thread_server(alist *sockfds, workq_t *client_wq)
{
int status;
s_sockfd *fd_ptr = NULL;

if (!sockfds->empty()) {
/*
* Cleanup open files and pointers to them
*/
fd_ptr = (s_sockfd *)sockfds->first();
while (fd_ptr) {
close(fd_ptr->fd);
fd_ptr = (s_sockfd *)sockfds->next();
}

sockfds->destroy();

/*
* Stop work queue thread
*/
if ((status = workq_destroy(client_wq)) != 0) {
berrno be;
be.set_errno(status);
Emsg1(M_FATAL, 0, _("Could not destroy client queue: ERR=%s\n"),
be.bstrerror());
}
}
}

/*
* Become Threaded Network Server
*
Expand All @@ -71,8 +113,8 @@ void bnet_stop_thread_server(pthread_t tid)
*
* At the moment it is impossible to bind to different ports.
*/
void bnet_thread_server(dlist *addr_list, int max_clients, workq_t *client_wq,
void *handle_client_request(void *bsock))
void bnet_thread_server(dlist *addr_list, int max_clients, alist *sockfds,
workq_t *client_wq, void *handle_client_request(void *bsock))
{
int newsockfd, status;
socklen_t clilen;
Expand All @@ -83,12 +125,8 @@ void bnet_thread_server(dlist *addr_list, int max_clients, workq_t *client_wq,
struct request_info request;
#endif
IPADDR *ipaddr, *next;
struct s_sockfd {
int fd;
int port;
} *fd_ptr = NULL;
s_sockfd *fd_ptr = NULL;
char buf[128];
alist sockfds(10, not_owned_by_alist);
#ifdef HAVE_POLL
nfds_t nfds;
struct pollfd *pfds;
Expand Down Expand Up @@ -164,7 +202,7 @@ void bnet_thread_server(dlist *addr_list, int max_clients, workq_t *client_wq,
}

listen(fd_ptr->fd, 50); /* tell system we are ready */
sockfds.append(fd_ptr);
sockfds->append(fd_ptr);

#ifdef HAVE_POLL
nfds++;
Expand All @@ -188,7 +226,7 @@ void bnet_thread_server(dlist *addr_list, int max_clients, workq_t *client_wq,
memset(pfds, 0, sizeof(struct pollfd) * nfds);

nfds = 0;
foreach_alist(fd_ptr, &sockfds) {
foreach_alist(fd_ptr, sockfds) {
pfds[nfds].fd = fd_ptr->fd;
pfds[nfds].events |= POLL_IN;
nfds++;
Expand All @@ -204,7 +242,7 @@ void bnet_thread_server(dlist *addr_list, int max_clients, workq_t *client_wq,
fd_set sockset;
FD_ZERO(&sockset);

foreach_alist(fd_ptr, &sockfds) {
foreach_alist(fd_ptr, sockfds) {
FD_SET((unsigned)fd_ptr->fd, &sockset);
if ((unsigned)fd_ptr->fd > maxfd) {
maxfd = fd_ptr->fd;
Expand All @@ -221,7 +259,7 @@ void bnet_thread_server(dlist *addr_list, int max_clients, workq_t *client_wq,
break;
}

foreach_alist(fd_ptr, &sockfds) {
foreach_alist(fd_ptr, sockfds) {
if (FD_ISSET(fd_ptr->fd, &sockset)) {
#else
int cnt;
Expand All @@ -237,7 +275,7 @@ void bnet_thread_server(dlist *addr_list, int max_clients, workq_t *client_wq,
}

cnt = 0;
foreach_alist(fd_ptr, &sockfds) {
foreach_alist(fd_ptr, sockfds) {
if (pfds[cnt++].revents & POLLIN) {
#endif
/*
Expand Down Expand Up @@ -301,20 +339,5 @@ void bnet_thread_server(dlist *addr_list, int max_clients, workq_t *client_wq,
}
}

/*
* Cleanup open files and pointers to them
*/
while ((fd_ptr = (s_sockfd *)sockfds.first())) {
close(fd_ptr->fd);
}

/*
* Stop work queue thread
*/
if ((status = workq_destroy(client_wq)) != 0) {
berrno be;
be.set_errno(status);
Emsg1(M_FATAL, 0, _("Could not destroy client queue: ERR=%s\n"),
be.bstrerror());
}
cleanup_bnet_thread_server(sockfds, client_wq);
}
12 changes: 6 additions & 6 deletions src/lib/protos.h
Expand Up @@ -83,16 +83,16 @@ dlist *bnet_host2ipaddrs(const char *host, int family, const char **errstr);
int bnet_set_blocking(BSOCK *sock);
int bnet_set_nonblocking(BSOCK *sock);
void bnet_restore_blocking(BSOCK *sock, int flags);

/* bnet_server.c */
void bnet_thread_server(dlist *addr_lis, int max_clients, workq_t *client_wq,
void *handle_client_request(void *bsock));
void bnet_stop_thread_server(pthread_t tid);
void bnet_server(int port, void handle_client_request(BSOCK *bsock));
int net_connect(int port);
BSOCK *bnet_bind(int port);
BSOCK *bnet_accept(BSOCK *bsock, char *who);

/* bnet_server.c */
void cleanup_bnet_thread_server(alist *sockfds, workq_t *client_wq);
void bnet_thread_server(dlist *addr_list, int max_clients, alist *sockfds,
workq_t *client_wq, void *handle_client_request(void *bsock));
void bnet_stop_thread_server(pthread_t tid);

/* bpipe.c */
BPIPE *open_bpipe(char *prog, int wait, const char *mode);
int close_wpipe(BPIPE *bpipe);
Expand Down
4 changes: 3 additions & 1 deletion src/stored/ndmp_tape.c
Expand Up @@ -1452,8 +1452,10 @@ extern "C" void *ndmp_thread_server(void *arg)
/*
* Cleanup open files.
*/
while ((fd_ptr = (s_sockfd *)sockfds.first())) {
fd_ptr = (s_sockfd *)sockfds->first();
while (fd_ptr) {
close(fd_ptr->fd);
fd_ptr = (s_sockfd *)sockfds->next();
}

/*
Expand Down
9 changes: 7 additions & 2 deletions src/stored/stored.c
Expand Up @@ -62,7 +62,6 @@ pthread_mutex_t device_release_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wait_device_release = PTHREAD_COND_INITIALIZER;
void *start_heap;


static uint32_t VolSessionId = 0;
uint32_t VolSessionTime;
char *configfile = NULL;
Expand All @@ -75,9 +74,9 @@ static workq_t dird_workq; /* queue for processing connections */
#if HAVE_NDMP
static workq_t ndmp_workq; /* queue for processing NDMP connections */
#endif
static alist *sock_fds;
static CONFIG *config;


static void usage()
{
fprintf(stderr, _(
Expand Down Expand Up @@ -298,8 +297,10 @@ int main (int argc, char *argv[])
#endif

/* Single server used for Director and File daemon */
sock_fds = New(alist(10, not_owned_by_alist));
bnet_thread_server(me->sdaddrs,
me->max_concurrent_jobs * 2 + 1,
sock_fds,
&dird_workq,
handle_connection_request);
exit(1); /* to keep compiler quiet */
Expand Down Expand Up @@ -677,6 +678,10 @@ void terminate_stored(int sig)
#endif
stop_watchdog();

cleanup_bnet_thread_server(sock_fds, &dird_workq);
delete sock_fds;
sock_fds = NULL;

if (sig == SIGTERM) { /* normal shutdown request? */
/*
* This is a normal shutdown request. We wiffle through
Expand Down

0 comments on commit fc9ba4c

Please sign in to comment.