Skip to content

Commit

Permalink
heartbeat: change api
Browse files Browse the repository at this point in the history
  • Loading branch information
sebsura authored and BareosBot committed Feb 21, 2024
1 parent 09316dc commit 80f2f65
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 179 deletions.
4 changes: 2 additions & 2 deletions core/src/filed/backup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ bool BlastDataToStorageDaemon(JobControlRecord* jcr, crypto_cipher_t cipher)
AccurateCheckFile);
}

StartHeartbeatMonitor(jcr);
auto hb_send = MakeHeartbeatMonitor(jcr);

if (have_acl) {
jcr->fd_impl->acl_data = std::make_unique<AclData>();
Expand Down Expand Up @@ -190,7 +190,7 @@ bool BlastDataToStorageDaemon(JobControlRecord* jcr, crypto_cipher_t cipher)

AccurateFinish(jcr); /* send deleted or base file list to SD */

StopHeartbeatMonitor(jcr);
hb_send.reset();

sd->signal(BNET_EOD); /* end of sending data */

Expand Down
52 changes: 27 additions & 25 deletions core/src/filed/dir_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2011,11 +2011,11 @@ static bool VerifyCmd(JobControlRecord* jcr)
case L_VERIFY_CATALOG:
DoVerify(jcr);
break;
case L_VERIFY_VOLUME_TO_CATALOG:
case L_VERIFY_VOLUME_TO_CATALOG: {
if (!OpenSdReadSession(jcr)) { return false; }
StartDirHeartbeat(jcr);
auto send_hb = MakeDirHeartbeat(jcr);
DoVerifyVolume(jcr);
StopDirHeartbeat(jcr);
send_hb.reset();
// Send Close session command to Storage daemon
sd->fsend(read_close, jcr->fd_impl->Ticket);
Dmsg1(130, "filed>stored: %s", sd->msg);
Expand All @@ -2025,8 +2025,7 @@ static bool VerifyCmd(JobControlRecord* jcr)

/* Inform Storage daemon that we are done */
sd->signal(BNET_TERMINATE);

break;
} break;
case L_VERIFY_DISK_TO_CATALOG:
DoVerify(jcr);
break;
Expand Down Expand Up @@ -2193,32 +2192,35 @@ static bool RestoreCmd(JobControlRecord* jcr)
jcr->setJobStatusWithPriorityCheck(JS_Running);

// Do restore of files and data
StartDirHeartbeat(jcr);
GeneratePluginEvent(jcr, bEventStartRestoreJob);
{
auto hb_send = MakeDirHeartbeat(jcr);

GeneratePluginEvent(jcr, bEventStartRestoreJob);

#if defined(WIN32_VSS)
// START VSS ON WIN32
if (jcr->fd_impl->pVSSClient) {
if (!jcr->fd_impl->pVSSClient->InitializeForRestore(jcr)) {
BErrNo be;
Jmsg(jcr, M_WARNING, 0,
T_("VSS was not initialized properly. VSS support is disabled. "
"ERR=%s\n"),
be.bstrerror());
}
// START VSS ON WIN32
if (jcr->fd_impl->pVSSClient) {
if (!jcr->fd_impl->pVSSClient->InitializeForRestore(jcr)) {
BErrNo be;
Jmsg(jcr, M_WARNING, 0,
T_("VSS was not initialized properly. VSS support is disabled. "
"ERR=%s\n"),
be.bstrerror());
}

GeneratePluginEvent(jcr, bEventVssRestoreLoadComponentMetadata);
GeneratePluginEvent(jcr, bEventVssRestoreLoadComponentMetadata);

RunScripts(
jcr, jcr->fd_impl->RunScripts, "ClientAfterVSS",
(jcr->fd_impl->director && jcr->fd_impl->director->allowed_script_dirs)
? jcr->fd_impl->director->allowed_script_dirs
: me->allowed_script_dirs);
}
RunScripts(jcr, jcr->fd_impl->RunScripts, "ClientAfterVSS",
(jcr->fd_impl->director
&& jcr->fd_impl->director->allowed_script_dirs)
? jcr->fd_impl->director->allowed_script_dirs
: me->allowed_script_dirs);
}
#endif

DoRestore(jcr);
StopDirHeartbeat(jcr);
DoRestore(jcr);
hb_send.reset();
}

if (jcr->JobWarnings) {
jcr->setJobStatusWithPriorityCheck(JS_Warnings);
Expand Down
6 changes: 1 addition & 5 deletions core/src/filed/filed_jcr_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Copyright (C) 2000-2012 Free Software Foundation Europe e.V.
Copyright (C) 2011-2012 Planets Communications B.V.
Copyright (C) 2013-2023 Bareos GmbH & Co. KG
Copyright (C) 2013-2024 Bareos GmbH & Co. KG
This program is Free Software; you can redistribute it and/or
modify it under the terms of version three of the GNU Affero General Public
Expand Down Expand Up @@ -72,10 +72,6 @@ struct FiledJcrImpl {
uint32_t StartBlock{};
uint32_t EndBlock{};
pthread_t heartbeat_id{}; /**< Id of heartbeat thread */
std::atomic<bool> hb_initialized_once{}; /**< Heartbeat initialized */
std::atomic<bool> hb_running{}; /**< Heartbeat running */
std::shared_ptr<BareosSocket> hb_bsock; /**< Duped SD socket */
std::shared_ptr<BareosSocket> hb_dir_bsock; /**< Duped DIR socket */
alist<RunScript*>* RunScripts{}; /**< Commands to run before and after job */
CryptoContext crypto; /**< Crypto ctx */
filedaemon::DirectorResource* director{}; /**< Director resource */
Expand Down
221 changes: 79 additions & 142 deletions core/src/filed/heartbeat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,199 +35,136 @@
#include "lib/bnet.h"
#include "lib/bsock.h"
#include "lib/watchdog.h"
#include "filed/heartbeat.h"

#include <functional>

namespace filedaemon {

#define WAIT_INTERVAL 5

extern "C" void* sd_heartbeat_thread(void* arg);
extern "C" void* dir_heartbeat_thread(void* arg);
extern bool no_signals;

/**
* Listen on the SD socket for heartbeat signals.
* Send heartbeats to the Director every HB_TIME
* seconds.
*/
extern "C" void* sd_heartbeat_thread(void* arg)
heartbeat_dir::heartbeat_dir(BareosSocket* sock, time_t interval)
: thread{std::mem_fn(&heartbeat_dir::send_heartbeat), this, sock, interval}
{
int32_t n;
JobControlRecord* jcr = (JobControlRecord*)arg;
std::shared_ptr<BareosSocket> sd, dir;
}

void heartbeat_dir::send_heartbeat(BareosSocket* sock, time_t interval)
{
sock->suppress_error_msgs_ = true;
time_t last_heartbeat = time(NULL);
time_t now;

pthread_detach(pthread_self());
while (!sock->IsStop() && !stop_requested) {
time_t now = time(NULL);
time_t since_last = now - last_heartbeat;
if (since_last >= interval) {
sock->signal(BNET_HEARTBEAT);
if (sock->IsStop() || stop_requested) { break; }
last_heartbeat = now;
Bmicrosleep(interval, 0);
} else {
Bmicrosleep(interval - since_last, 0);
}
}
sock->close();

delete sock;
}

// Get our own local copy
sd.reset(jcr->store_bsock->clone());
dir.reset(jcr->dir_bsock->clone());
heartbeat_dir::~heartbeat_dir()
{
stop_requested = true;
pthread_kill(thread.native_handle(), TIMEOUT_SIGNAL);
thread.join();
}

jcr->fd_impl->hb_bsock = sd;
jcr->fd_impl->hb_running = true;
jcr->fd_impl->hb_dir_bsock = dir;
heartbeat_sd_dir::heartbeat_sd_dir(BareosSocket* sd,
BareosSocket* dir,
time_t interval)
: thread{std::mem_fn(&heartbeat_sd_dir::send_heartbeat), this, sd, dir,
interval}
{
}

void heartbeat_sd_dir::send_heartbeat(BareosSocket* sd,
BareosSocket* dir,
time_t interval)
{
dir->suppress_error_msgs_ = true;
sd->suppress_error_msgs_ = true;
jcr->fd_impl->hb_initialized_once
= true; // initialize last to avoid race condition

time_t last_heartbeat = time(NULL);

/* Hang reading the socket to the SD, and every time we get
* a heartbeat or we get a wait timeout (5 seconds), we
* check to see if we need to send a heartbeat to the
* Director.
*/
while (!sd->IsStop()) {
n = BnetWaitDataIntr(sd.get(), WAIT_INTERVAL);
if (n < 0 || sd->IsStop()) { break; }
if (me->heartbeat_interval) {
now = time(NULL);
if (now - last_heartbeat >= me->heartbeat_interval) {
while (!sd->IsStop() && !stop_requested) {
std::int32_t n = BnetWaitDataIntr(sd, WAIT_INTERVAL);
if (n < 0 || sd->IsStop() || stop_requested) { break; }
if (interval) {
time_t now = time(NULL);
if (now - last_heartbeat >= interval) {
dir->signal(BNET_HEARTBEAT);
if (dir->IsStop()) { break; }
if (dir->IsStop() || stop_requested) { break; }
last_heartbeat = now;
}
}
if (n == 1) { /* input waiting */
sd->recv(); /* read it -- probably heartbeat from sd */
if (sd->IsStop()) { break; }
if (sd->IsStop() || stop_requested) { break; }
if (sd->message_length <= 0) {
Dmsg1(100, "Got BNET_SIG %d from SD\n", sd->message_length);
} else {
Dmsg2(100, "Got %d bytes from SD. MSG=%s\n", sd->message_length,
sd->msg);
}
}
Dmsg2(200, "wait_intr=%d stop=%d\n", n, IsBnetStop(sd.get()));
Dmsg2(200, "wait_intr=%d stop=%d\n", n, IsBnetStop(sd));
}

sd->close();
dir->close();
jcr->fd_impl->hb_bsock.reset();
jcr->fd_impl->hb_running = false;
jcr->fd_impl->hb_dir_bsock = NULL;

return NULL;
delete sd;
delete dir;
}

/* Startup the heartbeat thread -- see above */
void StartHeartbeatMonitor(JobControlRecord* jcr)
heartbeat_sd_dir::~heartbeat_sd_dir()
{
/* If no signals are set, do not start the heartbeat because
* it gives a constant stream of TIMEOUT_SIGNAL signals that
* make debugging impossible. */
if (!no_signals) {
jcr->fd_impl->hb_bsock = NULL;
jcr->fd_impl->hb_running = false;
jcr->fd_impl->hb_initialized_once = false;
jcr->fd_impl->hb_dir_bsock = NULL;
jcr->dir_bsock->SetLocking();
pthread_create(&jcr->fd_impl->heartbeat_id, NULL, sd_heartbeat_thread,
(void*)jcr);
}
stop_requested = true;
pthread_kill(thread.native_handle(), TIMEOUT_SIGNAL);
thread.join();
}

/* Terminate the heartbeat thread. Used for both SD and DIR */
void StopHeartbeatMonitor(JobControlRecord* jcr)
std::optional<heartbeat_sd_dir> MakeHeartbeatMonitor(JobControlRecord* jcr)
{
int cnt = 0;
if (no_signals) { return; }

/* Wait max 10 secs for heartbeat thread to start */
while (!jcr->fd_impl->hb_initialized_once && cnt++ < 200) {
Bmicrosleep(0, 50000); /* wait for start */
}

if (jcr->fd_impl->hb_running) {
jcr->fd_impl->hb_bsock->SetTimedOut(); /* set timed_out to Terminate read */
jcr->fd_impl->hb_bsock->SetTerminated(); /* set to Terminate read */
}

if (jcr->fd_impl->hb_dir_bsock) {
jcr->fd_impl->hb_dir_bsock
->SetTimedOut(); /* set timed_out to Terminate read */
jcr->fd_impl->hb_dir_bsock->SetTerminated(); /* set to Terminate read */
}

if (jcr->fd_impl->hb_running) {
Dmsg0(100, "Send kill to heartbeat id\n");
pthread_kill(jcr->fd_impl->heartbeat_id,
TIMEOUT_SIGNAL); /* make heartbeat thread go away */
Bmicrosleep(0, 50000);
}
cnt = 0;
if (no_signals) { return std::nullopt; }

// Wait max 100 secs for heartbeat thread to stop
while (jcr->fd_impl->hb_running && cnt++ < 200) {
pthread_kill(jcr->fd_impl->heartbeat_id,
TIMEOUT_SIGNAL); /* make heartbeat thread go away */
Bmicrosleep(0, 500000);
}
// both the heartbeat thread and normal jmsgs will want to write to the
// dir_bsock socket. To prevent data races we have to enable locking on that
// socket otherwise it will lead to bad problems.
jcr->dir_bsock->SetLocking();

if (jcr->fd_impl->hb_bsock) {
// delete jcr->fd_impl_->hb_bsock;
jcr->fd_impl->hb_bsock.reset();
}

if (jcr->fd_impl->hb_dir_bsock) {
// delete jcr->fd_impl_->hb_dir_bsock;
jcr->fd_impl->hb_dir_bsock.reset();
}

jcr->fd_impl->hb_initialized_once = false;
return std::optional<heartbeat_sd_dir>{
std::in_place, jcr->store_bsock->clone(), jcr->dir_bsock->clone(),
me->heartbeat_interval};
}

/**
* Thread for sending heartbeats to the Director when there
* is no SD monitoring needed -- e.g. restore and verify Vol
* both do their own read() on the SD socket.
*/
extern "C" void* dir_heartbeat_thread(void* arg)
std::optional<heartbeat_dir> MakeDirHeartbeat(JobControlRecord* jcr)
{
JobControlRecord* jcr = (JobControlRecord*)arg;
BareosSocket* dir;
time_t last_heartbeat = time(NULL);

pthread_detach(pthread_self());
auto interval = me->heartbeat_interval;
if (no_signals || !interval) { return std::nullopt; }

// Get our own local copy
dir = jcr->dir_bsock->clone();
// both the heartbeat thread and normal jmsgs will want to write to the
// dir_bsock socket. To prevent data races we have to enable locking on that
// socket otherwise it will lead to bad problems.
jcr->dir_bsock->SetLocking();

jcr->fd_impl->hb_bsock.reset(dir);
jcr->fd_impl->hb_running = true;
dir->suppress_error_msgs_ = true;
jcr->fd_impl->hb_initialized_once
= true; // initialize last to avoid race condition

while (!dir->IsStop()) {
time_t now, next;

now = time(NULL);
next = now - last_heartbeat;
if (next >= me->heartbeat_interval) {
dir->signal(BNET_HEARTBEAT);
if (dir->IsStop()) { break; }
last_heartbeat = now;
}
Bmicrosleep(next, 0);
}
dir->close();
jcr->fd_impl->hb_bsock.reset();
jcr->fd_impl->hb_running = false;
return NULL;
return std::optional<heartbeat_dir>{std::in_place, jcr->dir_bsock->clone(),
interval};
}

// Same as above but we don't listen to the SD
void StartDirHeartbeat(JobControlRecord* jcr)
{
if (me->heartbeat_interval) {
jcr->dir_bsock->SetLocking();
pthread_create(&jcr->fd_impl->heartbeat_id, NULL, dir_heartbeat_thread,
(void*)jcr);
}
}

void StopDirHeartbeat(JobControlRecord* jcr)
{
if (me->heartbeat_interval) { StopHeartbeatMonitor(jcr); }
}
} /* namespace filedaemon */

0 comments on commit 80f2f65

Please sign in to comment.