Skip to content

Commit

Permalink
clusterchecks: simple dispatching logic (#2392)
Browse files Browse the repository at this point in the history
* clusterchecks: simple dispatching logic
* compile in clustercheck build tag by default
  • Loading branch information
xvello committed Nov 5, 2018
1 parent 27e6bb2 commit 0f77254
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 75 deletions.
25 changes: 13 additions & 12 deletions cmd/cluster-agent/app/app.go
Expand Up @@ -8,6 +8,7 @@
package app

import (
"context"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -108,6 +109,9 @@ func start(cmd *cobra.Command, args []string) error {
logFile = ""
}

mainCtx, mainCtxCancel := context.WithCancel(context.Background())
defer mainCtxCancel() // Calling cancel twice is safe

err = config.SetupLogger(
config.Datadog.GetString("log_level"),
logFile,
Expand Down Expand Up @@ -176,7 +180,7 @@ func start(cmd *cobra.Command, args []string) error {
common.StartAutoConfig()

// Start the cluster-check discovery if configured
clusterCheckHandler := setupClusterCheck()
clusterCheckHandler := setupClusterCheck(mainCtx)
// start the cmd HTTPS server
sc := clusteragent.ServerContext{
ClusterCheckHandler: clusterCheckHandler,
Expand All @@ -196,9 +200,10 @@ func start(cmd *cobra.Command, args []string) error {

// Block here until we receive the interrupt signal
<-signalCh
if clusterCheckHandler != nil {
clusterCheckHandler.StopDiscovery()
}

// Cancel the main context to stop components
mainCtxCancel()

if config.Datadog.GetBool("external_metrics_provider.enabled") {
custommetrics.StopServer()
}
Expand All @@ -210,23 +215,19 @@ func start(cmd *cobra.Command, args []string) error {
return nil
}

func setupClusterCheck() *clusterchecks.Handler {
func setupClusterCheck(ctx context.Context) *clusterchecks.Handler {
if !config.Datadog.GetBool("cluster_checks.enabled") {
log.Debug("Cluster check Autodiscovery disabled")
return nil
}

clusterCheckHandler, err := clusterchecks.SetupHandler(common.AC)
handler, err := clusterchecks.NewHandler(common.AC)
if err != nil {
log.Errorf("Could not setup the cluster-checks Autodiscovery: %s", err.Error())
return nil
}
err = clusterCheckHandler.StartDiscovery()
if err != nil {
log.Errorf("Could not start the cluster-checks Autodiscovery: %s", err.Error())
return nil
}
go handler.Run(ctx)

log.Info("Started cluster check Autodiscovery")
return clusterCheckHandler
return handler
}
6 changes: 3 additions & 3 deletions pkg/autodiscovery/providers/clusterchecks.go
Expand Up @@ -23,7 +23,7 @@ const defaultGraceDuration = 60 * time.Second
type ClusterChecksConfigProvider struct {
dcaClient *clusteragent.DCAClient
graceDuration time.Duration
lastPing time.Time
heartbeat time.Time
lastChange int64
nodeName string
flushedConfigs bool
Expand Down Expand Up @@ -62,7 +62,7 @@ func (c *ClusterChecksConfigProvider) String() string {
}

func (c *ClusterChecksConfigProvider) withinGracePeriod() bool {
return c.lastPing.Add(c.graceDuration).After(time.Now())
return c.heartbeat.Add(c.graceDuration).After(time.Now())
}

// IsUpToDate queries the cluster-agent to update its status and
Expand Down Expand Up @@ -90,7 +90,7 @@ func (c *ClusterChecksConfigProvider) IsUpToDate() (bool, error) {
return false, err
}

c.lastPing = time.Now()
c.heartbeat = time.Now()
if reply.IsUpToDate {
log.Tracef("Up to date with change %d", c.lastChange)
} else {
Expand Down
7 changes: 7 additions & 0 deletions pkg/clusteragent/clusterchecks/README.md
Expand Up @@ -64,3 +64,10 @@ atomic, the stores are designed with an external locking, held by the `dispatche
## Node-agent communication

The node-agent queries the cluster-agent through the autodiscovery `ClusterChecksConfigProvider`.
As nodes can be removed without notice, the cluster-agent has to detect when a node is not
connected anymore and re-dispatch the configurations to other, active, nodes.

This is handled by the `node_expiration_timeout` option (30 seconds by default) and the
`dispatcher.expireNodes` method. The node-agents heartbeat is updated when they POST on the
`status` url (10 seconds in the default configuration). When that heartbeat timestamp is too
old, the node is deleted and its configurations put back in the dangling map.
14 changes: 5 additions & 9 deletions pkg/clusteragent/clusterchecks/api_nocompile.go
Expand Up @@ -8,6 +8,7 @@
package clusterchecks

import (
"context"
"errors"

"github.com/DataDog/datadog-agent/pkg/autodiscovery"
Expand All @@ -28,17 +29,12 @@ func (h *Handler) GetAllConfigs() ([]integration.Config, error) {
return nil, ErrNotCompiled
}

// SetupHandler not implemented
func SetupHandler(ac *autodiscovery.AutoConfig) (*Handler, error) {
// NewHandler not implemented
func NewHandler(_ *autodiscovery.AutoConfig) (*Handler, error) {
return nil, ErrNotCompiled
}

// StartDiscovery not implemented
func (h *Handler) StartDiscovery() error {
return ErrNotCompiled
}

// StopDiscovery not implemented
func (h *Handler) StopDiscovery() error {
// Run not implemented
func (h *Handler) Run(_ context.Context) error {
return ErrNotCompiled
}
28 changes: 28 additions & 0 deletions pkg/clusteragent/clusterchecks/dispatcher_configs.go
Expand Up @@ -27,9 +27,12 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string)
digest := config.Digest()
d.store.digestToConfig[digest] = config

// No target node specified: store in danglingConfigs
if targetNodeName == "" {
d.store.danglingConfigs[digest] = config
return
}

currentNode, foundCurrent := d.store.getNodeStore(d.store.digestToNode[digest])
targetNode := d.store.getOrCreateNodeStore(targetNodeName)

Expand All @@ -52,6 +55,7 @@ func (d *dispatcher) removeConfig(digest string) {
defer d.store.Unlock()

delete(d.store.digestToConfig, digest)
delete(d.store.danglingConfigs, digest)

// Remove from node configs if assigned
node, found := d.store.getNodeStore(d.store.digestToNode[digest])
Expand All @@ -61,3 +65,27 @@ func (d *dispatcher) removeConfig(digest string) {
node.Unlock()
}
}

// shouldDispatchDanling returns true if there are dangling configs
// and node registered, available for dispatching.
func (d *dispatcher) shouldDispatchDanling() bool {
d.store.RLock()
defer d.store.RUnlock()

if len(d.store.danglingConfigs) == 0 {
return false
}
if len(d.store.nodes) == 0 {
return false
}
return true
}

// retrieveAndClearDangling extracts dangling configs from the store
func (d *dispatcher) retrieveAndClearDangling() []integration.Config {
d.store.Lock()
defer d.store.Unlock()
configs := makeConfigArray(d.store.danglingConfigs)
d.store.clearDangling()
return configs
}
49 changes: 41 additions & 8 deletions pkg/clusteragent/clusterchecks/dispatcher_main.go
Expand Up @@ -8,20 +8,26 @@
package clusterchecks

import (
"context"
"time"

"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/util"
"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// dispatcher holds the management logic for cluster-checks
type dispatcher struct {
store *clusterStore
store *clusterStore
nodeExpirationSeconds int64
}

func newDispatcher() *dispatcher {
return &dispatcher{
d := &dispatcher{
store: newClusterStore(),
}
d.nodeExpirationSeconds = config.Datadog.GetInt64("cluster_checks.node_expiration_timeout")
return d
}

// Stop implements the scheduler.Scheduler interface
Expand All @@ -48,11 +54,16 @@ func (d *dispatcher) add(config integration.Config) {
if !config.ClusterCheck {
return // Ignore non cluster-check configs
}
log.Debugf("dispatching configuration %s:%s", config.Name, config.Digest())

// TODO: add dispatching logic
hostname, _ := util.GetHostname()
d.addConfig(config, hostname)
target := d.getLeastBusyNode()
if target == "" {
// If no node is found, store it in the danglingConfigs map for retrying later.
log.Warnf("No available node to dispatch %s:%s on, will retry later", config.Name, config.Digest())
} else {
log.Infof("Dispatching configuration %s:%s to node %s", config.Name, config.Digest(), target)
}

d.addConfig(config, target)
}

// remove deletes a given configuration
Expand All @@ -61,6 +72,28 @@ func (d *dispatcher) remove(config integration.Config) {
return // Ignore non cluster-check configs
}
digest := config.Digest()
log.Debugf("removing configuration %s:%s", config.Name, digest)
log.Debugf("Removing configuration %s:%s", config.Name, digest)
d.removeConfig(digest)
}

// cleanupLoop is the cleanup goroutine for the dispatcher.
// It has to be called in a goroutine with a cancellable context.
func (d *dispatcher) cleanupLoop(ctx context.Context) {
cleanupTicker := time.NewTicker(time.Duration(d.nodeExpirationSeconds/2) * time.Second)
defer cleanupTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-cleanupTicker.C:
// Expire old nodes, orphaned configs are moved to dangling
d.expireNodes()

// Re-dispatch dangling configs
if d.shouldDispatchDanling() {
d.Schedule(d.retrieveAndClearDangling())
}
}
}
}
50 changes: 49 additions & 1 deletion pkg/clusteragent/clusterchecks/dispatcher_nodes.go
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// getNodeConfigs returns configurations dispatched to a given node
Expand Down Expand Up @@ -39,7 +40,54 @@ func (d *dispatcher) processNodeStatus(nodeName string, status types.NodeStatus)
node.Lock()
defer node.Unlock()
node.lastStatus = status
node.lastPing = timestampNow()
node.heartbeat = timestampNow()

return (node.lastConfigChange == status.LastChange), nil
}

// getLeastBusyNode returns the name of the node that is assigned
// the lowest number of checks. In case of equality, one is chosen
// randomly, based on map iterations being randomized.
func (d *dispatcher) getLeastBusyNode() string {
var leastBusyNode string
minCheckCount := int(-1)

d.store.RLock()
defer d.store.RUnlock()

for name, store := range d.store.nodes {
if name == "" {
continue
}
if minCheckCount == -1 || len(store.digestToConfig) < minCheckCount {
leastBusyNode = name
minCheckCount = len(store.digestToConfig)
}
}
return leastBusyNode
}

// expireNodes iterates over nodes and removes the ones that have not
// reported for more than the expiration duration. The configurations
// dispatched to these nodes will be moved to the danglingConfigs map.
func (d *dispatcher) expireNodes() {
cutoffTimestamp := timestampNow() - d.nodeExpirationSeconds

d.store.Lock()
defer d.store.Unlock()

for name, node := range d.store.nodes {
node.RLock()
if node.heartbeat < cutoffTimestamp {
if name != "" {
// Don't report on the dummy "" host for unscheduled configs
log.Infof("Expiring out node %s, last status report %d seconds ago", name, timestampNow()-node.heartbeat)
}
for digest, config := range node.digestToConfig {
d.store.danglingConfigs[digest] = config
}
delete(d.store.nodes, name)
}
node.RUnlock()
}
}

0 comments on commit 0f77254

Please sign in to comment.