Skip to content

Commit

Permalink
Iniate block cache in Connect.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeGruffins committed Dec 6, 2021
1 parent 8f6a091 commit e2ee2b2
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 97 deletions.
35 changes: 1 addition & 34 deletions server/asset/eth/cache.go
Expand Up @@ -26,7 +26,7 @@ type hashN struct {
// otherwise.
type hashCache struct {
// signalMtx locks the blockChans array.
signalMtx sync.RWMutex
signalMtx *sync.RWMutex
blockChans map[chan *asset.BlockUpdate]struct{}

mtx sync.RWMutex
Expand All @@ -36,39 +36,6 @@ type hashCache struct {
node ethFetcher
}

// Constructor for a hashCache. Prime before use.
func newHashCache(logger dex.Logger) *hashCache {
return &hashCache{
log: logger,
blockChans: make(map[chan *asset.BlockUpdate]struct{}),
}
}

// blockChannel returns a new block channel that will be sent block updates
// when a new block is added to the chain and noticed.
func (hc *hashCache) blockChannel(size int) <-chan *asset.BlockUpdate {
c := make(chan *asset.BlockUpdate, size)
hc.signalMtx.Lock()
defer hc.signalMtx.Unlock()
hc.blockChans[c] = struct{}{}
return c
}

// prime should be run once before use. Sets the best hash and node. Not
// concurrent safe.
func (hc *hashCache) prime(ctx context.Context, node ethFetcher) error {
hdr, err := node.bestHeader(ctx)
if err != nil {
return fmt.Errorf("error getting best block header from geth: %w", err)
}
hc.best = hashN{
height: hdr.Number.Uint64(),
hash: hdr.Hash(),
}
hc.node = node
return nil
}

// poll pulls the best hash from an eth node and compares that to a stored
// hash. If the same does nothing. If different, updates the stored hash and
// notifies listeners on block chans.
Expand Down
58 changes: 13 additions & 45 deletions server/asset/eth/cache_test.go
Expand Up @@ -8,47 +8,14 @@ package eth
import (
"errors"
"math/big"
"sync"
"testing"
"time"

"decred.org/dcrdex/server/asset"
"github.com/ethereum/go-ethereum/core/types"
)

func TestPrime(t *testing.T) {
tests := []struct {
name string
bestHdr *types.Header
bestHdrErr error
wantErr bool
}{{
name: "ok",
bestHdr: &types.Header{Number: big.NewInt(0)},
}, {
name: "best header error",
bestHdrErr: errors.New(""),
wantErr: true,
}}

for _, test := range tests {
hc := newHashCache(tLogger)
node := &testNode{
bestHdr: test.bestHdr,
bestHdrErr: test.bestHdrErr,
}
err := hc.prime(nil, node)
if test.wantErr {
if err == nil {
t.Fatalf("expected error for test %q", test.name)
}
continue
}
if err != nil {
t.Fatalf("unexpected error for test %q: %v", test.name, err)
}
}
}

func TestPoll(t *testing.T) {
blkHdr := &types.Header{Number: big.NewInt(0)}
tests := []struct {
Expand Down Expand Up @@ -97,26 +64,27 @@ func TestPoll(t *testing.T) {
}}

for _, test := range tests {
hc := newHashCache(tLogger)
node := &testNode{
bestHdr: new(types.Header),
bestHdr: test.bestHdr,
bestHdrErr: test.bestHdrErr,
hdrByHeight: test.hdrByHeight,
hdrByHeightErr: test.hdrByHeightErr,
}
*node.bestHdr = *blkHdr
err := hc.prime(nil, node)
if err != nil {
t.Fatalf("unexpected error for test %q: %v", test.name, err)
}
if test.bestHdr != nil {
*node.bestHdr = *test.bestHdr
hc := &hashCache{
log: tLogger,
signalMtx: new(sync.RWMutex),
blockChans: make(map[chan *asset.BlockUpdate]struct{}),
node: node,
best: hashN{
hash: blkHdr.Hash(),
},
}
node.bestHdrErr = test.bestHdrErr
chSize := 1
if test.preventSend {
chSize = 0
}
ch := hc.blockChannel(chSize)
ch := make(chan *asset.BlockUpdate, chSize)
hc.blockChans[ch] = struct{}{}
bu := new(asset.BlockUpdate)
wait := make(chan struct{})
go func() {
Expand Down
28 changes: 24 additions & 4 deletions server/asset/eth/eth.go
Expand Up @@ -103,6 +103,10 @@ type Backend struct {
cancelRPCs context.CancelFunc
cfg *config
node ethFetcher
// The backend provides block notification channels through the BlockChannel
// method. signalMtx locks the blockChans array.
signalMtx sync.RWMutex
blockChans map[chan *asset.BlockUpdate]struct{}
// The hash cache stores just enough info to detect reorgs.
hashCache *hashCache
// A logger will be provided by the DEX. All logging should use the provided
Expand Down Expand Up @@ -142,8 +146,8 @@ func unconnectedETH(logger dex.Logger, cfg *config) *Backend {
rpcCtx: ctx,
cancelRPCs: cancel,
cfg: cfg,
hashCache: newHashCache(logger),
log: logger,
blockChans: make(map[chan *asset.BlockUpdate]struct{}),
contractAddr: contractAddr,
initTxSize: uint32(dexeth.InitGas(1, version)),
}
Expand Down Expand Up @@ -172,9 +176,21 @@ func (eth *Backend) Connect(ctx context.Context) (*sync.WaitGroup, error) {
eth.node = &c

// Prime the cache with the best block hash.
if err := eth.hashCache.prime(ctx, &c); err != nil {
eth.log.Errorf("error priming the best block hash: %v", err)
hdr, err := c.bestHeader(ctx)
if err != nil {
return nil, fmt.Errorf("error getting best block header from geth: %w", err)
}
hc := &hashCache{
log: eth.log.SubLogger("hashcache"),
node: &c,
signalMtx: &eth.signalMtx,
blockChans: eth.blockChans,
best: hashN{
height: hdr.Number.Uint64(),
hash: hdr.Hash(),
},
}
eth.hashCache = hc

var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -226,7 +242,11 @@ func (eth *Backend) FeeRate(ctx context.Context) (uint64, error) {
// updates. If the returned channel is ever blocking, there will be no error
// logged from the eth package. Part of the asset.Backend interface.
func (eth *Backend) BlockChannel(size int) <-chan *asset.BlockUpdate {
return eth.hashCache.blockChannel(size)
c := make(chan *asset.BlockUpdate, size)
eth.signalMtx.Lock()
defer eth.signalMtx.Unlock()
eth.blockChans[c] = struct{}{}
return c
}

// Contract is part of the asset.Backend interface.
Expand Down
35 changes: 21 additions & 14 deletions server/asset/eth/eth_test.go
Expand Up @@ -214,25 +214,32 @@ func TestDecodeCoinID(t *testing.T) {
func TestRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
backend := unconnectedETH(tLogger, new(config))
ch := backend.BlockChannel(1)
node := &testNode{
bestHdr: &types.Header{Number: big.NewInt(0)},
backend.node = &testNode{
bestHdr: &types.Header{Number: big.NewInt(1)},
}
backend.node = node
backend.hashCache.prime(ctx, node)
*node = testNode{
bestHdr: &types.Header{
Number: big.NewInt(1),
},
backend.hashCache = &hashCache{
log: tLogger,
signalMtx: &backend.signalMtx,
blockChans: backend.blockChans,
node: backend.node,
}
ch := backend.BlockChannel(1)
go func() {
<-ch
cancel()
select {
case <-ch:
cancel()
case <-time.After(blockPollInterval * 2):
}
}()
backend.run(ctx)
// This timing out indicates failure. run should call a block update
// after the pollInterval and we have the listening function cancel the
// context above.
// Ok if ctx was canceled above. Linters complain about calling t.Fatal
// in the goroutine above.
select {
case <-ctx.Done():
return
default:
t.Fatal("test timeout")
}
}

func TestFeeRate(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions server/asset/eth/rpcclient.go
Expand Up @@ -33,6 +33,8 @@ type rpcclient struct {
ec *ethclient.Client
// es is a wrapper for contract calls.
es *swapv0.ETHSwap
// c is a direct client for raw calls.
c *rpc.Client
}

// connect connects to an ipc socket. It then wraps ethclient's client and
Expand All @@ -47,6 +49,7 @@ func (c *rpcclient) connect(ctx context.Context, IPC string, contractAddr *commo
if err != nil {
return fmt.Errorf("unable to find swap contract: %v", err)
}
c.c = client
return nil
}

Expand Down

0 comments on commit e2ee2b2

Please sign in to comment.