Skip to content

Commit

Permalink
stored: fix jcr->authenticate data race
Browse files Browse the repository at this point in the history
The condition variable is not used correctly:

// reader
1|  while (!unprotected) {
2|        wait(cond_var)
    }
// writer

3|  unprotected = true;
4|  signal(cond_var)

The execution order 1->3->4->2 will cause a deadlock.  This is why the
wait command takes a mutex:  Everything that might change the
condition to be true needs to lock the mutex,  this way we can ensure
that we either see the updated value or the wait sees the signal.

Since jcr->authenticate is used all over the place in a lot of
different situations, this problem could not be easily fixed by just
protecting that variable (we do not want weird deadlocks to happen
after all).

We just do not rely on jcr->authenticate anymore when it comes to
waiting on job start.  Instead we have a single, properly protected
bool `client_available` that we can wait on.
This bool obviously needs to be set by whoever authenticates the FD/SD
connection, otherwise the job will deadlock.   But at least that is
easily fixable.
  • Loading branch information
sebsura authored and BareosBot committed Mar 18, 2024
1 parent 689cc22 commit 20682c9
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 93 deletions.
16 changes: 3 additions & 13 deletions core/src/stored/dir_cmd.cc
Expand Up @@ -254,16 +254,6 @@ void* HandleDirectorConnection(BareosSocket* dir)
jcr->dir_bsock = dir; /* save Director bsock */
jcr->dir_bsock->SetJcr(jcr);

// Initialize Start Job condition variable
errstat = pthread_cond_init(&jcr->sd_impl->job_start_wait, NULL);
if (errstat != 0) {
BErrNo be;
Jmsg1(jcr, M_FATAL, 0,
T_("Unable to init job start cond variable: ERR=%s\n"),
be.bstrerror(errstat));
goto bail_out;
}

// Initialize End Job condition variable
errstat = pthread_cond_init(&jcr->sd_impl->job_end_wait, NULL);
if (errstat != 0) {
Expand Down Expand Up @@ -495,8 +485,7 @@ static bool CancelCmd(JobControlRecord* cjcr)
Dmsg2(800, "Cancel JobId=%d %p\n", jcr->JobId, jcr);
if (!jcr->authenticated
&& (oldStatus == JS_WaitFD || oldStatus == JS_WaitSD)) {
pthread_cond_signal(
&jcr->sd_impl->job_start_wait); /* wake waiting thread */
jcr->sd_impl->job_start_wait.notify_one(); /* wake waiting thread */
}

if (jcr->file_bsock) {
Expand All @@ -506,7 +495,7 @@ static bool CancelCmd(JobControlRecord* cjcr)
} else {
if (oldStatus != JS_WaitSD) {
// Still waiting for FD to connect, release it
pthread_cond_signal(&jcr->sd_impl->job_start_wait); /* wake waiting job */
jcr->sd_impl->job_start_wait.notify_one(); /* wake waiting job */
Dmsg2(800, "Signal FD connect jid=%d %p\n", jcr->JobId, jcr);
}
}
Expand Down Expand Up @@ -1765,6 +1754,7 @@ static bool PassiveCmd(JobControlRecord* jcr)
utime_t now;

Dmsg0(110, "Authenticated with FD.\n");
*jcr->sd_impl->client_available.lock() = true;

// Update the initial Job Statistics.
now = (utime_t)time(NULL);
Expand Down
4 changes: 2 additions & 2 deletions core/src/stored/fd_cmds.cc
Expand Up @@ -137,7 +137,7 @@ void* HandleFiledConnection(BareosSocket* fd, char* job_name)
} else {
utime_t now;

jcr->authenticated = true;
*jcr->sd_impl->client_available.lock() = true;
Dmsg2(50, "OK Authentication jid=%u Job %s\n", (uint32_t)jcr->JobId,
jcr->Job);

Expand All @@ -146,7 +146,7 @@ void* HandleFiledConnection(BareosSocket* fd, char* job_name)
UpdateJobStatistics(jcr, now);
}

pthread_cond_signal(&jcr->sd_impl->job_start_wait); /* wake waiting job */
jcr->sd_impl->job_start_wait.notify_one(); /* wake waiting job */
FreeJcr(jcr);

return NULL;
Expand Down
91 changes: 39 additions & 52 deletions core/src/stored/job.cc
Expand Up @@ -38,6 +38,7 @@
#include "lib/bsock.h"
#include "lib/edit.h"
#include "lib/parse_bsr.h"
#include "lib/parse_conf.h"
#include "lib/util.h"
#include "lib/compression.h"
#include "include/jcr.h"
Expand Down Expand Up @@ -170,39 +171,46 @@ bool job_cmd(JobControlRecord* jcr)
return true;
}

