Skip to content

Commit

Permalink
sd reserve device: created separate classes for director messages
Browse files Browse the repository at this point in the history
- refactored UseDeviceCmd
- added: UseDeviceMessage, StorageDefinitionMessage
- removed some PoolMem variables
  • Loading branch information
franku committed Jul 3, 2018
1 parent 874785c commit e7bc565
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 46 deletions.
131 changes: 87 additions & 44 deletions core/src/stored/reserve.cc
Expand Up @@ -54,13 +54,6 @@ static bool UseDeviceCmd(JobControlRecord *jcr);
static void QueueReserveMessage(JobControlRecord *jcr);
static void PopReserveMessages(JobControlRecord *jcr);

/* Requests from the Director daemon */
static char use_storage[] =
"use storage=%127s media_type=%127s "
"pool_name=%127s pool_type=%127s append=%d copy=%d stripe=%d\n";
static char use_device[] =
"use device=%127s\n";

/* Responses sent to Director daemon */
static char OK_device[] =
"3000 OK use device device=%s\n";
Expand Down Expand Up @@ -166,7 +159,7 @@ void DeviceControlRecord::UnreserveDevice()
dev->Unlock();
}

static bool wiffle(JobControlRecord *jcr, int32_t append)
static bool wiffle(JobControlRecord *jcr, int32_t append, std::string dev_name)
{
ReserveContext reserve_context;
memset(&reserve_context, 0, sizeof(ReserveContext));
Expand Down Expand Up @@ -269,10 +262,78 @@ static bool wiffle(JobControlRecord *jcr, int32_t append)
LockReservations();
jcr->dir_bsock->signal(BNET_HEARTBEAT); /* Inform Dir that we are alive */
}

UnlockReservations();
return ok;

if (!ok) {
/*
* If we get here, there are no suitable devices available, which
* means nothing configured. If a device is suitable but busy
* with another Volume, we will not come here.
*/
UnbashSpaces(jcr->dir_bsock->msg);
PmStrcpy(jcr->errmsg, jcr->dir_bsock->msg);
Jmsg(jcr, M_FATAL, 0, _("Device reservation failed for JobId=%d: %s\n"),
jcr->JobId, jcr->errmsg);
jcr->dir_bsock->fsend(NO_device, dev_name.c_str());

Dmsg1(debuglevel, ">dird: %s", jcr->dir_bsock->msg);
return false;
}
return true;
}

StorageDefinitionMessage::StorageDefinitionMessage()
: is_valid(false)
, regex("^use storage=(.{1,127}) media_type=(.{1,127}) "
"pool_name=(.{1,127}) pool_type=(.{1,127})"
"append=([0-9]) copy=([0-9]) stripe=([0-9])\n")
{
return;
}

bool StorageDefinitionMessage::ParseMessage(std::string msg)
{
is_valid = false;
std::smatch sm;
std::string input(msg);
if (std::regex_match(input, sm, regex)) {
if (sm.size() == 8) {
StoreName = sm[1];
media_type = sm[2];
pool_name = sm[3];
pool_type = sm[4];
append = std::stoi(sm[5]);
Copy = std::stoi(sm[6]);
Stripe = std::stoi(sm[7]);
is_valid = true;
}
}
return is_valid;
}

UseDeviceMessage::UseDeviceMessage()
: is_valid(false)
, regex("use device=(.{1,127})\n")
{
return;
}

bool UseDeviceMessage::ParseMessage(std::string msg)
{
is_valid = false;
std::smatch sm;
std::string input(msg);
if (std::regex_match(input, sm, regex)) {
if (sm.size() == 2) {
dev_name = sm[1];
is_valid = true;
}
}
return is_valid;
}


