Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kv/leader_routed_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ func (s *LeaderRoutedStore) LastCommitTS() uint64 {
return s.local.LastCommitTS()
}

// WriteConflictCountsByPrefix delegates to the local MVCC store. The
// leader-routed wrapper does not add cross-group conflict detection of
// its own, so the node-local view IS the authoritative view.
func (s *LeaderRoutedStore) WriteConflictCountsByPrefix() map[string]uint64 {
if s == nil || s.local == nil {
return map[string]uint64{}
}
return s.local.WriteConflictCountsByPrefix()
}

const globalLastCommitTSTimeout = 200 * time.Millisecond

// GlobalLastCommitTS returns the most recently committed HLC timestamp from
Expand Down
17 changes: 17 additions & 0 deletions kv/shard_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,23 @@ func (s *ShardStore) LastCommitTS() uint64 {
return max
}

// WriteConflictCountsByPrefix aggregates OCC conflict counts across
// every shard group owned by this ShardStore. Per-shard counts share
// the same "<kind>|<key_prefix>" label schema, so a simple sum gives
// the node-wide view. The result is always non-nil.
func (s *ShardStore) WriteConflictCountsByPrefix() map[string]uint64 {
out := map[string]uint64{}
for _, g := range s.groups {
if g == nil || g.Store == nil {
continue
}
for label, count := range g.Store.WriteConflictCountsByPrefix() {
out[label] += count
}
}
return out
}

