Skip to content

Commit

Permalink
server/eth: Monitor RPC provider health
Browse files Browse the repository at this point in the history
Previously, the ETH backend would not start if any of the providers
could not connect or was outdated. Now, if at least one of the
providers is able to connect and the header is recent, the backend
will start. After connecting, a goroutine will start that will
periodically check the health of the RPC providers, sort the list
putting the non-outdated ones first, outdated ones after, and the
ones that fail to respond last. Requests will then attempt to use the
rpc providers in this order.
  • Loading branch information
martonp committed Feb 12, 2023
1 parent ea8cb4d commit 23c4710
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 68 deletions.
188 changes: 139 additions & 49 deletions server/asset/eth/rpcclient.go
Expand Up @@ -29,8 +29,9 @@ import (
var (
_ ethFetcher = (*rpcclient)(nil)

bigZero = new(big.Int)
headerExpirationTime = time.Minute
bigZero = new(big.Int)
headerExpirationTime = time.Minute
monitorConnectionsInterval = 30 * time.Second
)

type ContextCaller interface {
Expand All @@ -52,13 +53,18 @@ type ethConn struct {
}

type rpcclient struct {
net dex.Network
log dex.Logger
net dex.Network
log dex.Logger
// endpoints should only be used during connect to know which endpoints
// to attempt to connect. If we were unable to connect to some of the
// endpoints, they will not be included in the clients slice.
endpoints []string
clients []*ethConn

idxMtx sync.RWMutex
endpointIdx int
// the order of clients will change based on the health of the connections.
clientsMtx sync.RWMutex
clients []*ethConn

healthCheckWaiter sync.WaitGroup
}

func newRPCClient(net dex.Network, endpoints []string, log dex.Logger) *rpcclient {
Expand All @@ -69,35 +75,106 @@ func newRPCClient(net dex.Network, endpoints []string, log dex.Logger) *rpcclien
}
}

func (c *rpcclient) clientsCopy() []*ethConn {
c.clientsMtx.RLock()
defer c.clientsMtx.RUnlock()

clients := make([]*ethConn, len(c.clients))
copy(clients, c.clients)
return clients
}

// checkIfConnectionOutdated checks if the connection is outdated.
func (c *rpcclient) checkIfConnectionOutdated(ctx context.Context, conn *ethConn) (bool, error) {
hdr, err := conn.HeaderByNumber(ctx, nil)
if err != nil {
return false, fmt.Errorf("Failed to get header from %q: %v", conn.endpoint, err)
}

return c.headerIsOutdated(hdr), nil
}

// checkConnections checks the health of the connections and reorders them
// based on their health. It does a best header call to each connection and
// connections with non outdated headers are placed first, ones with outdated
// headers are placed in the middle, and ones that error are placed last.
func (c *rpcclient) checkConnectionsHealth(ctx context.Context) {
clients := c.clientsCopy()

healthyConnections := make([]*ethConn, 0, len(clients))
outdatedConnections := make([]*ethConn, 0, len(clients))
failingConnections := make([]*ethConn, 0, len(clients))

for _, ec := range clients {
outdated, err := c.checkIfConnectionOutdated(ctx, ec)
if err != nil {
c.log.Errorf("Error checking if connection to %q outdated: %v", ec.endpoint, err)
failingConnections = append(failingConnections, ec)
continue
}

if outdated {
c.log.Warnf("Connection to %q is outdated. Check your system clock if you see this repeatedly.", ec.endpoint)
outdatedConnections = append(outdatedConnections, ec)
continue
}

healthyConnections = append(healthyConnections, ec)
}

clientsUpdatedOrder := make([]*ethConn, 0, len(clients))
clientsUpdatedOrder = append(clientsUpdatedOrder, healthyConnections...)
clientsUpdatedOrder = append(clientsUpdatedOrder, outdatedConnections...)
clientsUpdatedOrder = append(clientsUpdatedOrder, failingConnections...)

c.clientsMtx.Lock()
defer c.clientsMtx.Unlock()
c.clients = clientsUpdatedOrder
}

// monitorConnectionsHealth starts a goroutine that checks the health of all connections
// every 30 seconds.
func (c *rpcclient) monitorConnectionsHealth(ctx context.Context) {
ticker := time.NewTicker(monitorConnectionsInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.checkConnectionsHealth(ctx)
}
}
}

