Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver,mvcc: add more storage layer metrics #9761

Merged
merged 8 commits into from
May 23, 2018
14 changes: 14 additions & 0 deletions etcdserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ var (
Name: "leader_changes_seen_total",
Help: "The number of leader changes seen.",
})
heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "heartbeat_send_failures_total",
Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
})
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "slow_apply_total",
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
})
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Expand Down Expand Up @@ -85,6 +97,8 @@ func init() {
prometheus.MustRegister(hasLeader)
prometheus.MustRegister(isLeader)
prometheus.MustRegister(leaderChanges)
prometheus.MustRegister(heartbeatSendFailures)
prometheus.MustRegister(slowApplies)
prometheus.MustRegister(proposalsCommitted)
prometheus.MustRegister(proposalsApplied)
prometheus.MustRegister(proposalsPending)
Expand Down
6 changes: 4 additions & 2 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,16 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
// TODO: limit request rate.
if r.lg != nil {
r.lg.Warn(
"heartbeat took too long to send out; server is overloaded, likely from slow disk",
zap.Duration("exceeded", exceed),
"leader failed to send out heartbeat on time; took too long, leader is overloaded likely from slow disk",
zap.Duration("heartbeat-interval", r.heartbeat),
zap.Duration("expected-duration", 2*r.heartbeat),
zap.Duration("exceeded-duration", exceed),
)
} else {
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
plog.Warningf("server is likely overloaded")
}
heartbeatSendFailures.Inc()
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, stringer fmt.S
if d > warnApplyDuration {
if lg != nil {
lg.Warn(
"request took too long",
"apply request took too long",
zap.Duration("took", d),
zap.Duration("expected-duration", warnApplyDuration),
zap.String("prefix", prefix),
Expand All @@ -122,5 +122,6 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, stringer fmt.S
} else {
plog.Warningf("%srequest %q took too long (%v) to execute", prefix, stringer.String(), d)
}
slowApplies.Inc()
}
}
5 changes: 4 additions & 1 deletion mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,9 @@ func (b *backend) defrag() error {
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))

took := time.Since(now)
defragDurations.Observe(took.Seconds())

size2, sizeInUse2 := b.Size(), b.SizeInUse()
if b.lg != nil {
b.lg.Info(
Expand All @@ -426,7 +429,7 @@ func (b *backend) defrag() error {
zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
zap.Duration("took", time.Since(now)),
zap.Duration("took", took),
)
}
return nil
Expand Down
22 changes: 9 additions & 13 deletions mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,15 @@ func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error)
// Commit commits a previous tx and begins a new writable one.
func (t *batchTx) Commit() {
t.Lock()
defer t.Unlock()
t.commit(false)
t.Unlock()
}

// CommitAndStop commits the previous tx and does not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
t.Unlock()
}

func (t *batchTx) Unlock() {
Expand All @@ -215,19 +215,18 @@ func (t *batchTx) commit(stop bool) {
}

start := time.Now()

// gofail: var beforeCommit struct{}
err := t.tx.Commit()
// gofail: var afterCommit struct{}

commitDurations.Observe(time.Since(start).Seconds())
atomic.AddInt64(&t.backend.commits, 1)

t.pending = 0
if err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to commit tx",
zap.Error(err),
)
t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
} else {
plog.Fatalf("cannot commit tx (%s)", err)
}
Expand Down Expand Up @@ -269,31 +268,28 @@ func (t *batchTxBuffered) Unlock() {

func (t *batchTxBuffered) Commit() {
t.Lock()
defer t.Unlock()
t.commit(false)
t.Unlock()
}

func (t *batchTxBuffered) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
t.Unlock()
}

func (t *batchTxBuffered) commit(stop bool) {
// all read txs must be closed to acquire boltdb commit rwlock
t.backend.readTx.mu.Lock()
defer t.backend.readTx.mu.Unlock()
t.unsafeCommit(stop)
t.backend.readTx.mu.Unlock()
}

func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
if err := t.backend.readTx.tx.Rollback(); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal(
"failed to rollback tx",
zap.Error(err),
)
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
Expand Down
22 changes: 20 additions & 2 deletions mvcc/backend/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,38 @@ var (
Subsystem: "disk",
Name: "backend_commit_duration_seconds",
Help: "The latency distributions of commit called by backend.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),

// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})

defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "backend_defrag_duration_seconds",
Help: "The latency distribution of backend defragmentation.",

// 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^12 == 409.6 sec
Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
})

snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "backend_snapshot_duration_seconds",
Help: "The latency distribution of backend snapshots.",
// 10 ms -> 655 seconds

// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
})
)

func init() {
prometheus.MustRegister(commitDurations)
prometheus.MustRegister(defragDurations)
prometheus.MustRegister(snapshotDurations)
}
11 changes: 10 additions & 1 deletion mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,18 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
}

func (s *store) Hash() (hash uint32, revision int64, err error) {
start := time.Now()

s.b.ForceCommit()
h, err := s.b.Hash(DefaultIgnores)

hashDurations.Observe(time.Since(start).Seconds())
return h, s.currentRev, err
}

func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
start := time.Now()

s.mu.RLock()
s.revMu.RLock()
compactRev, currentRev = s.compactMainRev, s.currentRev
Expand Down Expand Up @@ -213,7 +219,10 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
h.Write(v)
return nil
})
return h.Sum32(), currentRev, compactRev, err
hash = h.Sum32()

hashRevDurations.Observe(time.Since(start).Seconds())
return hash, currentRev, compactRev, err
}

func (s *store) Compact(rev int64) (<-chan struct{}, error) {
Expand Down
26 changes: 26 additions & 0 deletions mvcc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,30 @@ var (
// overridden by mvcc initialization
reportDbTotalSizeInUseInBytesMu sync.RWMutex
reportDbTotalSizeInUseInBytes func() float64 = func() float64 { return 0 }

hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "mvcc",
Name: "hash_duration_seconds",
Help: "The latency distribution of storage hash operation.",

// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
})

hashRevDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "mvcc",
Name: "hash_rev_duration_seconds",
Help: "The latency distribution of storage hash by revision operation.",

// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
})
)

func init() {
Expand All @@ -189,6 +213,8 @@ func init() {
prometheus.MustRegister(dbCompactionKeysCounter)
prometheus.MustRegister(dbTotalSize)
prometheus.MustRegister(dbTotalSizeInUse)
prometheus.MustRegister(hashDurations)
prometheus.MustRegister(hashRevDurations)
}

// ReportEventReceived reports that an event is received.
Expand Down