Skip to content

Commit

Permalink
server/asset/eth: Fix run.
Browse files Browse the repository at this point in the history
Only cache the best hash and use it to determine if we see a reorg. Move
polling and notification logic to hashCache.
  • Loading branch information
JoeGruffins committed Dec 6, 2021
1 parent 4ee4c47 commit 8f6a091
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 341 deletions.
2 changes: 1 addition & 1 deletion dex/testing/eth/create-node.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ if [ "${CHAIN_ADDRESS}" != "_" ]; then
sleep 1
AFTER=\$("${NODES_ROOT}/harness-ctl/${NAME}" attach --exec 'eth.blockNumber')
DIFF=\$((AFTER-BEFORE))
echo "Mined \$DIFF blocks on ${NAME}. Their headers:"
echo "Mined \$DIFF blocks on ${NAME}. Their hashes:"
for i in \$(seq \$((BEFORE+1)) \$AFTER)
do
echo \$i
Expand Down
4 changes: 2 additions & 2 deletions dex/testing/eth/harness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ done
sleep 1
# Uncomment to see the effect of reorgs on transactions.
# "./alpha" "attach --exec personal.sendTransaction({from:eth.accounts[0],to:eth.accounts[1],value:1,gasPrice:82000000000},\\"abc\\")"
# "./beta" "attach --exec personal.sendTransaction({from:eth.accounts[0],to:eth.accounts[1],value:1,gasPrice:82000000000},\\"abc\\")"
# "./alpha" "attach --exec personal.sendTransaction({from:eth.accounts[0],to:eth.accounts[1],value:1,gasPrice:82000000000},\\"${PASSWORD}\\")"
# "./beta" "attach --exec personal.sendTransaction({from:eth.accounts[0],to:eth.accounts[1],value:1,gasPrice:82000000000},\\"${PASSWORD}\\")"
"./mine-\$VALID_NODE" \$((REORG_DEPTH + 2))
"./mine-\$REORG_NODE" \$REORG_DEPTH
Expand Down
216 changes: 113 additions & 103 deletions server/asset/eth/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,130 +7,140 @@
package eth

