New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server/eth: Monitor RPC provider health #2125
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind me reviewing this.
Previously, the ETH backend would not start if any of the providers could not connect or was outdated. Now, if at least one of the providers is able to connect and the header is recent, the backend will start. After connecting, a goroutine will start that will periodically check the health of the RPC providers, sort the list putting the non-outdated ones first, outdated ones after, and the ones that fail to respond last. Requests will then attempt to use the rpc providers in this order.
f3fbec7
to
23c4710
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small thing that is loosely related, the simet harness continually appends to the file at $ETH_CONFIG_PATH in the dcrdex harness. If it could just write a new file every time that would be good.
<<
to >
I believe. cat > $ETH_CONFIG_PATH <<EOF
dcrdex/dex/testing/dcrdex/harness.sh
Lines 223 to 228 in ea8cb4d
cat << EOF >> $ETH_CONFIG_PATH | |
ws://localhost:38557 | |
# comments are respected | |
# http://localhost:38556 | |
${ETH_IPC_FILE} | |
EOF |
server/asset/eth/rpcclient.go
Outdated
continue | ||
} | ||
|
||
healthyConnections = append(healthyConnections, ec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client makes an effort to randomize providers. I presume to keep down requests per provider. Do you think server should also worry about this? Or if an operator had a preferred provider, for example they have an eth node set up and just want a fallback for emergencies. Doesn't have to be done in this pr, just for conversation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a client I'd probably sign up for the free tier of multiple providers and want them randomized, but as a server operator, like you said, I'd probably want to pay for a provider I trust and have then generally serve everything, and then have a backup. I think it would be useful if the providers file allowed users to specify a priority along with each provider.
Maybe could check that endpoints are not the same on initial start up? The harness copies the same address over and over, so I noticed 9 connections to the same node. Could just be two.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working well.
server/asset/eth/rpcclient.go
Outdated
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
c.sortConnectionsByHealth(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log a warning if false
?
c.log.Errorf("Unpropagated error from %q: %v", c.endpoints[idx], err) | ||
// Try the next client. | ||
c.idxMtx.Lock() | ||
// Only advance it if another thread hasn't. | ||
if c.endpointIdx == idx && len(c.endpoints) > 0 { | ||
c.endpointIdx = (c.endpointIdx + 1) % len(c.endpoints) | ||
c.log.Infof("Switching RPC endpoint to %q", c.endpoints[c.endpointIdx]) | ||
} | ||
c.idxMtx.Unlock() | ||
|
||
c.log.Errorf("Unpropagated error from %q: %v", ec.endpoint, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not "failing" the client in any way means we'll continue to try this client until the next sortConnectionsByHealth
catches it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I'll move provider to the end if there's an error.
server/asset/eth/rpcclient.go
Outdated
if err != nil { | ||
return err | ||
} | ||
return err | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just return err
.
server/asset/eth/eth.go
Outdated
scanner := bufio.NewScanner(file) | ||
for scanner.Scan() { | ||
line := strings.Trim(scanner.Text(), " ") | ||
if line == "" || strings.HasPrefix(line, "#") { | ||
if line == "" || strings.HasPrefix(line, "#") || endpointsMap[line] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In testing I commented one with a ;
like a lot of .conf files allow. Would you pls add that HasPrefix
check?
server/asset/eth/rpcclient.go
Outdated
cl.tokens[assetID] = tkn | ||
} | ||
return nil | ||
} | ||
|
||
func (c *rpcclient) withTokener(assetID uint32, f func(*tokener) error) error { | ||
func (c *rpcclient) withTokener(ctx context.Context, assetID uint32, f func(*tokener) error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like ctx
is unused. swap
gives it to the closure directly.
getEndpoints := func(clients []*ethConn) []string { | ||
endpoints := make([]string, 0, len(clients)) | ||
for _, c := range clients { | ||
endpoints = append(endpoints, c.endpoint) | ||
} | ||
return endpoints | ||
} | ||
|
||
ethClient.idxMtx.RLock() | ||
idx = ethClient.endpointIdx | ||
ethClient.idxMtx.RUnlock() | ||
if idx == 0 { | ||
t.Fatalf("endpoint index not advanced") | ||
fmt.Println("Original clients:", getEndpoints(originalClients)) | ||
fmt.Println("Updated clients:", getEndpoints(updatedClients)) | ||
|
||
if originalClients[0].endpoint != updatedClients[len(updatedClients)-1].endpoint { | ||
t.Fatalf("failing client was not moved to the end. got %s, expected %s", updatedClients[len(updatedClients)-1].endpoint, originalClients[0].endpoint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if you make ethConn
into a Stringer
then it'll print a slice of pointers to them as intended. Also, would be good to print the failing and outdated slices with higher severity.
c.log.Debugf("healthy connections: %v", healthyConnections)
if len(outdatedConnections) > 0 {
c.log.Warnf("outdated connections: %v", outdatedConnections)
}
if len(failingConnections) > 0 {
c.log.Warnf("failing connections: %v", failingConnections)
}
with
func (ec *ethConn) String() string {
return ec.endpoint
}
server/asset/eth/rpcclient.go
Outdated
if c.headerIsOutdated(hdr) { | ||
c.log.Warnf("header fetched from %q appears to be outdated (time %s). If you continue to see this message, you might need to check your system clock", | ||
conn.endpoint, time.Unix(int64(hdr.Time), 0)) | ||
return outdated | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor tweak to show age could be nice:
if c.headerIsOutdated(hdr) {
hdrTime := time.Unix(int64(hdr.Time), 0)
c.log.Warnf("header fetched from %q appears to be outdated (time %s is %v old). "+
"If you continue to see this message, you might need to check your system clock",
conn.endpoint, hdrTime, time.Since(hdrTime))
return outdated
}
server/asset/eth/rpcclient.go
Outdated
c.idxMtx.RLock() | ||
idx := c.endpointIdx | ||
ec := c.clients[idx] | ||
c.idxMtx.RUnlock() | ||
clients := c.clientsCopy() | ||
|
||
for _, ec := range clients { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nbd but for _, ec := range c.clientsCopy() {
would be fine
server/asset/eth/rpcclient.go
Outdated
return len(healthyConnections) > 0 | ||
} | ||
|
||
// monitorConnectionsHealth starts a goroutine that checks the health of all connections |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can re-wrap.
} | ||
ec.swapContract = &swapSourceV0{es} | ||
ec.caller = client | ||
go c.monitorConnectionsHealth(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to have this unsupervised based on the caller's pattern of connect
's context cancellation followed by calling the shutdown()
method, but it's a little confusing. Seem like anything could go wrong?
Fundamentally I don't quite get why there's both a context for connect
, but also a shutdown
method. It's like the context on connect was sort of intended to just apply to the initial connection action (e.g. timeout), but this use of it makes it apply to longer running processes, which is compatible with how (*ETHBackend).Connect
works...
Does this all seem fine or are things a bit mixed up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got rid of the shutdown method. I guess the reason was to make sure that the ETH backend was shutdown before closing the client connections, but that can be handled with a new context as well.
server/asset/eth/rpcclient.go
Outdated
// never been successfully connection will be checked. True is returned if | ||
// there is at least one healthy connection. | ||
func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool { | ||
c.log.Tracef("sorting connections by health counter = %d", c.healthCheckCounter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this trace now that we're getting closer to a final iteration.
server/asset/eth/rpcclient.go
Outdated
} | ||
|
||
if c.healthCheckCounter == 0 && len(c.neverConnectedEndpoints) > 0 { | ||
c.log.Tracef("number of never connected endpoints: %d", len(c.neverConnectedEndpoints)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be an Infof or even a Warnf
Actually, can remove since it's redundant with the two outcome logs below, but the "successfully connected" log can be an Info because it's nice to know something recovered.
df377bb
to
efae8cb
Compare
Previously, the ETH backend would not start if any of the providers could not connect or was outdated. Now, if at least one of the providers is able to connect and the header is recent, the backend will start. After connecting, a goroutine will start that will periodically check the health of the RPC providers, sort the list putting the non-outdated ones first, outdated ones after, and the ones that fail to respond last. Requests will then attempt to use the rpc providers in this order.
Closes #2122