From efae8cba43aab944d29b5060223a282ea750abeb Mon Sep 17 00:00:00 2001 From: martonp Date: Thu, 16 Feb 2023 14:10:28 -0500 Subject: [PATCH] Chappjc and Buck review fixes --- server/asset/eth/eth.go | 18 ++--- server/asset/eth/rpcclient.go | 90 ++++++++++++++-------- server/asset/eth/rpcclient_harness_test.go | 13 +--- 3 files changed, 66 insertions(+), 55 deletions(-) diff --git a/server/asset/eth/eth.go b/server/asset/eth/eth.go index 827cf5f7d0..8ca348a0c3 100644 --- a/server/asset/eth/eth.go +++ b/server/asset/eth/eth.go @@ -164,7 +164,6 @@ type ethFetcher interface { blockNumber(ctx context.Context) (uint64, error) headerByHeight(ctx context.Context, height uint64) (*types.Header, error) connect(ctx context.Context) error - shutdown() suggestGasTipCap(ctx context.Context) (*big.Int, error) syncProgress(ctx context.Context) (*ethereum.SyncProgress, error) transaction(ctx context.Context, hash common.Hash) (tx *types.Transaction, isMempool bool, err error) @@ -285,7 +284,7 @@ func NewBackend(configPath string, log dex.Logger, net dex.Network) (*ETHBackend scanner := bufio.NewScanner(file) for scanner.Scan() { line := strings.Trim(scanner.Text(), " ") - if line == "" || strings.HasPrefix(line, "#") || endpointsMap[line] { + if line == "" || strings.HasPrefix(line, "#") || strings.HasPrefix(line, ";") || endpointsMap[line] { continue } endpointsMap[line] = true @@ -317,21 +316,22 @@ func NewBackend(configPath string, log dex.Logger, net dex.Network) (*ETHBackend return eth, nil } -func (eth *baseBackend) shutdown() { - eth.node.shutdown() -} - // Connect connects to the node RPC server and initializes some variables. func (eth *ETHBackend) Connect(ctx context.Context) (*sync.WaitGroup, error) { eth.baseBackend.ctx = ctx - if err := eth.node.connect(ctx); err != nil { + // Create a separate context for the node so that it will only be cancelled + // after the ETHBackend's run method has returned. + nodeContext, cancelNodeContext := context.WithCancel(context.Background()) + if err := eth.node.connect(nodeContext); err != nil { + cancelNodeContext() return nil, err } // Prime the best block hash and height. bn, err := eth.node.blockNumber(ctx) if err != nil { + cancelNodeContext() return nil, fmt.Errorf("error getting best block header from geth: %w", err) } eth.baseBackend.bestHeight = bn @@ -340,6 +340,7 @@ func (eth *ETHBackend) Connect(ctx context.Context) (*sync.WaitGroup, error) { wg.Add(1) go func() { eth.run(ctx) + cancelNodeContext() wg.Done() }() return &wg, nil @@ -728,9 +729,6 @@ func (eth *ETHBackend) poll(ctx context.Context) { // run processes the queue and monitors the application context. func (eth *ETHBackend) run(ctx context.Context) { - // Shut down the RPC client on ctx.Done(). - defer eth.shutdown() - blockPoll := time.NewTicker(blockPollInterval) defer blockPoll.Stop() diff --git a/server/asset/eth/rpcclient.go b/server/asset/eth/rpcclient.go index 824b6d4a8a..418b5a709a 100644 --- a/server/asset/eth/rpcclient.go +++ b/server/asset/eth/rpcclient.go @@ -55,6 +55,10 @@ type ethConn struct { txPoolSupported bool } +func (ec *ethConn) String() string { + return ec.endpoint +} + type rpcclient struct { net dex.Network log dex.Logger @@ -159,8 +163,10 @@ func (c *rpcclient) checkConnectionStatus(ctx context.Context, conn *ethConn) co } if c.headerIsOutdated(hdr) { - c.log.Warnf("header fetched from %q appears to be outdated (time %s). If you continue to see this message, you might need to check your system clock", - conn.endpoint, time.Unix(int64(hdr.Time), 0)) + hdrTime := time.Unix(int64(hdr.Time), 0) + c.log.Warnf("header fetched from %q appears to be outdated (time %s is %v old). "+ + "If you continue to see this message, you might need to check your system clock", + conn.endpoint, hdrTime, time.Since(hdrTime)) return outdated } @@ -175,8 +181,6 @@ func (c *rpcclient) checkConnectionStatus(ctx context.Context, conn *ethConn) co // never been successfully connection will be checked. True is returned if // there is at least one healthy connection. func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool { - c.log.Tracef("sorting connections by health counter = %d", c.healthCheckCounter) - clients := c.clientsCopy() healthyConnections := make([]*ethConn, 0, len(clients)) @@ -200,7 +204,6 @@ func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool { } if c.healthCheckCounter == 0 && len(c.neverConnectedEndpoints) > 0 { - c.log.Tracef("number of never connected endpoints: %d", len(c.neverConnectedEndpoints)) stillUnconnectedEndpoints := make([]string, 0, len(c.neverConnectedEndpoints)) for _, endpoint := range c.neverConnectedEndpoints { @@ -211,7 +214,7 @@ func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool { continue } - c.log.Debugf("successfully connected to %q", endpoint) + c.log.Infof("Successfully connected to %q", endpoint) categorizeConnection(ec) } @@ -224,16 +227,13 @@ func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool { clientsUpdatedOrder = append(clientsUpdatedOrder, outdatedConnections...) clientsUpdatedOrder = append(clientsUpdatedOrder, failingConnections...) - getEndpoints := func(clients []*ethConn) []string { - endpoints := make([]string, 0, len(clients)) - for _, c := range clients { - endpoints = append(endpoints, c.endpoint) - } - return endpoints + c.log.Tracef("Healthy connections: %v", healthyConnections) + if len(outdatedConnections) > 0 { + c.log.Warnf("Outdated connections: %v", outdatedConnections) + } + if len(failingConnections) > 0 { + c.log.Warnf("Failing connections: %v", failingConnections) } - c.log.Tracef("healthy connections: %v", getEndpoints(healthyConnections)) - c.log.Tracef("outdated connections: %v", getEndpoints(outdatedConnections)) - c.log.Tracef("failing connections: %v", getEndpoints(failingConnections)) c.clientsMtx.Lock() defer c.clientsMtx.Unlock() @@ -243,9 +243,40 @@ func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool { return len(healthyConnections) > 0 } -// monitorConnectionsHealth starts a goroutine that checks the health of all connections -// every 30 seconds. +// markConnectionAsFailed moves an connection to the end of the client list. +func (c *rpcclient) markConnectionAsFailed(endpoint string) { + c.clientsMtx.Lock() + defer c.clientsMtx.Unlock() + + var index int = -1 + for i, ec := range c.clients { + if ec.endpoint == endpoint { + index = i + break + } + } + if index == -1 { + c.log.Errorf("Failed to mark client as failed: %q not found", endpoint) + return + } + + updatedClients := make([]*ethConn, 0, len(c.clients)) + updatedClients = append(updatedClients, c.clients[:index]...) + updatedClients = append(updatedClients, c.clients[index+1:]...) + updatedClients = append(updatedClients, c.clients[index]) + + c.clients = updatedClients +} + +// monitorConnectionsHealth starts a goroutine that checks the health of all +// connections every 30 seconds. func (c *rpcclient) monitorConnectionsHealth(ctx context.Context) { + defer func() { + for _, ec := range c.clientsCopy() { + ec.Close() + } + }() + ticker := time.NewTicker(monitorConnectionsInterval) defer ticker.Stop() @@ -254,15 +285,15 @@ func (c *rpcclient) monitorConnectionsHealth(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - c.sortConnectionsByHealth(ctx) + if !c.sortConnectionsByHealth(ctx) { + c.log.Warnf("No healthy ETH RPC connections") + } } } } func (c *rpcclient) withClient(f func(ec *ethConn) error, haltOnNotFound ...bool) (err error) { - clients := c.clientsCopy() - - for _, ec := range clients { + for _, ec := range c.clientsCopy() { err = f(ec) if err == nil { return nil @@ -272,6 +303,7 @@ func (c *rpcclient) withClient(f func(ec *ethConn) error, haltOnNotFound ...bool } c.log.Errorf("Unpropagated error from %q: %v", ec.endpoint, err) + c.markConnectionAsFailed(ec.endpoint) } return fmt.Errorf("all providers failed. last error: %w", err) @@ -325,13 +357,6 @@ func (c *rpcclient) headerIsOutdated(hdr *types.Header) bool { return c.net != dex.Simnet && hdr.Time < uint64(time.Now().Add(-headerExpirationTime).Unix()) } -// shutdown shuts down the client. -func (c *rpcclient) shutdown() { - for _, ec := range c.clientsCopy() { - ec.Close() - } -} - func (c *rpcclient) loadToken(ctx context.Context, assetID uint32) error { c.tokensLoaded[assetID] = true @@ -345,7 +370,7 @@ func (c *rpcclient) loadToken(ctx context.Context, assetID uint32) error { return nil } -func (c *rpcclient) withTokener(ctx context.Context, assetID uint32, f func(*tokener) error) error { +func (c *rpcclient) withTokener(assetID uint32, f func(*tokener) error) error { return c.withClient(func(ec *ethConn) error { tkn, found := ec.tokens[assetID] if !found { @@ -360,10 +385,7 @@ func (c *rpcclient) withTokener(ctx context.Context, assetID uint32, f func(*tok func (c *rpcclient) bestHeader(ctx context.Context) (hdr *types.Header, err error) { return hdr, c.withClient(func(ec *ethConn) error { hdr, err = ec.HeaderByNumber(ctx, nil) - if err != nil { - return err - } - return nil + return err }) } @@ -408,7 +430,7 @@ func (c *rpcclient) swap(ctx context.Context, assetID uint32, secretHash [32]byt return err }) } - return state, c.withTokener(ctx, assetID, func(tkn *tokener) error { + return state, c.withTokener(assetID, func(tkn *tokener) error { state, err = tkn.Swap(ctx, secretHash) return err }) diff --git a/server/asset/eth/rpcclient_harness_test.go b/server/asset/eth/rpcclient_harness_test.go index e4d0a7e10f..4ddf60446d 100644 --- a/server/asset/eth/rpcclient_harness_test.go +++ b/server/asset/eth/rpcclient_harness_test.go @@ -59,7 +59,6 @@ func TestMain(m *testing.M) { ethClient = newRPCClient(dex.Simnet, []string{wsEndpoint, alphaIPCFile}, ethContractAddr, log) defer func() { cancel() - ethClient.shutdown() }() dexeth.ContractAddresses[0][dex.Simnet] = getContractAddrFromFile(contractAddrFile) @@ -199,16 +198,8 @@ func TestMonitorHealth(t *testing.T) { updatedClients := ethClient.clientsCopy() - getEndpoints := func(clients []*ethConn) []string { - endpoints := make([]string, 0, len(clients)) - for _, c := range clients { - endpoints = append(endpoints, c.endpoint) - } - return endpoints - } - - fmt.Println("Original clients:", getEndpoints(originalClients)) - fmt.Println("Updated clients:", getEndpoints(updatedClients)) + fmt.Println("Original clients:", originalClients) + fmt.Println("Updated clients:", updatedClients) if originalClients[0].endpoint != updatedClients[len(updatedClients)-1].endpoint { t.Fatalf("failing client was not moved to the end. got %s, expected %s", updatedClients[len(updatedClients)-1].endpoint, originalClients[0].endpoint)