bool DoJobRun(JobControlRecord* jcr)
static void WaitClient(JobControlRecord* jcr, utime_t wait_time)
{
struct timeval tv;
struct timezone tz;
struct timespec timeout;
int errstat = 0;
auto timeout
= std::chrono::system_clock::now() + std::chrono::seconds(wait_time);
auto locked = jcr->sd_impl->client_available.lock();

locked.wait_until(jcr->sd_impl->job_start_wait, timeout, [jcr](bool started) {
return started || jcr->IsJobCanceled();
});
}

static void WaitFD(JobControlRecord* jcr)
{
jcr->sendJobStatus(JS_WaitFD); /* wait for FD to connect */

gettimeofday(&tv, &tz);
timeout.tv_nsec = tv.tv_usec * 1000;
timeout.tv_sec = tv.tv_sec + me->client_wait;
utime_t wait_time = [] {
ResLocker _{my_config};
return me->client_wait;
}();

if (wait_time == 0) {
Dmsg3(100, "Client Connect Wait was set to 0; Setting to 1800s instead.\n");
wait_time = 1800;
}

Dmsg3(50, "%s waiting %d sec for FD to contact SD key=%s\n", jcr->Job,
(int)(timeout.tv_sec - time(NULL)), jcr->sd_auth_key);
wait_time, jcr->sd_auth_key);
Dmsg2(800, "Wait FD for jid=%d %p\n", jcr->JobId, jcr);

/* Wait for the File daemon to contact us to start the Job,
* when he does, we will be released, unless the 30 minutes
* expires. */
lock_mutex(mutex);
while (!jcr->authenticated && !jcr->IsJobCanceled()) {
errstat = pthread_cond_timedwait(&jcr->sd_impl->job_start_wait, &mutex,
&timeout);
if (errstat == ETIMEDOUT || errstat == EINVAL || errstat == EPERM) {
break;
}
Dmsg1(800, "=== Auth cond errstat=%d\n", errstat);
}
Dmsg3(50, "Auth=%d canceled=%d errstat=%d\n", jcr->authenticated,
jcr->IsJobCanceled(), errstat);
unlock_mutex(mutex);
Dmsg2(800, "Auth fail or cancel for jid=%d %p\n", jcr->JobId, jcr);
* when he does, we will be released, unless the me->client_wait seconds
* (default: 1800 seconds = 30 minutes) expires. */
WaitClient(jcr, wait_time);
}

