diff --git a/server/asset/eth/cache.go b/server/asset/eth/cache.go deleted file mode 100644 index 99737f1637..0000000000 --- a/server/asset/eth/cache.go +++ /dev/null @@ -1,113 +0,0 @@ -// This code is available on the terms of the project LICENSE.md file, -// also available online at https://blueoakcouncil.org/license/1.0.0. - -//go:build lgpl -// +build lgpl - -package eth - -import ( - "context" - "fmt" - "sync" - - "decred.org/dcrdex/dex" - "decred.org/dcrdex/server/asset" - "github.com/ethereum/go-ethereum/common" -) - -type hashN struct { - height uint64 - hash common.Hash -} - -// The hashCache caches block information to detect reorgs and notify -// listeners of new blocks. All methods are concurrent safe unless specified -// otherwise. -type hashCache struct { - // signalMtx locks the blockChans array. - signalMtx *sync.RWMutex - blockChans map[chan *asset.BlockUpdate]struct{} - - mtx sync.RWMutex - best hashN - - log dex.Logger - node ethFetcher -} - -// poll pulls the best hash from an eth node and compares that to a stored -// hash. If the same does nothing. If different, updates the stored hash and -// notifies listeners on block chans. -func (hc *hashCache) poll(ctx context.Context) { - send := func(reorg bool, err error) { - if err != nil { - hc.log.Error(err) - } - hc.signalMtx.Lock() - for c := range hc.blockChans { - select { - case c <- &asset.BlockUpdate{ - Reorg: reorg, - Err: err, - }: - default: - hc.log.Error("failed to send block update on blocking channel") - } - } - hc.signalMtx.Unlock() - } - hc.mtx.Lock() - defer hc.mtx.Unlock() - bhdr, err := hc.node.bestHeader(ctx) - if err != nil { - send(false, fmt.Errorf("error getting best block header from geth: %w", err)) - return - } - if bhdr.Hash() == hc.best.hash { - // Same hash, nothing to do. - return - } - update := func(reorg bool, fastBlocks bool) { - hash := bhdr.Hash() - height := bhdr.Number.Uint64() - str := fmt.Sprintf("Tip change from %s (%d) to %s (%d).", - hc.best.hash, hc.best.height, hash, height) - switch { - case reorg: - str += " Detected reorg." - case fastBlocks: - str += " Fast blocks." - } - hc.log.Debug(str) - hc.best.hash = hash - hc.best.height = height - } - if bhdr.ParentHash == hc.best.hash { - // Sequential hash, report a block update. - update(false, false) - send(false, nil) - return - } - // Either a block was skipped or a reorg happened. We can only detect - // the reorg if our last best header's hash has changed. Otherwise, - // assume no reorg and report the new block change. - // - // headerByHeight will only return mainchain headers. - hdr, err := hc.node.headerByHeight(ctx, hc.best.height) - if err != nil { - send(false, fmt.Errorf("error getting block header from geth: %w", err)) - return - } - if hdr.Hash() == hc.best.hash { - // Our recorded hash is still on main chain so there is no reorg - // that we know of. The chain has advanced more than one block. - update(false, true) - send(false, nil) - return - } - // The block for our recorded hash was forked off and the chain had a - // reorganization. - update(true, false) - send(true, nil) -} diff --git a/server/asset/eth/cache_test.go b/server/asset/eth/cache_test.go deleted file mode 100644 index 3027b43afa..0000000000 --- a/server/asset/eth/cache_test.go +++ /dev/null @@ -1,113 +0,0 @@ -//go:build !harness && lgpl -// +build !harness,lgpl - -// These tests will not be run if the harness build tag is set. - -package eth - -import ( - "errors" - "math/big" - "sync" - "testing" - "time" - - "decred.org/dcrdex/server/asset" - "github.com/ethereum/go-ethereum/core/types" -) - -func TestPoll(t *testing.T) { - blkHdr := &types.Header{Number: big.NewInt(0)} - tests := []struct { - name string - bestHdr, hdrByHeight *types.Header - bestHdrErr, hdrByHeightErr error - wantErr, preventSend bool - }{{ - name: "ok nothing to do", - bestHdr: blkHdr, - }, { - name: "ok sequential", - bestHdr: &types.Header{ - Number: big.NewInt(1), - ParentHash: blkHdr.Hash(), - }, - }, { - name: "ok fast blocks", - bestHdr: &types.Header{ - Number: big.NewInt(1), - }, - hdrByHeight: blkHdr, - }, { - name: "ok reorg", - bestHdr: &types.Header{ - Number: big.NewInt(1), - }, - }, { - name: "ok but cannot send", - bestHdr: &types.Header{ - Number: big.NewInt(1), - ParentHash: blkHdr.Hash(), - }, - preventSend: true, - }, { - name: "best header error", - bestHdrErr: errors.New(""), - wantErr: true, - }, { - name: "header by height error", - bestHdr: &types.Header{ - Number: big.NewInt(1), - }, - hdrByHeightErr: errors.New(""), - wantErr: true, - }} - - for _, test := range tests { - node := &testNode{ - bestHdr: test.bestHdr, - bestHdrErr: test.bestHdrErr, - hdrByHeight: test.hdrByHeight, - hdrByHeightErr: test.hdrByHeightErr, - } - hc := &hashCache{ - log: tLogger, - signalMtx: new(sync.RWMutex), - blockChans: make(map[chan *asset.BlockUpdate]struct{}), - node: node, - best: hashN{ - hash: blkHdr.Hash(), - }, - } - chSize := 1 - if test.preventSend { - chSize = 0 - } - ch := make(chan *asset.BlockUpdate, chSize) - hc.blockChans[ch] = struct{}{} - bu := new(asset.BlockUpdate) - wait := make(chan struct{}) - go func() { - if test.preventSend { - close(wait) - return - } - select { - case bu = <-ch: - case <-time.After(time.Second * 2): - } - close(wait) - }() - hc.poll(nil) - <-wait - if test.wantErr { - if bu.Err == nil { - t.Fatalf("expected error for test %q", test.name) - } - continue - } - if bu.Err != nil { - t.Fatalf("unexpected error for test %q: %v", test.name, bu.Err) - } - } -} diff --git a/server/asset/eth/eth.go b/server/asset/eth/eth.go index e97c6fb6ab..6a527ec0e4 100644 --- a/server/asset/eth/eth.go +++ b/server/asset/eth/eth.go @@ -93,6 +93,11 @@ type ethFetcher interface { accountBalance(ctx context.Context, addr common.Address) (*big.Int, error) } +type hashN struct { + height uint64 + hash common.Hash +} + // Backend is an asset backend for Ethereum. It has methods for fetching output // information and subscribing to block updates. It maintains a cache of block // data for quick lookups. Backend implements asset.Backend, so provides @@ -104,12 +109,17 @@ type Backend struct { cancelRPCs context.CancelFunc cfg *config node ethFetcher + + // bestHash caches the last know best block hash and height and is used + // to detect reorgs. Only accessed in Connect and poll which is + // syncronous so no locking is needed presently. + bestHash hashN + // The backend provides block notification channels through the BlockChannel - // method. signalMtx locks the blockChans array. - signalMtx sync.RWMutex - blockChans map[chan *asset.BlockUpdate]struct{} - // The hash cache stores just enough info to detect reorgs. - hashCache *hashCache + // method. + blockChansMtx sync.RWMutex + blockChans map[chan *asset.BlockUpdate]struct{} + // A logger will be provided by the DEX. All logging should use the provided // logger. log dex.Logger @@ -176,20 +186,14 @@ func (eth *Backend) Connect(ctx context.Context) (*sync.WaitGroup, error) { } eth.node = &c - // Prime the cache with the best block hash. + // Prime the best block hash and height. hdr, err := c.bestHeader(ctx) if err != nil { return nil, fmt.Errorf("error getting best block header from geth: %w", err) } - eth.hashCache = &hashCache{ - log: eth.log.SubLogger("hashcache"), - node: &c, - signalMtx: ð.signalMtx, - blockChans: eth.blockChans, - best: hashN{ - height: hdr.Number.Uint64(), - hash: hdr.Hash(), - }, + eth.bestHash = hashN{ + height: hdr.Number.Uint64(), + hash: hdr.Hash(), } var wg sync.WaitGroup @@ -243,8 +247,8 @@ func (eth *Backend) FeeRate(ctx context.Context) (uint64, error) { // logged from the eth package. Part of the asset.Backend interface. func (eth *Backend) BlockChannel(size int) <-chan *asset.BlockUpdate { c := make(chan *asset.BlockUpdate, size) - eth.signalMtx.Lock() - defer eth.signalMtx.Unlock() + eth.blockChansMtx.Lock() + defer eth.blockChansMtx.Unlock() eth.blockChans[c] = struct{}{} return c } @@ -360,6 +364,81 @@ func (eth *Backend) AccountBalance(addrStr string) (uint64, error) { return dexeth.ToGwei(bigBal) } +// poll pulls the best hash from an eth node and compares that to a stored +// hash. If the same does nothing. If different, updates the stored hash and +// notifies listeners on block chans. +func (eth *Backend) poll(ctx context.Context) { + best := ð.bestHash + send := func(reorg bool, err error) { + if err != nil { + eth.log.Error(err) + } + eth.blockChansMtx.RLock() + for c := range eth.blockChans { + select { + case c <- &asset.BlockUpdate{ + Reorg: reorg, + Err: err, + }: + default: + eth.log.Error("failed to send block update on blocking channel") + } + } + eth.blockChansMtx.RUnlock() + } + bhdr, err := eth.node.bestHeader(ctx) + if err != nil { + send(false, fmt.Errorf("error getting best block header from geth: %w", err)) + return + } + if bhdr.Hash() == best.hash { + // Same hash, nothing to do. + return + } + update := func(reorg bool, fastBlocks bool) { + hash := bhdr.Hash() + height := bhdr.Number.Uint64() + str := fmt.Sprintf("Tip change from %s (%d) to %s (%d).", + best.hash, best.height, hash, height) + switch { + case reorg: + str += " Detected reorg." + case fastBlocks: + str += " Fast blocks." + } + eth.log.Debug(str) + best.hash = hash + best.height = height + } + if bhdr.ParentHash == best.hash { + // Sequential hash, report a block update. + update(false, false) + send(false, nil) + return + } + // Either a block was skipped or a reorg happened. We can only detect + // the reorg if our last best header's hash has changed. Otherwise, + // assume no reorg and report the new block change. + // + // headerByHeight will only return mainchain headers. + hdr, err := eth.node.headerByHeight(ctx, best.height) + if err != nil { + send(false, fmt.Errorf("error getting block header from geth: %w", err)) + return + } + if hdr.Hash() == best.hash { + // Our recorded hash is still on main chain so there is no reorg + // that we know of. The chain has advanced more than one block. + update(false, true) + send(false, nil) + return + } + // The block for our recorded hash was forked off and the chain had a + // reorganization. + update(true, false) + send(true, nil) +} + // run processes the queue and monitors the application context. func (eth *Backend) run(ctx context.Context) { done := ctx.Done() @@ -379,7 +458,7 @@ out: for { select { case <-blockPoll.C: - eth.hashCache.poll(ctx) + eth.poll(ctx) case <-done: break out } diff --git a/server/asset/eth/eth_test.go b/server/asset/eth/eth_test.go index 7a4a94c2fa..10aeb18592 100644 --- a/server/asset/eth/eth_test.go +++ b/server/asset/eth/eth_test.go @@ -21,6 +21,7 @@ import ( "decred.org/dcrdex/dex/encode" dexeth "decred.org/dcrdex/dex/networks/eth" swapv0 "decred.org/dcrdex/dex/networks/eth/contracts/v0" + "decred.org/dcrdex/server/asset" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -200,7 +201,7 @@ func TestLoad(t *testing.T) { func TestDecodeCoinID(t *testing.T) { drv := &Driver{} txid := "0x1b86600b740d58ecc06eda8eba1c941c7ba3d285c78be89b56678da146ed53d1" - txHashB := mustDecodeHex("1b86600b740d58ecc06eda8eba1c941c7ba3d285c78be89b56678da146ed53d1") + txHashB := mustParseHex("1b86600b740d58ecc06eda8eba1c941c7ba3d285c78be89b56678da146ed53d1") type test struct { name string @@ -247,12 +248,6 @@ func TestRun(t *testing.T) { backend.node = &testNode{ bestHdr: &types.Header{Number: big.NewInt(1)}, } - backend.hashCache = &hashCache{ - log: tLogger, - signalMtx: &backend.signalMtx, - blockChans: backend.blockChans, - node: backend.node, - } ch := backend.BlockChannel(1) go func() { select { @@ -724,10 +719,97 @@ func TestAccountBalance(t *testing.T) { } } -func mustDecodeHex(s string) []byte { - b, err := hex.DecodeString(s) - if err != nil { - panic("mustDecodeHex: " + err.Error()) +func TestPoll(t *testing.T) { + blkHdr := &types.Header{Number: big.NewInt(0)} + tests := []struct { + name string + bestHdr, hdrByHeight *types.Header + bestHdrErr, hdrByHeightErr error + wantErr, preventSend bool + }{{ + name: "ok nothing to do", + bestHdr: blkHdr, + }, { + name: "ok sequential", + bestHdr: &types.Header{ + Number: big.NewInt(1), + ParentHash: blkHdr.Hash(), + }, + }, { + name: "ok fast blocks", + bestHdr: &types.Header{ + Number: big.NewInt(1), + }, + hdrByHeight: blkHdr, + }, { + name: "ok reorg", + bestHdr: &types.Header{ + Number: big.NewInt(1), + }, + }, { + name: "ok but cannot send", + bestHdr: &types.Header{ + Number: big.NewInt(1), + ParentHash: blkHdr.Hash(), + }, + preventSend: true, + }, { + name: "best header error", + bestHdrErr: errors.New(""), + wantErr: true, + }, { + name: "header by height error", + bestHdr: &types.Header{ + Number: big.NewInt(1), + }, + hdrByHeightErr: errors.New(""), + wantErr: true, + }} + + for _, test := range tests { + node := &testNode{ + bestHdr: test.bestHdr, + bestHdrErr: test.bestHdrErr, + hdrByHeight: test.hdrByHeight, + hdrByHeightErr: test.hdrByHeightErr, + } + eth := &Backend{ + log: tLogger, + blockChans: make(map[chan *asset.BlockUpdate]struct{}), + node: node, + bestHash: hashN{ + hash: blkHdr.Hash(), + }, + } + chSize := 1 + if test.preventSend { + chSize = 0 + } + ch := make(chan *asset.BlockUpdate, chSize) + eth.blockChans[ch] = struct{}{} + bu := new(asset.BlockUpdate) + wait := make(chan struct{}) + go func() { + if test.preventSend { + close(wait) + return + } + select { + case bu = <-ch: + case <-time.After(time.Second * 2): + } + close(wait) + }() + eth.poll(nil) + <-wait + if test.wantErr { + if bu.Err == nil { + t.Fatalf("expected error for test %q", test.name) + } + continue + } + if bu.Err != nil { + t.Fatalf("unexpected error for test %q: %v", test.name, bu.Err) + } } - return b }