Skip to content

Commit

Permalink
Current progress
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanConnell committed Mar 14, 2018
1 parent 500e69a commit 8490ffd
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 46 deletions.
3 changes: 3 additions & 0 deletions master/src/common/task.rs
Expand Up @@ -42,6 +42,7 @@ pub struct Task {
pub id: String,

pub job_priority: u32,
pub has_completed_before: bool,

// This will only exist if TaskType is Map.
pub map_request: Option<pb::PerformMapRequest>,
Expand Down Expand Up @@ -88,6 +89,7 @@ impl Task {
id: id,

job_priority: job_priority,
has_completed_before: false,

map_request: Some(map_request),
reduce_request: None,
Expand Down Expand Up @@ -170,6 +172,7 @@ impl Task {
id: id,

job_priority: job_priority,
has_completed_before: false,

map_request: None,
reduce_request: Some(reduce_request),
Expand Down
4 changes: 4 additions & 0 deletions master/src/scheduling/scheduler.rs
Expand Up @@ -54,6 +54,10 @@ impl Scheduler {
let reduce_tasks_required = state.reduce_tasks_required(&task.job_id).chain_err(
|| "Error processing completed task result",
)?;
println!(
"[][][][] Reduce Tasks Required? => {}",
reduce_tasks_required
);

if reduce_tasks_required {
info!("Creating reduce tasks for job {}", task.job_id);
Expand Down
14 changes: 13 additions & 1 deletion master/src/scheduling/state.rs
Expand Up @@ -234,6 +234,9 @@ impl State {
None => return Err(format!("Job with ID {} is not found.", &job_id).into()),
};

println!("completed ({}) == total ({})", scheduled_job.job.map_tasks_completed,
scheduled_job.job.map_tasks_total);
println!("reduce_tasks_total = {}", scheduled_job.job.reduce_tasks_total);
if scheduled_job.job.map_tasks_completed == scheduled_job.job.map_tasks_total {
return Ok(scheduled_job.job.reduce_tasks_total == 0);
}
Expand All @@ -254,11 +257,19 @@ impl State {
None => return Err(format!("Job with ID {} is not found.", &task.job_id).into()),
};

println!("\t\tBEFORE => {}", scheduled_job.job.map_tasks_completed);
if task.has_completed_before && task.task_type == TaskType::Map {
scheduled_job.job.reduce_tasks_total = 0;
scheduled_job.job.reduce_tasks_completed = 0;
}

scheduled_job.job.cpu_time += task.cpu_time;
if task.status == TaskStatus::Complete {
match task.task_type {
TaskType::Map => {
scheduled_job.job.map_tasks_completed += 1;
if !task.has_completed_before {
scheduled_job.job.map_tasks_completed += 1;
}
}
TaskType::Reduce => {
scheduled_job.job.reduce_tasks_completed += 1;
Expand All @@ -277,6 +288,7 @@ impl State {
}
}

println!("\t\tAFTER => {}", scheduled_job.job.map_tasks_completed);
scheduled_job.tasks.insert(task.id.to_owned(), task);

Ok(())
Expand Down
121 changes: 98 additions & 23 deletions master/src/worker_management/state.rs
Expand Up @@ -5,7 +5,7 @@ use chrono::prelude::*;
use serde_json;

use cerberus_proto::worker as pb;
use common::{PriorityTask, Task, TaskStatus, Worker};
use common::{PriorityTask, Task, TaskType, TaskStatus, Worker};
use errors::*;
use state;

Expand Down Expand Up @@ -464,41 +464,116 @@ impl State {
format!("Unable to get queued tasks for job {}", current_task.job_id)
})?;

/*
* When a worker fails we want to reschedule the Map task that it requires and cancel
* all current reduce tasks that depend on that map task.
*
* Do we want to consider it as a failure when a worker can't get intermediate
* data from another worker?
*/

for task_id in queued_tasks {
let task = self.tasks.get(&task_id).chain_err(|| {
format!("Task with ID {} not found.", task_id)
})?;
println!("Task ID: {} | Task Type: {:?}", task_id, task.task_type);
// 1. Find all task that require this map task.
// 2. Remove them from the current queue.
// 3. Reschedule the map task.
let mut remove_tasks: Vec<String> = Vec::new();
{
let task_ids = self.completed_tasks.keys().clone();

for task_id in task_ids {
println!("\t\t\t-: checking {}", task_id);
let map_task = self.completed_tasks.get(task_id).chain_err(|| {
format!("Unable to get map task with ID {}", task_id)
})?;

if !self.path_from_map_task(&map_task, request.path.clone()) {
continue;
}

println!("*\n*\n*Task {} depends on {}", current_task.id, map_task.id);

// Remove all tasks that require this map task from the queue.
for queued_task_id in queued_tasks.clone() {
let queued_task = self.tasks.get(&queued_task_id).chain_err(|| {
format!("Unable to get task {} from task queue", queued_task_id)
})?;

if self.reduce_from_map_task(&map_task, &queued_task) {
remove_tasks.push(queued_task.id.clone());
println!("Task {} | remove!", queued_task_id);
} else {
println!("Task {} |", queued_task_id);
}
println!("\t-> {:?}", queued_task.reduce_request);
}

// Reschedule the map task.
let mut new_map_task = map_task.clone();
new_map_task.status = TaskStatus::Queued;
new_map_task.has_completed_before = true;
self.tasks.insert(task_id.clone(), new_map_task.clone());
self.priority_task_queue.push(PriorityTask::new(
task_id.to_owned(),
new_map_task.job_priority * FAILED_TASK_PRIORITY,
));
println!("\n\n\n\tRescheduled map task: {:?}\n\n\n", new_map_task);

}
}

self.remove_tasks_from_queue(remove_tasks).chain_err(
|| "Unable to remove task from queue",
)?;

Ok(())
}

pub fn reschedule_map_task(&mut self, task_id: &str) -> Result<()> {
// We need to remove all reduce tasks that rely on the results of this map task.
// Returns true if the path was created by the given map task.
pub fn path_from_map_task(&self, map_task: &Task, path: String) -> bool {
for partition_key in map_task.map_output_files.keys() {
let partition = map_task.map_output_files.get(&partition_key).unwrap();
if partition.ends_with(&path) {
return true;
}
}
false
}

// Returns true if a reduce task stems from the given map task.
pub fn reduce_from_map_task(&self, map_task: &Task, reduce_task: &Task) -> bool {
if let Some(ref req) = reduce_task.reduce_request {
if !map_task.map_output_files.contains_key(&req.partition) {
return false;
}

let partition = map_task.map_output_files.get(&req.partition).unwrap();

for file_path in req.get_input_file_paths() {
if partition == file_path {
return true;
}
}
}
false
}

pub fn remove_tasks_from_queue(&mut self, task_ids: Vec<String>) -> Result<()> {
println!("-> Removing tasks from queue: {:?}", task_ids);
let mut new_priority_queue: BinaryHeap<PriorityTask> = BinaryHeap::new();

for priority_task in self.priority_task_queue.drain() {
if let Some(task) = self.tasks.get(&priority_task.id.clone()) {

println!("");
let mut can_push = true;
for task_id in task_ids.clone() {
if task.id == task_id {
can_push = false;
break;
}
}
if can_push {
new_priority_queue.push(priority_task);
}
}
}
self.priority_task_queue = new_priority_queue;

// We then need to reschedule the map task.
// TODO(rhino): Handle this better.
let map_task = PriorityTask::new(task_id.to_owned(), 1000);
self.priority_task_queue.push(map_task);
let mut new_new_priority_task_queue: BinaryHeap<PriorityTask> = BinaryHeap::new();
for t in new_priority_queue.drain() {
println!("=> {:?}", t.id.clone());
new_new_priority_task_queue.push(t);
}

self.priority_task_queue = new_new_priority_task_queue;
Ok(())
}
}
Expand Down
2 changes: 2 additions & 0 deletions proto/worker.proto
Expand Up @@ -94,6 +94,8 @@ message ReportWorkerRequest {
string worker_id = 1;
// Address of the worker being reported.
string report_address = 2;
// Path to the intermediate file.
string path = 3;
}

// ScheduleOperationService is used by the master to schedule Map and Reduce operations on the
Expand Down
9 changes: 7 additions & 2 deletions worker/src/master_interface.rs
Expand Up @@ -87,10 +87,15 @@ impl MasterInterface {
Ok(())
}

pub fn report_worker(&self, worker_address: String) -> Result<()> {
pub fn report_worker(
&self,
worker_address: String,
intermediate_data_path: String,
) -> Result<()> {
let mut report_request = pb::ReportWorkerRequest::new();
report_request.set_worker_id(self.worker_id.read().unwrap().clone());
report_request.set_report_address(worker_address.into());
report_request.set_report_address(worker_address.clone().into());
report_request.set_path(intermediate_data_path);
self.client
.report_worker(RequestOptions::new(), report_request)
.wait()
Expand Down
2 changes: 1 addition & 1 deletion worker/src/operations/mod.rs
Expand Up @@ -3,7 +3,7 @@ pub mod io;
mod map;
mod reduce;
mod state;
mod operation_handler;
pub mod operation_handler;

pub use self::operation_handler::OperationHandler;
pub use self::operation_handler::OperationResources;
13 changes: 5 additions & 8 deletions worker/src/operations/reduce.rs
Expand Up @@ -182,15 +182,14 @@ fn send_reduce_result(
fn create_reduce_operations(
reduce_request: &pb::PerformReduceRequest,
output_uuid: &str,
master_interface: &Arc<MasterInterface>,
resources: &OperationResources,
) -> Result<Vec<ReduceOperation>> {
let mut reduce_map: HashMap<String, Vec<serde_json::Value>> = HashMap::new();

for reduce_input_file in reduce_request.get_input_file_paths() {
// TODO: Run these operations in parallel as networks can be slow
let reduce_input =
WorkerInterface::get_data(reduce_input_file, output_uuid, master_interface)
.chain_err(|| "Couldn't read reduce input file")?;
let reduce_input = WorkerInterface::get_data(reduce_input_file, output_uuid, resources)
.chain_err(|| "Couldn't read reduce input file")?;

let parsed_value: serde_json::Value = serde_json::from_str(&reduce_input).chain_err(
|| "Error parsing map response.",
Expand Down Expand Up @@ -255,7 +254,6 @@ pub fn perform_reduce(
let result = internal_perform_reduce(reduce_request, resources, output_uuid);
if let Err(err) = result {
log_reduce_operation_err(err, &resources.operation_state);
operation_handler::set_failed_status(&resources.operation_state);
return Err("Error starting reduce operation.".into());
}

Expand All @@ -268,9 +266,8 @@ fn internal_perform_reduce(
resources: &OperationResources,
output_uuid: &str,
) -> Result<()> {
let reduce_operations =
create_reduce_operations(reduce_request, output_uuid, &resources.master_interface)
.chain_err(|| "Error creating reduce operations from input.")?;
let reduce_operations = create_reduce_operations(reduce_request, output_uuid, resources)
.chain_err(|| "Error creating reduce operations from input.")?;

let reduce_options = ReduceOptions {
reducer_file_path: reduce_request.get_reducer_file_path().to_owned(),
Expand Down
23 changes: 12 additions & 11 deletions worker/src/worker_interface.rs
@@ -1,14 +1,14 @@
use std::net::SocketAddr;
use std::str::FromStr;
use std::path::Path;
use std::sync::Arc;

use grpc::RequestOptions;

use errors::*;
use operations::io;

use master_interface::MasterInterface;
use operations::OperationResources;
use operations::operation_handler;
use cerberus_proto::worker as pb;
use cerberus_proto::worker_grpc as grpc_pb;
use cerberus_proto::worker_grpc::IntermediateDataService; // For pub functions only
Expand All @@ -21,7 +21,7 @@ impl WorkerInterface {
pub fn get_data<P: AsRef<Path>>(
path: P,
output_dir_uuid: &str,
master_interface: &Arc<MasterInterface>,
resources: &OperationResources,
) -> Result<String> {
let path_str = path.as_ref().to_string_lossy();
let split_path: Vec<&str> = path_str.splitn(2, '/').collect();
Expand All @@ -39,7 +39,7 @@ impl WorkerInterface {
let mut req = pb::IntermediateDataRequest::new();
req.set_path(file.clone());

let res = WorkerInterface::request_data(worker_addr, req, master_interface)
let res = WorkerInterface::request_data(worker_addr, req, resources)
.chain_err(|| format!("Failed to get {} from {}", file, worker_addr))?;

String::from_utf8(res.get_data().to_vec()).chain_err(
Expand All @@ -50,7 +50,7 @@ impl WorkerInterface {
pub fn request_data(
worker_addr: SocketAddr,
req: pb::IntermediateDataRequest,
master_interface: &Arc<MasterInterface>,
resources: &OperationResources,
) -> Result<pb::IntermediateData> {
// TODO: Add client store so we don't need to create a new client every time.
let client = grpc_pb::IntermediateDataServiceClient::new_plain(
Expand All @@ -74,14 +74,15 @@ impl WorkerInterface {
// At this point we have failed to contact the worker 3 times and should report this
// to the master.

master_interface
.report_worker(format!(
"{}:{}",
worker_addr.ip().to_string(),
worker_addr.port()
))
resources
.master_interface
.report_worker(
format!("{}:{}", worker_addr.ip().to_string(), worker_addr.port()),
req.path,
)
.chain_err(|| "Unable to report worker")?;

operation_handler::set_cancelled_status(&resources.operation_state);
Err("Unable to get intermediate data after 3 attempts".into())
}
}

0 comments on commit 8490ffd

Please sign in to comment.