Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for elasticsearch re-connection after network error & allow graceful shutdown #40794

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

belimawr
Copy link
Contributor

@belimawr belimawr commented Sep 12, 2024

Proposed commit message

When the Elasticsearch client fails to publish events, it ends up calling Close in the connection (that is reused). To cancel the in-flight requests, the context is cancelled and a new one is created to used in future requests.

The callback to check the version holds a reference to the connection via a closure, now the Elasticsearch client holds a pointer to that connection, so whenever Close is called, the callback can create a request with the new, not cancelled, context.

An integration test is added to ensure the
ES output can always recover from network errors.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Disruptive User Impact

It's a bug fix, there is no disruptive user impact

## Author's Checklist

How to test this PR locally

Related issues

## Use cases
## Screenshots
## Logs

@belimawr belimawr added the skip-ci Skip the build in the CI but linting label Sep 12, 2024
@belimawr belimawr self-assigned this Sep 12, 2024
@belimawr belimawr requested review from a team as code owners September 12, 2024 17:07
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Sep 12, 2024
Copy link
Contributor

mergify bot commented Sep 12, 2024

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b fix-es-connection-issue upstream/fix-es-connection-issue
git merge upstream/main
git push upstream fix-es-connection-issue

Copy link
Contributor

mergify bot commented Sep 12, 2024

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @belimawr? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit

Copy link
Contributor

mergify bot commented Sep 12, 2024

backport-8.x has been added to help with the transition to the new branch 8.x.
If you don't need it please use backport-skip label and remove the backport-8.x label.

@mergify mergify bot added the backport-8.x Automated backport to the 8.x branch with mergify label Sep 12, 2024
@belimawr belimawr added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Sep 12, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Sep 12, 2024
@belimawr belimawr added needs_team Indicates that the issue/PR needs a Team:* label and removed skip-ci Skip the build in the CI but linting labels Sep 12, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Sep 12, 2024
@belimawr belimawr added the backport-8.15 Automated backport to the 8.15 branch with mergify label Sep 12, 2024
Copy link
Member

@AndersonQ AndersonQ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I have a question. I'll approve once it's answered

libbeat/tests/integration/elasticsearch_test.go Outdated Show resolved Hide resolved
Comment on lines 187 to 189
// There are some cases where the connection is created but Connect
// is not called before it's used, so we populate reqsContext and cancelReqs
// here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just find the places where we don't call connect and fix them? This code is only called within Beats and we can search for uses of esleg.

Also using context.Background() is a smell, the parent context should be an argument, which will have the compiler find all NewConnection uses for you so you can audit them to see if they use close inappropriately.

The connection here also don't really represent a network connection at the network level, it looks like it is a convenience wrapper around an HTTP client. From that perspective closing or having a connection level context doesn't make much sense, it is just a wrapper for closing idle connections.

Arguably the contexts should be set on a per request basis in all the places that call execRequest and then the cancellation should propagate through those individual calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just find the places where we don't call connect and fix them? This code is only called within Beats and we can search for uses of esleg.

I went with this approach to be on the safe side of not introducing the possibility of a panic for calling a nil function. The panic I got when running the tests seems to be coming form a Connection that is not used but it's closed. That is triggered by a test testing a failure scenario where the ES host is not reachable.

Anyways, I've been looking into that.

Arguably the contexts should be set on a per request basis in all the places that call execRequest and then the cancellation should propagate through those individual calls.

I'm not sure that would achieve the same effect we currently have. Currently reqsContext is used to cancel in-flight requests when the Connection needs to be shutdown, that is done by the Close method that is called by a different gorotine than the one waiting for the in-flight request(s) to finish.

The issues that are fixed by this new behaviour:

An the PR fixing them:

Honestly, I rather have this PR merged as is, so the issues listed above and #40705 are correctly fixed on main. Then we can create a follow up issue to refactor the code ensuring that:

  • Methods creating requests accept a context
  • In-flight requests can be cancelled when the Connection is closed by a different goroutine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finally figured out the reason why Filebeat would panic when the connection was closed. It's an interesting corner case.
The root cause is if the publishing pipeline never tries to publish an event and Filebeat is shutdown, in that case the connection to ES was never used, however it is closed during the shutdown process, leading to a panic if cancelReqs is nil.

I could add some checks to ensure cancelReqs and reqsContext are not used if nil, however it feels cleaner to keep the previous behaviour of NewConnection returning a Connection that is safe to use without any change of behaviour on its methods.