func (c *rpcclient) withClient(f func(ec *ethConn) error, haltOnNotFound ...bool) (err error) {
for range c.endpoints {
c.idxMtx.RLock()
idx := c.endpointIdx
ec := c.clients[idx]
c.idxMtx.RUnlock()
clients := c.clientsCopy()

for _, ec := range clients {
err = f(ec)
if err == nil {
return nil
}
if len(haltOnNotFound) > 0 && haltOnNotFound[0] && (errors.Is(err, ethereum.NotFound) || strings.Contains(err.Error(), "not found")) {
return err
}
c.log.Errorf("Unpropagated error from %q: %v", c.endpoints[idx], err)
// Try the next client.
c.idxMtx.Lock()
// Only advance it if another thread hasn't.
if c.endpointIdx == idx && len(c.endpoints) > 0 {
c.endpointIdx = (c.endpointIdx + 1) % len(c.endpoints)
c.log.Infof("Switching RPC endpoint to %q", c.endpoints[c.endpointIdx])
}
c.idxMtx.Unlock()

c.log.Errorf("Unpropagated error from %q: %v", ec.endpoint, err)
}

return fmt.Errorf("all providers failed. last error: %w", err)
}

// connect connects to an ipc socket. It then wraps ethclient's client and
// bundles commands in a form we can easily use.
// connect will attempt to connect to all the endpoints in the endpoints slice.
// If at least one of the connections is successful and is not outdated, the
// function will return without error.
//
// Connections with an outdated block will be marked as outdated, but included
// in the clients slice. If the up-to-date providers start to fail, the outdated
// ones will be checked to see if they are still outdated.
//
// Failed connections will not be included in the clients slice.
func (c *rpcclient) connect(ctx context.Context) (err error) {
netAddrs, found := dexeth.ContractAddresses[ethContractVersion]
if !found {
Expand All @@ -110,11 +187,12 @@ func (c *rpcclient) connect(ctx context.Context) (err error) {

var success bool

c.clients = make([]*ethConn, len(c.endpoints))
for i, endpoint := range c.endpoints {
c.clients = make([]*ethConn, 0, len(c.endpoints))
for _, endpoint := range c.endpoints {
client, err := rpc.DialContext(ctx, endpoint)
if err != nil {
return fmt.Errorf("unable to dial rpc to %q: %v", endpoint, err)
c.log.Errorf("Ethereum RPC client failed to connect to %q: %v", endpoint, err)
continue
}

defer func() {
Expand All @@ -123,6 +201,7 @@ func (c *rpcclient) connect(ctx context.Context) (err error) {
}
}()

ethClient := ethclient.NewClient(client)
ec := &ethConn{
Client: ethclient.NewClient(client),
endpoint: endpoint,
Expand All @@ -140,23 +219,43 @@ func (c *rpcclient) connect(ctx context.Context) (err error) {

hdr, err := ec.HeaderByNumber(ctx, nil)
if err != nil {
return fmt.Errorf("error getting best header from %q: %v", endpoint, err)
c.log.Errorf("Failed to get header from %q: %v", endpoint, err)
continue
}
if c.headerIsOutdated(hdr) {
return fmt.Errorf("initial header fetched from %q appears to be outdated (time %s). If you continue to see this message, you might need to check your system clock",
endpoint, time.Unix(int64(hdr.Time), 0))

outdated := c.headerIsOutdated(hdr)
if outdated {
c.log.Warnf("Best header from %q is outdated.", endpoint)
} else {
success = true
}

es, err := swapv0.NewETHSwap(contractAddr, ec.Client)
// This only returns an error if the abi fails to parse, so if it fails
// for one provider, it will fail for all.
es, err := swapv0.NewETHSwap(contractAddr, ethClient)
if err != nil {
return fmt.Errorf("unable to initialize eth contract for %q: %v", endpoint, err)
}

ec.swapContract = &swapSourceV0{es}
ec.caller = client

c.clients[i] = ec
// Put outdated clients at the end of the list.
if outdated {
c.clients = append(c.clients, ec)
} else {
c.clients = append([]*ethConn{ec}, c.clients...)
}
}
success = true

c.log.Infof("number of connected ETH providers: %d", len(c.clients))

if !success {
return fmt.Errorf("no connection to an up to date ETH provider available")
}

go c.monitorConnectionsHealth(ctx)

return nil
}

Expand All @@ -166,13 +265,13 @@ func (c *rpcclient) headerIsOutdated(hdr *types.Header) bool {

// shutdown shuts down the client.
func (c *rpcclient) shutdown() {
for _, ec := range c.clients {
for _, ec := range c.clientsCopy() {
ec.Close()
}
}

func (c *rpcclient) loadToken(ctx context.Context, assetID uint32) error {
for _, cl := range c.clients {
for _, cl := range c.clientsCopy() {
tkn, err := newTokener(ctx, assetID, c.net, cl.Client)
if err != nil {
return fmt.Errorf("error constructing ERC20Swap: %w", err)
Expand All @@ -183,7 +282,7 @@ func (c *rpcclient) loadToken(ctx context.Context, assetID uint32) error {
return nil
}

func (c *rpcclient) withTokener(assetID uint32, f func(*tokener) error) error {
func (c *rpcclient) withTokener(ctx context.Context, assetID uint32, f func(*tokener) error) error {
return c.withClient(func(ec *ethConn) error {
tkn, found := ec.tokens[assetID]
if !found {
Expand All @@ -198,20 +297,11 @@ func (c *rpcclient) withTokener(assetID uint32, f func(*tokener) error) error {
func (c *rpcclient) bestHeader(ctx context.Context) (hdr *types.Header, err error) {
return hdr, c.withClient(func(ec *ethConn) error {
hdr, err = ec.HeaderByNumber(ctx, nil)
if err == nil && c.headerIsOutdated(hdr) {
c.log.Errorf("Best header from %q appears to be outdated (time %s). If you continue to see this message, you might need to check your system clock",
ec.endpoint, time.Unix(int64(hdr.Time), 0))
if len(c.endpoints) > 0 {
c.idxMtx.Lock()
c.endpointIdx = (c.endpointIdx + 1) % len(c.endpoints)
endpoint := c.endpoints[c.endpointIdx]
c.idxMtx.Unlock()
c.log.Infof("Switching RPC endpoint to %q", endpoint)
}
if err != nil {
return err
}
return err
return nil
})

}

// headerByHeight gets the best header at height.
Expand Down Expand Up @@ -255,7 +345,7 @@ func (c *rpcclient) swap(ctx context.Context, assetID uint32, secretHash [32]byt
return err
})
}
return state, c.withTokener(assetID, func(tkn *tokener) error {
return state, c.withTokener(ctx, assetID, func(tkn *tokener) error {
state, err = tkn.Swap(ctx, secretHash)
return err
})
Expand Down
41 changes: 22 additions & 19 deletions server/asset/eth/rpcclient_harness_test.go
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"path/filepath"
"time"

"context"
"testing"
Expand All @@ -38,6 +39,8 @@ var (
)

func TestMain(m *testing.M) {
monitorConnectionsInterval = 3 * time.Second

// Run in function so that defers happen before os.Exit is called.
run := func() (int, error) {
var cancel context.CancelFunc
Expand Down Expand Up @@ -169,14 +172,7 @@ func testAccountBalance(t *testing.T, assetID uint32) {
}
}

func TestRPCRotation(t *testing.T) {
ethClient.idxMtx.RLock()
idx := ethClient.endpointIdx
ethClient.idxMtx.RUnlock()
if idx != 0 {
t.Fatal("expected initial index to be zero")
}

func TestMonitorHealth(t *testing.T) {
// Requesting a non-existent transaction should propagate the error. Also
// check logs to ensure the endpoint index was not advanced.
_, _, err := ethClient.transaction(ctx, common.Hash{})
Expand All @@ -185,20 +181,27 @@ func TestRPCRotation(t *testing.T) {
}
ethClient.log.Info("Not found error successfully propagated")

// Shut down the zeroth client and ensure the endpoint index is advanced.
cl := ethClient.clients[idx]
cl.Close()
originalClients := ethClient.clientsCopy()
originalClients[0].Close()

_, err = ethClient.bestHeader(ctx)
if err != nil {
t.Fatalf("error getting best header with index advance: %v", err)
fmt.Println("Waiting for client health check...")
time.Sleep(5 * time.Second)

updatedClients := ethClient.clientsCopy()

getEndpoints := func(clients []*ethConn) []string {
endpoints := make([]string, 0, len(clients))
for _, c := range clients {
endpoints = append(endpoints, c.endpoint)
}
return endpoints
}

ethClient.idxMtx.RLock()
idx = ethClient.endpointIdx
ethClient.idxMtx.RUnlock()
if idx == 0 {
t.Fatalf("endpoint index not advanced")
fmt.Println("Original clients:", getEndpoints(originalClients))
fmt.Println("Updated clients:", getEndpoints(updatedClients))

if originalClients[0].endpoint != updatedClients[len(updatedClients)-1].endpoint {
t.Fatalf("failing client was not moved to the end. got %s, expected %s", updatedClients[len(updatedClients)-1].endpoint, originalClients[0].endpoint)
}
}

Expand Down

0 comments on commit 23c4710

Please sign in to comment.