Skip to content

Commit

Permalink
Addressed some review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanConnell committed Mar 31, 2018
1 parent 9e4f3e8 commit ac15d14
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 54 deletions.
86 changes: 37 additions & 49 deletions master/src/worker_management/state.rs
Expand Up @@ -5,7 +5,7 @@ use chrono::prelude::*;
use serde_json;

use cerberus_proto::worker as pb;
use common::{PriorityTask, Task, TaskStatus, Worker};
use common::{PriorityTask, Task, TaskStatus, TaskType, Worker};
use errors::*;
use util::state::StateHandling;

Expand Down Expand Up @@ -52,7 +52,7 @@ impl State {
Ok(())
}

pub fn get_tasks_for_job(&self, job_id: &str) -> Result<Vec<String>> {
fn get_in_progress_tasks_for_job(&self, job_id: &str) -> Result<Vec<String>> {
let mut tasks: Vec<String> = Vec::new();

for task_id in self.tasks.keys() {
Expand Down Expand Up @@ -475,17 +475,26 @@ impl State {
.chain_err(|| format!("Unable to get map task with ID {}", task_id))?
.clone();

let queued_tasks = self.get_tasks_for_job(&map_task.job_id).chain_err(|| {
format!("Unable to get queued tasks for job {}", map_task.job_id)
})?;
let queued_tasks = self.get_in_progress_tasks_for_job(&map_task.job_id)
.chain_err(|| {
format!("Unable to get queued tasks for job {}", map_task.job_id)
})?;

let mut remove_tasks: Vec<String> = Vec::new();
for queued_task_id in queued_tasks.clone() {
let queued_task = self.tasks.get(&queued_task_id).chain_err(|| {
format!("Unable to get task {} from task queue", queued_task_id)
})?;

if self.reduce_from_map_task(&map_task, &queued_task) {
if queued_task.task_type != TaskType::Reduce {
continue;
}

let from_map_task =
self.reduce_from_map_task(&map_task, &queued_task)
.chain_err(|| "Unable to determine if reduce stems from map task")?;

if from_map_task {
remove_tasks.push(queued_task.id.clone());
}
}
Expand Down Expand Up @@ -521,7 +530,7 @@ impl State {
self.tasks.contains_key(&worker.current_task_id)
};

let mut reschedule_tasks: Vec<String> = Vec::new();
let mut reschedule_task = String::new();
if reschedule {
let task_ids = self.completed_tasks.keys().clone();

Expand All @@ -530,17 +539,17 @@ impl State {
format!("Unable to get map task with ID {}", task_id)
})?;

if !self.path_from_map_task(&map_task, request.path.clone()) {
continue;
if self.path_from_map_task(&map_task, request.path.clone()) {
reschedule_task = map_task.id.clone();
break;
}

reschedule_tasks.push(map_task.id.clone());
}
}

for task in reschedule_tasks {
self.reschedule_map_task(&task).chain_err(|| {
format!("Unable to reschedule map task with ID {}", task)
if !reschedule_task.is_empty() {
self.reschedule_map_task(&reschedule_task).chain_err(|| {
format!("Unable to reschedule map task with ID {}", reschedule_task)
})?;
}

Expand All @@ -551,8 +560,7 @@ impl State {

// Returns true if the path was created by the given map task.
pub fn path_from_map_task(&self, map_task: &Task, path: String) -> bool {
for partition_key in map_task.map_output_files.keys() {
let partition = map_task.map_output_files.get(&partition_key).unwrap();
for partition in map_task.map_output_files.values() {
if partition.ends_with(&path) {
return true;
}
Expand All @@ -561,52 +569,32 @@ impl State {
}

// Returns true if a reduce task stems from the given map task.
pub fn reduce_from_map_task(&self, map_task: &Task, reduce_task: &Task) -> bool {
if let Some(ref req) = reduce_task.reduce_request {
if !map_task.map_output_files.contains_key(&req.partition) {
return false;
}

let partition = map_task.map_output_files.get(&req.partition).unwrap();

for file_path in req.get_input_file_paths() {
if partition == file_path {
return true;
}
}
pub fn reduce_from_map_task(&self, map_task: &Task, reduce_task: &Task) -> Result<bool> {
match reduce_task.reduce_request {
Some(ref req) => Ok(map_task.map_output_files.contains_key(&req.partition)),
None => Err(
format!(
"Unabale to get reduce request for task with ID {}",
map_task.id
).into(),
),
}
false
}

pub fn remove_tasks_from_queue(&mut self, task_ids: Vec<String>) -> Result<()> {
let mut new_priority_queue: BinaryHeap<PriorityTask> = BinaryHeap::new();

for task_id in task_ids.clone() {
self.tasks.remove(&task_id);
for task_id in &task_ids {
self.tasks.remove(task_id);
}

for priority_task in self.priority_task_queue.drain() {
if let Some(task) = self.tasks.get_mut(&priority_task.id.clone()) {
let mut can_push = true;
for task_id in task_ids.clone() {
if task.id == task_id {
task.status = TaskStatus::Cancelled;
can_push = false;
break;
}
}
if can_push {
new_priority_queue.push(priority_task);
}
if self.tasks.contains_key(&priority_task.id) {
new_priority_queue.push(priority_task);
}
}

let mut new_new_priority_task_queue: BinaryHeap<PriorityTask> = BinaryHeap::new();
for t in new_priority_queue.drain() {
new_new_priority_task_queue.push(t);
}

self.priority_task_queue = new_new_priority_task_queue;
self.priority_task_queue = new_priority_queue;
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions master/src/worker_management/worker_manager.rs
Expand Up @@ -406,8 +406,8 @@ impl WorkerManager {
}

pub fn handle_worker_report(&self, request: &pb::ReportWorkerRequest) -> Result<()> {
println!(
"Worker on '{}' was reported by '{}'",
info!(
"Worker on '{}' failed to provide map output data to '{}'",
request.report_address,
request.worker_id,
);
Expand Down
13 changes: 10 additions & 3 deletions worker/src/worker_interface.rs
Expand Up @@ -13,6 +13,8 @@ use cerberus_proto::worker as pb;
use cerberus_proto::worker_grpc as grpc_pb;
use cerberus_proto::worker_grpc::IntermediateDataService; // For pub functions only

const INTERMEDIATE_DATA_RETRIES: u8 = 3;

/// `WorkerInterface` is used to load data from other workers which have completed
/// their map tasks.
pub struct WorkerInterface;
Expand Down Expand Up @@ -61,7 +63,7 @@ impl WorkerInterface {
format!("Error building client for worker {}", worker_addr)
})?;

for _ in 0..3 {
for _ in 0..INTERMEDIATE_DATA_RETRIES {
let response = client
.get_intermediate_data(RequestOptions::new(), req.clone())
.wait();
Expand All @@ -71,7 +73,7 @@ impl WorkerInterface {
};
}

// At this point we have failed to contact the worker 3 times and should report this
// At this point we have failed to contact the worker multiple times and should report this
// to the master.

resources
Expand All @@ -83,6 +85,11 @@ impl WorkerInterface {
.chain_err(|| "Unable to report worker")?;

operation_handler::set_cancelled_status(&resources.operation_state);
Err("Unable to get intermediate data after 3 attempts".into())
Err(
format!(
"Unable to get intermediate data after {} attempts",
INTERMEDIATE_DATA_RETRIES
).into(),
)
}
}

0 comments on commit ac15d14

Please sign in to comment.