Skip to content

Commit

Permalink
dird: statistic thread crash fixed
Browse files Browse the repository at this point in the history
 - stop statistics thread before reload config and restart afterwards
 - added debug message when old resources table is destroyed within callback
 - cleanup variable names and removed obvious comments

 Fixes #695: director crashes some time after a reload if Collect Statistic is enabled
  • Loading branch information
franku committed May 7, 2018
1 parent a5d39b8 commit 8f42a3a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 130 deletions.
109 changes: 34 additions & 75 deletions core/src/dird/dird.cc
Expand Up @@ -138,12 +138,15 @@ static void ReloadJobEndCb(JobControlRecord *jcr, void *ctx)

foreach_alist_index(i, table, reload_table) {
if (table == (resource_table_reference *)ctx) {
if (--table->JobCount <= 0) {
Dmsg0(100, "Last reference to old configuration, removing saved configuration\n");
FreeSavedResources(table);
reload_table->remove(i);
free(table);
break;
if (table->JobCount) {
table->JobCount--;
if (table->JobCount == 0) {
Dmsg1(100, "Last reference to old configuration table: %#010x\n", table);
FreeSavedResources(table);
reload_table->remove(i);
free(table);
break;
}
}
}
}
Expand Down Expand Up @@ -565,91 +568,59 @@ static bool init_sighandler_sighup()
return retval;
}

/**
* The algorithm used is as follows: we count how many jobs are
* running and mark the running jobs to make a callback on
* exiting. The old config is saved with the reload table
* id in a reload table. The new configuration is read. Now, as
* each job exits, it calls back to the ReloadJobEndCb(), which
* decrements the count of open jobs for the given reload table.
* When the count goes to zero, we release those resources.
* This allows us to have pointers into the resource table (from
* jobs), and once they exit and all the pointers are released, we
* release the old table. Note, if no new jobs are running since the
* last reload, then the old resources will be immediately release.
* A console is considered a job because it may have pointers to
* resources, but a SYSTEM job is not since it *should* not have any
* permanent pointers to resources.
*/
bool do_reload_config()
{
static bool already_here = false;
bool ok = false;
static bool is_reloading = false;
bool reloaded = false;
JobControlRecord *jcr;
int njobs = 0; /* Number of running jobs */
resource_table_reference prev_config;

if (already_here) {
if (is_reloading) {
/*
* Note: don't use Jmsg here, as it could produce a race condition
* on multiple parallel reloads
*/
Qmsg(NULL, M_ERROR, 0, _("Already reloading. Request ignored.\n"));
return false;
}
already_here = true;
is_reloading = true;

StopStatisticsThread();

LockJobs();
LockRes();

/*
* Flush the sql connection pools.
*/
DbSqlPoolFlush();

/*
* Save the previous config so we can restore it.
*/
resource_table_reference prev_config;
prev_config.res_table = my_config->save_resources();
prev_config.JobCount = 0;

/*
* Start parsing the new config.
*/
Dmsg0(100, "Reloading config file\n");
ok = ParseDirConfig(my_config, configfile, M_ERROR);
bool ok = ParseDirConfig(my_config, configfile, M_ERROR);

if (!ok || !check_resources() || !CheckCatalog(UPDATE_CATALOG) || !InitializeSqlPooling()) {
int num;
resource_table_reference failed_config;

Jmsg(NULL, M_ERROR, 0, _("Please correct the configuration in %s\n"), my_config->get_base_config_path());
Jmsg(NULL, M_ERROR, 0, _("Resetting to previous configuration.\n"));

/*
* Save the config we were not able to load.
*/
failed_config.res_table = my_config->save_resources();
resource_table_reference temp_config;
temp_config.res_table = my_config->save_resources();

/*
* Now restore old resource values,
*/
num = my_config->r_last_ - my_config->r_first_ + 1;
for (int i = 0; i < num; i++) {
int num_rcodes = my_config->r_last_ - my_config->r_first_ + 1;
for (int i = 0; i < num_rcodes; i++) {
// restore original config
my_config->res_head_[i] = prev_config.res_table[i];
}

/*
* Reset director resource to old config as check_resources() changed it
*/
// me is changed above by check_resources()
me = (DirectorResource *)GetNextRes(R_DIRECTOR, NULL);

/*
* Destroy the content of the failed config load.
*/
FreeSavedResources(&failed_config);
FreeSavedResources(&temp_config);
goto bail_out;
} else {

} else { // parse config ok

JobControlRecord *jcr;
int num_running_jobs = 0;
resource_table_reference *new_table = NULL;

InvalidateSchedules();
Expand All @@ -661,42 +632,30 @@ bool do_reload_config()
}
new_table->JobCount++;
RegisterJobEndCallback(jcr, ReloadJobEndCb, (void *)new_table);
njobs++;
num_running_jobs++;
}
}
endeach_jcr(jcr);
reloaded = true;

/*
* Reset globals
*/
SetWorkingDirectory(me->working_directory);
Dmsg0(10, "Director's configuration file reread.\n");

if (njobs > 0) {
/*
* See if we already initialized the alist.
*/
if (num_running_jobs > 0) {
if (!reload_table) {
reload_table = New(alist(10, not_owned_by_alist));
}

/*
* Push the saved resource info onto the alist.
*/
reload_table->push(new_table);
} else {
/*
* There are no running Jobs so we don't need to keep the old config around.
*/
} else { // no jobs running
FreeSavedResources(&prev_config);
}
StartStatisticsThread();
}

bail_out:
UnlockRes();
UnlockJobs();
already_here = false;
is_reloading = false;
return reloaded;
}