bool DoJobRun(JobControlRecord* jcr)
{
WaitFD(jcr);

Dmsg2(50, "Auth=%d canceled=%d\n", jcr->authenticated, jcr->IsJobCanceled());

memset(jcr->sd_auth_key, 0, strlen(jcr->sd_auth_key));
switch (jcr->getJobProtocol()) {
Expand All @@ -219,6 +227,8 @@ bool DoJobRun(JobControlRecord* jcr)
lock_mutex(mutex);
pthread_cond_wait(&jcr->sd_impl->job_end_wait, &mutex);
unlock_mutex(mutex);
} else {
Dmsg2(800, "Auth fail or cancel for jid=%d %p\n", jcr->JobId, jcr);
}
Dmsg2(800, "Done jid=%d %p\n", jcr->JobId, jcr);

Expand All @@ -230,6 +240,8 @@ bool DoJobRun(JobControlRecord* jcr)
if (jcr->authenticated && !jcr->IsJobCanceled()) {
Dmsg2(800, "Running jid=%d %p\n", jcr->JobId, jcr);
RunJob(jcr); /* Run the job */
} else {
Dmsg2(800, "Auth fail or cancel for jid=%d %p\n", jcr->JobId, jcr);
}
Dmsg2(800, "Done jid=%d %p\n", jcr->JobId, jcr);

Expand All @@ -243,10 +255,6 @@ bool nextRunCmd(JobControlRecord* jcr)
{
char auth_key[MAX_NAME_LENGTH];
BareosSocket* dir = jcr->dir_bsock;
struct timeval tv;
struct timezone tz;
struct timespec timeout;
int errstat = 0;

switch (jcr->getJobProtocol()) {
case PT_NDMP_BAREOS:
Expand All @@ -266,29 +274,7 @@ bool nextRunCmd(JobControlRecord* jcr)
memset(auth_key, 0, sizeof(auth_key));
Dmsg2(50, ">dird jid=%u: %s", (uint32_t)jcr->JobId, dir->msg);

jcr->sendJobStatus(JS_WaitFD); /* wait for FD to connect */

gettimeofday(&tv, &tz);
timeout.tv_nsec = tv.tv_usec * 1000;
timeout.tv_sec = tv.tv_sec + me->client_wait;

Dmsg3(50, "%s waiting %d sec for FD to contact SD key=%s\n", jcr->Job,
(int)(timeout.tv_sec - time(NULL)), jcr->sd_auth_key);
Dmsg2(800, "Wait FD for jid=%d %p\n", jcr->JobId, jcr);

lock_mutex(mutex);
while (!jcr->authenticated && !jcr->IsJobCanceled()) {
errstat = pthread_cond_timedwait(&jcr->sd_impl->job_start_wait, &mutex,
&timeout);
if (errstat == ETIMEDOUT || errstat == EINVAL || errstat == EPERM) {
break;
}
Dmsg1(800, "=== Auth cond errstat=%d\n", errstat);
}
Dmsg3(50, "Auth=%d canceled=%d errstat=%d\n", jcr->authenticated,
jcr->IsJobCanceled(), errstat);
unlock_mutex(mutex);
Dmsg2(800, "Auth fail or cancel for jid=%d %p\n", jcr->JobId, jcr);
WaitFD(jcr);

if (jcr->authenticated && !jcr->IsJobCanceled()) {
Dmsg2(800, "Running jid=%d %p\n", jcr->JobId, jcr);
Expand All @@ -302,6 +288,8 @@ bool nextRunCmd(JobControlRecord* jcr)
lock_mutex(mutex);
pthread_cond_wait(&jcr->sd_impl->job_end_wait, &mutex);
unlock_mutex(mutex);
} else {
Dmsg2(800, "Auth fail or cancel for jid=%d %p\n", jcr->JobId, jcr);
}
Dmsg2(800, "Done jid=%d %p\n", jcr->JobId, jcr);

Expand Down Expand Up @@ -432,7 +420,6 @@ void StoredFreeJcr(JobControlRecord* jcr)
Emsg0(M_FATAL, 0, T_("In FreeJcr(), but still attached to device!!!!\n"));
}

pthread_cond_destroy(&jcr->sd_impl->job_start_wait);
pthread_cond_destroy(&jcr->sd_impl->job_end_wait);

// Avoid a double free
Expand Down
3 changes: 2 additions & 1 deletion core/src/stored/ndmp_tape.cc
Expand Up @@ -545,7 +545,8 @@ extern "C" ndmp9_error bndmp_tape_open(struct ndm_session* sess,
/* There is a native storage daemon session waiting for the FD to connect.
* In NDMP terms this is the same as a FD connecting so wake any waiting
* threads. */
pthread_cond_signal(&jcr->sd_impl->job_start_wait);
*jcr->sd_impl->client_available.lock() = true;
jcr->sd_impl->job_start_wait.notify_one();

/* Save the JobControlRecord to ndm_session binding so everything furher
* knows which JobControlRecord belongs to which NDMP session. We have
Expand Down
23 changes: 9 additions & 14 deletions core/src/stored/sd_cmds.cc
Expand Up @@ -2,7 +2,7 @@
BAREOS - Backup Archiving REcovery Open Sourced
Copyright (C) 2012 Planets Communications B.V.
Copyright (C) 2013-2023 Bareos GmbH & Co. KG
Copyright (C) 2013-2024 Bareos GmbH & Co. KG
This program is Free Software; you can redistribute it and/or
modify it under the terms of version three of the GNU Affero General Public
Expand Down Expand Up @@ -46,8 +46,6 @@

namespace storagedaemon {

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

/* Imported variables */

/* Static variables */
Expand Down Expand Up @@ -127,7 +125,7 @@ void* handle_stored_connection(BareosSocket* sd, char* job_name)
Dmsg1(50, "Authentication failed Job %s\n", jcr->Job);
Jmsg(jcr, M_FATAL, 0, T_("Unable to authenticate Storage daemon\n"));
} else {
jcr->authenticated = true;
*jcr->sd_impl->client_available.lock() = true;
Dmsg2(50, "OK Authentication jid=%u Job %s\n", (uint32_t)jcr->JobId,
jcr->Job);
}
Expand All @@ -136,7 +134,7 @@ void* handle_stored_connection(BareosSocket* sd, char* job_name)
jcr->setJobStatusWithPriorityCheck(JS_ErrorTerminated);
}

pthread_cond_signal(&jcr->sd_impl->job_start_wait); /* wake waiting job */
jcr->sd_impl->job_start_wait.notify_one(); /* wake waiting job */
FreeJcr(jcr);

return NULL;
Expand Down Expand Up @@ -209,7 +207,6 @@ static void DoSdCommands(JobControlRecord* jcr)
bool DoListenRun(JobControlRecord* jcr)
{
char ec1[30];
int errstat = 0;
BareosSocket* dir = jcr->dir_bsock;

jcr->sendJobStatus(JS_WaitSD); /* wait for SD to connect */
Expand All @@ -220,15 +217,13 @@ bool DoListenRun(JobControlRecord* jcr)

/* Wait for the Storage daemon to contact us to start the Job, when he does,
* we will be released. */
lock_mutex(mutex);
while (!jcr->authenticated && !jcr->IsJobCanceled()) {
errstat = pthread_cond_wait(&jcr->sd_impl->job_start_wait, &mutex);
if (errstat == EINVAL || errstat == EPERM) { break; }
Dmsg1(800, "=== Auth cond errstat=%d\n", errstat);
{
auto locked = jcr->sd_impl->client_available.lock();
locked.wait(jcr->sd_impl->job_start_wait, [jcr](bool started) {
return started || jcr->IsJobCanceled();
});
}
Dmsg3(50, "Auth=%d canceled=%d errstat=%d\n", jcr->authenticated,
jcr->IsJobCanceled(), errstat);
unlock_mutex(mutex);
Dmsg3(50, "Auth=%d canceled=%d\n", jcr->authenticated, jcr->IsJobCanceled());

if (!jcr->authenticated || !jcr->store_bsock) {
Dmsg2(800, "Auth fail or cancel for jid=%d %p\n", jcr->JobId, jcr);
Expand Down
9 changes: 0 additions & 9 deletions core/src/stored/stored.cc
Expand Up @@ -500,15 +500,6 @@ extern "C" void* device_initialization(void*)
NewPlugins(jcr); /* instantiate plugins */
jcr->setJobType(JT_SYSTEM);

// Initialize job start condition variable
errstat = pthread_cond_init(&jcr->sd_impl->job_start_wait, nullptr);
if (errstat != 0) {
BErrNo be;
Jmsg1(jcr, M_ABORT, 0,
T_("Unable to init job start cond variable: ERR=%s\n"),
be.bstrerror(errstat));
}

// Initialize job end condition variable
errstat = pthread_cond_init(&jcr->sd_impl->job_end_wait, nullptr);
if (errstat != 0) {
Expand Down
6 changes: 4 additions & 2 deletions core/src/stored/stored_jcr_impl.h
Expand Up @@ -3,7 +3,7 @@
Copyright (C) 2000-2012 Free Software Foundation Europe e.V.
Copyright (C) 2011-2012 Planets Communications B.V.
Copyright (C) 2013-2022 Bareos GmbH & Co. KG
Copyright (C) 2013-2024 Bareos GmbH & Co. KG
This program is Free Software; you can redistribute it and/or
modify it under the terms of version three of the GNU Affero General Public
Expand All @@ -26,6 +26,7 @@

#include "stored/read_ctx.h"
#include "stored/stored_conf.h"
#include "lib/thread_util.h"

#define SD_APPEND 1
#define SD_READ 0
Expand Down Expand Up @@ -68,8 +69,9 @@ struct DeviceWaitTimes {
struct StoredJcrImpl {
JobControlRecord* next_dev{}; /**< Next JobControlRecord attached to device */
JobControlRecord* prev_dev{}; /**< Previous JobControlRecord attached to device */
pthread_cond_t job_start_wait = PTHREAD_COND_INITIALIZER; /**< Wait for FD to start Job */
pthread_cond_t job_end_wait = PTHREAD_COND_INITIALIZER; /**< Wait for Job to end */
synchronized<bool> client_available;
std::condition_variable job_start_wait; /**< Wait for Client (FD/SD) to start Job */
storagedaemon::DeviceControlRecord* read_dcr{}; /**< Device context for reading */
storagedaemon::DeviceControlRecord* dcr{}; /**< Device context record */
POOLMEM* job_name{}; /**< Base Job name (not unique) */
Expand Down

0 comments on commit 20682c9

Please sign in to comment.