Skip to content

Commit

Permalink
Add ocworker watchdog capability
Browse files Browse the repository at this point in the history
watchdog threads now restarts failed worker automatically and
restart their previous job.
  • Loading branch information
Saremox committed Aug 22, 2016
1 parent 7aa6792 commit d4a12de
Showing 1 changed file with 37 additions and 5 deletions.
42 changes: 37 additions & 5 deletions src/ocworker.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <stdlib.h>
#include <time.h>
#include <wait.h>
#include "ocworker.h"
#include "debug.h"
#include "ocsched/ocsched.h"
Expand Down Expand Up @@ -129,6 +130,10 @@ void* ocworker_worker_watchdog_loop(void* data)
ocworkerWatchdog* wdctx = (ocworkerWatchdog*) data;
ocworker* worker = wdctx->myworker;

worker->ctx =
ocsched_fork_process( ocworker_worker_process_loop,"worker",
wdctx->ctx->memfd);

char* name_fmt = "Watchdog[%d]";
size_t namesize = snprintf(NULL,0,name_fmt,worker->ctx->pid)+1;
char* name = malloc(namesize);
Expand All @@ -138,9 +143,26 @@ void* ocworker_worker_watchdog_loop(void* data)

pthread_setname_np(pthread_self(),name);
while (true) {
struct timespec sleeptimer = {0,25000000};
nanosleep(&sleeptimer,NULL);
if (worker->next_job != NULL && worker->cur_job == NULL &&
// Watchdog
int child_status = 0;
waitpid(worker->ctx->pid,&child_status,WNOHANG);

if(WIFSIGNALED(child_status))
{
log_warn("Watchdog[%d] CHILD HUNG: %d CODEC: %s JOBID: %d",
worker->ctx->pid,
WTERMSIG(child_status),
worker->cur_job->result->comp_id->codec_id->name,
worker->cur_job->jobid);
// Try to reanimate child
worker->next_job = worker->cur_job;
worker->cur_job = NULL;
worker->ctx =
ocsched_fork_process( ocworker_worker_process_loop,"worker",
wdctx->ctx->memfd);
}
// Job transfer
if (worker->cur_job == NULL && worker->next_job != NULL &&
wdctx->ctx->loadedfile == worker->next_job->result->file_id) {
debug("%s: Next Job is File \"%s\" with \"%s\" as codec",
name,
Expand All @@ -153,8 +175,7 @@ void* ocworker_worker_watchdog_loop(void* data)
debug("Send \"%s\" to %d",sendbuf,worker->ctx->pid);
free(sendbuf);
}

if(worker->cur_job != NULL)
else if(worker->cur_job != NULL)
{
memset(recvBuf, 0, 4096);
if(ocsched_recvfrom(worker->ctx, recvBuf, 4096) > 0)
Expand All @@ -164,10 +185,21 @@ void* ocworker_worker_watchdog_loop(void* data)
worker->last_job = worker->cur_job;
worker->cur_job = NULL;
}
else
{
struct timespec sleeptimer = {0,25000000};
nanosleep(&sleeptimer,NULL);
}
}
else if(worker->next_job == NULL)
{
struct timespec sleeptimer = {0,25000000};
nanosleep(&sleeptimer,NULL);
}
}
}


void* ocworker_schedule_worker(void* data)
{
ocworkerContext* ctx = (ocworkerContext*) data;\
Expand Down

0 comments on commit d4a12de

Please sign in to comment.