func (s *ShardStore) Compact(ctx context.Context, minTS uint64) error {
for _, g := range s.groups {
if g == nil || g.Store == nil {
Expand Down
51 changes: 44 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,7 @@ func run() error {
adapter.WithDistributionCoordinator(coordinate),
adapter.WithDistributionActiveTimestampTracker(readTracker),
)
metricsRegistry.RaftObserver().Start(runCtx, raftMonitorRuntimes(runtimes), raftMetricsObserveInterval)
if collector := metricsRegistry.DispatchCollector(); collector != nil {
collector.Start(runCtx, dispatchMonitorSources(runtimes), raftMetricsObserveInterval)
}
if collector := metricsRegistry.PebbleCollector(); collector != nil {
collector.Start(runCtx, pebbleMonitorSources(runtimes), raftMetricsObserveInterval)
}
startMonitoringCollectors(runCtx, metricsRegistry, runtimes)
compactor := kv.NewFSMCompactor(
fsmCompactionRuntimes(runtimes),
kv.WithFSMCompactorActiveTimestampTracker(readTracker),
Expand Down Expand Up @@ -499,6 +493,49 @@ func dispatchMonitorSources(runtimes []*raftGroupRuntime) []monitoring.DispatchS
return out
}

// startMonitoringCollectors wires up the per-tick Prometheus
// collectors (raft dispatch, Pebble LSM, store-layer OCC conflicts)
// on top of the running raft runtimes. Kept separate from run() so
// the latter stays under the cyclop complexity budget and so new
// collectors can be added without widening run() further.
func startMonitoringCollectors(ctx context.Context, reg *monitoring.Registry, runtimes []*raftGroupRuntime) {
reg.RaftObserver().Start(ctx, raftMonitorRuntimes(runtimes), raftMetricsObserveInterval)
if collector := reg.DispatchCollector(); collector != nil {
collector.Start(ctx, dispatchMonitorSources(runtimes), raftMetricsObserveInterval)
}
if collector := reg.PebbleCollector(); collector != nil {
collector.Start(ctx, pebbleMonitorSources(runtimes), raftMetricsObserveInterval)
}
if collector := reg.WriteConflictCollector(); collector != nil {
collector.Start(ctx, writeConflictMonitorSources(runtimes), raftMetricsObserveInterval)
}
}

// writeConflictMonitorSources extracts the MVCC stores that expose
// per-(kind, key_prefix) OCC conflict counters so monitoring can poll
// them for the elastickv_store_write_conflict_total metric. Every
// store.MVCCStore implements WriteConflictCountsByPrefix(); stores
// that do not track conflicts return an empty map and simply do not
// contribute series.
func writeConflictMonitorSources(runtimes []*raftGroupRuntime) []monitoring.WriteConflictSource {
out := make([]monitoring.WriteConflictSource, 0, len(runtimes))
for _, runtime := range runtimes {
if runtime == nil || runtime.store == nil {
continue
}
src, ok := runtime.store.(monitoring.WriteConflictCounterSource)
if !ok {
continue
}
out = append(out, monitoring.WriteConflictSource{
GroupID: runtime.spec.id,
GroupIDStr: strconv.FormatUint(runtime.spec.id, 10),
Source: src,
})
}
return out
}

func fsmCompactionRuntimes(runtimes []*raftGroupRuntime) []kv.FSMCompactRuntime {
out := make([]kv.FSMCompactRuntime, 0, len(runtimes))
for _, runtime := range runtimes {
Expand Down
26 changes: 20 additions & 6 deletions monitoring/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ type Registry struct {
registerer prometheus.Registerer
gatherer prometheus.Gatherer

dynamo *DynamoDBMetrics
redis *RedisMetrics
raft *RaftMetrics
lua *LuaMetrics
hotPath *HotPathMetrics
pebble *PebbleMetrics
dynamo *DynamoDBMetrics
redis *RedisMetrics
raft *RaftMetrics
lua *LuaMetrics
hotPath *HotPathMetrics
pebble *PebbleMetrics
writeConflict *WriteConflictMetrics
}

// NewRegistry builds a registry with constant labels that identify the local node.
Expand All @@ -41,6 +42,7 @@ func NewRegistry(nodeID string, nodeAddress string) *Registry {
r.lua = newLuaMetrics(registerer)
r.hotPath = newHotPathMetrics(registerer)
r.pebble = newPebbleMetrics(registerer)
r.writeConflict = newWriteConflictMetrics(registerer)
return r
}

Expand Down Expand Up @@ -145,3 +147,15 @@ func (r *Registry) PebbleCollector() *PebbleCollector {
}
return newPebbleCollector(r.pebble)
}

// WriteConflictCollector returns a collector that polls each MVCC
// store's per-(kind, key_prefix) OCC conflict counters and mirrors
// them into the elastickv_store_write_conflict_total Prometheus
// counter vector. Start it with the node's MVCC sources after the
// stores have been opened.
func (r *Registry) WriteConflictCollector() *WriteConflictCollector {
if r == nil || r.writeConflict == nil {
return nil
}
return newWriteConflictCollector(r.writeConflict)
}
168 changes: 168 additions & 0 deletions monitoring/write_conflict.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package monitoring

import (
"context"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

// Write-conflict metrics expose the MVCC store layer's OCC conflict
// signal. Conflicts are detected inside the store's ApplyMutations —
// both write-write (mutation vs newer committed version) and
// read-write (read set vs newer committed version) — and attributed
// by a bounded key-prefix classification (txn_lock, redis_string,
// zset, ...). The metric is protocol-independent: whether the
// consumer is Redis, DynamoDB, or raw KV, a conflict landing here is
// the same underlying event.
//
// This complements the proxy-side view added in #585: the proxy
// counter tells you how many client requests observed a conflict,
// but a single client request can map to many Raft proposals (e.g.
// a DynamoDB TransactWriteItems across several items). The store
// counter is the authoritative per-proposal count and the right
// signal for capacity/alerting.

const defaultWriteConflictPollInterval = 5 * time.Second

// WriteConflictMetrics owns the per-(group, kind, key_prefix) counter
// vector used by the write-conflict dashboard. Registered once per
// Registry.
type WriteConflictMetrics struct {
writeConflictTotal *prometheus.CounterVec
}

func newWriteConflictMetrics(registerer prometheus.Registerer) *WriteConflictMetrics {
m := &WriteConflictMetrics{
writeConflictTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "elastickv_store_write_conflict_total",
Help: "OCC write conflicts detected at the MVCC store layer, bucketed by conflicting key-prefix class and conflict kind (read=read-write, write=write-write). Rate shows cluster-wide conflict pressure; sharp increases on specific prefixes point at hot keys or lock-resolver races (e.g. txn_rollback for the PR #581 incident pattern).",
},
[]string{"group", "kind", "key_prefix"},
),
}
registerer.MustRegister(m.writeConflictTotal)
return m
}

// WriteConflictCounterSource abstracts per-group access to the MVCC
// store's OCC conflict counters. The concrete store implementations
// (pebbleStore, mvccStore, ShardStore, LeaderRoutedStore) satisfy
// this via WriteConflictCountsByPrefix(); keys in the returned map
// follow the "<kind>|<key_prefix>" encoding from the store package.
type WriteConflictCounterSource interface {
WriteConflictCountsByPrefix() map[string]uint64
}

// WriteConflictSource binds a raft group ID to a counter source.
// Multiple groups can be polled by a single collector on a sharded
// node. GroupIDStr is the pre-formatted decimal form of GroupID used
// as the "group" Prometheus label; pre-computing it avoids a
// per-tick strconv allocation.
type WriteConflictSource struct {
GroupID uint64
GroupIDStr string
Source WriteConflictCounterSource
}

// WriteConflictCollector polls each registered store on a fixed
// interval and mirrors the snapshot into the Prometheus counter
// vector. Store-side counts are monotonic for the lifetime of a
// single store instance; counters advance by the positive delta
// against the last snapshot so a store reopen (Restore swap) does
// not produce negative values.
type WriteConflictCollector struct {
metrics *WriteConflictMetrics

mu sync.Mutex
previous map[uint64]map[string]uint64
}

func newWriteConflictCollector(metrics *WriteConflictMetrics) *WriteConflictCollector {
return &WriteConflictCollector{
metrics: metrics,
previous: map[uint64]map[string]uint64{},
}
}

// Start polls sources on the given interval until ctx is canceled.
// Passing interval <= 0 uses defaultWriteConflictPollInterval (5 s),
// matching the DispatchCollector / PebbleCollector cadence.
func (c *WriteConflictCollector) Start(ctx context.Context, sources []WriteConflictSource, interval time.Duration) {
if c == nil || c.metrics == nil || len(sources) == 0 {
return
}
if interval <= 0 {
interval = defaultWriteConflictPollInterval
}
c.observeOnce(sources)
ticker := time.NewTicker(interval)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.observeOnce(sources)
}
}
}()
}

