Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for block events to backfill in core.App.Start #318

Merged
merged 1 commit into from Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 58 additions & 51 deletions core/core.go
Expand Up @@ -278,24 +278,9 @@ func (app *App) Start(ctx context.Context) error {
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()

// Initialize the p2p node.
nodeConfig := p2p.Config{
Topic: getPubSubTopic(app.config.EthereumNetworkID),
ListenPort: app.config.P2PListenPort,
Insecure: false,
PrivateKey: app.privKey,
MessageHandler: app,
RendezvousString: getRendezvous(app.config.EthereumNetworkID),
UseBootstrapList: app.config.UseBootstrapList,
}
var err error
app.node, err = p2p.New(innerCtx, nodeConfig)
if err != nil {
return err
}

// Start several independent goroutines. Use separate channels to communicate
// errors and a waitgroup to wait for all goroutines to exit.
// Below, we will start several independent goroutines. We use separate
// channels to communicate errors and a waitgroup to wait for all goroutines
// to exit.
wg := &sync.WaitGroup{}

// Close the database when the context is canceled.
Expand All @@ -306,41 +291,32 @@ func (app *App) Start(ctx context.Context) error {
app.db.Close()
}()

// Start the p2p node.
p2pErrChan := make(chan error, 1)
// Set up and start the snapshot expiration watcher.
wg.Add(1)
go func() {
defer wg.Done()
addrs := app.node.Multiaddrs()
log.WithFields(map[string]interface{}{
"addresses": addrs,
}).Info("starting p2p node")

wg.Add(1)
go func() {
defer wg.Done()
app.periodicallyCheckForNewAddrs(innerCtx, addrs)
}()

p2pErrChan <- app.node.Start()
for expiredSnapshots := range app.snapshotExpirationWatcher.ExpiredItems() {
for _, expiredSnapshot := range expiredSnapshots {
app.muIdToSnapshotInfo.Lock()
delete(app.idToSnapshotInfo, expiredSnapshot.ID)
app.muIdToSnapshotInfo.Unlock()
}
}
}()

// Start the order watcher.
orderWatcherErrChan := make(chan error, 1)
snapshotExpirationWatcherErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
log.Info("starting order watcher")
orderWatcherErrChan <- app.orderWatcher.Watch(innerCtx)
snapshotExpirationWatcherErrChan <- app.snapshotExpirationWatcher.Watch(innerCtx, expirationPollingInterval)
}()

// Start the block watcher.
blockWatcherErrChan := make(chan error, 1)
// Start the order watcher.
orderWatcherErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
log.Info("starting block watcher")
blockWatcherErrChan <- app.blockWatcher.Watch(innerCtx)
log.Info("starting order watcher")
orderWatcherErrChan <- app.orderWatcher.Watch(innerCtx)
}()

// Start the ETH balance watcher.
Expand All @@ -354,23 +330,54 @@ func (app *App) Start(ctx context.Context) error {
ethWatcherErrChan <- app.ethWatcher.Watch(innerCtx)
}()

// Set up and start the snapshot expiration watcher.
// Backfill block events if needed. This is a blocking call so we won't
// continue set up until its finished.
if err := app.blockWatcher.BackfillEventsIfNeeded(innerCtx); err != nil {
return err
}

// Start the block watcher.
blockWatcherErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
for expiredSnapshots := range app.snapshotExpirationWatcher.ExpiredItems() {
for _, expiredSnapshot := range expiredSnapshots {
app.muIdToSnapshotInfo.Lock()
delete(app.idToSnapshotInfo, expiredSnapshot.ID)
app.muIdToSnapshotInfo.Unlock()
}
}
log.Info("starting block watcher")
blockWatcherErrChan <- app.blockWatcher.Watch(innerCtx)
}()
snapshotExpirationWatcherErrChan := make(chan error, 1)

// Initialize the p2p node.
nodeConfig := p2p.Config{
Topic: getPubSubTopic(app.config.EthereumNetworkID),
ListenPort: app.config.P2PListenPort,
Insecure: false,
PrivateKey: app.privKey,
MessageHandler: app,
RendezvousString: getRendezvous(app.config.EthereumNetworkID),
UseBootstrapList: app.config.UseBootstrapList,
}
var err error
app.node, err = p2p.New(innerCtx, nodeConfig)
if err != nil {
return err
}

// Start the p2p node.
p2pErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
snapshotExpirationWatcherErrChan <- app.snapshotExpirationWatcher.Watch(innerCtx, expirationPollingInterval)
addrs := app.node.Multiaddrs()
log.WithFields(map[string]interface{}{
"addresses": addrs,
}).Info("starting p2p node")

wg.Add(1)
go func() {
defer wg.Done()
app.periodicallyCheckForNewAddrs(innerCtx, addrs)
}()

p2pErrChan <- app.node.Start()
}()

// If any error channel returns a non-nil error, we cancel the inner context
Expand Down
25 changes: 16 additions & 9 deletions ethereum/blockwatch/block_watcher.go
Expand Up @@ -84,6 +84,20 @@ func New(config Config) *Watcher {
return bs
}

// BackfillEventsIfNeeded finds missed events that might have occured while the
// Mesh node was offline and sends them to event subscribers. It blocks until
// it is done backfilling or the given context is canceled.
func (w *Watcher) BackfillEventsIfNeeded(ctx context.Context) error {
events, err := w.getMissedEventsToBackfill(ctx)
if err != nil {
return err
}
if len(events) > 0 {
w.blockFeed.Send(events)
}
return nil
}

// Watch starts the Watcher. It will continuously look for new blocks and blocks
// until there is a critical error or the given context is canceled. Typically,
// you want to call Watch inside a goroutine. For non-critical errors, callers
Expand All @@ -97,14 +111,6 @@ func (w *Watcher) Watch(ctx context.Context) error {
w.wasStartedOnce = true
w.mu.Unlock()

events, err := w.getMissedEventsToBackfill(ctx)
if err != nil {
return err
}
if len(events) > 0 {
w.blockFeed.Send(events)
}

ticker := time.NewTicker(w.pollingInterval)
for {
select {
Expand Down Expand Up @@ -297,7 +303,7 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro
return events, nil
}

log.WithField("blocksElapsed", blocksElapsed.Int64()).Info("Some blocks have elapsed since last boot. Backfilling events")
log.WithField("blocksElapsed", blocksElapsed.Int64()).Info("Some blocks have elapsed since last boot. Backfilling block events (this can take a while)...")
startBlockNum := int(latestRetainedBlock.Number.Int64() + 1)
endBlockNum := int(latestRetainedBlock.Number.Int64() + blocksElapsed.Int64())
logs, furthestBlockProcessed := w.getLogsInBlockRange(ctx, startBlockNum, endBlockNum)
Expand Down Expand Up @@ -354,6 +360,7 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro
BlockHeader: blockHeader,
})
}
log.Info("Done backfilling block events")
return events, nil
}
return events, nil
Expand Down