Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat][Autodiscover Kubernetes] Fix multiple instances reporting same metrics #38471

Merged
merged 23 commits into from Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0f1019e
Fix event id
constanca-m Mar 20, 2024
d72bb25
Update changelog
constanca-m Mar 20, 2024
1a5222f
Update libbeat/autodiscover/providers/kubernetes/kubernetes.go
constanca-m Mar 20, 2024
f839eed
Update libbeat/autodiscover/providers/kubernetes/kubernetes.go
constanca-m Mar 20, 2024
1656831
add space to log line
constanca-m Mar 20, 2024
4525d78
change log.debug order
constanca-m Mar 20, 2024
9bd259e
Merge branch 'main' into leader-election-issue
constanca-m Mar 20, 2024
d5b5872
- run leader elector until context is cancelled
constanca-m Mar 22, 2024
8f86db2
Merge remote-tracking branch 'origin/leader-election-issue' into lead…
constanca-m Mar 22, 2024
7dcc5d9
fix lint errors
constanca-m Mar 22, 2024
d1cd700
mage check
constanca-m Mar 22, 2024
e621934
use assert instead of require
constanca-m Mar 22, 2024
0f52db0
Merge branch 'main' into leader-election-issue
constanca-m Mar 23, 2024
a45812f
Update changelog
constanca-m Mar 28, 2024
2de31ca
Update changelog
constanca-m Mar 28, 2024
75b7776
Add test comments
constanca-m Mar 28, 2024
f7c3ddc
Update docs
constanca-m Apr 2, 2024
346737b
Merge branch 'main' into leader-election-issue
constanca-m Apr 2, 2024
9af1aa5
Merge branch 'main' into leader-election-issue
constanca-m Apr 3, 2024
e675cae
Merge branch 'main' into leader-election-issue
constanca-m Apr 3, 2024
b8eff25
Merge branch 'main' into leader-election-issue
constanca-m Apr 4, 2024
55052d4
Merge branch 'main' into leader-election-issue
constanca-m Apr 5, 2024
1af4973
Merge branch 'main' into leader-election-issue
constanca-m Apr 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Expand Up @@ -69,6 +69,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.

==== Bugfixes

- Fix multiple metricbeat instances reporting same metrics when using autodiscover with provider kubernetes, and ensure leader elector is always running in autodiscover mode.{pull}38471[38471]
- Fix how Prometheus histograms are calculated when percentiles are provide.{pull}36537[36537]
- Stop using `mage:import` in community beats. This was ignoring the vendorized beats directory for some mage targets, using the code available in GOPATH, this causes inconsistencies and compilation problems if the version of the code in the GOPATH is different to the vendored one. Use of `mage:import` will continue to be unsupported in custom beats till beats is migrated to go modules, or mage supports vendored dependencies. {issue}13998[13998] {pull}14162[14162]
- Metricbeat module builders call host parser only once when instantiating light modules. {pull}20149[20149]
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -230,6 +230,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d

*Metricbeat*