Comment on lines 53 to 54
// that is passed to this client is also used in a closure, we need
// to ensure both hold a reference to the same instance of the connection.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That you have to account for future usage at all here feels wrong too. We need to fix this in a way that does not require knowing future uses of the code. Make it impossible to have this bug again.

Why do we even need a closure, was it just there because it was convenient? Can we just not have a closure anymore? It looks like if we saved the captured onConnect it could be a method on the client to avoid also capturing the connection.

conn.OnConnectCallback = func() error {
globalCallbackRegistry.mutex.Lock()
defer globalCallbackRegistry.mutex.Unlock()
for _, callback := range globalCallbackRegistry.callbacks {
err := callback(conn)
if err != nil {
return err
}
}
if onConnect != nil {
onConnect.mutex.Lock()
defer onConnect.mutex.Unlock()
for _, callback := range onConnect.callbacks {
err := callback(conn)
if err != nil {
return err
}
}
}
return nil
}

@cmacknz
Copy link
Member

cmacknz commented Sep 16, 2024

I want to make sure I follow the lifetime of the connection properly. The Connect and Publish calls come from here:

// Try to (re)connect so we can publish batch
if !connected {
// Return batch to other output workers while we try to (re)connect
batch.Cancelled()
if reconnectAttempts == 0 {
w.logger.Infof("Connecting to %v", w.client)
} else {
w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
}
err := w.client.Connect()
connected = err == nil
if connected {
w.logger.Infof("Connection to %v established", w.client)
reconnectAttempts = 0
} else {
w.logger.Errorf("Failed to connect to %v: %v", w.client, err)
reconnectAttempts++
}
continue
}
if err := w.publishBatch(batch); err != nil {
connected = false
}
}

The close comes from here:

func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) error {
err := b.client.Publish(ctx, batch)
if err != nil {
b.client.Close()
}

Can you make Connect accept a context.Context so that the context is actually tied to the lifetime of the connection the way it is supposed to be? Then the client worker run() function would be responsible for creating+cancelling the context. That you can't see the close actually happen in that loop since it is dependent on the client type implementation is also annoying but one thing at a time.

This would require touching the interface of every output but it looks like the correct place for the lifetime of the context to be managed.

It looks like we only have one other use of eslegclient for monitoring that is not a test.

for {
// Select one configured endpoint by random and check if xpack is available
client := r.out[rand.Intn(len(r.out))]
err := client.Connect()

@marc-gr
Copy link
Contributor

marc-gr commented Sep 17, 2024

Can you make Connect accept a context.Context so that the context is actually tied to the lifetime of the connection the way it is supposed to be? Then the client worker run() function would be responsible for creating+cancelling the context. That you can't see the close actually happen in that loop since it is dependent on the client type implementation is also annoying but one thing at a time.

This would require touching the interface of every output but it looks like the correct place for the lifetime of the context to be managed.

👍 , and in addition if this is done it would require the rest of outputs to honor that new context, too. IIRC each uses different cancellation mechanisms atm.

@belimawr belimawr marked this pull request as draft September 18, 2024 16:37
When the Elasticsearch client fails to publish events, it ends up
calling `Close` in the connection (that is reused). To cancel the
in-flight requests, the context is cancelled and a new one is created
to used in future requests.

The callback to check the version holds a reference to the connection
via a closure, now the Elasticsearch client holds a pointer to that
connection, so whenever Close is called, the callback can create a
request with the new, not cancelled, context.

An integration test is added to ensure the
ES output can always recover from network errors.
This commit moves the creation of the request context to the connect
method.
There are some cases where the Connection will be used without calling
Connect, so we initialise reqsContext and cancelReqs in the
NewConnection function to avoid panics.
Connection.Connect now accepts a context to control the life cycle of
its requests.
Add a context to outputs.Connectable.Connect to correctly manage the
life cycle of the connection and it's requests.
@cmacknz cmacknz changed the title Fix elasticsearch re-connection after network error Add test for elasticsearch re-connection after network error & allow graceful shutdown Sep 20, 2024
Copy link
Contributor

mergify bot commented Sep 23, 2024

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b fix-es-connection-issue upstream/fix-es-connection-issue
git merge upstream/main
git push upstream fix-es-connection-issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-8.x Automated backport to the 8.x branch with mergify backport-8.15 Automated backport to the 8.15 branch with mergify Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team
Projects
None yet
6 participants