/**
* We get the following type of information:
*
Expand All @@ -283,12 +344,10 @@ static bool wiffle(JobControlRecord *jcr, int32_t append)
* use storage=xxx media_type=yyy pool_name=xxx pool_type=yyy append=0 copy=0 strip=0
* use device=bbb
*/
#include <sstream>
static bool UseDeviceCmd(JobControlRecord *jcr)
{
PoolMem StoreName, dev_name, media_type, pool_name, pool_type;
int32_t append;
bool ok;
int32_t Copy, Stripe;
DirectorStorage *store;
alist *dirstore;

Expand All @@ -298,51 +357,49 @@ static bool UseDeviceCmd(JobControlRecord *jcr)
*/
dirstore = New(alist(10, not_owned_by_alist));
jcr->reserve_msgs = New(alist(10, not_owned_by_alist));
StorageDefinitionMessage storage_definition_message;
UseDeviceMessage use_device_message;
do {
Dmsg1(debuglevel, "<dird: %s", jcr->dir_bsock->msg);
ok = sscanf(jcr->dir_bsock->msg, use_storage, StoreName.c_str(),
media_type.c_str(), pool_name.c_str(),
pool_type.c_str(), &append, &Copy, &Stripe) == 7;
UnbashSpaces(jcr->dir_bsock->msg);

ok = storage_definition_message.ParseMessage(std::string(jcr->dir_bsock->msg));
if (!ok) {
break;
}
if (append) {
if (storage_definition_message.append) {
jcr->write_store = dirstore;
} else {
jcr->read_store = dirstore;
}
UnbashSpaces(StoreName);
UnbashSpaces(media_type);
UnbashSpaces(pool_name);
UnbashSpaces(pool_type);
store = new DirectorStorage;
dirstore->append(store);
memset(store, 0, sizeof(DirectorStorage));
store->device = New(alist(10));
bstrncpy(store->name, StoreName, sizeof(store->name));
bstrncpy(store->media_type, media_type, sizeof(store->media_type));
bstrncpy(store->pool_name, pool_name, sizeof(store->pool_name));
bstrncpy(store->pool_type, pool_type, sizeof(store->pool_type));
store->append = append;
bstrncpy(store->name, storage_definition_message.StoreName.c_str(), sizeof(store->name));
bstrncpy(store->media_type, storage_definition_message.media_type.c_str(), sizeof(store->media_type));
bstrncpy(store->pool_name, storage_definition_message.pool_name.c_str(), sizeof(store->pool_name));
bstrncpy(store->pool_type, storage_definition_message.pool_type.c_str(), sizeof(store->pool_type));
store->append = storage_definition_message.append;

/*
* Now get all devices
*/
while (jcr->dir_bsock->recv() >= 0) {
Dmsg1(debuglevel, "<dird device: %s", jcr->dir_bsock->msg);
ok = sscanf(jcr->dir_bsock->msg, use_device, dev_name.c_str()) == 1;
UnbashSpaces(jcr->dir_bsock->msg);
ok = use_device_message.ParseMessage(jcr->dir_bsock->msg);
if (!ok) {
break;
}
UnbashSpaces(dev_name);
store->device->append(bstrdup(dev_name.c_str()));
store->device->append(bstrdup(use_device_message.dev_name.c_str()));
}
} while (ok && jcr->dir_bsock->recv() >= 0);

InitJcrDeviceWaitTimers(jcr);
jcr->dcr = New(StorageDaemonDeviceControlRecord);
SetupNewDcrDevice(jcr, jcr->dcr, NULL, NULL);
if (append) {
if (storage_definition_message.append) {
jcr->dcr->SetWillWrite();
}

Expand All @@ -363,22 +420,8 @@ static bool UseDeviceCmd(JobControlRecord *jcr)
*/
if (ok) {

ok = wiffle(jcr, append);

if (!ok) {
/*
* If we get here, there are no suitable devices available, which
* means nothing configured. If a device is suitable but busy
* with another Volume, we will not come here.
*/
UnbashSpaces(jcr->dir_bsock->msg);
PmStrcpy(jcr->errmsg, jcr->dir_bsock->msg);
Jmsg(jcr, M_FATAL, 0, _("Device reservation failed for JobId=%d: %s\n"),
jcr->JobId, jcr->errmsg);
jcr->dir_bsock->fsend(NO_device, dev_name.c_str());
ok = wiffle(jcr, storage_definition_message.append, use_device_message.dev_name);

Dmsg1(debuglevel, ">dird: %s", jcr->dir_bsock->msg);
}
} else {
UnbashSpaces(jcr->dir_bsock->msg);
PmStrcpy(jcr->errmsg, jcr->dir_bsock->msg);
Expand Down
30 changes: 28 additions & 2 deletions core/src/stored/reserve.h
Expand Up @@ -36,8 +36,7 @@
* the media and pool info in the JobControlRecord. This class is used
* only temporarily in this file.
*/
class DirectorStorage {
public:
struct DirectorStorage {
alist *device;
bool append;
char name[MAX_NAME_LENGTH];
Expand Down Expand Up @@ -65,6 +64,33 @@ struct ReserveContext {
char VolumeName[MAX_NAME_LENGTH]; /**< Vol name suggested by DIR */
};

#include <regex>
class StorageDefinitionMessage
{
public:
StorageDefinitionMessage();
bool ParseMessage(std::string str);

std::string StoreName, media_type, pool_name, pool_type;
uint32_t append, Copy, Stripe;
bool is_valid;

private:
const std::regex regex;
};

class UseDeviceMessage
{
public:
UseDeviceMessage();
bool ParseMessage(std::string str);

std::string dev_name;
bool is_valid;

private:
const std::regex regex;
};

DLL_IMP_EXP void InitReservationsLock();
DLL_IMP_EXP void TermReservationsLock();
Expand Down

0 comments on commit e7bc565

Please sign in to comment.