- Add new fields to configure the lease duration, retry and renew when using leader elector with kubernetes autodiscover.{pull}38471[38471]
- Add per-thread metrics to system_summary {pull}33614[33614]
- Add GCP CloudSQL metadata {pull}33066[33066]
- Add GCP Carbon Footprint metricbeat data {pull}34820[34820]
Expand Down
10 changes: 9 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/config.go
Expand Up @@ -44,9 +44,14 @@ type Config struct {
// Scope can be either node or cluster.
Scope string `config:"scope"`
Resource string `config:"resource"`

// Unique identifies if this provider enables its templates only when it is elected as leader in a k8s cluster
Unique bool `config:"unique"`
LeaderLease string `config:"leader_lease"`
//Parameters to configure election process
LeaseDuration time.Duration `config:"leader_leaseduration"`
RenewDeadline time.Duration `config:"leader_renewdeadline"`
RetryPeriod time.Duration `config:"leader_retryperiod"`
constanca-m marked this conversation as resolved.
Show resolved Hide resolved

Prefix string `config:"prefix"`
Hints *config.C `config:"hints"`
Expand All @@ -57,7 +62,7 @@ type Config struct {
AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
}

// Public variable, so specific beats (as Filebeat) can set a different cleanup timeout if they need it.
// DefaultCleanupTimeout Public variable, so specific beats (as Filebeat) can set a different cleanup timeout if they need it.
var DefaultCleanupTimeout time.Duration = 0

func defaultConfig() *Config {
Expand All @@ -68,6 +73,9 @@ func defaultConfig() *Config {
Prefix: "co.elastic",
Unique: false,
AddResourceMetadata: metadata.GetDefaultResourceMetadataConfig(),
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
}
}

Expand Down
52 changes: 50 additions & 2 deletions libbeat/autodiscover/providers/kubernetes/config_test.go
Expand Up @@ -31,7 +31,8 @@ import (
)

func TestConfigWithCustomBuilders(t *testing.T) {
autodiscover.Registry.AddBuilder("mock", newMockBuilder)
err := autodiscover.Registry.AddBuilder("mock", newMockBuilder)
assert.NoError(t, err)

cfg := mapstr.M{
"hints.enabled": false,
Expand All @@ -44,13 +45,15 @@ func TestConfigWithCustomBuilders(t *testing.T) {

config := conf.MustNewConfigFrom(&cfg)
c := defaultConfig()
err := config.Unpack(&c)
err = config.Unpack(&c)
assert.NoError(t, err)

cfg1 := mapstr.M{
"hints.enabled": false,
}
config, err = conf.NewConfigFrom(&cfg1)
assert.NoError(t, err)

c = defaultConfig()
err = config.Unpack(&c)
assert.Error(t, err)
Expand All @@ -72,6 +75,51 @@ func TestConfigWithIncorrectScope(t *testing.T) {
assert.Equal(t, "cluster", c.Scope)
}

func TestConfigLeaseFields(t *testing.T) {
cfg := mapstr.M{
"scope": "cluster",
"unique": "true",
}

tests := []struct {
LeaseDuration string
RenewDeadline string
RetryPeriod string
message string
}{
{
LeaseDuration: "20seconds",
RenewDeadline: "15s",
RetryPeriod: "2s",
message: "incorrect lease duration, should be set to default",
},
{
LeaseDuration: "20s",
RenewDeadline: "15minutes",
RetryPeriod: "2s",
message: "incorrect renew deadline, should be set to default",
},
{
LeaseDuration: "20s",
RenewDeadline: "15s",
RetryPeriod: "2hrs",
message: "incorrect retry period, should be set to default",
},
}

for _, test := range tests {
cfg["leader_leaseduration"] = test.LeaseDuration
cfg["leader_renewdeadline"] = test.RenewDeadline
cfg["leader_retryperiod"] = test.RetryPeriod

config := conf.MustNewConfigFrom(&cfg)

c := defaultConfig()
err := config.Unpack(&c)
assert.Errorf(t, err, test.message)
}
}

type mockBuilder struct {
}

Expand Down
38 changes: 26 additions & 12 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Expand Up @@ -279,7 +279,9 @@ func NewLeaderElectionManager(
Name: cfg.LeaderLease,
Namespace: ns,
}
metaUID := lease.GetObjectMeta().GetUID()

var eventID string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility of data races with this? Presumably OnStoppedLeading and OnStartedLeading should never be called concurrently...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are correct. The code for running the leader election is in these lines, and startLeading is running as a go routine. It also says:

// LeaderCallbacks are callbacks that are triggered during certain
// lifecycle events of the LeaderElector. These are invoked asynchronously.

So I will check for racing conditions and try to prevent it from happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do a unit test to check the racing condition:

  1. I could never find it, I think that is weird. Could it be possible that it never happens?
    Edit: Yes, it never happens. See comment [Metricbeat][Autodiscover Kubernetes] Fix multiple instances reporting same metrics #38471 (comment).
  2. To cause a lease renewal, I would have to add a timer to ensure a new instance has the lease. I believe a good time to wait could be (leaseDuration + retryPeriod) * 2, but this means our test will wait 34s at most for one iteration. I was testing with multiple, so it could take 2 minutes. Is it good idea to make a commit with such a long test since it would be triggered in other PRs? What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To cause a lease renewal, I would have to add a timer to ensure a new instance has the lease. I believe a good time to wait could be (leaseDuration + retryPeriod) * 2, but this means our test will wait 34s at most for one iteration. I was testing with multiple, so it could take 2 minutes. Is it good idea to make a commit with such a long test since it would be triggered in other PRs? What do you think?

I agree, we should avoid such long running tests. We could perhaps make the lease duration configurable. Another option would be to pass in a fake clock, like https://github.com/jonboulle/clockwork, and then we can control it in the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a unit test. It still takes a few seconds to complete, at most 30s. I could not reduce the lease duration fields anymore, because it was causing unexpected lease renewals (I am guessing for network issues).

leaseId := lease.Name + "-" + lease.Namespace
lem.leaderElection = leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: lease,
Expand All @@ -289,18 +291,17 @@ func NewLeaderElectionManager(
},
},
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
LeaseDuration: cfg.LeaseDuration,
RenewDeadline: cfg.RenewDeadline,
RetryPeriod: cfg.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
logger.Debugf("leader election lock GAINED, id %v", id)
eventID := fmt.Sprintf("%v-%v", metaUID, time.Now().UnixNano())
eventID = fmt.Sprintf("%v-%v", leaseId, time.Now().UnixNano())
constanca-m marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we include the timestamp in the event ID? I'm wondering if we could drop it, and just use the leaseId? That way we don't need to store/reference eventID, and that would eliminate the question of data races entirely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is risky and it could cause more problems. Example:

  1. metricbeat-1 is the leader.
  2. There happens some kind of lease renewal twice.
  3. metricbeat-1 is the new leader again, but:
    1. startLeading from this new leader election is called.
    2. stopLeading is called after, triggered from the loss of lease from step 2. In that case, since they are using the same eventId we end up entirely without a leader because we deleted the configs.

If we use event id with timestamp, we make sure this never happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... we use Run for leader election:

And it says this:

// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease

This means we will never have the same leader twice! Because they stop running and they never are reelected!

So we have a problem. Example:

  1. We have two nodes, node-1 and node-2.
  2. node-1 is the first leader.
  3. node-1 loses the locker, so it stops running.
  4. node-2 gets elected.
  5. There happens some kind of lease renewal that fails with timeout (for example, rolebinding gets deleted). node-2 loses the lease. It stops running.
  6. Who's going to be leader now? There are no more instances running to report the metrics...

I tried this with a unit test, trying to renew around 20 times, and I could see the leader election had stopped like in the example above.

So I believe the implementation as of now (the official one, not the from this branch) has two problems:

  1. A metricbeat instance never stops from reporting metrics, even after losing the lease! It causes duplicated documents.
    • This PR could fix it, but it would cause the next problem.
  2. A metricbeat instance can never be reelected as the leader.
    • This doesn't necessarily cause a problem in the current implementation, since our previous leader instances never stop.

I think we need to consider other alternatives to leader election or find a way to make it run again, because like this we will be forcing users to delete pods so they can start again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it run indefinitely, we would have to put the code in this function in a for cycle:

func (p *leaderElectionManager) Start() {
ctx, cancel := context.WithCancel(context.TODO())
p.cancelLeaderElection = cancel
p.startLeaderElector(ctx, p.leaderElection)
}

Maybe not ideal... Would it be just easier to use a lease and discard the leader election all together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two possible solutions:

  1. Make leader election run again once it stops, so we know the metricbeat instance is always a candidate to be a leader.

    • Would this be a good idea? Maybe it is too resource consumption to be running this all the time, as the instance is constantly trying to acquire the lock.
  2. Use a watcher that keeps track of the lease. This way, once the lease changes holder, we can start reporting metrics with this metricbeat instance.

    • Currently we don't have a watcher that tracks a single resource. This has been discussed before as a solution for the kubernetes secrets provider as well.
    • If we did this, we would be following the same logic for two providers, leader election and secrets.

Copy link
Member

@axw axw Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we will never have the same leader twice! Because they stop running and they never are reelected!

Yikes... another good find.

I'm not familiar enough with Kubernetes leadership election or resource watchers, so it's probably best to discuss with others - but my instinct is to go with this one:

Make leader election run again once it stops, so we know the metricbeat instance is always a candidate to be a leader.

I'm not sure if we can do better by using watchers, but this option means a relatively small change, so seems less risky to me. I don't think this would consume any more resources than having multiple Metricbeat instances that each attempt to acquire a lease?

In terms of code, I think we would need to change the go le.Run(ctx) to something like this:

go func() {
    for {
        le.Run(ctx)
        select {
        case <-ctx.Done():
            return
        default:
            // Run returned because the lease was lost,
            // try to reacquire in case the new lease holder
            // loses the lease.
        }
    }
}()

logger.Debugf("leader election lock GAINED, holder: %v, eventID: %v", id, eventID)
startLeading(uuid.String(), eventID)
},
OnStoppedLeading: func() {
logger.Debugf("leader election lock LOST, id %v", id)
eventID := fmt.Sprintf("%v-%v", metaUID, time.Now().UnixNano())
logger.Debugf("leader election lock LOST, holder: %v, eventID: %v", id, eventID)
stopLeading(uuid.String(), eventID)
},
},
Expand Down Expand Up @@ -329,7 +330,7 @@ func (p *eventerManager) GenerateHints(event bus.Event) bus.Event {
func (p *leaderElectionManager) Start() {
ctx, cancel := context.WithCancel(context.TODO())
p.cancelLeaderElection = cancel
p.startLeaderElector(ctx, p.leaderElection)
p.startLeaderElectorIndefinitely(ctx, p.leaderElection)
}

// Stop signals the stop channel to force the leader election loop routine to stop.
Expand All @@ -344,14 +345,27 @@ func (p *leaderElectionManager) GenerateHints(event bus.Event) bus.Event {
return event
}

// startLeaderElector starts a Leader Elector in the background with the provided config
func (p *leaderElectionManager) startLeaderElector(ctx context.Context, lec leaderelection.LeaderElectionConfig) {
// startLeaderElectorIndefinitely starts a Leader Elector in the background with the provided config.
// If this instance gets the lease lock and later loses it, we run the leader elector again.
func (p *leaderElectionManager) startLeaderElectorIndefinitely(ctx context.Context, lec leaderelection.LeaderElectionConfig) {
le, err := leaderelection.NewLeaderElector(lec)
if err != nil {
p.logger.Errorf("error while creating Leader Elector: %w", err)
}
p.logger.Debugf("Starting Leader Elector")
go le.Run(ctx)

go func() {
for {
le.Run(ctx)
select {
case <-ctx.Done():
return
default:
// Run returned because the lease was lost. Run the leader elector again, so this instance
// is still a candidate to get the lease.
}
}
}()
}

func ShouldPut(event mapstr.M, field string, value interface{}, logger *logp.Logger) {
Expand Down