import (
"context"
"fmt"
"sync"

"decred.org/dcrdex/dex"
"decred.org/dcrdex/server/asset"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

// The ethBlock structure should hold a minimal amount of information about a
// block.
type ethBlock struct {
hash common.Hash
height uint64
orphaned bool
type hashN struct {
height uint64
hash common.Hash
}

// The blockCache caches block information to prevent repeated calls to
// rpcclient.GetblockVerbose.
type blockCache struct {
mtx sync.RWMutex
blocks map[common.Hash]*ethBlock
mainchain map[uint64]*ethBlock
best ethBlock
log dex.Logger
}

// Constructor for a blockCache.
func newBlockCache(logger dex.Logger) *blockCache {
return &blockCache{
blocks: make(map[common.Hash]*ethBlock),
mainchain: make(map[uint64]*ethBlock),
log: logger,
}
}
// The hashCache caches block information to detect reorgs and notify
// listeners of new blocks. All methods are concurrent safe unless specified
// otherwise.
type hashCache struct {
// signalMtx locks the blockChans array.
signalMtx sync.RWMutex
blockChans map[chan *asset.BlockUpdate]struct{}

// Getter for a block by it's hash.
func (cache *blockCache) block(h common.Hash) (*ethBlock, bool) {
cache.mtx.RLock()
defer cache.mtx.RUnlock()
blk, found := cache.blocks[h]
return blk, found
}
mtx sync.RWMutex
best hashN

// Getter for a mainchain block by its height. This method does not attempt
// to load the block from the blockchain if it is not found.
func (cache *blockCache) atHeight(height uint64) (*ethBlock, bool) {
cache.mtx.RLock()
defer cache.mtx.RUnlock()
blk, found := cache.mainchain[height]
return blk, found
log dex.Logger
node ethFetcher
}

// Add a block to the blockCache. This method will translate the RPC result
// to a ethBlock, returning the ethBlock. If the block is not orphaned, it will
// be added to the mainchain.
func (cache *blockCache) add(block *types.Block) (*ethBlock, error) {
cache.mtx.Lock()
defer cache.mtx.Unlock()
// TODO: Fix this.
orphaned := false
height, hash := block.NumberU64(), block.Hash()
blk := &ethBlock{
hash: hash,
height: height,
orphaned: orphaned,
}
cache.blocks[hash] = blk

if !orphaned {
cache.mainchain[height] = blk
if height > cache.best.height {
cache.best.height = height
cache.best.hash = hash
}
// Constructor for a hashCache. Prime before use.
func newHashCache(logger dex.Logger) *hashCache {
return &hashCache{
log: logger,
blockChans: make(map[chan *asset.BlockUpdate]struct{}),
}
return blk, nil
}

// Get the best known block height for the blockCache.
func (cache *blockCache) tipHeight() uint64 {
cache.mtx.Lock()
defer cache.mtx.Unlock()
return cache.best.height
}

// Get the best known block hash in the blockCache.
func (cache *blockCache) tipHash() common.Hash {
cache.mtx.RLock()
defer cache.mtx.RUnlock()
return cache.best.hash
// blockChannel returns a new block channel that will be sent block updates
// when a new block is added to the chain and noticed.
func (hc *hashCache) blockChannel(size int) <-chan *asset.BlockUpdate {
c := make(chan *asset.BlockUpdate, size)
hc.signalMtx.Lock()
defer hc.signalMtx.Unlock()
hc.blockChans[c] = struct{}{}
return c
}

// Get the best known block height in the blockCache.
func (cache *blockCache) tip() ethBlock {
cache.mtx.RLock()
defer cache.mtx.RUnlock()
return cache.best
// prime should be run once before use. Sets the best hash and node. Not
// concurrent safe.
func (hc *hashCache) prime(ctx context.Context, node ethFetcher) error {
hdr, err := node.bestHeader(ctx)
if err != nil {
return fmt.Errorf("error getting best block header from geth: %w", err)
}
hc.best = hashN{
height: hdr.Number.Uint64(),
hash: hdr.Hash(),
}
hc.node = node
return nil
}

// Trigger a reorg, setting any blocks at or above the provided height as
// orphaned and removing them from mainchain, but not the blocks map. reorg
// clears the best block, so should always be followed with the addition of a
// new mainchain block.
func (cache *blockCache) reorg(from uint64) {
if from < 0 {
// poll pulls the best hash from an eth node and compares that to a stored
// hash. If the same does nothing. If different, updates the stored hash and
// notifies listeners on block chans.
func (hc *hashCache) poll(ctx context.Context) {
send := func(reorg bool, err error) {
if err != nil {
hc.log.Error(err)
}
hc.signalMtx.Lock()
for c := range hc.blockChans {
select {
case c <- &asset.BlockUpdate{
Reorg: reorg,
Err: err,
}:
default:
hc.log.Error("failed to send block update on blocking channel")
}
}
hc.signalMtx.Unlock()
}
hc.mtx.Lock()
defer hc.mtx.Unlock()
bhdr, err := hc.node.bestHeader(ctx)
if err != nil {
send(false, fmt.Errorf("error getting best block header from geth: %w", err))
return
}
cache.mtx.Lock()
defer cache.mtx.Unlock()
for height := from; height <= cache.best.height; height++ {
block, found := cache.mainchain[height]
if !found {
cache.log.Errorf("reorg block not found on mainchain at height %d for a reorg from %d to %d", height, from, cache.best.height)
continue
}
// Delete the block from mainchain.
delete(cache.mainchain, block.height)
// Store an orphaned block in the blocks cache.
cache.blocks[block.hash] = &ethBlock{
hash: block.hash,
height: block.height,
orphaned: true,
if bhdr.Hash() == hc.best.hash {
// Same hash, nothing to do.
return
}
update := func(reorg bool, fastBlocks bool) {
hash := bhdr.Hash()
height := bhdr.Number.Uint64()
str := fmt.Sprintf("Tip change from %s (%d) to %s (%d).",
hc.best.hash, hc.best.height, hash, height)
switch {
case reorg:
str += " Detected reorg."
case fastBlocks:
str += " Fast blocks."
}
hc.log.Info(str)
hc.best.hash = hash
hc.best.height = height
}
if bhdr.ParentHash == hc.best.hash {
// Sequencial hash, report a block update.
update(false, false)
send(false, nil)
return
}
// Either a block was skipped or a reorg happened. We can only detect
// the reorg if our last best header's hash has changed. Otherwise,
// assume no reorg and report the new block change.
//
// headerByHeight will only return mainchain headers.
hdr, err := hc.node.headerByHeight(ctx, hc.best.height)
if err != nil {
send(false, fmt.Errorf("error getting block header from geth: %w", err))
return
}
if hdr.Hash() == hc.best.hash {
// Our recorded hash is still on main chain so there is no reorg
// that we know of. The chain has advanced more than one block.
update(false, true)
send(false, nil)
return
}
// Set this to a zero block so that the new block will replace it even if
// it is of the same height as the previous best block.
cache.best = ethBlock{}
// The block for our recorded hash was forked off and the chain had a
// reorganization.
update(true, false)
send(true, nil)
}

0 comments on commit 8f6a091

Please sign in to comment.