Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track node and service counts in the state store and emit them periodically as metrics #8603

Merged
merged 9 commits into from
Sep 2, 2020
3 changes: 3 additions & 0 deletions .changelog/8603.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
telemetry: track node and service counts and emit them as metrics
```
19 changes: 14 additions & 5 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ type Config struct {
// dead servers.
AutopilotInterval time.Duration

// MetricsReportingInterval is the frequency with which the server will
// report usage metrics to the configured go-metrics Sinks.
MetricsReportingInterval time.Duration

// ConnectEnabled is whether to enable Connect features such as the CA.
ConnectEnabled bool

Expand Down Expand Up @@ -589,11 +593,16 @@ func DefaultConfig() *Config {
},
},

ServerHealthInterval: 2 * time.Second,
AutopilotInterval: 10 * time.Second,
DefaultQueryTime: 300 * time.Second,
MaxQueryTime: 600 * time.Second,
EnterpriseConfig: DefaultEnterpriseConfig(),
// Stay under the 10 second aggregation interval of
// go-metrics. This ensures we always report the
// usage metrics in each cycle.
MetricsReportingInterval: 9 * time.Second,
ServerHealthInterval: 2 * time.Second,
AutopilotInterval: 10 * time.Second,
DefaultQueryTime: 300 * time.Second,
MaxQueryTime: 600 * time.Second,

EnterpriseConfig: DefaultEnterpriseConfig(),
}

// Increase our reap interval to 3 days instead of 24h.
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/fsm/snapshot_oss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, err)
require.Equal(t, fedState2, fedStateLoaded2)

// Verify usage data is correctly updated
idx, nodeCount, err := fsm2.state.NodeCount()
require.NoError(t, err)
require.Equal(t, len(nodes), nodeCount)
require.NotZero(t, idx)

// Snapshot
snap, err = fsm2.Snapshot()
require.NoError(t, err)
Expand Down
14 changes: 14 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
Expand Down Expand Up @@ -589,6 +590,19 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
return nil, err
}

reporter, err := usagemetrics.NewUsageMetricsReporter(
new(usagemetrics.Config).
WithStateProvider(s.fsm).
WithLogger(s.logger).
WithDatacenter(s.config.Datacenter).
WithReportingInterval(s.config.MetricsReportingInterval),
)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start usage metrics reporter: %v", err)
}
go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

// Initialize Autopilot. This must happen before starting leadership monitoring
// as establishing leadership could attempt to use autopilot and cause a panic.
s.initAutopilot(config)
Expand Down
45 changes: 32 additions & 13 deletions agent/consul/state/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ type ReadTxn interface {
Abort()
}

// WriteTxn is implemented by memdb.Txn to perform write operations.
type WriteTxn interface {
ReadTxn
Insert(table string, obj interface{}) error
Commit() error
}

// Changes wraps a memdb.Changes to include the index at which these changes
// were made.
type Changes struct {
Expand All @@ -24,8 +31,9 @@ type Changes struct {
}

// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
// all write transactions. When the transaction is committed the changes are
// sent to the eventPublisher which will create and emit change events.
// all write transactions. When the transaction is committed the changes are:
// 1. Used to update our internal usage tracking
// 2. Sent to the eventPublisher which will create and emit change events
type changeTrackerDB struct {
db *memdb.MemDB
publisher eventPublisher
Expand Down Expand Up @@ -89,17 +97,21 @@ func (c *changeTrackerDB) publish(changes Changes) error {
return nil
}

// WriteTxnRestore returns a wrapped RW transaction that does NOT have change
// tracking enabled. This should only be used in Restore where we need to
// replace the entire contents of the Store without a need to track the changes.
// WriteTxnRestore uses a zero index since the whole restore doesn't really occur
// at one index - the effect is to write many values that were previously
// written across many indexes.
// WriteTxnRestore returns a wrapped RW transaction that should only be used in
// Restore where we need to replace the entire contents of the Store.
// WriteTxnRestore uses a zero index since the whole restore doesn't really
// occur at one index - the effect is to write many values that were previously
// written across many indexes. WriteTxnRestore also does not publish any
// change events to subscribers.
func (c *changeTrackerDB) WriteTxnRestore() *txn {
return &txn{
t := &txn{
Txn: c.db.Txn(true),
Index: 0,
}

// We enable change tracking so that usage data is correctly populated.
t.Txn.TrackChanges()
return t
}

// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
Expand All @@ -125,14 +137,21 @@ type txn struct {
// by the caller. A non-nil error indicates that a commit failed and was not
// applied.
func (tx *txn) Commit() error {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
}

if len(changes.Changes) > 0 {
if err := updateUsage(tx, changes); err != nil {
return err
}
}

// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases changes should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
}
if err := tx.publish(changes); err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions agent/consul/state/operations_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,30 @@ import (
"github.com/hashicorp/go-memdb"
)

func firstWithTxn(tx *txn,
func firstWithTxn(tx ReadTxn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (interface{}, error) {

return tx.First(table, index, idxVal)
}

func firstWatchWithTxn(tx *txn,
func firstWatchWithTxn(tx ReadTxn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {

return tx.FirstWatch(table, index, idxVal)
}

func firstWatchCompoundWithTxn(tx *txn,
func firstWatchCompoundWithTxn(tx ReadTxn,
table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch(table, index, idxVals...)
}

func getWithTxn(tx *txn,
func getWithTxn(tx ReadTxn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {

return tx.Get(table, index, idxVal)
}

func getCompoundWithTxn(tx *txn, table, index string,
func getCompoundWithTxn(tx ReadTxn, table, index string,
_ *structs.EnterpriseMeta, idxVals ...interface{}) (memdb.ResultIterator, error) {

return tx.Get(table, index, idxVals...)
Expand Down
Loading