-
Notifications
You must be signed in to change notification settings - Fork 923
/
checkpoint.go
56 lines (48 loc) · 1.42 KB
/
checkpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package das
import (
"fmt"
)
type checkpoint struct {
SampleFrom uint64 `json:"sample_from"`
NetworkHead uint64 `json:"network_head"`
// Failed heights will be retried
Failed map[uint64]int `json:"failed,omitempty"`
// Workers will resume on restart from previous state
Workers []workerCheckpoint `json:"workers,omitempty"`
}
// workerCheckpoint will be used to resume worker on restart
type workerCheckpoint struct {
From uint64 `json:"from"`
To uint64 `json:"to"`
JobType jobType `json:"job_type"`
}
func newCheckpoint(stats SamplingStats) checkpoint {
workers := make([]workerCheckpoint, 0, len(stats.Workers))
for _, w := range stats.Workers {
// no need to resume recent jobs after restart. On the other hand, retry jobs will resume from
// failed heights map. it leaves only catchup jobs to be stored and resumed
if w.JobType == catchupJob {
workers = append(workers, workerCheckpoint{
From: w.Curr,
To: w.To,
JobType: w.JobType,
})
}
}
return checkpoint{
SampleFrom: stats.CatchupHead + 1,
NetworkHead: stats.NetworkHead,
Failed: stats.Failed,
Workers: workers,
}
}
func (c checkpoint) String() string {
str := fmt.Sprintf("SampleFrom: %v, NetworkHead: %v", c.SampleFrom, c.NetworkHead)
if len(c.Workers) > 0 {
str += fmt.Sprintf(", Workers: %v", len(c.Workers))
}
if len(c.Failed) > 0 {
str += fmt.Sprintf("\nFailed: %v", c.Failed)
}
return str
}