Skip to content

Commit

Permalink
commit lab1
Browse files Browse the repository at this point in the history
  • Loading branch information
Xinlong-Chen committed Jun 9, 2022
1 parent 7e5eb65 commit a16950f
Show file tree
Hide file tree
Showing 3 changed files with 376 additions and 46 deletions.
182 changes: 167 additions & 15 deletions src/mr/coordinator.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,46 @@
package mr

import "log"
import "net"
import "os"
import "net/rpc"
import "net/http"
import (
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"os"
"sync"
"time"
)

type TaskStatus int

const (
idle TaskStatus = iota
in_progress
completed
)

type Task struct {
tno int
filenames []string
status TaskStatus
startTime time.Time
}

type CoordinatorStatus int

const (
MAP_PHASE CoordinatorStatus = iota
REDUCE_PHASE
FINISH_PHASE
)

type Coordinator struct {
// Your definitions here.

tasks []Task
nReduce int
nMap int
status CoordinatorStatus
mu sync.Mutex
}

// Your code here -- RPC handlers for the worker to call.
Expand All @@ -19,11 +50,71 @@ type Coordinator struct {
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()

finish_flag := c.IsAllFinish()
if finish_flag {
c.NextPhase()
}
for i := 0; i < len(c.tasks); i++ {
if c.tasks[i].status == idle {
log.Printf("send task %d to worker\n", i)
reply.Err = SuccessCode
reply.Task_no = i
reply.Filenames = c.tasks[i].filenames
if c.status == MAP_PHASE {
reply.Type = MAP
reply.NReduce = c.nReduce
} else if c.status == REDUCE_PHASE {
reply.NReduce = 0
reply.Type = REDUCE
} else {
log.Fatal("unexpected status")
}
c.tasks[i].startTime = time.Now()
c.tasks[i].status = in_progress
return nil
} else if c.tasks[i].status == in_progress {
curr := time.Now()
if curr.Sub(c.tasks[i].startTime) > time.Second*10 {
log.Printf("resend task %d to worker\n", i)
reply.Err = SuccessCode
reply.Task_no = i
reply.Filenames = c.tasks[i].filenames
if c.status == MAP_PHASE {
reply.Type = MAP
reply.NReduce = c.nReduce
} else if c.status == REDUCE_PHASE {
reply.NReduce = 0
reply.Type = REDUCE
} else {
log.Fatal("unexpected status")
}
c.tasks[i].startTime = time.Now()
return nil
}
}
}
reply.Err = SuccessCode
reply.Type = WAIT
return nil
}

func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if args.Task_no >= len(c.tasks) || args.Task_no < 0 {
reply.Err = ParaErrCode
return nil
}
c.tasks[args.Task_no].status = completed
if c.IsAllFinish() {
c.NextPhase()
}
return nil
}

//
// start a thread that listens for RPCs from worker.go
Expand All @@ -41,17 +132,78 @@ func (c *Coordinator) server() {
go http.Serve(l, nil)
}

// coordinator init code
func (c *Coordinator) Init(files []string, nReduce int) {
c.mu.Lock()
defer c.mu.Unlock()
log.Println("init coordinator")

// make map tasks
log.Println("make map tasks")
tasks := make([]Task, len(files))
for i, file := range files {
tasks[i].tno = i
tasks[i].filenames = []string{file}
tasks[i].status = idle
}

// init coordinator
c.tasks = tasks
c.nReduce = nReduce
c.nMap = len(files)
c.status = MAP_PHASE
}

func (c *Coordinator) MakeReduceTasks() {
// make reduce tasks
log.Println("make reduce tasks")
tasks := make([]Task, c.nReduce)
for i := 0; i < c.nReduce; i++ {
tasks[i].tno = i
files := make([]string, c.nMap)
for j := 0; j < c.nMap; j++ {
filename := fmt.Sprintf("mr-%d-%d", j, i)
files[j] = filename
}
tasks[i].filenames = files
tasks[i].status = idle
}
c.tasks = tasks
}

func (c *Coordinator) IsAllFinish() bool {
for i := len(c.tasks) - 1; i >= 0; i-- {
if c.tasks[i].status != completed {
return false
}
}
return true
}

func (c *Coordinator) NextPhase() {
if c.status == MAP_PHASE {
log.Println("change to REDUCE_PHASE")
c.MakeReduceTasks()
c.status = REDUCE_PHASE
} else if c.status == REDUCE_PHASE {
log.Println("change to FINISH_PHASE")
c.status = FINISH_PHASE
} else {
log.Println("unexpected status change!")
}
}

//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
ret := false

// Your code here.


return ret
c.mu.Lock()
defer c.mu.Unlock()
if c.status == FINISH_PHASE {
return true
}
return false
}

//
Expand All @@ -63,7 +215,7 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}

// Your code here.

c.Init(files, nReduce)

c.server()
return &c
Expand Down
43 changes: 36 additions & 7 deletions src/mr/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,53 @@ package mr
// remember to capitalize all names.
//

import "os"
import "strconv"
import (
"os"
"strconv"
)

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ExampleArgs struct {
X int
type Errno int

const (
SuccessCode Errno = iota
ServiceErrCode
ParaErrCode
)

type TaskType int

const (
MAP TaskType = iota
REDUCE
WAIT
STOP
)

// Add your RPC definitions here.
type GetTaskArgs struct {
}

type ExampleReply struct {
Y int
type GetTaskReply struct {
Type TaskType
Filenames []string
Task_no int
NReduce int
Err Errno
}

// Add your RPC definitions here.
type FinishTaskArgs struct {
Type TaskType
Task_no int
}

type FinishTaskReply struct {
Err Errno
}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
Expand Down
Loading

0 comments on commit a16950f

Please sign in to comment.