// ObserveOnce is exposed for tests and single-shot callers.
func (c *WriteConflictCollector) ObserveOnce(sources []WriteConflictSource) {
c.observeOnce(sources)
}

func (c *WriteConflictCollector) observeOnce(sources []WriteConflictSource) {
if c == nil || c.metrics == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()
for _, src := range sources {
if src.Source == nil {
continue
}
curr := src.Source.WriteConflictCountsByPrefix()
prev := c.previous[src.GroupID]
for label, count := range curr {
prevCount := prev[label]
if count <= prevCount {
// Counter reset (store reopen) or no change:
// do not emit. prev is replaced below, which
// rebases the delta baseline silently.
continue
}
kind, keyPrefix, ok := splitWriteConflictLabel(label)
if !ok {
continue
}
c.metrics.writeConflictTotal.
WithLabelValues(src.GroupIDStr, kind, keyPrefix).
Add(float64(count - prevCount))
}
// Copy curr into previous so future ticks use the latest
// snapshot as baseline even if the source happens to reset.
snap := make(map[string]uint64, len(curr))
for k, v := range curr {
snap[k] = v
}
c.previous[src.GroupID] = snap
}
}

// splitWriteConflictLabel mirrors store.DecodeWriteConflictLabel
// without importing the store package (which would pull pebble into
// monitoring). The encoding is stable: "<kind>|<key_prefix>".
func splitWriteConflictLabel(label string) (kind, keyPrefix string, ok bool) {
idx := strings.IndexByte(label, '|')
if idx < 0 {
return "", "", false
}
return label[:idx], label[idx+1:], true
}
Loading
Loading