Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge pull request #291 from btovar/wq_deadlocks_2
Modified protocol for the transfer of task stdout from worker to prevent deadlock in the master-foreman interactions when both are streaming data:
1. Workers do not send results immediately, now they send a message "available_results".
2. When master sees "available_results", it pushes the worker into a list.
3. Eventually, the master goes through that list, sending "send_results n" to the respective workers, and waits for the workers (one by one) to respond. n is the maximum number of results to send. n == -1 means send all.
4. workers respond with at most n result messages, and a final "end" message.
  • Loading branch information
dpandiar committed Nov 6, 2013
2 parents 01ca257 + 0193f01 commit 35d199e
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 17 deletions.
71 changes: 66 additions & 5 deletions work_queue/src/work_queue.c
Expand Up @@ -118,6 +118,8 @@ struct work_queue {
struct hash_table *worker_table;
struct itable *worker_task_map;

struct list *workers_with_available_results;

int workers_in_state[WORKER_STATE_MAX];

INT64_T total_tasks_submitted;
Expand Down Expand Up @@ -204,7 +206,8 @@ static int start_task_on_worker(struct work_queue *q, struct work_queue_worker *
static void add_task_report(struct work_queue *q, struct work_queue_task *t );

static int process_workqueue(struct work_queue *q, struct work_queue_worker *w, const char *line);
static int process_result(struct work_queue *q, struct work_queue_worker *w, const char *line, time_t stoptime);
static int process_result(struct work_queue *q, struct work_queue_worker *w, const char *line);
static int process_available_results(struct work_queue *q, struct work_queue_worker *w, int max_count);
static int process_queue_status(struct work_queue *q, struct work_queue_worker *w, const char *line, time_t stoptime);
static int process_resource(struct work_queue *q, struct work_queue_worker *w, const char *line);

Expand Down Expand Up @@ -351,10 +354,11 @@ static int recv_worker_msg(struct work_queue *q, struct work_queue_worker *w, ch
result = 0;
} else if(string_prefix_is(line, "workqueue")) {
result = process_workqueue(q, w, line);
} else if (string_prefix_is(line,"result")) {
result = process_result(q, w, line, stoptime);
} else if (string_prefix_is(line,"queue_status") || string_prefix_is(line, "worker_status") || string_prefix_is(line, "task_status")) {
result = process_queue_status(q, w, line, stoptime);
} else if (string_prefix_is(line, "available_results")) {
list_push_tail(q->workers_with_available_results, w);
result = 0;
} else if (string_prefix_is(line, "resource")) {
result = process_resource(q, w, line);
} else if (string_prefix_is(line, "auth")) {
Expand Down Expand Up @@ -1021,7 +1025,7 @@ static int process_workqueue(struct work_queue *q, struct work_queue_worker *w,
return 0;
}

static int process_result(struct work_queue *q, struct work_queue_worker *w, const char *line, time_t stoptime) {
static int process_result(struct work_queue *q, struct work_queue_worker *w, const char *line) {

if(!q || !w || !line) return -1;

Expand All @@ -1034,6 +1038,8 @@ static int process_result(struct work_queue *q, struct work_queue_worker *w, con
timestamp_t observed_execution_time;
timestamp_t effective_stoptime = 0;

time_t stoptime;

//Format: result, output length, execution time, taskid
char items[3][WORK_QUEUE_PROTOCOL_FIELD_MAX];
int n = sscanf(line, "result %s %s %s %" SCNd64, items[0], items[1], items[2], &taskid);
Expand Down Expand Up @@ -1115,6 +1121,46 @@ static int process_result(struct work_queue *q, struct work_queue_worker *w, con
return 0;
}

static int process_available_results(struct work_queue *q, struct work_queue_worker *w, int max_count)
{
//max_count == -1, tells the worker to send all available results.

send_worker_msg(q, w, "send_results %d\n", max_count);

char line[WORK_QUEUE_LINE_MAX];

debug(D_WQ, "Reading result(s) from %s (%s)", w->hostname, w->addrport);

int i = 0;
while(1) {
int result = recv_worker_msg_retry(q, w, line, sizeof(line));
if(result < 0)
return result;

if(string_prefix_is(line,"result")) {
result = process_result(q, w, line);
if(result < 0)
return result;
i++;
} else if(!strcmp(line,"end")) {
//Only return success if last message is end.
break;
} else {
debug(D_WQ, "%s (%s): sent invalid response to send_results: %s",w->hostname,w->addrport,line);
return -1;
}

}

if(max_count > 0 && i > max_count)
{
debug(D_WQ, "%s (%s): sent %d results. At most %d were expected.",w->hostname,w->addrport, i, max_count);
return -1;
}

return 0;
}

/*
queue_to_nvpair examines the overall queue status and creates
an nvair which can be sent to the catalog or directly to the
Expand Down Expand Up @@ -2708,6 +2754,8 @@ struct work_queue *work_queue_create(int port)
q->worker_table = hash_table_create(0, 0);
q->worker_task_map = itable_create(0);

q->workers_with_available_results = list_create();

// The poll table is initially null, and will be created
// (and resized) as needed by build_poll_table.
q->poll_table_size = 8;
Expand Down Expand Up @@ -2916,6 +2964,8 @@ void work_queue_delete(struct work_queue *q)
itable_delete(q->running_tasks);
itable_delete(q->finished_tasks);
list_delete(q->complete_list);

list_delete(q->workers_with_available_results);

list_free(q->task_reports);
list_delete(q->task_reports);
Expand Down Expand Up @@ -3099,10 +3149,16 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo
handle_worker(q, q->poll_table[i].link);
}
}

while(list_size(q->workers_with_available_results) > 0)
{
struct work_queue_worker *w = list_pop_head(q->workers_with_available_results);
process_available_results(q, w, -1);
}

// Start tasks on ready workers
start_tasks(q);

// If any worker has sent a results message, retrieve the output files.
if(itable_size(q->finished_tasks)) {
struct work_queue_worker *w;
Expand Down Expand Up @@ -3261,6 +3317,11 @@ struct list * work_queue_cancel_all_tasks(struct work_queue *q) {
list_push_tail(l, t);
}

while(list_size(q->workers_with_available_results))
{
list_pop_head(q->workers_with_available_results);
}

hash_table_firstkey(q->worker_table);
while(hash_table_nextkey(q->worker_table, &key, (void**)&w)) {

Expand Down

0 comments on commit 35d199e

Please sign in to comment.