Expand Down
69 changes: 14 additions & 55 deletions core/src/dird/stats.cc
Expand Up @@ -45,7 +45,6 @@ static char TapeAlerts[] =
static char JobStats[] =
"Jobstats [%lld]: JobId=%ld, JobFiles=%lu, JobBytes=%llu, DevName=%s";

/* Static globals */
static bool quit = false;
static bool statistics_initialized = false;
static bool need_flush = true;
Expand All @@ -68,18 +67,12 @@ static inline bool LookupDevice(JobControlRecord *jcr, const char *device_name,

memset(&dr, 0, sizeof(dr));

/*
* See if we can use the cached DeviceId.
*/
if (cached_device.StorageId == StorageId &&
bstrcmp(cached_device.device_name, device_name)) {
*DeviceId = cached_device.DeviceId;
return true;
}

/*
* Find or create device record
*/
dr.StorageId = StorageId;
bstrncpy(dr.Name, device_name, sizeof(dr.Name));
if (!jcr->db->CreateDeviceRecord(jcr, &dr)){
Expand All @@ -89,9 +82,6 @@ static inline bool LookupDevice(JobControlRecord *jcr, const char *device_name,

Dmsg3(200, "Deviceid of \"%s\" on StorageId %d is %d\n", dr.Name, dr.StorageId, dr.DeviceId);

/*
* Cache the result.
*/
bstrncpy(cached_device.device_name, device_name, sizeof(cached_device.device_name));
cached_device.StorageId = StorageId;
cached_device.DeviceId = dr.DeviceId;
Expand All @@ -103,19 +93,12 @@ static inline bool LookupDevice(JobControlRecord *jcr, const char *device_name,
return false;
}

/**
* Wait for the next run.
*/
static inline void wait_for_next_run()
{
struct timeval tv;
struct timezone tz;
struct timespec timeout;

/*
* Wait for a next run. Normally this waits exactly me->stats_collect_interval seconds.
* It can be interrupted when signaled by the StopStatisticsThread() function.
*/
gettimeofday(&tv, &tz);
timeout.tv_nsec = tv.tv_usec * 1000;
timeout.tv_sec = tv.tv_sec + me->stats_collect_interval;
Expand All @@ -125,27 +108,20 @@ static inline void wait_for_next_run()
V(mutex);
}

/**
* Entry point for a separate statistics thread.
*/
extern "C"
void *statistics_thread_runner(void *arg)
void *statistics_thread(void *arg)
{
JobControlRecord *jcr;
utime_t now;
PoolMem current_store(PM_NAME);

Dmsg0(200, "Starting statistics thread\n");

memset(&cached_device, 0, sizeof(struct cached_device));
PmStrcpy(current_store, "");

/*
* Create a dummy JobControlRecord for the statistics thread.
*/
jcr = new_control_jcr("*StatisticsCollector*", JT_SYSTEM);

/*
* Open a connection to the database for storing long term statistics.
*/
jcr->res.catalog = (CatalogResource *)GetNextRes(R_CATALOG, NULL);
jcr->db = db_sql_get_pooled_connection(jcr,
jcr->res.catalog->db_driver,
Expand All @@ -164,33 +140,21 @@ void *statistics_thread_runner(void *arg)
goto bail_out;
}

/*
* Do our work as long as we are not signaled to quit.
*/
while (!quit) {
now = (utime_t)time(NULL);

Dmsg1(200, "statistics_thread_runner: Doing work at %ld\n", now);

/*
* Do nothing if no job is running currently.
*/
if (JobCount() == 0) {
if (!need_flush) {
Dmsg0(200, "statistics_thread_runner: do nothing as no jobs are running\n");
wait_for_next_run();
continue;
} else {
/*
* Flush any pending statistics data one more time and then sleep until new jobs start running.
*/
Dmsg0(200, "statistics_thread_runner: flushing pending statistics\n");
need_flush = false;
}
} else {
/*
* We have running jobs so on a next run we still need to flush any collected data.
*/
need_flush = true;
}

Expand Down Expand Up @@ -227,12 +191,9 @@ void *statistics_thread_runner(void *arg)
continue;
}

/*
* Try connecting 2 times with a max time to wait of 1 seconds.
* We don't want to lock the resources to long. And as the stored
* will cache the stats anyway we can always try collecting things
* in the next run.
*/
constexpr int retries = 2;
constexpr int timeout_secs= 1;

jcr->res.rstore = store;
if (!ConnectToStorageDaemon(jcr, 2, 1, false)) {
UnlockRes();
Expand Down Expand Up @@ -323,22 +284,21 @@ void *statistics_thread_runner(void *arg)
}
}

/*
* Disconnect.
*/
jcr->store_bsock->close();
delete jcr->store_bsock;
jcr->store_bsock = NULL;
}

} // while (1)

wait_for_next_run();
}

DbSqlClosePooledConnection(jcr, jcr->db);
} // while(!quit)

bail_out:
FreeJcr(jcr);

Dmsg0(200, "Finished statistics thread\n");

return NULL;
}

Expand All @@ -350,7 +310,9 @@ int StartStatisticsThread(void)
return 0;
}

if ((status = pthread_create(&statistics_tid, NULL, statistics_thread_runner, NULL)) != 0) {
quit = false;

if ((status = pthread_create(&statistics_tid, NULL, statistics_thread, NULL)) != 0) {
return status;
}

Expand All @@ -374,9 +336,6 @@ void StopStatisticsThread()

void stats_job_started()
{
/*
* A new Job was started so we need to flush any pending statistics the next run.
*/
if (statistics_initialized) {
need_flush = true;
}
Expand Down

0 comments on commit 8f42a3a

Please sign in to comment.