-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
stats.go
102 lines (77 loc) · 2.49 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package sealing
import (
"context"
"sync"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
)
type statSectorState int
const (
sstStaging statSectorState = iota
sstSealing
sstFailed
sstProving
nsst
)
type SectorStats struct {
lk sync.Mutex
bySector map[abi.SectorID]SectorState
byState map[SectorState]int64
totals [nsst]uint64
}
func (ss *SectorStats) updateSector(ctx context.Context, cfg sealiface.Config, id abi.SectorID, st SectorState) (updateInput bool) {
ss.lk.Lock()
defer ss.lk.Unlock()
preSealing := ss.curSealingLocked()
preStaging := ss.curStagingLocked()
// update totals
oldst, found := ss.bySector[id]
if found {
ss.totals[toStatState(oldst, cfg.FinalizeEarly)]--
ss.byState[oldst]--
mctx, _ := tag.New(ctx, tag.Upsert(metrics.SectorState, string(oldst)))
stats.Record(mctx, metrics.SectorStates.M(ss.byState[oldst]))
}
sst := toStatState(st, cfg.FinalizeEarly)
ss.bySector[id] = st
ss.totals[sst]++
ss.byState[st]++
mctx, _ := tag.New(ctx, tag.Upsert(metrics.SectorState, string(st)))
stats.Record(mctx, metrics.SectorStates.M(ss.byState[st]))
// check if we may need be able to process more deals
sealing := ss.curSealingLocked()
staging := ss.curStagingLocked()
log.Debugw("sector stats", "sealing", sealing, "staging", staging)
if cfg.MaxSealingSectorsForDeals > 0 && // max sealing deal sector limit set
preSealing >= cfg.MaxSealingSectorsForDeals && // we were over limit
sealing < cfg.MaxSealingSectorsForDeals { // and we're below the limit now
updateInput = true
}
if cfg.MaxWaitDealsSectors > 0 && // max waiting deal sector limit set
preStaging >= cfg.MaxWaitDealsSectors && // we were over limit
staging < cfg.MaxWaitDealsSectors { // and we're below the limit now
updateInput = true
}
return updateInput
}
func (ss *SectorStats) curSealingLocked() uint64 {
return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed]
}
func (ss *SectorStats) curStagingLocked() uint64 {
return ss.totals[sstStaging]
}
// return the number of sectors currently in the sealing pipeline
func (ss *SectorStats) curSealing() uint64 {
ss.lk.Lock()
defer ss.lk.Unlock()
return ss.curSealingLocked()
}
// return the number of sectors waiting to enter the sealing pipeline
func (ss *SectorStats) curStaging() uint64 {
ss.lk.Lock()
defer ss.lk.Unlock()
return ss.curStagingLocked()
}