Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockwatch: Separate blockwatch dependency on meshdb #355

Merged
merged 8 commits into from Sep 5, 2019
@@ -14,6 +14,7 @@ import (

"github.com/0xProject/0x-mesh/db"
"github.com/0xProject/0x-mesh/ethereum"
"github.com/0xProject/0x-mesh/ethereum/dbstack"
"github.com/0xProject/0x-mesh/ethereum/blockwatch"
"github.com/0xProject/0x-mesh/expirationwatch"
"github.com/0xProject/0x-mesh/keys"
@@ -166,14 +167,14 @@ func New(config Config) (*App, error) {
return nil, err
}
topics := orderwatch.GetRelevantTopics()
stack := dbstack.New(meshDB, blockWatcherRetentionLimit)
blockWatcherConfig := blockwatch.Config{
MeshDB: meshDB,
PollingInterval: config.BlockPollingInterval,
StartBlockDepth: ethrpc.LatestBlockNumber,
BlockRetentionLimit: blockWatcherRetentionLimit,
WithLogs: true,
Topics: topics,
Client: blockWatcherClient,
Stack: stack,
PollingInterval: config.BlockPollingInterval,
StartBlockDepth: ethrpc.LatestBlockNumber,
WithLogs: true,
Topics: topics,
Client: blockWatcherClient,
}
blockWatcher := blockwatch.New(blockWatcherConfig)

@@ -8,7 +8,7 @@ import (
"sync"
"time"

"github.com/0xProject/0x-mesh/meshdb"
"github.com/0xProject/0x-mesh/ethereum/miniheader"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@@ -37,50 +37,54 @@ const (
// Event describes a block event emitted by a Watcher
type Event struct {
Type EventType
BlockHeader *meshdb.MiniHeader
BlockHeader *miniheader.MiniHeader
}

// Stack defines the interface a stack must implement in order to be used by
// OrderWatcher for block header storage
type Stack interface {
Pop() (*miniheader.MiniHeader, error)
Push(*miniheader.MiniHeader) error
Peek() (*miniheader.MiniHeader, error)
PeekAll() ([]*miniheader.MiniHeader, error)
}

// Config holds some configuration options for an instance of BlockWatcher.
type Config struct {
MeshDB *meshdb.MeshDB
PollingInterval time.Duration
StartBlockDepth rpc.BlockNumber
BlockRetentionLimit int
WithLogs bool
Topics []common.Hash
Client Client
Stack Stack
PollingInterval time.Duration
StartBlockDepth rpc.BlockNumber
WithLogs bool
Topics []common.Hash
Client Client
}

// Watcher maintains a consistent representation of the latest `blockRetentionLimit` blocks,
// handling block re-orgs and network disruptions gracefully. It can be started from any arbitrary
// block height, and will emit both block added and removed events.
// Watcher maintains a consistent representation of the latest X blocks (where X is enforced by the
// supplied stack) handling block re-orgs and network disruptions gracefully. It can be started from
// any arbitrary block height, and will emit both block added and removed events.
type Watcher struct {
blockRetentionLimit int
startBlockDepth rpc.BlockNumber
stack *Stack
client Client
blockFeed event.Feed
blockScope event.SubscriptionScope // Subscription scope tracking current live listeners
wasStartedOnce bool // Whether the block watcher has previously been started
pollingInterval time.Duration
ticker *time.Ticker
withLogs bool
topics []common.Hash
mu sync.RWMutex
startBlockDepth rpc.BlockNumber
stack Stack
client Client
blockFeed event.Feed
blockScope event.SubscriptionScope // Subscription scope tracking current live listeners
wasStartedOnce bool // Whether the block watcher has previously been started
pollingInterval time.Duration
ticker *time.Ticker
withLogs bool
topics []common.Hash
mu sync.RWMutex
}

// New creates a new Watcher instance.
func New(config Config) *Watcher {
stack := NewStack(config.MeshDB, config.BlockRetentionLimit)

bs := &Watcher{
pollingInterval: config.PollingInterval,
blockRetentionLimit: config.BlockRetentionLimit,
startBlockDepth: config.StartBlockDepth,
stack: stack,
client: config.Client,
withLogs: config.WithLogs,
topics: config.Topics,
pollingInterval: config.PollingInterval,
startBlockDepth: config.StartBlockDepth,
stack: config.Stack,
client: config.Client,
withLogs: config.WithLogs,
topics: config.Topics,
}
return bs
}
@@ -141,14 +145,13 @@ func (w *Watcher) Subscribe(sink chan<- []*Event) event.Subscription {
}

// GetLatestBlock returns the latest block processed
func (w *Watcher) GetLatestBlock() (*meshdb.MiniHeader, error) {
func (w *Watcher) GetLatestBlock() (*miniheader.MiniHeader, error) {
return w.stack.Peek()
}

// InspectRetainedBlocks returns the blocks retained in-memory by the Watcher instance. It is not
// particularly performant and therefore should only be used for debugging and testing purposes.
func (w *Watcher) InspectRetainedBlocks() ([]*meshdb.MiniHeader, error) {
return w.stack.Inspect()
// GetAllRetainedBlocks returns the blocks retained in-memory by the Watcher.
func (w *Watcher) GetAllRetainedBlocks() ([]*miniheader.MiniHeader, error) {
return w.stack.PeekAll()
}

// pollNextBlock polls for the next block header to be added to the block stack.
@@ -193,7 +196,7 @@ func (w *Watcher) pollNextBlock() error {
return nil
}

func (w *Watcher) buildCanonicalChain(nextHeader *meshdb.MiniHeader, events []*Event) ([]*Event, error) {
func (w *Watcher) buildCanonicalChain(nextHeader *miniheader.MiniHeader, events []*Event) ([]*Event, error) {
latestHeader, err := w.stack.Peek()
if err != nil {
return nil, err
@@ -276,7 +279,7 @@ func (w *Watcher) buildCanonicalChain(nextHeader *meshdb.MiniHeader, events []*E
return events, nil
}

func (w *Watcher) addLogs(header *meshdb.MiniHeader) (*meshdb.MiniHeader, error) {
func (w *Watcher) addLogs(header *miniheader.MiniHeader) (*miniheader.MiniHeader, error) {
if !w.withLogs {
return header, nil
}
@@ -323,7 +326,7 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro
// If we have processed blocks further then the latestRetainedBlock in the DB, we
// want to remove all blocks from the DB and insert the furthestBlockProcessed
// Doing so will cause the BlockWatcher to start from that furthestBlockProcessed.
headers, err := w.InspectRetainedBlocks()
headers, err := w.GetAllRetainedBlocks()
if err != nil {
return events, err
}
@@ -350,14 +353,14 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro

// Create the block events from all the logs found by grouping
// them into blockHeaders
hashToBlockHeader := map[common.Hash]*meshdb.MiniHeader{}
hashToBlockHeader := map[common.Hash]*miniheader.MiniHeader{}
for _, log := range logs {
blockHeader, ok := hashToBlockHeader[log.BlockHash]
if !ok {
// TODO(fabio): Find a way to include the parent hash for the block as well.
// It's currently not an issue to omit it since we don't use the parent hash
// when processing block events in OrderWatcher.
blockHeader = &meshdb.MiniHeader{
blockHeader = &miniheader.MiniHeader{
Hash: log.BlockHash,
Number: big.NewInt(0).SetUint64(log.BlockNumber),
Logs: []types.Log{},
@@ -10,34 +10,33 @@ import (
"testing"
"time"

"github.com/0xProject/0x-mesh/meshdb"
"github.com/0xProject/0x-mesh/ethereum/miniheader"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var config = Config{
PollingInterval: 1 * time.Second,
BlockRetentionLimit: 10,
StartBlockDepth: rpc.LatestBlockNumber,
WithLogs: false,
Topics: []common.Hash{},
PollingInterval: 1 * time.Second,
StartBlockDepth: rpc.LatestBlockNumber,
WithLogs: false,
Topics: []common.Hash{},
}

var basicFakeClientFixture = "testdata/fake_client_basic_fixture.json"
var (
basicFakeClientFixture = "testdata/fake_client_basic_fixture.json"
blockRetentionLimit = 10
)

func TestWatcher(t *testing.T) {
fakeClient, err := newFakeClient("testdata/fake_client_block_poller_fixtures.json")
require.NoError(t, err)

// Polling interval unused because we hijack the ticker for this test
meshDB, err := meshdb.New("/tmp/leveldb_testing/" + uuid.New().String())
require.NoError(t, err)
defer meshDB.Close()
config.MeshDB = meshDB
config.Stack = NewSimpleStack(blockRetentionLimit)
config.Client = fakeClient
watcher := New(config)

@@ -51,7 +50,7 @@ func TestWatcher(t *testing.T) {
err := watcher.pollNextBlock()
require.NoError(t, err)

retainedBlocks, err := watcher.InspectRetainedBlocks()
retainedBlocks, err := watcher.GetAllRetainedBlocks()
require.NoError(t, err)
expectedRetainedBlocks := fakeClient.ExpectedRetainedBlocks()
assert.Equal(t, expectedRetainedBlocks, retainedBlocks, scenarioLabel)
@@ -79,10 +78,8 @@ func TestWatcherStartStop(t *testing.T) {
fakeClient, err := newFakeClient(basicFakeClientFixture)
require.NoError(t, err)

meshDB, err := meshdb.New("/tmp/leveldb_testing/" + uuid.New().String())
require.NoError(t, err)
defer meshDB.Close()
config.MeshDB = meshDB
config.Stack = NewSimpleStack(blockRetentionLimit)
config.Client = fakeClient
watcher := New(config)

@@ -182,10 +179,8 @@ func TestGetSubBlockRanges(t *testing.T) {

fakeClient, err := newFakeClient(basicFakeClientFixture)
require.NoError(t, err)
meshDB, err := meshdb.New("/tmp/leveldb_testing/" + uuid.New().String())
require.NoError(t, err)
defer meshDB.Close()
config.MeshDB = meshDB
config.Stack = NewSimpleStack(blockRetentionLimit)
config.Client = fakeClient
watcher := New(config)

@@ -200,19 +195,19 @@ func TestGetMissedEventsToBackfillSomeMissed(t *testing.T) {
fakeClient, err := newFakeClient("testdata/fake_client_fast_sync_fixture.json")
require.NoError(t, err)

meshDB, err := meshdb.New("/tmp/leveldb_testing/" + uuid.New().String())
require.NoError(t, err)
defer meshDB.Close()
// Add block number 5 as the last block seen by BlockWatcher
lastBlockSeen := &meshdb.MiniHeader{
lastBlockSeen := &miniheader.MiniHeader{
Number: big.NewInt(5),
Hash: common.HexToHash("0x293b9ea024055a3e9eddbf9b9383dc7731744111894af6aa038594dc1b61f87f"),
Parent: common.HexToHash("0x26b13ac89500f7fcdd141b7d1b30f3a82178431eca325d1cf10998f9d68ff5ba"),
}
err = meshDB.MiniHeaders.Insert(lastBlockSeen)

config.Stack = NewSimpleStack(blockRetentionLimit)

err = config.Stack.Push(lastBlockSeen)
require.NoError(t, err)

config.MeshDB = meshDB
config.Client = fakeClient
watcher := New(config)

@@ -223,7 +218,7 @@ func TestGetMissedEventsToBackfillSomeMissed(t *testing.T) {
assert.Len(t, events, 1)

// Check that block 30 is now in the DB, and block 5 was removed.
headers, err := meshDB.FindAllMiniHeadersSortedByNumber()
headers, err := config.Stack.PeekAll()
require.NoError(t, err)
require.Len(t, headers, 1)
assert.Equal(t, big.NewInt(30), headers[0].Number)
@@ -234,19 +229,19 @@ func TestGetMissedEventsToBackfillNoneMissed(t *testing.T) {
fakeClient, err := newFakeClient("testdata/fake_client_basic_fixture.json")
require.NoError(t, err)

meshDB, err := meshdb.New("/tmp/leveldb_testing/" + uuid.New().String())
require.NoError(t, err)
defer meshDB.Close()
// Add block number 5 as the last block seen by BlockWatcher
lastBlockSeen := &meshdb.MiniHeader{
lastBlockSeen := &miniheader.MiniHeader{
Number: big.NewInt(5),
Hash: common.HexToHash("0x293b9ea024055a3e9eddbf9b9383dc7731744111894af6aa038594dc1b61f87f"),
Parent: common.HexToHash("0x26b13ac89500f7fcdd141b7d1b30f3a82178431eca325d1cf10998f9d68ff5ba"),
}
err = meshDB.MiniHeaders.Insert(lastBlockSeen)

config.Stack = NewSimpleStack(blockRetentionLimit)

err = config.Stack.Push(lastBlockSeen)
require.NoError(t, err)

config.MeshDB = meshDB
config.Client = fakeClient
watcher := New(config)

@@ -257,7 +252,7 @@ func TestGetMissedEventsToBackfillNoneMissed(t *testing.T) {
assert.Len(t, events, 0)

// Check that block 5 is still in the DB
headers, err := meshDB.FindAllMiniHeadersSortedByNumber()
headers, err := config.Stack.PeekAll()
require.NoError(t, err)
require.Len(t, headers, 1)
assert.Equal(t, big.NewInt(5), headers[0].Number)
@@ -377,10 +372,7 @@ func TestFilterLogsRecursively(t *testing.T) {
},
}

meshDB, err := meshdb.New("/tmp/leveldb_testing/" + uuid.New().String())
require.NoError(t, err)
defer meshDB.Close()
config.MeshDB = meshDB
config.Stack = NewSimpleStack(blockRetentionLimit)

for _, testCase := range testCases {
fakeLogClient, err := newFakeLogClient(testCase.rangeToFilterLogsResponse)
@@ -483,10 +475,7 @@ func TestGetLogsInBlockRange(t *testing.T) {
},
}

meshDB, err := meshdb.New("/tmp/leveldb_testing/" + uuid.New().String())
require.NoError(t, err)
defer meshDB.Close()
config.MeshDB = meshDB
config.Stack = NewSimpleStack(blockRetentionLimit)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.