Skip to content

Commit

Permalink
dird: split device reservation from sd job start
Browse files Browse the repository at this point in the history
  • Loading branch information
alaaeddineelamri authored and pstorz committed Nov 30, 2022
1 parent 9fb9bf2 commit b5fd7df
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 153 deletions.
4 changes: 2 additions & 2 deletions core/src/dird/backup.cc
Expand Up @@ -427,8 +427,8 @@ bool DoNativeBackup(JobControlRecord* jcr)
return false;
}

if (!StartStorageDaemonJob(jcr, nullptr,
jcr->dir_impl->res.write_storage_list)) {
if (!StartStorageDaemonJob(jcr)) { return false; }
if (!ReserveWriteDevice(jcr, jcr->dir_impl->res.write_storage_list)) {
return false;
}

Expand Down
22 changes: 10 additions & 12 deletions core/src/dird/migrate.cc
Expand Up @@ -696,9 +696,7 @@ static bool regex_find_jobids(JobControlRecord* jcr,

bail_out:
Dmsg2(dbglevel, "Count=%d Jobids=%s\n", ids->count, ids->list);
foreach_dlist (item, item_chain) {
free(item->item);
}
foreach_dlist (item, item_chain) { free(item->item); }
delete item_chain;
return ok;
}
Expand Down Expand Up @@ -1338,18 +1336,17 @@ static inline bool DoActualMigration(JobControlRecord* jcr)
}

