Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/eth: Monitor RPC provider health #2125

Merged
merged 6 commits into from Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion server/asset/eth/eth.go
Expand Up @@ -301,7 +301,17 @@ func NewBackend(configPath string, log dex.Logger, net dex.Network) (*ETHBackend
if err != nil {
return nil, err
}
eth.node = newRPCClient(eth.net, endpoints, log.SubLogger("RPC"))

netAddrs, found := dexeth.ContractAddresses[ethContractVersion]
if !found {
return nil, fmt.Errorf("no contract address for eth version %d", ethContractVersion)
}
ethContractAddr, found := netAddrs[eth.net]
if !found {
return nil, fmt.Errorf("no contract address for eth version %d on %s", ethContractVersion, eth.net)
}

eth.node = newRPCClient(eth.net, endpoints, ethContractAddr, log.SubLogger("RPC"))
return eth, nil
}

Expand Down
221 changes: 137 additions & 84 deletions server/asset/eth/rpcclient.go
Expand Up @@ -32,6 +32,9 @@ var (
bigZero = new(big.Int)
headerExpirationTime = time.Minute
monitorConnectionsInterval = 30 * time.Second
// failingEndpointsCheckFreq means that endpoints that were never connected
// will be attempted every (monitorConnectionsInterval * failingEndpointsCheckFreq).
failingEndpointsCheckFreq = 4
)

