From 8490ffdfb4996b5fd08548aae93d2d25005a5f26 Mon Sep 17 00:00:00 2001 From: Ryan Connell Date: Wed, 14 Mar 2018 00:18:44 +0000 Subject: [PATCH] Current progress --- master/src/common/task.rs | 3 + master/src/scheduling/scheduler.rs | 4 + master/src/scheduling/state.rs | 14 ++- master/src/worker_management/state.rs | 121 +++++++++++++++++++++----- proto/worker.proto | 2 + worker/src/master_interface.rs | 9 +- worker/src/operations/mod.rs | 2 +- worker/src/operations/reduce.rs | 13 ++- worker/src/worker_interface.rs | 23 ++--- 9 files changed, 145 insertions(+), 46 deletions(-) diff --git a/master/src/common/task.rs b/master/src/common/task.rs index 8686b591..05a72a79 100644 --- a/master/src/common/task.rs +++ b/master/src/common/task.rs @@ -42,6 +42,7 @@ pub struct Task { pub id: String, pub job_priority: u32, + pub has_completed_before: bool, // This will only exist if TaskType is Map. pub map_request: Option, @@ -88,6 +89,7 @@ impl Task { id: id, job_priority: job_priority, + has_completed_before: false, map_request: Some(map_request), reduce_request: None, @@ -170,6 +172,7 @@ impl Task { id: id, job_priority: job_priority, + has_completed_before: false, map_request: None, reduce_request: Some(reduce_request), diff --git a/master/src/scheduling/scheduler.rs b/master/src/scheduling/scheduler.rs index dad15532..7a82b99a 100644 --- a/master/src/scheduling/scheduler.rs +++ b/master/src/scheduling/scheduler.rs @@ -54,6 +54,10 @@ impl Scheduler { let reduce_tasks_required = state.reduce_tasks_required(&task.job_id).chain_err( || "Error processing completed task result", )?; + println!( + "[][][][] Reduce Tasks Required? => {}", + reduce_tasks_required + ); if reduce_tasks_required { info!("Creating reduce tasks for job {}", task.job_id); diff --git a/master/src/scheduling/state.rs b/master/src/scheduling/state.rs index daab111b..c1796bfe 100644 --- a/master/src/scheduling/state.rs +++ b/master/src/scheduling/state.rs @@ -234,6 +234,9 @@ impl State { None => return Err(format!("Job with ID {} is not found.", &job_id).into()), }; + println!("completed ({}) == total ({})", scheduled_job.job.map_tasks_completed, + scheduled_job.job.map_tasks_total); + println!("reduce_tasks_total = {}", scheduled_job.job.reduce_tasks_total); if scheduled_job.job.map_tasks_completed == scheduled_job.job.map_tasks_total { return Ok(scheduled_job.job.reduce_tasks_total == 0); } @@ -254,11 +257,19 @@ impl State { None => return Err(format!("Job with ID {} is not found.", &task.job_id).into()), }; + println!("\t\tBEFORE => {}", scheduled_job.job.map_tasks_completed); + if task.has_completed_before && task.task_type == TaskType::Map { + scheduled_job.job.reduce_tasks_total = 0; + scheduled_job.job.reduce_tasks_completed = 0; + } + scheduled_job.job.cpu_time += task.cpu_time; if task.status == TaskStatus::Complete { match task.task_type { TaskType::Map => { - scheduled_job.job.map_tasks_completed += 1; + if !task.has_completed_before { + scheduled_job.job.map_tasks_completed += 1; + } } TaskType::Reduce => { scheduled_job.job.reduce_tasks_completed += 1; @@ -277,6 +288,7 @@ impl State { } } + println!("\t\tAFTER => {}", scheduled_job.job.map_tasks_completed); scheduled_job.tasks.insert(task.id.to_owned(), task); Ok(()) diff --git a/master/src/worker_management/state.rs b/master/src/worker_management/state.rs index 951a6fcb..89d18e21 100644 --- a/master/src/worker_management/state.rs +++ b/master/src/worker_management/state.rs @@ -5,7 +5,7 @@ use chrono::prelude::*; use serde_json; use cerberus_proto::worker as pb; -use common::{PriorityTask, Task, TaskStatus, Worker}; +use common::{PriorityTask, Task, TaskType, TaskStatus, Worker}; use errors::*; use state; @@ -464,41 +464,116 @@ impl State { format!("Unable to get queued tasks for job {}", current_task.job_id) })?; - /* - * When a worker fails we want to reschedule the Map task that it requires and cancel - * all current reduce tasks that depend on that map task. - * - * Do we want to consider it as a failure when a worker can't get intermediate - * data from another worker? - */ - - for task_id in queued_tasks { - let task = self.tasks.get(&task_id).chain_err(|| { - format!("Task with ID {} not found.", task_id) - })?; - println!("Task ID: {} | Task Type: {:?}", task_id, task.task_type); + // 1. Find all task that require this map task. + // 2. Remove them from the current queue. + // 3. Reschedule the map task. + let mut remove_tasks: Vec = Vec::new(); + { + let task_ids = self.completed_tasks.keys().clone(); + + for task_id in task_ids { + println!("\t\t\t-: checking {}", task_id); + let map_task = self.completed_tasks.get(task_id).chain_err(|| { + format!("Unable to get map task with ID {}", task_id) + })?; + + if !self.path_from_map_task(&map_task, request.path.clone()) { + continue; + } + + println!("*\n*\n*Task {} depends on {}", current_task.id, map_task.id); + + // Remove all tasks that require this map task from the queue. + for queued_task_id in queued_tasks.clone() { + let queued_task = self.tasks.get(&queued_task_id).chain_err(|| { + format!("Unable to get task {} from task queue", queued_task_id) + })?; + + if self.reduce_from_map_task(&map_task, &queued_task) { + remove_tasks.push(queued_task.id.clone()); + println!("Task {} | remove!", queued_task_id); + } else { + println!("Task {} |", queued_task_id); + } + println!("\t-> {:?}", queued_task.reduce_request); + } + + // Reschedule the map task. + let mut new_map_task = map_task.clone(); + new_map_task.status = TaskStatus::Queued; + new_map_task.has_completed_before = true; + self.tasks.insert(task_id.clone(), new_map_task.clone()); + self.priority_task_queue.push(PriorityTask::new( + task_id.to_owned(), + new_map_task.job_priority * FAILED_TASK_PRIORITY, + )); + println!("\n\n\n\tRescheduled map task: {:?}\n\n\n", new_map_task); + + } } + self.remove_tasks_from_queue(remove_tasks).chain_err( + || "Unable to remove task from queue", + )?; Ok(()) } - pub fn reschedule_map_task(&mut self, task_id: &str) -> Result<()> { - // We need to remove all reduce tasks that rely on the results of this map task. + // Returns true if the path was created by the given map task. + pub fn path_from_map_task(&self, map_task: &Task, path: String) -> bool { + for partition_key in map_task.map_output_files.keys() { + let partition = map_task.map_output_files.get(&partition_key).unwrap(); + if partition.ends_with(&path) { + return true; + } + } + false + } + + // Returns true if a reduce task stems from the given map task. + pub fn reduce_from_map_task(&self, map_task: &Task, reduce_task: &Task) -> bool { + if let Some(ref req) = reduce_task.reduce_request { + if !map_task.map_output_files.contains_key(&req.partition) { + return false; + } + + let partition = map_task.map_output_files.get(&req.partition).unwrap(); + + for file_path in req.get_input_file_paths() { + if partition == file_path { + return true; + } + } + } + false + } + + pub fn remove_tasks_from_queue(&mut self, task_ids: Vec) -> Result<()> { + println!("-> Removing tasks from queue: {:?}", task_ids); let mut new_priority_queue: BinaryHeap = BinaryHeap::new(); + for priority_task in self.priority_task_queue.drain() { if let Some(task) = self.tasks.get(&priority_task.id.clone()) { - - println!(""); + let mut can_push = true; + for task_id in task_ids.clone() { + if task.id == task_id { + can_push = false; + break; + } + } + if can_push { + new_priority_queue.push(priority_task); + } } } - self.priority_task_queue = new_priority_queue; - // We then need to reschedule the map task. - // TODO(rhino): Handle this better. - let map_task = PriorityTask::new(task_id.to_owned(), 1000); - self.priority_task_queue.push(map_task); + let mut new_new_priority_task_queue: BinaryHeap = BinaryHeap::new(); + for t in new_priority_queue.drain() { + println!("=> {:?}", t.id.clone()); + new_new_priority_task_queue.push(t); + } + self.priority_task_queue = new_new_priority_task_queue; Ok(()) } } diff --git a/proto/worker.proto b/proto/worker.proto index 5c2d8c87..8684e9fe 100644 --- a/proto/worker.proto +++ b/proto/worker.proto @@ -94,6 +94,8 @@ message ReportWorkerRequest { string worker_id = 1; // Address of the worker being reported. string report_address = 2; + // Path to the intermediate file. + string path = 3; } // ScheduleOperationService is used by the master to schedule Map and Reduce operations on the diff --git a/worker/src/master_interface.rs b/worker/src/master_interface.rs index 1d4409bd..2f245344 100644 --- a/worker/src/master_interface.rs +++ b/worker/src/master_interface.rs @@ -87,10 +87,15 @@ impl MasterInterface { Ok(()) } - pub fn report_worker(&self, worker_address: String) -> Result<()> { + pub fn report_worker( + &self, + worker_address: String, + intermediate_data_path: String, + ) -> Result<()> { let mut report_request = pb::ReportWorkerRequest::new(); report_request.set_worker_id(self.worker_id.read().unwrap().clone()); - report_request.set_report_address(worker_address.into()); + report_request.set_report_address(worker_address.clone().into()); + report_request.set_path(intermediate_data_path); self.client .report_worker(RequestOptions::new(), report_request) .wait() diff --git a/worker/src/operations/mod.rs b/worker/src/operations/mod.rs index 7e329d45..6f44033b 100644 --- a/worker/src/operations/mod.rs +++ b/worker/src/operations/mod.rs @@ -3,7 +3,7 @@ pub mod io; mod map; mod reduce; mod state; -mod operation_handler; +pub mod operation_handler; pub use self::operation_handler::OperationHandler; pub use self::operation_handler::OperationResources; diff --git a/worker/src/operations/reduce.rs b/worker/src/operations/reduce.rs index ea25b575..423273d2 100644 --- a/worker/src/operations/reduce.rs +++ b/worker/src/operations/reduce.rs @@ -182,15 +182,14 @@ fn send_reduce_result( fn create_reduce_operations( reduce_request: &pb::PerformReduceRequest, output_uuid: &str, - master_interface: &Arc, + resources: &OperationResources, ) -> Result> { let mut reduce_map: HashMap> = HashMap::new(); for reduce_input_file in reduce_request.get_input_file_paths() { // TODO: Run these operations in parallel as networks can be slow - let reduce_input = - WorkerInterface::get_data(reduce_input_file, output_uuid, master_interface) - .chain_err(|| "Couldn't read reduce input file")?; + let reduce_input = WorkerInterface::get_data(reduce_input_file, output_uuid, resources) + .chain_err(|| "Couldn't read reduce input file")?; let parsed_value: serde_json::Value = serde_json::from_str(&reduce_input).chain_err( || "Error parsing map response.", @@ -255,7 +254,6 @@ pub fn perform_reduce( let result = internal_perform_reduce(reduce_request, resources, output_uuid); if let Err(err) = result { log_reduce_operation_err(err, &resources.operation_state); - operation_handler::set_failed_status(&resources.operation_state); return Err("Error starting reduce operation.".into()); } @@ -268,9 +266,8 @@ fn internal_perform_reduce( resources: &OperationResources, output_uuid: &str, ) -> Result<()> { - let reduce_operations = - create_reduce_operations(reduce_request, output_uuid, &resources.master_interface) - .chain_err(|| "Error creating reduce operations from input.")?; + let reduce_operations = create_reduce_operations(reduce_request, output_uuid, resources) + .chain_err(|| "Error creating reduce operations from input.")?; let reduce_options = ReduceOptions { reducer_file_path: reduce_request.get_reducer_file_path().to_owned(), diff --git a/worker/src/worker_interface.rs b/worker/src/worker_interface.rs index 149461c8..635d829a 100644 --- a/worker/src/worker_interface.rs +++ b/worker/src/worker_interface.rs @@ -1,14 +1,14 @@ use std::net::SocketAddr; use std::str::FromStr; use std::path::Path; -use std::sync::Arc; use grpc::RequestOptions; use errors::*; use operations::io; -use master_interface::MasterInterface; +use operations::OperationResources; +use operations::operation_handler; use cerberus_proto::worker as pb; use cerberus_proto::worker_grpc as grpc_pb; use cerberus_proto::worker_grpc::IntermediateDataService; // For pub functions only @@ -21,7 +21,7 @@ impl WorkerInterface { pub fn get_data>( path: P, output_dir_uuid: &str, - master_interface: &Arc, + resources: &OperationResources, ) -> Result { let path_str = path.as_ref().to_string_lossy(); let split_path: Vec<&str> = path_str.splitn(2, '/').collect(); @@ -39,7 +39,7 @@ impl WorkerInterface { let mut req = pb::IntermediateDataRequest::new(); req.set_path(file.clone()); - let res = WorkerInterface::request_data(worker_addr, req, master_interface) + let res = WorkerInterface::request_data(worker_addr, req, resources) .chain_err(|| format!("Failed to get {} from {}", file, worker_addr))?; String::from_utf8(res.get_data().to_vec()).chain_err( @@ -50,7 +50,7 @@ impl WorkerInterface { pub fn request_data( worker_addr: SocketAddr, req: pb::IntermediateDataRequest, - master_interface: &Arc, + resources: &OperationResources, ) -> Result { // TODO: Add client store so we don't need to create a new client every time. let client = grpc_pb::IntermediateDataServiceClient::new_plain( @@ -74,14 +74,15 @@ impl WorkerInterface { // At this point we have failed to contact the worker 3 times and should report this // to the master. - master_interface - .report_worker(format!( - "{}:{}", - worker_addr.ip().to_string(), - worker_addr.port() - )) + resources + .master_interface + .report_worker( + format!("{}:{}", worker_addr.ip().to_string(), worker_addr.port()), + req.path, + ) .chain_err(|| "Unable to report worker")?; + operation_handler::set_cancelled_status(&resources.operation_state); Err("Unable to get intermediate data after 3 attempts".into()) } }