// Now start a job with the Reading Storage daemon
if (!StartStorageDaemonJob(jcr, jcr->dir_impl->res.read_storage_list, NULL,
/* send_bsr */ true)) {
if (!StartStorageDaemonJob(jcr, true)) { goto bail_out; }
if (!ReserveReadDevice(jcr, jcr->dir_impl->res.read_storage_list)) {
goto bail_out;
}

Dmsg0(150, "Reading Storage daemon connection OK\n");

// Now start a job with the Writing Storage daemon

if (!StartStorageDaemonJob(mig_jcr, NULL,
mig_jcr->dir_impl->res.write_storage_list,
/* send_bsr */ false)) {
if (!StartStorageDaemonJob(mig_jcr, false)) { goto bail_out; }
if (!ReserveWriteDevice(mig_jcr,
mig_jcr->dir_impl->res.write_storage_list)) {
goto bail_out;
}

Expand All @@ -1369,9 +1366,10 @@ static inline bool DoActualMigration(JobControlRecord* jcr)
}

// Now start a job with the Storage daemon
if (!StartStorageDaemonJob(jcr, jcr->dir_impl->res.read_storage_list,
jcr->dir_impl->res.write_storage_list,
/* send_bsr */ true)) {
if (!StartStorageDaemonJob(jcr, true)
|| !ReserveReadAndWriteDevices(jcr,
jcr->dir_impl->res.read_storage_list,
jcr->dir_impl->res.write_storage_list)) {
FreePairedStorage(jcr);
return false;
}
Expand Down
299 changes: 176 additions & 123 deletions core/src/dird/msgchan.cc
Expand Up @@ -117,12 +117,183 @@ static inline bool SendBootstrapFileToSd(JobControlRecord* jcr,
return true;
}

bool ReserveReadDevice(JobControlRecord* jcr,
alist<StorageResource*>* read_storage)
{
BareosSocket* sd_socket = jcr->store_bsock;
StorageResource* storage = nullptr;
PoolMem device_name;
std::string StoreName{};
std::string pool_name{};
std::string pool_type{};
std::string media_type{};
bool ok = true;
int copy = 0;
int stripe = 0;
/*
* We have two loops here. The first comes from the
* Storage = associated with the Job, and we need
* to attach to each one.
* The inner loop loops over all the alternative devices
* associated with each Storage. It selects the first
* available one.
*
*/
// Do read side of storage daemon
if (read_storage) {
// For the moment, only migrate, copy and vbackup have rpool
if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY)
|| (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
pool_type = jcr->dir_impl->res.rpool->pool_type;
pool_name = jcr->dir_impl->res.rpool->resource_name_;
} else {
pool_type = jcr->dir_impl->res.pool->pool_type;
pool_name = jcr->dir_impl->res.pool->resource_name_;
}
BashSpaces(pool_type);
BashSpaces(pool_name);
foreach_alist (storage, read_storage) {
Dmsg1(100, "Rstore=%s\n", storage->resource_name_);
StoreName = storage->resource_name_;
BashSpaces(StoreName);
media_type = storage->media_type;
BashSpaces(media_type);
sd_socket->fsend(use_storage, StoreName.c_str(), media_type.c_str(),
pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
Dmsg1(100, "read_storage >stored: %s", sd_socket->msg);
DeviceResource* dev = nullptr;
/* Loop over alternative storage Devices until one is OK */
foreach_alist (dev, storage->device) {
PmStrcpy(device_name, dev->resource_name_);
BashSpaces(device_name);
sd_socket->fsend(use_device, device_name.c_str());
Dmsg1(100, ">stored: %s", sd_socket->msg);
}
sd_socket->signal(BNET_EOD); // end of Device
}
sd_socket->signal(BNET_EOD); // end of Storages
if (BgetDirmsg(sd_socket) > 0) {
Dmsg1(100, "<stored: %s", sd_socket->msg);
// ****FIXME**** save actual device name
ok = sscanf(sd_socket->msg, OK_device, device_name.c_str()) == 1;
} else {
ok = false;
}
if (ok) {
Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to read.\n"),
device_name.c_str());
}
}

if (!ok) {
if (jcr->store_bsock->msg[0]) {
Jmsg(jcr, M_FATAL, 0,
_("\n"
" Storage daemon didn't accept Device \"%s\" because:\n "
"%s\n"),
device_name.c_str(), jcr->store_bsock->msg);
} else {
Jmsg(jcr, M_FATAL, 0,
_("\n"
" Storage daemon didn't accept Device \"%s\" command.\n"),
device_name.c_str());
}
}

return ok;
}

bool ReserveWriteDevice(JobControlRecord* jcr,
alist<StorageResource*>* write_storage)
{
PoolMem device_name;
std::string StoreName{};
std::string pool_name{};
std::string pool_type{};
std::string media_type{};
bool ok = true;
int copy = 0;
int stripe = 0;
/*
* We have two loops here. The first comes from the
* Storage = associated with the Job, and we need
* to attach to each one.
* The inner loop loops over all the alternative devices
* associated with each Storage. It selects the first
* available one.
*
*/
// Do write side of storage daemon
if (write_storage) {
pool_type = jcr->dir_impl->res.pool->pool_type;
pool_name = jcr->dir_impl->res.pool->resource_name_;
BashSpaces(pool_type);
BashSpaces(pool_name);
StorageResource* storage = nullptr;
foreach_alist (storage, write_storage) {
StoreName = storage->resource_name_;
BashSpaces(StoreName);
media_type = storage->media_type;
BashSpaces(media_type);
jcr->store_bsock->fsend(use_storage, StoreName.c_str(),
media_type.c_str(), pool_name.c_str(),
pool_type.c_str(), 1, copy, stripe);

Dmsg1(100, "write_storage >stored: %s", jcr->store_bsock->msg);
DeviceResource* dev = nullptr;
// Loop over alternative storage Devices until one is OK
foreach_alist (dev, storage->device) {
PmStrcpy(device_name, dev->resource_name_);
BashSpaces(device_name);
jcr->store_bsock->fsend(use_device, device_name.c_str());
Dmsg1(100, ">stored: %s", jcr->store_bsock->msg);
}
jcr->store_bsock->signal(BNET_EOD); // end of Devices
}
jcr->store_bsock->signal(BNET_EOD); // end of Storages
if (BgetDirmsg(jcr->store_bsock) > 0) {
Dmsg1(100, "<stored: %s", jcr->store_bsock->msg);
// ****FIXME**** save actual device name
ok = sscanf(jcr->store_bsock->msg, OK_device, device_name.c_str()) == 1;
} else {
ok = false;
}
if (ok) {
Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to write.\n"),
device_name.c_str());
}
}
if (!ok) {
if (jcr->store_bsock->msg[0]) {
Jmsg(jcr, M_FATAL, 0,
_("\n"
" Storage daemon didn't accept Device \"%s\" because:\n "
"%s\n"),
device_name.c_str(), jcr->store_bsock->msg);
} else {
Jmsg(jcr, M_FATAL, 0,
_("\n"
" Storage daemon didn't accept Device \"%s\" command.\n"),
device_name.c_str());
}
}

return ok;
}

bool ReserveReadAndWriteDevices(JobControlRecord* jcr,
alist<StorageResource*>* read_storage,
alist<StorageResource*>* write_storage)
{
if (!ReserveReadDevice(jcr, read_storage)) { return false; }
if (!ReserveWriteDevice(jcr, write_storage)) { return false; }

return true;
}

