Skip to content

Commit

Permalink
Remove hashCache type.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeGruffins committed Dec 14, 2021
1 parent 615cdd5 commit 6b4a2e8
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 256 deletions.
113 changes: 0 additions & 113 deletions server/asset/eth/cache.go

This file was deleted.

113 changes: 0 additions & 113 deletions server/asset/eth/cache_test.go

This file was deleted.

115 changes: 97 additions & 18 deletions server/asset/eth/eth.go
Expand Up @@ -93,6 +93,11 @@ type ethFetcher interface {
accountBalance(ctx context.Context, addr common.Address) (*big.Int, error)
}

type hashN struct {
height uint64
hash common.Hash
}

// Backend is an asset backend for Ethereum. It has methods for fetching output
// information and subscribing to block updates. It maintains a cache of block
// data for quick lookups. Backend implements asset.Backend, so provides
Expand All @@ -104,12 +109,17 @@ type Backend struct {
cancelRPCs context.CancelFunc
cfg *config
node ethFetcher

// bestHash caches the last know best block hash and height and is used
// to detect reorgs. Only accessed in Connect and poll which is
// syncronous so no locking is needed presently.
bestHash hashN

// The backend provides block notification channels through the BlockChannel
// method. signalMtx locks the blockChans array.
signalMtx sync.RWMutex
blockChans map[chan *asset.BlockUpdate]struct{}
// The hash cache stores just enough info to detect reorgs.
hashCache *hashCache
// method.
blockChansMtx sync.RWMutex
blockChans map[chan *asset.BlockUpdate]struct{}

// A logger will be provided by the DEX. All logging should use the provided
// logger.
log dex.Logger
Expand Down Expand Up @@ -176,20 +186,14 @@ func (eth *Backend) Connect(ctx context.Context) (*sync.WaitGroup, error) {
}
eth.node = &c

// Prime the cache with the best block hash.
// Prime the best block hash and height.
hdr, err := c.bestHeader(ctx)
if err != nil {
return nil, fmt.Errorf("error getting best block header from geth: %w", err)
}
eth.hashCache = &hashCache{
log: eth.log.SubLogger("hashcache"),
node: &c,
signalMtx: &eth.signalMtx,
blockChans: eth.blockChans,
best: hashN{
height: hdr.Number.Uint64(),
hash: hdr.Hash(),
},
eth.bestHash = hashN{
height: hdr.Number.Uint64(),
hash: hdr.Hash(),
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -243,8 +247,8 @@ func (eth *Backend) FeeRate(ctx context.Context) (uint64, error) {
// logged from the eth package. Part of the asset.Backend interface.
func (eth *Backend) BlockChannel(size int) <-chan *asset.BlockUpdate {
c := make(chan *asset.BlockUpdate, size)
eth.signalMtx.Lock()
defer eth.signalMtx.Unlock()
eth.blockChansMtx.Lock()
defer eth.blockChansMtx.Unlock()
eth.blockChans[c] = struct{}{}
return c
}
Expand Down Expand Up @@ -360,6 +364,81 @@ func (eth *Backend) AccountBalance(addrStr string) (uint64, error) {
return dexeth.ToGwei(bigBal)
}

// poll pulls the best hash from an eth node and compares that to a stored
// hash. If the same does nothing. If different, updates the stored hash and
// notifies listeners on block chans.
func (eth *Backend) poll(ctx context.Context) {
best := &eth.bestHash
send := func(reorg bool, err error) {
if err != nil {
eth.log.Error(err)
}
eth.blockChansMtx.RLock()
for c := range eth.blockChans {
select {
case c <- &asset.BlockUpdate{
Reorg: reorg,
Err: err,
}:
default:
eth.log.Error("failed to send block update on blocking channel")
}
}
eth.blockChansMtx.RUnlock()
}
bhdr, err := eth.node.bestHeader(ctx)
if err != nil {
send(false, fmt.Errorf("error getting best block header from geth: %w", err))
return
}
if bhdr.Hash() == best.hash {
// Same hash, nothing to do.
return
}
update := func(reorg bool, fastBlocks bool) {
hash := bhdr.Hash()
height := bhdr.Number.Uint64()
str := fmt.Sprintf("Tip change from %s (%d) to %s (%d).",
best.hash, best.height, hash, height)
switch {
case reorg:
str += " Detected reorg."
case fastBlocks:
str += " Fast blocks."
}
eth.log.Debug(str)
best.hash = hash
best.height = height
}
if bhdr.ParentHash == best.hash {
// Sequential hash, report a block update.
update(false, false)
send(false, nil)
return
}
// Either a block was skipped or a reorg happened. We can only detect
// the reorg if our last best header's hash has changed. Otherwise,
// assume no reorg and report the new block change.
//
// headerByHeight will only return mainchain headers.
hdr, err := eth.node.headerByHeight(ctx, best.height)
if err != nil {
send(false, fmt.Errorf("error getting block header from geth: %w", err))
return
}
if hdr.Hash() == best.hash {
// Our recorded hash is still on main chain so there is no reorg
// that we know of. The chain has advanced more than one block.
update(false, true)
send(false, nil)
return
}
// The block for our recorded hash was forked off and the chain had a
// reorganization.
update(true, false)
send(true, nil)
}

// run processes the queue and monitors the application context.
func (eth *Backend) run(ctx context.Context) {
done := ctx.Done()
Expand All @@ -379,7 +458,7 @@ out:
for {
select {
case <-blockPoll.C:
eth.hashCache.poll(ctx)
eth.poll(ctx)
case <-done:
break out
}
Expand Down

0 comments on commit 6b4a2e8

Please sign in to comment.