New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redo Map Tasks when Workers are unable to get Intermediate Data #404
Conversation
8490ffd
to
9e4f3e8
Compare
@@ -50,6 +52,20 @@ impl State { | |||
Ok(()) | |||
} | |||
|
|||
pub fn get_tasks_for_job(&self, job_id: &str) -> Result<Vec<String>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't need to be public. It should also specify that they are in progress tasks.
@@ -287,6 +308,19 @@ impl State { | |||
Ok(()) | |||
} | |||
|
|||
pub fn set_worker_operation_cancelled(&mut self, worker_id: &str) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the logic behind this? It seems like changing the workers status only on the master will lead to trouble when it tries to assign a busy worker a task and then kicks them from the cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state is also changed in the worker (when we call set_cancelled_state) but this function works similarly to the 'set_worker_operation_failed' and 'set_worker_operation_completed' functions in the same file, which are both called after we process worker map/reduce results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, okay sounds good.
} | ||
|
||
pub fn handle_worker_report(&mut self, request: &pb::ReportWorkerRequest) -> Result<()> { | ||
let worker_id = request.worker_id.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the worker id is necessary for this, I think instead a worker should send its current task id and we should simply check if that task still needs a response. The reason for this is the same reason that workers send their current task id in other places, the data the master has might be behind if it crashed and then reloaded an old version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept the worker_id as it's used in other places (such as setting the status to cancelled) but I have also added the task_id.
self.tasks.contains_key(&worker.current_task_id) | ||
}; | ||
|
||
let mut reschedule_tasks: Vec<String> = Vec::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this actually ever be more than a single map task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope
for partition_key in map_task.map_output_files.keys() { | ||
let partition = map_task.map_output_files.get(&partition_key).unwrap(); | ||
if partition.ends_with(&path) { | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can just iterate by value:
for partition in map_task.map_output_files.values() {
if partition.ends_with(&path) {
ect.
worker/src/operations/reduce.rs
Outdated
// If the task was cancelled, we shouldn't return an error here. | ||
if status == pb::OperationStatus::CANCELLED { | ||
return Ok(()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a weird special case caused by setting cancelled unnecessarily in the worker interface. Seems better to just leave the error chaining like normal.
format!("Unable to get task {} from task queue", queued_task_id) | ||
})?; | ||
|
||
if self.reduce_from_map_task(&map_task, &queued_task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should check the task type before calling this, since the function only applies to reduce tasks.
pub fn remove_tasks_from_queue(&mut self, task_ids: Vec<String>) -> Result<()> { | ||
let mut new_priority_queue: BinaryHeap<PriorityTask> = BinaryHeap::new(); | ||
|
||
for task_id in task_ids.clone() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this clone is unnecessary.
Can just do:
for task_id in &task_ids {
self.tasks.remove(task_id);
}
if task.id == task_id { | ||
task.status = TaskStatus::Cancelled; | ||
can_push = false; | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this ever possible? If a task is in task_ids, it will not be in self.tasks anymore.
let mut new_new_priority_task_queue: BinaryHeap<PriorityTask> = BinaryHeap::new(); | ||
for t in new_priority_queue.drain() { | ||
new_new_priority_task_queue.push(t); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary? Can't the first new_priority_queue be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not, it's part of some leftover debug code. Removed.
ac15d14
to
dd5db14
Compare
fn get_in_progress_tasks_for_job(&self, job_id: &str) -> Result<Vec<String>> { | ||
let mut tasks: Vec<String> = Vec::new(); | ||
|
||
for task_id in self.tasks.keys() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can just iterate over values here.
new_map_task.job_priority * FAILED_TASK_PRIORITY, | ||
)); | ||
|
||
println!("Rescheduled map task with ID {}", new_map_task.id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think info! would be better here.
if self.tasks.contains_key(&request.task_id) { | ||
let task_ids = self.completed_tasks.keys().clone(); | ||
|
||
for task_id in task_ids { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can just iterate over values of completed_tasks here.
03eb406
to
b52f9ee
Compare
#404