/** Start a job with the Storage daemon
*/
bool StartStorageDaemonJob(JobControlRecord* jcr,
alist<StorageResource*>* read_storage,
alist<StorageResource*>* write_storage,
bool send_bsr)
bool StartStorageDaemonJob(JobControlRecord* jcr, bool send_bsr)
{
BareosSocket* sd_socket = jcr->store_bsock;

Expand Down Expand Up @@ -235,125 +406,7 @@ bool StartStorageDaemonJob(JobControlRecord* jcr,
if (!SendSecureEraseReqToSd(jcr)) {
Dmsg1(400, "Unexpected %s Secure Erase Reply\n", "SD");
}


StorageResource* storage = nullptr;
PoolMem device_name;
std::string StoreName;
std::string pool_name;
std::string pool_type;
std::string media_type;
bool ok = true;
int copy = 0;
int stripe = 0;
/*
* We have two loops here. The first comes from the
* Storage = associated with the Job, and we need
* to attach to each one.
* The inner loop loops over all the alternative devices
* associated with each Storage. It selects the first
* available one.
*
*/
// Do read side of storage daemon
if (read_storage) {
// For the moment, only migrate, copy and vbackup have rpool
if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY)
|| (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
pool_type = jcr->dir_impl->res.rpool->pool_type;
pool_name = jcr->dir_impl->res.rpool->resource_name_;
} else {
pool_type = jcr->dir_impl->res.pool->pool_type;
pool_name = jcr->dir_impl->res.pool->resource_name_;
}
BashSpaces(pool_type);
BashSpaces(pool_name);
foreach_alist (storage, read_storage) {
Dmsg1(100, "Rstore=%s\n", storage->resource_name_);
StoreName = storage->resource_name_;
BashSpaces(StoreName);
media_type = storage->media_type;
BashSpaces(media_type);
sd_socket->fsend(use_storage, StoreName.c_str(), media_type.c_str(),
pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
Dmsg1(100, "read_storage >stored: %s", sd_socket->msg);
DeviceResource* dev = nullptr;
/* Loop over alternative storage Devices until one is OK */
foreach_alist (dev, storage->device) {
PmStrcpy(device_name, dev->resource_name_);
BashSpaces(device_name);
sd_socket->fsend(use_device, device_name.c_str());
Dmsg1(100, ">stored: %s", sd_socket->msg);
}
sd_socket->signal(BNET_EOD); // end of Device
}
sd_socket->signal(BNET_EOD); // end of Storages
if (BgetDirmsg(sd_socket) > 0) {
Dmsg1(100, "<stored: %s", sd_socket->msg);
// ****FIXME**** save actual device name
ok = sscanf(sd_socket->msg, OK_device, device_name.c_str()) == 1;
} else {
ok = false;
}
if (ok) {
Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to read.\n"),
device_name.c_str());
}
}

// Do write side of storage daemon
if (ok && write_storage) {
pool_type = jcr->dir_impl->res.pool->pool_type;
pool_name = jcr->dir_impl->res.pool->resource_name_;
BashSpaces(pool_type);
BashSpaces(pool_name);
foreach_alist (storage, write_storage) {
StoreName = storage->resource_name_;
BashSpaces(StoreName);
media_type = storage->media_type;
BashSpaces(media_type);
sd_socket->fsend(use_storage, StoreName.c_str(), media_type.c_str(),
pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);

Dmsg1(100, "write_storage >stored: %s", sd_socket->msg);
DeviceResource* dev = nullptr;
// Loop over alternative storage Devices until one is OK
foreach_alist (dev, storage->device) {
PmStrcpy(device_name, dev->resource_name_);
BashSpaces(device_name);
sd_socket->fsend(use_device, device_name.c_str());
Dmsg1(100, ">stored: %s", sd_socket->msg);
}
sd_socket->signal(BNET_EOD); // end of Devices
}
sd_socket->signal(BNET_EOD); // end of Storages
if (BgetDirmsg(sd_socket) > 0) {
Dmsg1(100, "<stored: %s", sd_socket->msg);
// ****FIXME**** save actual device name
ok = sscanf(sd_socket->msg, OK_device, device_name.c_str()) == 1;
} else {
ok = false;
}
if (ok) {
Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to write.\n"),
device_name.c_str());
}
}
if (!ok) {
if (sd_socket->msg[0]) {
Jmsg(jcr, M_FATAL, 0,
_("\n"
" Storage daemon didn't accept Device \"%s\" because:\n "
"%s\n"),
device_name.c_str(), sd_socket->msg);
} else {
Jmsg(jcr, M_FATAL, 0,
_("\n"
" Storage daemon didn't accept Device \"%s\" command.\n"),
device_name.c_str());
}
}
return ok;
return true;
}

/** Start a thread to handle Storage daemon messages and
Expand Down
12 changes: 8 additions & 4 deletions core/src/dird/msgchan.h
Expand Up @@ -26,10 +26,14 @@ template <typename T> class alist;

namespace directordaemon {

bool StartStorageDaemonJob(JobControlRecord* jcr,
alist<StorageResource*>* read_storage,
alist<StorageResource*>* write_storage,
bool send_bsr = false);
bool StartStorageDaemonJob(JobControlRecord* jcr, bool send_bsr = false);
bool ReserveReadDevice(JobControlRecord* jcr,
alist<StorageResource*>* read_storage);
bool ReserveWriteDevice(JobControlRecord* jcr,
alist<StorageResource*>* write_storage);
bool ReserveReadAndWriteDevices(JobControlRecord* jcr,
alist<StorageResource*>* read_storage,
alist<StorageResource*>* write_storage);
bool StartStorageDaemonMessageThread(JobControlRecord* jcr);
int BgetDirmsg(BareosSocket* bs, bool allow_any_msg = false);
void WaitForStorageDaemonTermination(JobControlRecord* jcr);
Expand Down

0 comments on commit b5fd7df

Please sign in to comment.