Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/eth: Monitor RPC provider health #2125

Merged
merged 6 commits into from Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions dex/testing/dcrdex/harness.sh
Expand Up @@ -217,16 +217,17 @@ EOF
fi

if [ $ETH_ON -eq 0 ]; then

ETH_CONFIG_PATH=${TEST_ROOT}/eth.conf
ETH_IPC_FILE=${TEST_ROOT}/eth/alpha/node/geth.ipc
cat << EOF >> $ETH_CONFIG_PATH

cat > $ETH_CONFIG_PATH <<EOF
ws://localhost:38557
# comments are respected
# http://localhost:38556
${ETH_IPC_FILE}
EOF
cat << EOF >> "./markets.json"

cat << EOF >> "./markets.json"
},
"ETH_simnet": {
"bip44symbol": "eth",
Expand Down
32 changes: 21 additions & 11 deletions server/asset/eth/eth.go
Expand Up @@ -164,7 +164,6 @@ type ethFetcher interface {
blockNumber(ctx context.Context) (uint64, error)
headerByHeight(ctx context.Context, height uint64) (*types.Header, error)
connect(ctx context.Context) error
shutdown()
suggestGasTipCap(ctx context.Context) (*big.Int, error)
syncProgress(ctx context.Context) (*ethereum.SyncProgress, error)
transaction(ctx context.Context, hash common.Hash) (tx *types.Transaction, isMempool bool, err error)
Expand Down Expand Up @@ -281,12 +280,14 @@ func NewBackend(configPath string, log dex.Logger, net dex.Network) (*ETHBackend
defer file.Close()

var endpoints []string
endpointsMap := make(map[string]bool) // to avoid duplicates
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := strings.Trim(scanner.Text(), " ")
if line == "" || strings.HasPrefix(line, "#") {
if line == "" || strings.HasPrefix(line, "#") || strings.HasPrefix(line, ";") || endpointsMap[line] {
continue
}
endpointsMap[line] = true
endpoints = append(endpoints, line)
}
if err := scanner.Err(); err != nil {
Expand All @@ -301,25 +302,36 @@ func NewBackend(configPath string, log dex.Logger, net dex.Network) (*ETHBackend
if err != nil {
return nil, err
}
eth.node = newRPCClient(eth.net, endpoints, log.SubLogger("RPC"))
return eth, nil
}

func (eth *baseBackend) shutdown() {
eth.node.shutdown()
netAddrs, found := dexeth.ContractAddresses[ethContractVersion]
if !found {
return nil, fmt.Errorf("no contract address for eth version %d", ethContractVersion)
}
ethContractAddr, found := netAddrs[eth.net]
if !found {
return nil, fmt.Errorf("no contract address for eth version %d on %s", ethContractVersion, eth.net)
}

eth.node = newRPCClient(eth.net, endpoints, ethContractAddr, log.SubLogger("RPC"))
return eth, nil
}

// Connect connects to the node RPC server and initializes some variables.
func (eth *ETHBackend) Connect(ctx context.Context) (*sync.WaitGroup, error) {
eth.baseBackend.ctx = ctx

if err := eth.node.connect(ctx); err != nil {
// Create a separate context for the node so that it will only be cancelled
// after the ETHBackend's run method has returned.
nodeContext, cancelNodeContext := context.WithCancel(context.Background())
if err := eth.node.connect(nodeContext); err != nil {
cancelNodeContext()
return nil, err
}

// Prime the best block hash and height.
bn, err := eth.node.blockNumber(ctx)
if err != nil {
cancelNodeContext()
return nil, fmt.Errorf("error getting best block header from geth: %w", err)
}
eth.baseBackend.bestHeight = bn
Expand All @@ -328,6 +340,7 @@ func (eth *ETHBackend) Connect(ctx context.Context) (*sync.WaitGroup, error) {
wg.Add(1)
go func() {
eth.run(ctx)
cancelNodeContext()
wg.Done()
}()
return &wg, nil
Expand Down Expand Up @@ -716,9 +729,6 @@ func (eth *ETHBackend) poll(ctx context.Context) {

// run processes the queue and monitors the application context.
func (eth *ETHBackend) run(ctx context.Context) {
// Shut down the RPC client on ctx.Done().
defer eth.shutdown()

blockPoll := time.NewTicker(blockPollInterval)
defer blockPoll.Stop()

Expand Down