Skip to content

Commit

Permalink
Allow cancel by JobId on storage daemon.
Browse files Browse the repository at this point in the history
Sometimes the Director already removed the Job from its running queue
but the Storage daemon still thinks it doing a backup (or other Job)
and you cannot cancel the Job from within a console anymore. This piece
of code allows you to cancel a Storage Daemon Job by JobId. E.g. do
a status sd on the Storage Daemon make sure what Job you want to cancel
and issue a cancel storage=<Storage Daemon> Jobid=<JobId> This way you
can also remove a Job that blocks any other Jobs from running without
the need to restart the whole Storage Daemon.

Fixes #13: Allow cancel by JobId on storage daemon.
  • Loading branch information
mvwieringen authored and Marco van Wieringen committed May 5, 2013
1 parent fb2fda7 commit d72cca4
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 65 deletions.
1 change: 1 addition & 0 deletions src/dird/protos.h
Expand Up @@ -208,6 +208,7 @@ dlist *get_vol_list_from_SD(UAContext *ua, STORERES *store, bool listall, bool s
void free_vol_list(dlist *vol_list);
int get_num_slots_from_SD(UAContext *ua);
int get_num_drives_from_SD(UAContext *ua);
bool cancel_storage_daemon_job(UAContext *ua, STORERES *store, char *JobId);
bool cancel_storage_daemon_job(UAContext *ua, JCR *jcr, bool silent = false);
void cancel_storage_daemon_job(JCR *jcr);
void do_native_storage_status(UAContext *ua, STORERES *store, char *cmd);
Expand Down
66 changes: 53 additions & 13 deletions src/dird/sd_cmds.c
Expand Up @@ -59,13 +59,12 @@ bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
}

/*
* Open message channel with the Storage daemon
* Open message channel with the Storage daemon
*/
Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
store->SDport);
Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address, store->SDport);
sd->set_source_address(director->DIRsrc_addr);
if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
store->address, NULL, store->SDport, verbose)) {
store->address, NULL, store->SDport, verbose)) {
sd->destroy();
sd = NULL;
}
Expand All @@ -81,6 +80,7 @@ bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
jcr->store_bsock = NULL;
return false;
}

return true;
}

Expand All @@ -93,7 +93,7 @@ BSOCK *open_sd_bsock(UAContext *ua)

if (!ua->jcr->store_bsock) {
ua->send_msg(_("Connecting to Storage daemon %s at %s:%d ...\n"),
store->name(), store->address, store->SDport);
store->name(), store->address, store->SDport);
if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
ua->error_msg(_("Failed to connect to Storage daemon.\n"));
return NULL;
Expand Down Expand Up @@ -561,6 +561,39 @@ int get_num_drives_from_SD(UAContext *ua)
return drives;
}

/*
* Cancel a running job on a storage daemon. Used by the interactive cancel
* command to cancel a JobId on a Storage Daemon this can be used when the
* Director already removed the Job and thinks it finished but the Storage
* Daemon still thinks its active.
*/
bool cancel_storage_daemon_job(UAContext *ua, STORERES *store, char *JobId)
{
BSOCK *sd;
JCR *control_jcr;

control_jcr = new_control_jcr("*JobCancel*", JT_SYSTEM);

control_jcr->res.wstore = store;
if (!connect_to_storage_daemon(control_jcr, 10, SDConnectTimeout, 1)) {
ua->error_msg(_("Failed to connect to Storage daemon.\n"));
}

Dmsg0(200, "Connected to storage daemon\n");
sd = control_jcr->store_bsock;
sd->fsend("cancel Job=%s\n", JobId);
while (sd->recv() >= 0) {
ua->send_msg("%s", sd->msg);
}
sd->signal(BNET_TERMINATE);
sd->close();
control_jcr->store_bsock = NULL;

free_jcr(control_jcr);

return true;
}

