Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(chainsyncer): minor improvements #2619

Merged
merged 2 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions pkg/chainsyncer/chainsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,22 @@ import (
)

const (
defaultFlagTimeout = 5 * time.Minute
defaultPollEvery = 1 * time.Minute
blocksToRemember = 1000
defaultFlagTimeout = 10 * time.Minute
defaultPollEvery = 1 * time.Minute
defaultBlockerPollTime = 10 * time.Second
blockDuration = 24 * time.Hour

blockDuration = 24 * time.Hour
pollTime = 1 * time.Second
blocksToRemember = 1000
)

type prover interface {
Prove(context.Context, swarm.Address, uint64) ([]byte, error)
}

type Options struct {
FlagTimeout time.Duration
PollEvery time.Duration
FlagTimeout time.Duration
PollEvery time.Duration
BlockerPollTime time.Duration
}

type ChainSyncer struct {
Expand All @@ -63,8 +64,9 @@ func New(backend transaction.Backend, p prover, peerIterator topology.EachPeerer
}
if o == nil {
o = &Options{
FlagTimeout: defaultFlagTimeout,
PollEvery: defaultPollEvery,
FlagTimeout: defaultFlagTimeout,
PollEvery: defaultPollEvery,
BlockerPollTime: defaultBlockerPollTime,
}
}

Expand All @@ -84,7 +86,7 @@ func New(backend transaction.Backend, p prover, peerIterator topology.EachPeerer
c.logger.Warningf("chainsyncer: peer %s is unsynced and will be temporarily blocklisted", a.String())
c.metrics.UnsyncedPeers.Inc()
}
c.blocker = blocker.New(disconnecter, o.FlagTimeout, blockDuration, pollTime, cb, logger)
c.blocker = blocker.New(disconnecter, o.FlagTimeout, blockDuration, o.BlockerPollTime, cb, logger)

c.wg.Add(1)
go c.manage()
Expand All @@ -93,29 +95,35 @@ func New(backend transaction.Backend, p prover, peerIterator topology.EachPeerer

func (c *ChainSyncer) manage() {
defer c.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
var (
ctx, cancel = context.WithCancel(context.Background())
wg sync.WaitGroup
o sync.Once
positives int32
items int
timer = time.NewTimer(0)
)
go func() {
<-c.quit
cancel()
_ = timer.Stop()
}()
var (
wg sync.WaitGroup
positives int32
items int
)

for {
select {
case <-c.quit:
return
case <-time.After(c.pollEvery):
case <-timer.C:
o.Do(func() {
timer.Reset(c.pollEvery)
})
}
// go through every peer we are connected to
// try to ask about a recent block height.
// if they answer, we unflag them
// if not, they get flagged with time.Now() (in case they weren't
// flagged before).
// once subsequent checks continue failing we eventually
// kick them away.
// when subsequent checks continue failing we eventually blocklist.
blockHeight, blockHash, err := c.getBlockHeight(ctx)
if err != nil {
c.logger.Warningf("chainsyncer: failed getting block height for challenge: %v", err)
Expand All @@ -138,7 +146,7 @@ func (c *ChainSyncer) manage() {
return
}
if !bytes.Equal(blockHash, hash) {
c.logger.Infof("chainsync: peer %s failed to prove block %d: want block hash %x got %x", p.String(), blockHash, hash)
c.logger.Infof("chainsync: peer %s failed to prove block %d: want block hash %x got %x", p.String(), blockHeight, blockHash, hash)
c.metrics.InvalidProofs.Inc()
c.blocker.Flag(p)
return
Expand Down Expand Up @@ -215,6 +223,7 @@ func (c *ChainSyncer) Close() error {
}()

close(c.quit)
_ = c.blocker.Close()
select {
case <-done:
case <-time.After(5 * time.Second):
Expand Down
19 changes: 10 additions & 9 deletions pkg/chainsyncer/chainsyncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package chainsyncer_test
import (
"context"
"errors"
"io/ioutil"
"math/big"
"os"
"testing"
"time"

Expand All @@ -24,7 +24,7 @@ import (
func TestChainsyncer(t *testing.T) {
var (
expBlockHash = common.HexToHash("0x9de2787d1d80a6164f4bc6359d9017131cbc14402ee0704bff0c6d691701c1dc").Bytes()
logger = logging.New(os.Stdout, 0)
logger = logging.New(ioutil.Discard, 0)
trxBlock = common.HexToHash("0x2")
blockC = make(chan struct{})
nextBlockHeader = &types.Header{ParentHash: trxBlock}
Expand All @@ -48,40 +48,41 @@ func TestChainsyncer(t *testing.T) {
}}
)

newChainSyncerTest := func(e error, blockHash []byte, cb func()) func(*testing.T) {
newChainSyncerTest := func(e error, blockHash []byte, cb func(*testing.T)) func(*testing.T) {
proofError = e
proofBlockHash = blockHash
return func(t *testing.T) {
cs, err := chainsyncer.New(backend, p, topology, d, logger, &chainsyncer.Options{
FlagTimeout: 100 * time.Millisecond,
PollEvery: 50 * time.Millisecond,
FlagTimeout: 500 * time.Millisecond,
PollEvery: 100 * time.Millisecond,
BlockerPollTime: 100 * time.Millisecond,
})
if err != nil {
t.Fatal(err)
}

defer cs.Close()
cb()
cb(t)
}
}

t.Run("prover error", newChainSyncerTest(proofError, proofBlockHash, func() {
t.Run("prover error", newChainSyncerTest(proofError, proofBlockHash, func(t *testing.T) {
select {
case <-blockC:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for blocklisting")
}
}))

t.Run("blockhash mismatch", newChainSyncerTest(nil, proofBlockHash, func() {
t.Run("blockhash mismatch", newChainSyncerTest(nil, proofBlockHash, func(t *testing.T) {
select {
case <-blockC:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for blocklisting")
}
}))

t.Run("all good", newChainSyncerTest(nil, expBlockHash, func() {
t.Run("all good", newChainSyncerTest(nil, expBlockHash, func(t *testing.T) {
select {
case <-blockC:
t.Fatal("blocklisting occurred but should not have")
Expand Down