This is the first lab of MIT 6.824, distributed system.
This Lab is an implementation of MapReduce, a framework introduced by Google, which can make programs written in functional styles automatically parallelized and executed in a cluster of comodity machines.
Under the hood, this framework consists of one master
and multiple worker
, which can be eitherMap worker
or Reduce worker
.
The Master
will assign as-yet-unstarted tasks and keep track of the progress of these tasks.
As for workers
, There are two phases:
Map
: the user-defined functions will receives an inputfile split, takes an input pair and produces a set of intermediate key-value pairsMap (k1,v1) -> list(k2,v2)
. And these buffered pairs will be written into local disks, partitioned intoR
partitons.Reduce
: When last map task has finished, the worker assigned with reduce tasks will be notified by the Master about these location. It reads remotely the buffered data from local disks, sorts them by intermediate keys and applies them toreducef
, finally append the output toR
output files
We're required to implement three major components: Master
, Worker
, RPC
Master needs data structures that keeps tracks of the state and type for each tasks. And for each finished map tasks, it stores the locations of R
intermediate files produced by map workers.
The responsibilities for master
are:
- Assign each unstarted task to a certain worker. Especially, if the worker does not report the task back after an duration (10s here), reassign the task to another worker.
func (m *Master) waitForTask(task *Task) {
if task.Type != Map && task.Type != Reduce {
return
}
<-time.After(TaskTimeout * time.Second)
m.Mu.Lock()
defer m.Mu.Unlock()
if task.Status == Assigned {
task.Status = Idle
task.WorkerId = -1
fmt.Println("Task timeout, reset task status: ", *task)
}
}
- Monitor the progress. Assign Reduce tasks until all map tasks have finished. When all tasks are done, master needs to notify worker to exit
func (m *Master) ReportTaskDone(args *ReportTaskArgs, reply *ReportTaskReply) error {
m.Mu.Lock()
defer m.Mu.Unlock()
taskType := args.TaskType
var task *Task
if taskType == Map {
task = &m.MapTasks[args.TaskId]
} else {
task = &m.ReduceTasks[args.TaskId]
}
if task.WorkerId == args.WorkerId && task.Status == Assigned {
task.Status = Done
if taskType == Map && m.nMap > 0 {
//fmt.Printf("Map Task %d finished! \n", args.TaskId)
m.nMap--
} else if taskType == Reduce && m.nReduce > 0 {
//fmt.Printf("Reduce Task %d finished! \n", args.TaskId)
m.nReduce--
}
}
reply.CanExit = m.nMap == 0 && m.nReduce == 0
return nil
}
- Validate the output. Ensure that nobody observers partially written files in the crashes. Only confirm an output file when it's completely written
newPath := fmt.Sprintf("mr-out-%d", index)
err = os.Rename(file.Name(), newPath)
It handles two data flow directions between worker and master:
Master -> Worker
: Master assigns an idle task for workersWorker -> Master
: Workers report the task's progress to the master
/*
`Worker -> Master` : Workers report the task's progress to the master
*/
type ReportTaskArgs struct {
WorkerId int
TaskType TaskType
TaskId int
}
type ReportTaskReply struct {
CanExit bool
}
/*
`Worker -> Master` : Workers report the Reduce task's partition to the master
*/
type BufferArgs struct {
TaskId int
Location string
}
/*
`Master -> Worker` : Master assigns an idle task for workers
*/
type TaskArgs struct {
WorkerId int
}
type TaskReply struct {
Task Task
}
worker
is kind of single thread. It keeps requesting new task, processing it either by mapf
or ``reducef, report it and exit when
master` sends signal to exit.
for {
reply, succ := CallForTask()
if succ == false {
fmt.Println("Failed to contact master, worker exiting.")
return
}
exit, succ := false, true
if reply.Task.Type == Map {
MapWorker(reply.Task, mapf)
exit, succ = ReportTaskDone(Map, reply.Task.Index)
} else if reply.Task.Type == Reduce {
ReduceWorker(reply.Task, reducef)
exit, succ = ReportTaskDone(Reduce, reply.Task.Index)
} else if reply.Task.Type == NoTask {
// (map/all) tasks have been assigned, but still working
} else {
// exit, all task has finished
return
}
if exit || !succ {
fmt.Println("Master exited or all tasks done, worker exiting.")
return
}
time.Sleep(TaskInterval * time.Millisecond)
}