/*
* Cancel a running job on a storage daemon. The silent flag sets
* if we need to be silent or not e.g. when doing an interactive cancel
Expand Down Expand Up @@ -621,19 +654,23 @@ bool cancel_storage_daemon_job(UAContext *ua, JCR *jcr, bool silent)
*/
void cancel_storage_daemon_job(JCR *jcr)
{
UAContext *ua;
JCR *control_jcr;

if (jcr->sd_canceled) {
return; /* cancel only once */
}

UAContext *ua = new_ua_context(jcr);
JCR *control_jcr = new_control_jcr("*JobCancel*", JT_SYSTEM);
ua = new_ua_context(jcr);
control_jcr = new_control_jcr("*JobCancel*", JT_SYSTEM);

ua->jcr = control_jcr;
if (jcr->store_bsock) {
if (!cancel_storage_daemon_job(ua, jcr, true)) {
goto bail_out;
}
}

bail_out:
free_jcr(control_jcr);
free_ua_context(ua);
Expand All @@ -654,11 +691,14 @@ void do_native_storage_status(UAContext *ua, STORERES *store, char *cmd)
/*
* Try connecting for up to 15 seconds
*/
if (!ua->api) ua->send_msg(_("Connecting to Storage daemon %s at %s:%d\n"),
store->name(), store->address, store->SDport);
if (!connect_to_storage_daemon(ua->jcr, 1, 15, 0)) {
if (!ua->api) {
ua->send_msg(_("Connecting to Storage daemon %s at %s:%d\n"),
store->name(), store->address, store->SDport);
}

if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 0)) {
ua->send_msg(_("\nFailed to connect to Storage daemon %s.\n====\n"),
store->name());
store->name());
if (ua->jcr->store_bsock) {
ua->jcr->store_bsock->close();
ua->jcr->store_bsock = NULL;
Expand Down Expand Up @@ -687,9 +727,9 @@ void do_native_storage_status(UAContext *ua, STORERES *store, char *cmd)
*/
bool transfer_volume(UAContext *ua, STORERES *store, int src_slot, int dst_slot)
{
BSOCK *sd = NULL;
bool retval = true;
char dev_name[MAX_NAME_LENGTH];
BSOCK *sd = NULL;

if (!(sd = open_sd_bsock(ua))) {
return false;
Expand Down Expand Up @@ -733,9 +773,9 @@ bool transfer_volume(UAContext *ua, STORERES *store, int src_slot, int dst_slot)
bool do_autochanger_volume_operation(UAContext *ua, STORERES *store,
const char *operation, int drive, int slot)
{
BSOCK *sd = NULL;
bool retval = true;
char dev_name[MAX_NAME_LENGTH];
BSOCK *sd = NULL;

if (!(sd = open_sd_bsock(ua))) {
return false;
Expand Down
64 changes: 45 additions & 19 deletions src/dird/ua_cmds.c
Expand Up @@ -106,7 +106,7 @@ static struct cmdstruct commands[] = {
{ NT_("automount"), automount_cmd, _("Automount after label"),
NT_("on | off"), false },
{ NT_("cancel"), cancel_cmd, _("Cancel a job"),
NT_("jobid=<jobid> job=<job-name> ujobid=<unique-jobid>"), false },
NT_("storage=<storage-name> jobid=<jobid> job=<job-name> ujobid=<unique-jobid>"), false },
{ NT_("create"), create_cmd, _("Create DB Pool from resource"),
NT_("pool=<pool-name>"), false },
{ NT_("delete"), delete_cmd, _("Delete volume, pool or job"),
Expand Down Expand Up @@ -312,10 +312,9 @@ static int add_cmd(UAContext *ua, const char *cmd)
STORERES *store;
int Slot = 0, InChanger = 0;

ua->send_msg(_(
"You probably don't want to be using this command since it\n"
"creates database records without labeling the Volumes.\n"
"You probably want to use the \"label\" command.\n\n"));
ua->send_msg(_("You probably don't want to be using this command since it\n"
"creates database records without labeling the Volumes.\n"
"You probably want to use the \"label\" command.\n\n"));

if (!open_client_db(ua)) {
return 1;
Expand All @@ -339,7 +338,7 @@ static int add_cmd(UAContext *ua, const char *cmd)
}

/* Get media type */
if ((store = get_storage_resource(ua, false/*no default*/)) != NULL) {
if ((store = get_storage_resource(ua, false /* no default */)) != NULL) {
bstrncpy(mr.MediaType, store->media_type, sizeof(mr.MediaType));
} else if (!get_media_type(ua, mr.MediaType, sizeof(mr.MediaType))) {
return 1;
Expand Down Expand Up @@ -472,13 +471,42 @@ int automount_cmd(UAContext *ua, const char *cmd)
*/
static int cancel_cmd(UAContext *ua, const char *cmd)
{
JCR *jcr = select_running_job(ua, "cancel");
if (!jcr) {
return 1;
int i, ret;
JCR *jcr;
STORERES *store;

/*
* See if we need to explicitly cancel a storage daemon Job.
*/
i = find_arg_with_value(ua, "storage");
if (i >= 0) {
store = get_storage_resource(ua, false /* no default */);
if (store) {
/*
* See what JobId to cancel on the storage daemon.
*/
i = find_arg_with_value(ua, "jobid");
if (i >= 0) {
if (!is_a_number(ua->argv[i])) {
ua->warning_msg(_("JobId %s not a number\n"), ua->argv[i]);
}

cancel_storage_daemon_job(ua, store, ua->argv[i]);
} else {
ua->warning_msg(_("Missing jobid=JobId specification\n"));
}
}
} else {
jcr = select_running_job(ua, "cancel");
if (!jcr) {
return 1;
}
ret = cancel_job(ua, jcr);
free_jcr(jcr);
return ret;
}
int ret = cancel_job(ua, jcr);
free_jcr(jcr);
return ret;

return 1;
}

/*
Expand All @@ -490,7 +518,6 @@ static int cancel_cmd(UAContext *ua, const char *cmd)
*
* Caution : RecyclePoolId isn't setup in this function.
* You can use set_pooldbr_recyclepoolid();
*
*/
void set_pooldbr_from_poolres(POOL_DBR *pr, POOLRES *pool, e_pool_op op)
{
Expand Down Expand Up @@ -1004,7 +1031,6 @@ static int setdebug_cmd(UAContext *ua, const char *cmd)
hangup = atoi(ua->argv[i]);
}


/* General debug? */
for (i=1; i<ua->argc; i++) {
if (bstrcasecmp(ua->argk[i], "all")) {
Expand Down Expand Up @@ -1045,7 +1071,7 @@ static int setdebug_cmd(UAContext *ua, const char *cmd)
return 1;
}
}
store = get_storage_resource(ua, false/*no default*/);
store = get_storage_resource(ua, false /* no default */);
if (store) {
switch (store->Protocol) {
case APT_NDMPV2:
Expand All @@ -1062,9 +1088,9 @@ static int setdebug_cmd(UAContext *ua, const char *cmd)
}
}
}

/*
* We didn't find an appropriate keyword above, so
* prompt the user.
* We didn't find an appropriate keyword above, so prompt the user.
*/
start_prompt(ua, _("Available daemons are: \n"));
add_prompt(ua, _("Director"));
Expand All @@ -1077,7 +1103,7 @@ static int setdebug_cmd(UAContext *ua, const char *cmd)
set_trace(trace_flag);
break;
case 1:
store = get_storage_resource(ua, false/*no default*/);
store = get_storage_resource(ua, false /* no default */);
if (store) {
switch (store->Protocol) {
case APT_NDMPV2:
Expand Down Expand Up @@ -1648,7 +1674,7 @@ static void do_mount_cmd(UAContext *ua, const char *cmd)
}
Dmsg2(120, "%s: %s\n", cmd, ua->UA_sock->msg);

store.store = get_storage_resource(ua, true/*arg is storage*/);
store.store = get_storage_resource(ua, true /* arg is storage */);
if (!store.store) {
return;
}
Expand Down
79 changes: 46 additions & 33 deletions src/stored/dircmd.c
Expand Up @@ -323,6 +323,7 @@ static bool cancel_cmd(JCR *cjcr)
BSOCK *dir = cjcr->dir_bsock;
int oldStatus;
char Job[MAX_NAME_LENGTH];
JobId_t JobId;
JCR *jcr;
int status;
const char *reason;
Expand All @@ -335,48 +336,60 @@ static bool cancel_cmd(JCR *cjcr)
goto bail_out;
}

if (!(jcr=get_jcr_by_full_name(Job))) {
dir->fsend(_("3904 Job %s not found.\n"), Job);
/*
* See if the Jobname is a number only then its a JobId.
*/
if (is_a_number(Job)) {
JobId = str_to_int64(Job);
if (!(jcr = get_jcr_by_id(JobId))) {
dir->fsend(_("3904 Job %s not found.\n"), Job);
goto bail_out;
}
} else {
oldStatus = jcr->JobStatus;
jcr->setJobStatus(status);

Dmsg2(800, "Cancel JobId=%d %p\n", jcr->JobId, jcr);
if (!jcr->authenticated && oldStatus == JS_WaitFD) {
pthread_cond_signal(&jcr->job_start_wait); /* wake waiting thread */
if (!(jcr = get_jcr_by_full_name(Job))) {
dir->fsend(_("3904 Job %s not found.\n"), Job);
goto bail_out;
}
}

if (jcr->file_bsock) {
jcr->file_bsock->set_terminated();
jcr->file_bsock->set_timed_out();
Dmsg2(800, "Term bsock jid=%d %p\n", jcr->JobId, jcr);
} else {
/* Still waiting for FD to connect, release it */
pthread_cond_signal(&jcr->job_start_wait); /* wake waiting job */
Dmsg2(800, "Signal FD connect jid=%d %p\n", jcr->JobId, jcr);
}
oldStatus = jcr->JobStatus;
jcr->setJobStatus(status);

/*
* If thread waiting on mount, wake him
*/
if (jcr->dcr && jcr->dcr->dev && jcr->dcr->dev->waiting_for_mount()) {
pthread_cond_broadcast(&jcr->dcr->dev->wait_next_vol);
Dmsg1(100, "JobId=%u broadcast wait_device_release\n", (uint32_t)jcr->JobId);
pthread_cond_broadcast(&wait_device_release);
}
Dmsg2(800, "Cancel JobId=%d %p\n", jcr->JobId, jcr);
if (!jcr->authenticated && oldStatus == JS_WaitFD) {
pthread_cond_signal(&jcr->job_start_wait); /* wake waiting thread */
}

if (jcr->read_dcr && jcr->read_dcr->dev && jcr->read_dcr->dev->waiting_for_mount()) {
pthread_cond_broadcast(&jcr->read_dcr->dev->wait_next_vol);
Dmsg1(100, "JobId=%u broadcast wait_device_release\n", (uint32_t)jcr->JobId);
pthread_cond_broadcast(&wait_device_release);
}
if (jcr->file_bsock) {
jcr->file_bsock->set_terminated();
jcr->file_bsock->set_timed_out();
Dmsg2(800, "Term bsock jid=%d %p\n", jcr->JobId, jcr);
} else {
/* Still waiting for FD to connect, release it */
pthread_cond_signal(&jcr->job_start_wait); /* wake waiting job */
Dmsg2(800, "Signal FD connect jid=%d %p\n", jcr->JobId, jcr);
}

pthread_cond_signal(&jcr->job_end_wait); /* wake waiting job */
/*
* If thread waiting on mount, wake him
*/
if (jcr->dcr && jcr->dcr->dev && jcr->dcr->dev->waiting_for_mount()) {
pthread_cond_broadcast(&jcr->dcr->dev->wait_next_vol);
Dmsg1(100, "JobId=%u broadcast wait_device_release\n", (uint32_t)jcr->JobId);
pthread_cond_broadcast(&wait_device_release);
}

dir->fsend(_("3000 JobId=%ld Job=\"%s\" marked to be %s.\n"), jcr->JobId, jcr->Job, reason);
free_jcr(jcr);
if (jcr->read_dcr && jcr->read_dcr->dev && jcr->read_dcr->dev->waiting_for_mount()) {
pthread_cond_broadcast(&jcr->read_dcr->dev->wait_next_vol);
Dmsg1(100, "JobId=%u broadcast wait_device_release\n", (uint32_t)jcr->JobId);
pthread_cond_broadcast(&wait_device_release);
}

pthread_cond_signal(&jcr->job_end_wait); /* wake waiting job */

dir->fsend(_("3000 JobId=%ld Job=\"%s\" marked to be %s.\n"), jcr->JobId, jcr->Job, reason);
free_jcr(jcr);

bail_out:
dir->signal(BNET_EOD);
return 1;
Expand Down

0 comments on commit d72cca4

Please sign in to comment.