diff --git a/core/core.go b/core/core.go index 7f54ebbf1..8acd73afb 100644 --- a/core/core.go +++ b/core/core.go @@ -278,24 +278,9 @@ func (app *App) Start(ctx context.Context) error { innerCtx, cancel := context.WithCancel(ctx) defer cancel() - // Initialize the p2p node. - nodeConfig := p2p.Config{ - Topic: getPubSubTopic(app.config.EthereumNetworkID), - ListenPort: app.config.P2PListenPort, - Insecure: false, - PrivateKey: app.privKey, - MessageHandler: app, - RendezvousString: getRendezvous(app.config.EthereumNetworkID), - UseBootstrapList: app.config.UseBootstrapList, - } - var err error - app.node, err = p2p.New(innerCtx, nodeConfig) - if err != nil { - return err - } - - // Start several independent goroutines. Use separate channels to communicate - // errors and a waitgroup to wait for all goroutines to exit. + // Below, we will start several independent goroutines. We use separate + // channels to communicate errors and a waitgroup to wait for all goroutines + // to exit. wg := &sync.WaitGroup{} // Close the database when the context is canceled. @@ -306,41 +291,32 @@ func (app *App) Start(ctx context.Context) error { app.db.Close() }() - // Start the p2p node. - p2pErrChan := make(chan error, 1) + // Set up and start the snapshot expiration watcher. wg.Add(1) go func() { defer wg.Done() - addrs := app.node.Multiaddrs() - log.WithFields(map[string]interface{}{ - "addresses": addrs, - }).Info("starting p2p node") - - wg.Add(1) - go func() { - defer wg.Done() - app.periodicallyCheckForNewAddrs(innerCtx, addrs) - }() - - p2pErrChan <- app.node.Start() + for expiredSnapshots := range app.snapshotExpirationWatcher.ExpiredItems() { + for _, expiredSnapshot := range expiredSnapshots { + app.muIdToSnapshotInfo.Lock() + delete(app.idToSnapshotInfo, expiredSnapshot.ID) + app.muIdToSnapshotInfo.Unlock() + } + } }() - - // Start the order watcher. - orderWatcherErrChan := make(chan error, 1) + snapshotExpirationWatcherErrChan := make(chan error, 1) wg.Add(1) go func() { defer wg.Done() - log.Info("starting order watcher") - orderWatcherErrChan <- app.orderWatcher.Watch(innerCtx) + snapshotExpirationWatcherErrChan <- app.snapshotExpirationWatcher.Watch(innerCtx, expirationPollingInterval) }() - // Start the block watcher. - blockWatcherErrChan := make(chan error, 1) + // Start the order watcher. + orderWatcherErrChan := make(chan error, 1) wg.Add(1) go func() { defer wg.Done() - log.Info("starting block watcher") - blockWatcherErrChan <- app.blockWatcher.Watch(innerCtx) + log.Info("starting order watcher") + orderWatcherErrChan <- app.orderWatcher.Watch(innerCtx) }() // Start the ETH balance watcher. @@ -354,23 +330,54 @@ func (app *App) Start(ctx context.Context) error { ethWatcherErrChan <- app.ethWatcher.Watch(innerCtx) }() - // Set up and start the snapshot expiration watcher. + // Backfill block events if needed. This is a blocking call so we won't + // continue set up until its finished. + if err := app.blockWatcher.BackfillEventsIfNeeded(innerCtx); err != nil { + return err + } + + // Start the block watcher. + blockWatcherErrChan := make(chan error, 1) wg.Add(1) go func() { defer wg.Done() - for expiredSnapshots := range app.snapshotExpirationWatcher.ExpiredItems() { - for _, expiredSnapshot := range expiredSnapshots { - app.muIdToSnapshotInfo.Lock() - delete(app.idToSnapshotInfo, expiredSnapshot.ID) - app.muIdToSnapshotInfo.Unlock() - } - } + log.Info("starting block watcher") + blockWatcherErrChan <- app.blockWatcher.Watch(innerCtx) }() - snapshotExpirationWatcherErrChan := make(chan error, 1) + + // Initialize the p2p node. + nodeConfig := p2p.Config{ + Topic: getPubSubTopic(app.config.EthereumNetworkID), + ListenPort: app.config.P2PListenPort, + Insecure: false, + PrivateKey: app.privKey, + MessageHandler: app, + RendezvousString: getRendezvous(app.config.EthereumNetworkID), + UseBootstrapList: app.config.UseBootstrapList, + } + var err error + app.node, err = p2p.New(innerCtx, nodeConfig) + if err != nil { + return err + } + + // Start the p2p node. + p2pErrChan := make(chan error, 1) wg.Add(1) go func() { defer wg.Done() - snapshotExpirationWatcherErrChan <- app.snapshotExpirationWatcher.Watch(innerCtx, expirationPollingInterval) + addrs := app.node.Multiaddrs() + log.WithFields(map[string]interface{}{ + "addresses": addrs, + }).Info("starting p2p node") + + wg.Add(1) + go func() { + defer wg.Done() + app.periodicallyCheckForNewAddrs(innerCtx, addrs) + }() + + p2pErrChan <- app.node.Start() }() // If any error channel returns a non-nil error, we cancel the inner context diff --git a/ethereum/blockwatch/block_watcher.go b/ethereum/blockwatch/block_watcher.go index cbb91da41..b8fdcc255 100644 --- a/ethereum/blockwatch/block_watcher.go +++ b/ethereum/blockwatch/block_watcher.go @@ -84,6 +84,20 @@ func New(config Config) *Watcher { return bs } +// BackfillEventsIfNeeded finds missed events that might have occured while the +// Mesh node was offline and sends them to event subscribers. It blocks until +// it is done backfilling or the given context is canceled. +func (w *Watcher) BackfillEventsIfNeeded(ctx context.Context) error { + events, err := w.getMissedEventsToBackfill(ctx) + if err != nil { + return err + } + if len(events) > 0 { + w.blockFeed.Send(events) + } + return nil +} + // Watch starts the Watcher. It will continuously look for new blocks and blocks // until there is a critical error or the given context is canceled. Typically, // you want to call Watch inside a goroutine. For non-critical errors, callers @@ -97,14 +111,6 @@ func (w *Watcher) Watch(ctx context.Context) error { w.wasStartedOnce = true w.mu.Unlock() - events, err := w.getMissedEventsToBackfill(ctx) - if err != nil { - return err - } - if len(events) > 0 { - w.blockFeed.Send(events) - } - ticker := time.NewTicker(w.pollingInterval) for { select { @@ -297,7 +303,7 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro return events, nil } - log.WithField("blocksElapsed", blocksElapsed.Int64()).Info("Some blocks have elapsed since last boot. Backfilling events") + log.WithField("blocksElapsed", blocksElapsed.Int64()).Info("Some blocks have elapsed since last boot. Backfilling block events (this can take a while)...") startBlockNum := int(latestRetainedBlock.Number.Int64() + 1) endBlockNum := int(latestRetainedBlock.Number.Int64() + blocksElapsed.Int64()) logs, furthestBlockProcessed := w.getLogsInBlockRange(ctx, startBlockNum, endBlockNum) @@ -354,6 +360,7 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro BlockHeader: blockHeader, }) } + log.Info("Done backfilling block events") return events, nil } return events, nil