Skip to content

Commit

Permalink
Chappjc and Buck review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
martonp committed Feb 16, 2023
1 parent 73f725e commit efae8cb
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 55 deletions.
18 changes: 8 additions & 10 deletions server/asset/eth/eth.go
Expand Up @@ -164,7 +164,6 @@ type ethFetcher interface {
blockNumber(ctx context.Context) (uint64, error)
headerByHeight(ctx context.Context, height uint64) (*types.Header, error)
connect(ctx context.Context) error
shutdown()
suggestGasTipCap(ctx context.Context) (*big.Int, error)
syncProgress(ctx context.Context) (*ethereum.SyncProgress, error)
transaction(ctx context.Context, hash common.Hash) (tx *types.Transaction, isMempool bool, err error)
Expand Down Expand Up @@ -285,7 +284,7 @@ func NewBackend(configPath string, log dex.Logger, net dex.Network) (*ETHBackend
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := strings.Trim(scanner.Text(), " ")
if line == "" || strings.HasPrefix(line, "#") || endpointsMap[line] {
if line == "" || strings.HasPrefix(line, "#") || strings.HasPrefix(line, ";") || endpointsMap[line] {
continue
}
endpointsMap[line] = true
Expand Down Expand Up @@ -317,21 +316,22 @@ func NewBackend(configPath string, log dex.Logger, net dex.Network) (*ETHBackend
return eth, nil
}

func (eth *baseBackend) shutdown() {
eth.node.shutdown()
}

// Connect connects to the node RPC server and initializes some variables.
func (eth *ETHBackend) Connect(ctx context.Context) (*sync.WaitGroup, error) {
eth.baseBackend.ctx = ctx

if err := eth.node.connect(ctx); err != nil {
// Create a separate context for the node so that it will only be cancelled
// after the ETHBackend's run method has returned.
nodeContext, cancelNodeContext := context.WithCancel(context.Background())
if err := eth.node.connect(nodeContext); err != nil {
cancelNodeContext()
return nil, err
}

// Prime the best block hash and height.
bn, err := eth.node.blockNumber(ctx)
if err != nil {
cancelNodeContext()
return nil, fmt.Errorf("error getting best block header from geth: %w", err)
}
eth.baseBackend.bestHeight = bn
Expand All @@ -340,6 +340,7 @@ func (eth *ETHBackend) Connect(ctx context.Context) (*sync.WaitGroup, error) {
wg.Add(1)
go func() {
eth.run(ctx)
cancelNodeContext()
wg.Done()
}()
return &wg, nil
Expand Down Expand Up @@ -728,9 +729,6 @@ func (eth *ETHBackend) poll(ctx context.Context) {

// run processes the queue and monitors the application context.
func (eth *ETHBackend) run(ctx context.Context) {
// Shut down the RPC client on ctx.Done().
defer eth.shutdown()

blockPoll := time.NewTicker(blockPollInterval)
defer blockPoll.Stop()

Expand Down
90 changes: 56 additions & 34 deletions server/asset/eth/rpcclient.go
Expand Up @@ -55,6 +55,10 @@ type ethConn struct {
txPoolSupported bool
}

func (ec *ethConn) String() string {
return ec.endpoint
}

type rpcclient struct {
net dex.Network
log dex.Logger
Expand Down Expand Up @@ -159,8 +163,10 @@ func (c *rpcclient) checkConnectionStatus(ctx context.Context, conn *ethConn) co
}

if c.headerIsOutdated(hdr) {
c.log.Warnf("header fetched from %q appears to be outdated (time %s). If you continue to see this message, you might need to check your system clock",
conn.endpoint, time.Unix(int64(hdr.Time), 0))
hdrTime := time.Unix(int64(hdr.Time), 0)
c.log.Warnf("header fetched from %q appears to be outdated (time %s is %v old). "+
"If you continue to see this message, you might need to check your system clock",
conn.endpoint, hdrTime, time.Since(hdrTime))
return outdated
}

Expand All @@ -175,8 +181,6 @@ func (c *rpcclient) checkConnectionStatus(ctx context.Context, conn *ethConn) co
// never been successfully connection will be checked. True is returned if
// there is at least one healthy connection.
func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool {
c.log.Tracef("sorting connections by health counter = %d", c.healthCheckCounter)

clients := c.clientsCopy()

healthyConnections := make([]*ethConn, 0, len(clients))
Expand All @@ -200,7 +204,6 @@ func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool {
}

if c.healthCheckCounter == 0 && len(c.neverConnectedEndpoints) > 0 {
c.log.Tracef("number of never connected endpoints: %d", len(c.neverConnectedEndpoints))
stillUnconnectedEndpoints := make([]string, 0, len(c.neverConnectedEndpoints))

for _, endpoint := range c.neverConnectedEndpoints {
Expand All @@ -211,7 +214,7 @@ func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool {
continue
}

c.log.Debugf("successfully connected to %q", endpoint)
c.log.Infof("Successfully connected to %q", endpoint)

categorizeConnection(ec)
}
Expand All @@ -224,16 +227,13 @@ func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool {
clientsUpdatedOrder = append(clientsUpdatedOrder, outdatedConnections...)
clientsUpdatedOrder = append(clientsUpdatedOrder, failingConnections...)

getEndpoints := func(clients []*ethConn) []string {
endpoints := make([]string, 0, len(clients))
for _, c := range clients {
endpoints = append(endpoints, c.endpoint)
}
return endpoints
c.log.Tracef("Healthy connections: %v", healthyConnections)
if len(outdatedConnections) > 0 {
c.log.Warnf("Outdated connections: %v", outdatedConnections)
}
if len(failingConnections) > 0 {
c.log.Warnf("Failing connections: %v", failingConnections)
}
c.log.Tracef("healthy connections: %v", getEndpoints(healthyConnections))
c.log.Tracef("outdated connections: %v", getEndpoints(outdatedConnections))
c.log.Tracef("failing connections: %v", getEndpoints(failingConnections))

c.clientsMtx.Lock()
defer c.clientsMtx.Unlock()
Expand All @@ -243,9 +243,40 @@ func (c *rpcclient) sortConnectionsByHealth(ctx context.Context) bool {
return len(healthyConnections) > 0
}

// monitorConnectionsHealth starts a goroutine that checks the health of all connections
// every 30 seconds.
// markConnectionAsFailed moves an connection to the end of the client list.
func (c *rpcclient) markConnectionAsFailed(endpoint string) {
c.clientsMtx.Lock()
defer c.clientsMtx.Unlock()

var index int = -1
for i, ec := range c.clients {
if ec.endpoint == endpoint {
index = i
break
}
}
if index == -1 {
c.log.Errorf("Failed to mark client as failed: %q not found", endpoint)
return
}

updatedClients := make([]*ethConn, 0, len(c.clients))
updatedClients = append(updatedClients, c.clients[:index]...)
updatedClients = append(updatedClients, c.clients[index+1:]...)
updatedClients = append(updatedClients, c.clients[index])

c.clients = updatedClients
}

// monitorConnectionsHealth starts a goroutine that checks the health of all
// connections every 30 seconds.
func (c *rpcclient) monitorConnectionsHealth(ctx context.Context) {
defer func() {
for _, ec := range c.clientsCopy() {
ec.Close()
}
}()

ticker := time.NewTicker(monitorConnectionsInterval)
defer ticker.Stop()

Expand All @@ -254,15 +285,15 @@ func (c *rpcclient) monitorConnectionsHealth(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
c.sortConnectionsByHealth(ctx)
if !c.sortConnectionsByHealth(ctx) {
c.log.Warnf("No healthy ETH RPC connections")
}
}
}
}

func (c *rpcclient) withClient(f func(ec *ethConn) error, haltOnNotFound ...bool) (err error) {
clients := c.clientsCopy()

for _, ec := range clients {
for _, ec := range c.clientsCopy() {
err = f(ec)
if err == nil {
return nil
Expand All @@ -272,6 +303,7 @@ func (c *rpcclient) withClient(f func(ec *ethConn) error, haltOnNotFound ...bool
}

c.log.Errorf("Unpropagated error from %q: %v", ec.endpoint, err)
c.markConnectionAsFailed(ec.endpoint)
}

return fmt.Errorf("all providers failed. last error: %w", err)
Expand Down Expand Up @@ -325,13 +357,6 @@ func (c *rpcclient) headerIsOutdated(hdr *types.Header) bool {
return c.net != dex.Simnet && hdr.Time < uint64(time.Now().Add(-headerExpirationTime).Unix())
}

// shutdown shuts down the client.
func (c *rpcclient) shutdown() {
for _, ec := range c.clientsCopy() {
ec.Close()
}
}

func (c *rpcclient) loadToken(ctx context.Context, assetID uint32) error {
c.tokensLoaded[assetID] = true

Expand All @@ -345,7 +370,7 @@ func (c *rpcclient) loadToken(ctx context.Context, assetID uint32) error {
return nil
}

func (c *rpcclient) withTokener(ctx context.Context, assetID uint32, f func(*tokener) error) error {
func (c *rpcclient) withTokener(assetID uint32, f func(*tokener) error) error {
return c.withClient(func(ec *ethConn) error {
tkn, found := ec.tokens[assetID]
if !found {
Expand All @@ -360,10 +385,7 @@ func (c *rpcclient) withTokener(ctx context.Context, assetID uint32, f func(*tok
func (c *rpcclient) bestHeader(ctx context.Context) (hdr *types.Header, err error) {
return hdr, c.withClient(func(ec *ethConn) error {
hdr, err = ec.HeaderByNumber(ctx, nil)
if err != nil {
return err
}
return nil
return err
})
}

Expand Down Expand Up @@ -408,7 +430,7 @@ func (c *rpcclient) swap(ctx context.Context, assetID uint32, secretHash [32]byt
return err
})
}
return state, c.withTokener(ctx, assetID, func(tkn *tokener) error {
return state, c.withTokener(assetID, func(tkn *tokener) error {
state, err = tkn.Swap(ctx, secretHash)
return err
})
Expand Down
13 changes: 2 additions & 11 deletions server/asset/eth/rpcclient_harness_test.go
Expand Up @@ -59,7 +59,6 @@ func TestMain(m *testing.M) {
ethClient = newRPCClient(dex.Simnet, []string{wsEndpoint, alphaIPCFile}, ethContractAddr, log)
defer func() {
cancel()
ethClient.shutdown()
}()

dexeth.ContractAddresses[0][dex.Simnet] = getContractAddrFromFile(contractAddrFile)
Expand Down Expand Up @@ -199,16 +198,8 @@ func TestMonitorHealth(t *testing.T) {

updatedClients := ethClient.clientsCopy()

getEndpoints := func(clients []*ethConn) []string {
endpoints := make([]string, 0, len(clients))
for _, c := range clients {
endpoints = append(endpoints, c.endpoint)
}
return endpoints
}

fmt.Println("Original clients:", getEndpoints(originalClients))
fmt.Println("Updated clients:", getEndpoints(updatedClients))
fmt.Println("Original clients:", originalClients)
fmt.Println("Updated clients:", updatedClients)

if originalClients[0].endpoint != updatedClients[len(updatedClients)-1].endpoint {
t.Fatalf("failing client was not moved to the end. got %s, expected %s", updatedClients[len(updatedClients)-1].endpoint, originalClients[0].endpoint)
Expand Down

0 comments on commit efae8cb

Please sign in to comment.