type ContextCaller interface {
Expand Down Expand Up @@ -59,19 +62,25 @@ type rpcclient struct {
// to attempt to connect. If we were unable to connect to some of the
// endpoints, they will not be included in the clients slice.
endpoints []string
// neverConnectedEndpoints failed to connect since the initial connect call,
// so an ethConn has not been created for them.
neverConnectedEndpoints []string
healthCheckCounter int
tokensLoaded map[uint32]bool
ethContractAddr common.Address

// the order of clients will change based on the health of the connections.
clientsMtx sync.RWMutex
clients []*ethConn

healthCheckWaiter sync.WaitGroup
}

func newRPCClient(net dex.Network, endpoints []string, log dex.Logger) *rpcclient {
func newRPCClient(net dex.Network, endpoints []string, ethContractAddr common.Address, log dex.Logger) *rpcclient {
return &rpcclient{
net: net,
endpoints: endpoints,
log: log,
net: net,
endpoints: endpoints,
log: log,
ethContractAddr: ethContractAddr,
tokensLoaded: make(map[uint32]bool),
}
}

Expand All @@ -84,52 +93,144 @@ func (c *rpcclient) clientsCopy() []*ethConn {
return clients
}

// checkIfConnectionOutdated checks if the connection is outdated.
func (c *rpcclient) checkIfConnectionOutdated(ctx context.Context, conn *ethConn) (bool, error) {
func (c *rpcclient) connectToEndpoint(ctx context.Context, endpoint string) (*ethConn, error) {
var success bool

client, err := rpc.DialContext(ctx, endpoint)
if err != nil {
return nil, err
}

defer func() {
// This shouldn't happen as the only possible errors are due to ETHSwap and
// tokener creation.
if !success {
client.Close()
}
}()

ec := &ethConn{
Client: ethclient.NewClient(client),
endpoint: endpoint,
tokens: make(map[uint32]*tokener),
caller: client,
}

reqModules := []string{"eth", "txpool"}
if err := dexeth.CheckAPIModules(client, endpoint, c.log, reqModules); err != nil {
c.log.Warnf("Error checking required modules at %q: %v", endpoint, err)
c.log.Warnf("Will not account for pending transactions in balance calculations at %q", endpoint)
ec.txPoolSupported = false
} else {
ec.txPoolSupported = true
}

es, err := swapv0.NewETHSwap(c.ethContractAddr, ec.Client)
if err != nil {
return nil, fmt.Errorf("unable to initialize eth contract for %q: %v", endpoint, err)
}
ec.swapContract = &swapSourceV0{es}

for assetID := range c.tokensLoaded {
tkn, err := newTokener(ctx, assetID, c.net, ec.Client)
if err != nil {
return nil, fmt.Errorf("error constructing ERC20Swap: %w", err)
}
ec.tokens[assetID] = tkn
}
success = true

return ec, nil
}

type connectionStatus int

const (
failed connectionStatus = iota
outdated
connected
)

func (c *rpcclient) checkConnectionStatus(ctx context.Context, conn *ethConn) connectionStatus {
hdr, err := conn.HeaderByNumber(ctx, nil)
if err != nil {
return false, fmt.Errorf("Failed to get header from %q: %v", conn.endpoint, err)
return failed
}
martonp marked this conversation as resolved.
Show resolved Hide resolved

return c.headerIsOutdated(hdr), nil
if c.headerIsOutdated(hdr) {
return outdated
}

return connected
}

// checkConnections checks the health of the connections and reorders them
// sortConnectionsByHealth checks the health of the connections and sorts them
// based on their health. It does a best header call to each connection and
// connections with non outdated headers are placed first, ones with outdated
// headers are placed in the middle, and ones that error are placed last.
func (c *rpcclient) checkConnectionsHealth(ctx context.Context) {
// Every failingEndpointsCheckFreq health checks, the endpoints that have
// never been successfully connection will be checked. True is returned if
// there is at least one healthy connection.
func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool {
c.log.Tracef("sorting connections by health counter = %d", c.healthCheckCounter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this trace now that we're getting closer to a final iteration.


clients := c.clientsCopy()

healthyConnections := make([]*ethConn, 0, len(clients))
outdatedConnections := make([]*ethConn, 0, len(clients))
failingConnections := make([]*ethConn, 0, len(clients))

for _, ec := range clients {
outdated, err := c.checkIfConnectionOutdated(ctx, ec)
if err != nil {
c.log.Errorf("Error checking if connection to %q outdated: %v", ec.endpoint, err)
failingConnections = append(failingConnections, ec)
continue
categorizeConnection := func(conn *ethConn) {
status := c.checkConnectionStatus(ctx, conn)
switch status {
case connected:
healthyConnections = append(healthyConnections, conn)
case outdated:
outdatedConnections = append(outdatedConnections, conn)
case failed:
failingConnections = append(failingConnections, conn)
}
}

if outdated {
c.log.Warnf("Connection to %q is outdated. Check your system clock if you see this repeatedly.", ec.endpoint)
outdatedConnections = append(outdatedConnections, ec)
continue
for _, ec := range clients {
categorizeConnection(ec)
}

if c.healthCheckCounter == 0 {
martonp marked this conversation as resolved.
Show resolved Hide resolved
c.log.Tracef("number of never connected endpoints: %d", len(c.neverConnectedEndpoints))
Copy link
Member

@chappjc chappjc Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be an Infof or even a Warnf
Actually, can remove since it's redundant with the two outcome logs below, but the "successfully connected" log can be an Info because it's nice to know something recovered.

stillUnconnectedEndpoints := make([]string, 0, len(c.neverConnectedEndpoints))

for _, endpoint := range c.neverConnectedEndpoints {
ec, err := c.connectToEndpoint(ctx, endpoint)
if err != nil {
c.log.Errorf("Error connecting to %q: %v", endpoint, err)
stillUnconnectedEndpoints = append(stillUnconnectedEndpoints, endpoint)
continue
}

c.log.Debugf("successfully connected to %q", endpoint)

categorizeConnection(ec)
}

healthyConnections = append(healthyConnections, ec)
c.neverConnectedEndpoints = stillUnconnectedEndpoints
}

clientsUpdatedOrder := make([]*ethConn, 0, len(clients))
clientsUpdatedOrder = append(clientsUpdatedOrder, healthyConnections...)
clientsUpdatedOrder = append(clientsUpdatedOrder, outdatedConnections...)
clientsUpdatedOrder = append(clientsUpdatedOrder, failingConnections...)

c.log.Tracef("healthy connections: %v", healthyConnections)
c.log.Tracef("outdated connections: %v", outdatedConnections)
c.log.Tracef("failing connections: %v", failingConnections)
martonp marked this conversation as resolved.
Show resolved Hide resolved

c.clientsMtx.Lock()
defer c.clientsMtx.Unlock()
c.clients = clientsUpdatedOrder
c.healthCheckCounter = (c.healthCheckCounter + 1) % failingEndpointsCheckFreq

return len(healthyConnections) > 0
}

// monitorConnectionsHealth starts a goroutine that checks the health of all connections
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can re-wrap.

Expand All @@ -143,7 +244,7 @@ func (c *rpcclient) monitorConnectionsHealth(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
c.checkConnectionsHealth(ctx)
c.sortConnectionsByHealth(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log a warning if false?

}
}
}
Expand Down Expand Up @@ -176,82 +277,33 @@ func (c *rpcclient) withClient(f func(ec *ethConn) error, haltOnNotFound ...bool
//
// Failed connections will not be included in the clients slice.
func (c *rpcclient) connect(ctx context.Context) (err error) {
netAddrs, found := dexeth.ContractAddresses[ethContractVersion]
if !found {
return fmt.Errorf("no contract address for eth version %d", ethContractVersion)
}
contractAddr, found := netAddrs[c.net]
if !found {
return fmt.Errorf("no contract address for eth version %d on %s", ethContractVersion, c.net)
}

var success bool

c.clients = make([]*ethConn, 0, len(c.endpoints))
c.neverConnectedEndpoints = make([]string, 0, len(c.endpoints))

for _, endpoint := range c.endpoints {
client, err := rpc.DialContext(ctx, endpoint)
ec, err := c.connectToEndpoint(ctx, endpoint)
if err != nil {
c.log.Errorf("Ethereum RPC client failed to connect to %q: %v", endpoint, err)
c.log.Errorf("Error connecting to %q: %v", endpoint, err)
c.neverConnectedEndpoints = append(c.neverConnectedEndpoints, endpoint)
continue
}

defer func() {
// If all connections are outdated, we will not start, so close any open connections.
if !success {
client.Close()
ec.Close()
}
}()

ethClient := ethclient.NewClient(client)
ec := &ethConn{
Client: ethclient.NewClient(client),
endpoint: endpoint,
tokens: make(map[uint32]*tokener),
}

reqModules := []string{"eth", "txpool"}
if err := dexeth.CheckAPIModules(client, endpoint, c.log, reqModules); err != nil {
c.log.Warnf("Error checking required modules at %q: %v", endpoint, err)
c.log.Warnf("Will not account for pending transactions in balance calculations at %q", endpoint)
ec.txPoolSupported = false
} else {
ec.txPoolSupported = true
}

hdr, err := ec.HeaderByNumber(ctx, nil)
if err != nil {
c.log.Errorf("Failed to get header from %q: %v", endpoint, err)
continue
}

outdated := c.headerIsOutdated(hdr)
if outdated {
c.log.Warnf("Best header from %q is outdated.", endpoint)
} else {
success = true
}

// This only returns an error if the abi fails to parse, so if it fails
// for one provider, it will fail for all.
es, err := swapv0.NewETHSwap(contractAddr, ethClient)
if err != nil {
return fmt.Errorf("unable to initialize eth contract for %q: %v", endpoint, err)
}

ec.swapContract = &swapSourceV0{es}
ec.caller = client

// Put outdated clients at the end of the list.
if outdated {
c.clients = append(c.clients, ec)
} else {
c.clients = append([]*ethConn{ec}, c.clients...)
}
c.clients = append(c.clients, ec)
}

c.log.Infof("number of connected ETH providers: %d", len(c.clients))
success = c.sortConnectionsByHealth(ctx)

if !success {
return fmt.Errorf("no connection to an up to date ETH provider available")
return fmt.Errorf("failed to connect to an up-to-date ethereum node")
}

go c.monitorConnectionsHealth(ctx)
Copy link
Member

@chappjc chappjc Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to have this unsupervised based on the caller's pattern of connect's context cancellation followed by calling the shutdown() method, but it's a little confusing. Seem like anything could go wrong?

Fundamentally I don't quite get why there's both a context for connect, but also a shutdown method. It's like the context on connect was sort of intended to just apply to the initial connection action (e.g. timeout), but this use of it makes it apply to longer running processes, which is compatible with how (*ETHBackend).Connect works...

Does this all seem fine or are things a bit mixed up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got rid of the shutdown method. I guess the reason was to make sure that the ETH backend was shutdown before closing the client connections, but that can be handled with a new context as well.

Expand All @@ -271,12 +323,13 @@ func (c *rpcclient) shutdown() {
}

func (c *rpcclient) loadToken(ctx context.Context, assetID uint32) error {
c.tokensLoaded[assetID] = true

for _, cl := range c.clientsCopy() {
tkn, err := newTokener(ctx, assetID, c.net, cl.Client)
if err != nil {
return fmt.Errorf("error constructing ERC20Swap: %w", err)
}

cl.tokens[assetID] = tkn
}
return nil
Expand Down
12 changes: 11 additions & 1 deletion server/asset/eth/rpcclient_harness_test.go
Expand Up @@ -46,7 +46,17 @@ func TestMain(m *testing.M) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
log := dex.StdOutLogger("T", dex.LevelTrace)
ethClient = newRPCClient(dex.Simnet, []string{wsEndpoint, alphaIPCFile}, log)

netAddrs, found := dexeth.ContractAddresses[ethContractVersion]
if !found {
return 1, fmt.Errorf("no contract address for eth version %d", ethContractVersion)
}
ethContractAddr, found := netAddrs[dex.Simnet]
if !found {
return 1, fmt.Errorf("no contract address for eth version %d on %s", ethContractVersion, dex.Simnet)
}

ethClient = newRPCClient(dex.Simnet, []string{wsEndpoint, alphaIPCFile}, ethContractAddr, log)
defer func() {
cancel()
ethClient.shutdown()
Expand Down