Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add TaskFail interface #2719

Merged
merged 11 commits into from
Jul 11, 2017
5 changes: 5 additions & 0 deletions go/master/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (c *Client) taskFinished(taskID int) error {
return c.conn.Call("Service.TaskFinished", taskID, nil)
}

// TaskFailed tell the master server as task is failed.
func (c *Client) taskFailed(taskID TaskID) error {
return c.conn.Call("Service.TaskFinished", taskID, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be "Service.TaskFailed" :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

汗。Done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加了测试用例。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed so many mistakes when reviewing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero No worries, that's why we have multiple developers reviewing, we make mistake :)

Copy link
Contributor Author

@gongweibao gongweibao Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

汗死。我应该加unit test。盲目的自信是不可以有的。也不符合我们的做事方法。惭愧。

}

// NextRecord returns next record in the dataset.
//
// NextRecord will block until the next record is available. It is
Expand Down
102 changes: 67 additions & 35 deletions go/master/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,28 @@ type Chunk struct {
// Task is the basic unit of data instances assigned to trainers.
type Task struct {
ID int
Epoch int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task分发给Trainer的一个要做的事情,Trainer貌似不需要知道这是第几个Epoch。
我大概明白为什么需要Epoch,为的是唯一标示一个Task,不然多个机器可能会有同一个Task的不同Epoch,伟宝考虑的周到!
这样看来的话,是不是考虑每个Task分发出去的时候就给一个新的ID就好?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我添加了一个ISSUE:#2752

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉用全局唯一的ID表示Task就可以了,最终可以统计这个task被执行过几次,哪些成功哪些失败了。Epoch用来对timeout计数可以放在taskEntry中就可以了。

任务结束时,就可以统计task的成功和失败的情况。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在ISSUE中讨论这个问题吧。已经回了。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change like this is more consistent with func (s *Service) TaskFailed(taskID TaskID, dummy *int) error:

type TaskMeta struct {
  ID int
  Epoch int
}

type Task struct {
  TaskMeta Meta
  Chunks []Chunk
}

func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error {
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Chunks []Chunk
}

type taskEntry struct {
Epoch int
NumTimeout int
Task Task
Task Task
// A task fails if it's timeout or trainer reports it exits unnormally.
NumFailure int
}

type taskQueues struct {
Todo []taskEntry
Pending map[int]taskEntry // map from task ID to task entry
Done []taskEntry
Failed []Task
Failed []taskEntry
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task改成了taskEntry的原因是,我觉得Failed task应该保留进入错误队列时候的上下文状态。

}

// Service is the master server service.
type Service struct {
chunksPerTask int
timeoutDur time.Duration
timeoutMax int
failureMax int
ready chan struct{}
store Store

Expand Down Expand Up @@ -91,11 +92,11 @@ func partition(chunks []Chunk, chunksPerTask int) []taskEntry {
}

// NewService creates a new service.
func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, timeoutMax int) (*Service, error) {
func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failureMax int) (*Service, error) {
s := &Service{}
s.chunksPerTask = chunksPerTask
s.timeoutDur = timeoutDur
s.timeoutMax = timeoutMax
s.failureMax = failureMax
s.taskQueues = taskQueues{}
s.taskQueues.Pending = make(map[int]taskEntry)
s.ready = make(chan struct{})
Expand Down Expand Up @@ -257,6 +258,34 @@ func (s *Service) SetDataset(globPaths []string, dummy *int) error {
return nil
}

func (s *Service) procFailedTask(t taskEntry, epoch int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请问proc是什么意思?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if t.Task.Epoch != epoch {
// new epoch, task launched after the
// schedule of this timeout check or failed status report.
return
}

defer func() {
err := s.snapshot()
if err != nil {
log.Errorln(err)
}
}()

delete(s.taskQueues.Pending, t.Task.ID)

t.NumFailure++
if t.NumFailure > s.failureMax {
log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
s.taskQueues.Failed = append(s.taskQueues.Failed, t)
return
}

log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
s.taskQueues.Todo = append(s.taskQueues.Todo, t)
return
}

func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
return func() {
s.mu.Lock()
Expand All @@ -267,30 +296,7 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
return
}

if t.Epoch != epoch {
// new epoch, task launched after the
// schedule of this timeout check.
return
}

defer func() {
err := s.snapshot()
if err != nil {
log.Errorln(err)
}
}()

delete(s.taskQueues.Pending, t.Task.ID)

t.NumTimeout++
if t.NumTimeout > s.timeoutMax {
log.Warningf("Task %v timed out %d times, discard.", t.Task, t.NumTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里可能也会被failed调用,所以不一定都是time out,可以用泛化点的描述。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

s.taskQueues.Failed = append(s.taskQueues.Failed, t.Task)
return
}

log.Warningf("Task %v timed out %d times, retry.", t.Task, t.NumTimeout)
s.taskQueues.Todo = append(s.taskQueues.Todo, t)
s.procFailedTask(t, epoch)
}
}

Expand Down Expand Up @@ -339,7 +345,7 @@ func (s *Service) GetTask(dummy int, task *Task) error {
}

t := s.taskQueues.Todo[0]
t.Epoch++
t.Task.Epoch++
s.taskQueues.Todo = s.taskQueues.Todo[1:]
s.taskQueues.Pending[t.Task.ID] = t
err := s.snapshot()
Expand All @@ -348,9 +354,9 @@ func (s *Service) GetTask(dummy int, task *Task) error {
}

*task = t.Task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete unused *task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这行是返回值,是有用的。:)

log.WithFields(s.logFields()).Infof("Task #%d dispatched.", task.ID)
log.WithFields(s.logFields()).Infof("Task #%v dispatched.", t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A task contains []Chunks, maybe that is too much information to print.
Perhaps change to log.WithFields(s.logFields()).Infof("Task #%v dispatched.", t.Task.Meta)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.ID, t.Epoch))
time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.ID, t.Task.Epoch))
return nil
}

Expand All @@ -371,7 +377,7 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
}

// task finished, reset timeout
t.NumTimeout = 0
t.NumFailure = 0
s.taskQueues.Done = append(s.taskQueues.Done, t)
delete(s.taskQueues.Pending, taskID)

Expand All @@ -389,3 +395,29 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
}
return err
}

// TaskID is a struct which client uses for reports failure.
type TaskID struct {
ID int
Epoch int
}

// TaskFailed tells the service that a task is failed.
func (s *Service) TaskFailed(taskID TaskID, dummy *int) error {
select {
case <-s.ready:
}

s.mu.Lock()
defer s.mu.Unlock()

t, ok := s.taskQueues.Pending[taskID.ID]
if !ok {
err := errors.New("pending task not found")
log.WithFields(s.logFields()).Warningln("TaskFailed:Pending task #%v not found.", taskID)
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return error here? I think it's normal if that task is no longer pending (completed by other workers).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

s.procFailedTask(t, taskID.Epoch)
return nil
}