From 5dce7df5c03949c24a748d2ef9ec826b171f1b7f Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 08:57:18 -0600 Subject: [PATCH 01/30] spike on encounter --- mocks/syncer/handler.go | 14 ++++++++++++++ syncer/syncer.go | 28 +++++++++++++++++++++++++++- syncer/types.go | 7 +++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/mocks/syncer/handler.go b/mocks/syncer/handler.go index f2bf144c..abc0aa3f 100644 --- a/mocks/syncer/handler.go +++ b/mocks/syncer/handler.go @@ -29,6 +29,20 @@ func (_m *Handler) BlockAdded(ctx context.Context, block *types.Block) error { return r0 } +// BlockEncountered provides a mock function with given fields: ctx, block +func (_m *Handler) BlockEncountered(ctx context.Context, block *types.Block) error { + ret := _m.Called(ctx, block) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *types.Block) error); ok { + r0 = rf(ctx, block) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // BlockRemoved provides a mock function with given fields: ctx, block func (_m *Handler) BlockRemoved(ctx context.Context, block *types.BlockIdentifier) error { ret := _m.Called(ctx, block) diff --git a/syncer/syncer.go b/syncer/syncer.go index 9617c908..50a8fa02 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -21,6 +21,7 @@ import ( "log" "time" + lerrgroup "github.com/neilotoole/errgroup" "golang.org/x/sync/errgroup" "github.com/coinbase/rosetta-sdk-go/types" @@ -355,6 +356,10 @@ func (s *Syncer) processBlocks( if err != nil { return fmt.Errorf("%w: %v", ErrFetchBlockReorgFailed, err) } + + if err := s.handler.BlockEncountered(ctx, br.block); err != nil { + return err + } } else { // Anytime we re-fetch an index, we // will need to make another call to the node @@ -507,8 +512,29 @@ func (s *Syncer) syncRange( close(results) }() + c := make(chan *blockResult, 100) + g.Go(func() error { // TODO: ensures exit is coordinated + encounterG, encounterCtx := lerrgroup.WithContextN(pipelineCtx, 24, 24) // TODO: find a food value for this + for b := range results { + encounterG.Go(func() error { + if err := s.handler.BlockEncountered(encounterCtx, b.block); err != nil { + return err + } + + select { + case c <- b: + return nil + case <-encounterCtx.Done(): + return encounterCtx.Err() + } + }) + } + + return encounterCtx.Err() + }) + cache := make(map[int64]*blockResult) - for b := range results { + for b := range c { cache[b.index] = b if err := s.processBlocks(ctx, cache, endIndex); err != nil { diff --git a/syncer/types.go b/syncer/types.go index f52e7417..0c6fa219 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -81,6 +81,13 @@ const ( // to handle different events. It is common to write logs or // perform reconciliation in the sync processor. type Handler interface { + // Guaranteed that we will not invoke BlockAdded + // until blockencountered has returned. + BlockEncountered( + ctx context.Context, + block *types.Block, + ) error + BlockAdded( ctx context.Context, block *types.Block, From b1fe8408281bc81f105a212e633b98cd6ceefc95 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 09:04:28 -0600 Subject: [PATCH 02/30] initial support for encounter block --- statefulsyncer/stateful_syncer.go | 15 +++++++++++++ storage/modules/block_storage.go | 37 ++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/statefulsyncer/stateful_syncer.go b/statefulsyncer/stateful_syncer.go index 1218789e..63d7dd87 100644 --- a/statefulsyncer/stateful_syncer.go +++ b/statefulsyncer/stateful_syncer.go @@ -219,6 +219,21 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error { return ctx.Err() } +// BlockEncountered is called by the syncer when a block is encountered. +func (s *StatefulSyncer) BlockEncountered(ctx context.Context, block *types.Block) error { + err := s.blockStorage.EncounterBlock(ctx, block) + if err != nil { + return fmt.Errorf( + "%w: unable to encounter block to storage %s:%d", + err, + block.BlockIdentifier.Hash, + block.BlockIdentifier.Index, + ) + } + + return nil +} + // BlockAdded is called by the syncer when a block is added. func (s *StatefulSyncer) BlockAdded(ctx context.Context, block *types.Block) error { err := s.blockStorage.AddBlock(ctx, block) diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index 078c613e..d43cf2b1 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -561,7 +561,7 @@ func (b *BlockStorage) GetBlock( return b.GetBlockTransactional(ctx, transaction, blockIdentifier) } -func (b *BlockStorage) storeBlock( +func (b *BlockStorage) encounterBlock( ctx context.Context, transaction database.Transaction, blockResponse *types.BlockResponse, @@ -577,6 +577,16 @@ func (b *BlockStorage) storeBlock( return fmt.Errorf("%w: %v", storageErrs.ErrBlockStoreFailed, err) } + return nil +} + +func (b *BlockStorage) storeBlock( + ctx context.Context, + transaction database.Transaction, + blockIdentifier *types.BlockIdentifier, +) error { + _, key := getBlockHashKey(blockIdentifier.Hash) + if err := storeUniqueKey( ctx, transaction, @@ -598,12 +608,12 @@ func (b *BlockStorage) storeBlock( return nil } -// AddBlock stores a block or returns an error. -func (b *BlockStorage) AddBlock( +// EncounterBlock pre-stores a block or returns an error. +func (b *BlockStorage) EncounterBlock( ctx context.Context, block *types.Block, ) error { - transaction := b.db.WriteTransaction(ctx, blockSyncIdentifier, true) + transaction := b.db.WriteTransaction(ctx, block.BlockIdentifier.Hash, true) defer transaction.Discard(ctx) // Store all transactions in order and check for duplicates @@ -639,7 +649,7 @@ func (b *BlockStorage) AddBlock( } // Store block - err := b.storeBlock(ctx, transaction, blockWithoutTransactions) + err := b.encounterBlock(ctx, transaction, blockWithoutTransactions) if err != nil { return fmt.Errorf("%w: %v", storageErrs.ErrBlockStoreFailed, err) } @@ -668,6 +678,23 @@ func (b *BlockStorage) AddBlock( return err } + return transaction.Commit(ctx) +} + +// AddBlock stores a block or returns an error. +func (b *BlockStorage) AddBlock( + ctx context.Context, + block *types.Block, +) error { + transaction := b.db.WriteTransaction(ctx, blockSyncIdentifier, true) + defer transaction.Discard(ctx) + + // Store block + err := b.storeBlock(ctx, transaction, block.BlockIdentifier) + if err != nil { + return fmt.Errorf("%w: %v", storageErrs.ErrBlockStoreFailed, err) + } + return b.callWorkersAndCommit(ctx, block, transaction, true) } From aa8b096f2fb9bc5ac21585aa55c268a3f7a42b9c Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 09:08:29 -0600 Subject: [PATCH 03/30] allow unfinalized txs --- storage/modules/block_storage.go | 13 +++++++++++++ storage/modules/block_storage_test.go | 1 + 2 files changed, 14 insertions(+) diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index d43cf2b1..7f61f3d8 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -1004,6 +1004,7 @@ func (b *BlockStorage) FindTransaction( ctx context.Context, transactionIdentifier *types.TransactionIdentifier, txn database.Transaction, + allowUnfinalized bool, ) (*types.BlockIdentifier, *types.Transaction, error) { blockTransactions, err := b.getAllTransactionsByIdentifier(ctx, transactionIdentifier, txn) if err != nil { @@ -1014,10 +1015,22 @@ func (b *BlockStorage) FindTransaction( return nil, nil, nil } + head, err := b.GetHeadBlockIdentifier(ctx) + if err != nil { + return nil, nil, err + } + var newestBlock *types.BlockIdentifier var newestTransaction *types.Transaction for _, blockTransaction := range blockTransactions { if newestBlock == nil || blockTransaction.BlockIdentifier.Index > newestBlock.Index { + // Now that we are optimistically storing data, there is a change + // we may fetch a transaction from an unfinalized block. In some cases (rosetta-bitcoin), + // we want this! + if !allowUnfinalized && head != nil && blockTransaction.BlockIdentifier.Index > head.Index { + continue + } + newestBlock = blockTransaction.BlockIdentifier newestTransaction = blockTransaction.Transaction } diff --git a/storage/modules/block_storage_test.go b/storage/modules/block_storage_test.go index ca60f8cb..ff2722fc 100644 --- a/storage/modules/block_storage_test.go +++ b/storage/modules/block_storage_test.go @@ -296,6 +296,7 @@ func findTransactionWithDbTransaction( ctx, transactionIdentifier, txn, + false, ) } From 6cbc441877a650b8a416b435fad7a6cc23e3397b Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 09:10:48 -0600 Subject: [PATCH 04/30] cleanup encounter --- syncer/syncer.go | 10 ++++++++-- syncer/types.go | 2 ++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 50a8fa02..0d455fc4 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "log" + "runtime" "time" lerrgroup "github.com/neilotoole/errgroup" @@ -512,9 +513,14 @@ func (s *Syncer) syncRange( close(results) }() - c := make(chan *blockResult, 100) + c := make(chan *blockResult, defaultEncounterBacklog) g.Go(func() error { // TODO: ensures exit is coordinated - encounterG, encounterCtx := lerrgroup.WithContextN(pipelineCtx, 24, 24) // TODO: find a food value for this + numCPU := runtime.NumCPU() + encounterG, encounterCtx := lerrgroup.WithContextN( + pipelineCtx, + numCPU, + numCPU, + ) // TODO: find a good value for this for b := range results { encounterG.Go(func() error { if err := s.handler.BlockEncountered(encounterCtx, b.block); err != nil { diff --git a/syncer/types.go b/syncer/types.go index 0c6fa219..2acbcc36 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -75,6 +75,8 @@ const ( // when we are loading more blocks to fetch but we // already have a backlog >= to concurrency. defaultFetchSleep = 500 * time.Millisecond + + defaultEncounterBacklog = 100 ) // Handler is called at various times during the sync cycle From 04a04558f4a94d89bbc800c7501e9c2de4f4c9bb Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 09:16:14 -0600 Subject: [PATCH 05/30] fix restore when already exists --- storage/modules/block_storage.go | 49 +++++++++++++++++++++++++++---- syncer/syncer.go | 50 ++++++++++++++++++-------------- 2 files changed, 71 insertions(+), 28 deletions(-) diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index 7f61f3d8..fe0b15a1 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -565,19 +565,40 @@ func (b *BlockStorage) encounterBlock( ctx context.Context, transaction database.Transaction, blockResponse *types.BlockResponse, -) error { +) (bool, error) { blockIdentifier := blockResponse.Block.BlockIdentifier namespace, key := getBlockHashKey(blockIdentifier.Hash) buf, err := b.db.Encoder().Encode(namespace, blockResponse) if err != nil { - return fmt.Errorf("%w: %v", storageErrs.ErrBlockEncodeFailed, err) + return false, fmt.Errorf("%w: %v", storageErrs.ErrBlockEncodeFailed, err) } - if err := storeUniqueKey(ctx, transaction, key, buf, true); err != nil { - return fmt.Errorf("%w: %v", storageErrs.ErrBlockStoreFailed, err) + exists, val, err := transaction.Get(ctx, key) + if err != nil { + return false, err } - return nil + if !exists { + return false, transaction.Set(ctx, key, buf, true) + } + + var rosettaBlockResponse types.BlockResponse + err = b.db.Encoder().Decode(namespace, val, &rosettaBlockResponse, true) + if err != nil { + return false, err + } + + // Exit early if block already exists! + if blockResponse.Block.BlockIdentifier.Hash == rosettaBlockResponse.Block.BlockIdentifier.Hash && + blockResponse.Block.BlockIdentifier.Index == rosettaBlockResponse.Block.BlockIdentifier.Index { + return true, nil + } + + return false, fmt.Errorf( + "%w: duplicate key %s found", + storageErrs.ErrDuplicateKey, + string(key), + ) } func (b *BlockStorage) storeBlock( @@ -616,6 +637,18 @@ func (b *BlockStorage) EncounterBlock( transaction := b.db.WriteTransaction(ctx, block.BlockIdentifier.Hash, true) defer transaction.Discard(ctx) + // Check if block already saved + // TODO: only perform if we find a duplicate + bl, err := b.GetBlockLazyTransactional(ctx, types.ConstructPartialBlockIdentifier(block.BlockIdentifier), transaction) + if err != nil && !errors.Is(err, storageErrs.ErrBlockNotFound) { + return err + } + + // Exit early if block already exists! + if bl != nil && block.BlockIdentifier.Hash == bl.Block.BlockIdentifier.Hash && block.BlockIdentifier.Index == bl.Block.BlockIdentifier.Index { + return nil + } + // Store all transactions in order and check for duplicates identifiers := make([]*types.TransactionIdentifier, len(block.Transactions)) identiferSet := map[string]struct{}{} @@ -649,11 +682,15 @@ func (b *BlockStorage) EncounterBlock( } // Store block - err := b.encounterBlock(ctx, transaction, blockWithoutTransactions) + exists, err := b.encounterBlock(ctx, transaction, blockWithoutTransactions) if err != nil { return fmt.Errorf("%w: %v", storageErrs.ErrBlockStoreFailed, err) } + if exists { + return nil + } + g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU) for i := range block.Transactions { // We need to set variable before calling goroutine diff --git a/syncer/syncer.go b/syncer/syncer.go index 0d455fc4..54b60cd9 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -506,13 +506,6 @@ func (s *Syncer) syncRange( }) } - // Wait for all block fetching goroutines to exit - // before closing the results channel. - go func() { - _ = g.Wait() - close(results) - }() - c := make(chan *blockResult, defaultEncounterBacklog) g.Go(func() error { // TODO: ensures exit is coordinated numCPU := runtime.NumCPU() @@ -521,24 +514,37 @@ func (s *Syncer) syncRange( numCPU, numCPU, ) // TODO: find a good value for this - for b := range results { - encounterG.Go(func() error { - if err := s.handler.BlockEncountered(encounterCtx, b.block); err != nil { - return err - } - - select { - case c <- b: - return nil - case <-encounterCtx.Done(): - return encounterCtx.Err() - } - }) - } - return encounterCtx.Err() + for { + select { + case result := <-results: + encounterG.Go(func() error { + if err := s.handler.BlockEncountered(encounterCtx, result.block); err != nil { + fmt.Println(err) + return err + } + + select { + case c <- result: + return nil + case <-encounterCtx.Done(): + return encounterCtx.Err() + } + }) + case <-encounterCtx.Done(): + close(c) // TODO: will cause error to be silenced + return encounterCtx.Err() + } + } }) + // Wait for all block fetching goroutines to exit + // before closing the results channel. + go func() { + _ = g.Wait() + close(results) + }() + cache := make(map[int64]*blockResult) for b := range c { cache[b.index] = b From 3f765639138eac560b66a56eb70726b186f8cfc2 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 10:00:37 -0600 Subject: [PATCH 06/30] remove unnecessary lookup --- storage/modules/block_storage.go | 3 +-- storage/modules/block_storage_test.go | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index fe0b15a1..50772a4a 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -1041,7 +1041,6 @@ func (b *BlockStorage) FindTransaction( ctx context.Context, transactionIdentifier *types.TransactionIdentifier, txn database.Transaction, - allowUnfinalized bool, ) (*types.BlockIdentifier, *types.Transaction, error) { blockTransactions, err := b.getAllTransactionsByIdentifier(ctx, transactionIdentifier, txn) if err != nil { @@ -1064,7 +1063,7 @@ func (b *BlockStorage) FindTransaction( // Now that we are optimistically storing data, there is a change // we may fetch a transaction from an unfinalized block. In some cases (rosetta-bitcoin), // we want this! - if !allowUnfinalized && head != nil && blockTransaction.BlockIdentifier.Index > head.Index { + if head != nil && blockTransaction.BlockIdentifier.Index > head.Index { continue } diff --git a/storage/modules/block_storage_test.go b/storage/modules/block_storage_test.go index ff2722fc..ca60f8cb 100644 --- a/storage/modules/block_storage_test.go +++ b/storage/modules/block_storage_test.go @@ -296,7 +296,6 @@ func findTransactionWithDbTransaction( ctx, transactionIdentifier, txn, - false, ) } From 5e50069967e4b4ef48954a2a4e512aaecd9aac45 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 11:00:04 -0600 Subject: [PATCH 07/30] debug nil result --- syncer/syncer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 54b60cd9..41c521fc 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -518,8 +518,10 @@ func (s *Syncer) syncRange( for { select { case result := <-results: + // TODO: nil block result? + rb := result.block encounterG.Go(func() error { - if err := s.handler.BlockEncountered(encounterCtx, result.block); err != nil { + if err := s.handler.BlockEncountered(encounterCtx, rb); err != nil { fmt.Println(err) return err } From 1be70220f2de7405aa378a1c8fbf10bdebe83c76 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 14:22:24 -0600 Subject: [PATCH 08/30] skip encounter block if nil --- syncer/syncer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 41c521fc..b12a04af 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -518,12 +518,12 @@ func (s *Syncer) syncRange( for { select { case result := <-results: - // TODO: nil block result? - rb := result.block encounterG.Go(func() error { - if err := s.handler.BlockEncountered(encounterCtx, rb); err != nil { - fmt.Println(err) - return err + if result.block != nil { // can occur when ErrOrphanHead + if err := s.handler.BlockEncountered(encounterCtx, result.block); err != nil { + fmt.Println(err) + return err + } } select { From 20e9956fc3862245c3ec9afaa3e2796cc82abfdd Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 14:26:26 -0600 Subject: [PATCH 09/30] remove duplicate check --- storage/modules/block_storage.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index 50772a4a..2e2172f4 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -637,18 +637,6 @@ func (b *BlockStorage) EncounterBlock( transaction := b.db.WriteTransaction(ctx, block.BlockIdentifier.Hash, true) defer transaction.Discard(ctx) - // Check if block already saved - // TODO: only perform if we find a duplicate - bl, err := b.GetBlockLazyTransactional(ctx, types.ConstructPartialBlockIdentifier(block.BlockIdentifier), transaction) - if err != nil && !errors.Is(err, storageErrs.ErrBlockNotFound) { - return err - } - - // Exit early if block already exists! - if bl != nil && block.BlockIdentifier.Hash == bl.Block.BlockIdentifier.Hash && block.BlockIdentifier.Index == bl.Block.BlockIdentifier.Index { - return nil - } - // Store all transactions in order and check for duplicates identifiers := make([]*types.TransactionIdentifier, len(block.Transactions)) identiferSet := map[string]struct{}{} From 099e909ec2ba320db0a6439dacb47c7cdd1c88ba Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 6 Dec 2020 10:04:22 -0600 Subject: [PATCH 10/30] cleanup syncer code --- mocks/syncer/handler.go | 12 +-- syncer/syncer.go | 167 +++++++++++++++++++++++----------------- syncer/types.go | 2 +- 3 files changed, 103 insertions(+), 78 deletions(-) diff --git a/mocks/syncer/handler.go b/mocks/syncer/handler.go index abc0aa3f..d435c6df 100644 --- a/mocks/syncer/handler.go +++ b/mocks/syncer/handler.go @@ -29,12 +29,12 @@ func (_m *Handler) BlockAdded(ctx context.Context, block *types.Block) error { return r0 } -// BlockEncountered provides a mock function with given fields: ctx, block -func (_m *Handler) BlockEncountered(ctx context.Context, block *types.Block) error { +// BlockRemoved provides a mock function with given fields: ctx, block +func (_m *Handler) BlockRemoved(ctx context.Context, block *types.BlockIdentifier) error { ret := _m.Called(ctx, block) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *types.Block) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, *types.BlockIdentifier) error); ok { r0 = rf(ctx, block) } else { r0 = ret.Error(0) @@ -43,12 +43,12 @@ func (_m *Handler) BlockEncountered(ctx context.Context, block *types.Block) err return r0 } -// BlockRemoved provides a mock function with given fields: ctx, block -func (_m *Handler) BlockRemoved(ctx context.Context, block *types.BlockIdentifier) error { +// BlockSeen provides a mock function with given fields: ctx, block +func (_m *Handler) BlockSeen(ctx context.Context, block *types.Block) error { ret := _m.Called(ctx, block) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *types.BlockIdentifier) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, *types.Block) error); ok { r0 = rf(ctx, block) } else { r0 = ret.Error(0) diff --git a/syncer/syncer.go b/syncer/syncer.go index b12a04af..ba2aa20e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -358,7 +358,7 @@ func (s *Syncer) processBlocks( return fmt.Errorf("%w: %v", ErrFetchBlockReorgFailed, err) } - if err := s.handler.BlockEncountered(ctx, br.block); err != nil { + if err := s.handleSeenBlock(ctx, br); err != nil { return err } } else { @@ -457,6 +457,53 @@ func (s *Syncer) adjustWorkers() bool { return shouldCreate } +func (s *Syncer) handleSeenBlock( + ctx context.Context, + result *blockResult, +) error { + // If the helper returns ErrOrphanHead + // for a block fetch, result.block will + // be nil. + if result.block == nil { + return nil + } + + return s.handler.BlockSeen(ctx, result.block) +} + +func (s *Syncer) handleSeenBlocks( + ctx context.Context, + input chan *blockResult, + output chan *blockResult, +) error { + numCPU := runtime.NumCPU() + g, ctx := lerrgroup.WithContextN( + ctx, + numCPU, + numCPU, + ) + + for { + select { + case result := <-input: + g.Go(func() error { + if err := s.handleSeenBlock(ctx, result); err != nil { + return err + } + + select { + case output <- result: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }) + case <-ctx.Done(): + return ctx.Err() + } + } +} + // syncRange fetches and processes a range of blocks // (from syncer.nextIndex to endIndex, inclusive) // with syncer.concurrency. @@ -506,40 +553,6 @@ func (s *Syncer) syncRange( }) } - c := make(chan *blockResult, defaultEncounterBacklog) - g.Go(func() error { // TODO: ensures exit is coordinated - numCPU := runtime.NumCPU() - encounterG, encounterCtx := lerrgroup.WithContextN( - pipelineCtx, - numCPU, - numCPU, - ) // TODO: find a good value for this - - for { - select { - case result := <-results: - encounterG.Go(func() error { - if result.block != nil { // can occur when ErrOrphanHead - if err := s.handler.BlockEncountered(encounterCtx, result.block); err != nil { - fmt.Println(err) - return err - } - } - - select { - case c <- result: - return nil - case <-encounterCtx.Done(): - return encounterCtx.Err() - } - }) - case <-encounterCtx.Done(): - close(c) // TODO: will cause error to be silenced - return encounterCtx.Err() - } - } - }) - // Wait for all block fetching goroutines to exit // before closing the results channel. go func() { @@ -547,44 +560,56 @@ func (s *Syncer) syncRange( close(results) }() - cache := make(map[int64]*blockResult) - for b := range c { - cache[b.index] = b - - if err := s.processBlocks(ctx, cache, endIndex); err != nil { - return fmt.Errorf("%w: %v", ErrBlocksProcessMultipleFailed, err) - } - - // Determine if concurrency should be adjusted. - s.recentBlockSizes = append(s.recentBlockSizes, utils.SizeOf(b)) - s.lastAdjustment++ - - s.concurrencyLock.Lock() - shouldCreate := s.adjustWorkers() - if !shouldCreate { - s.concurrencyLock.Unlock() - continue - } + c := make(chan *blockResult, defaultEncounterBacklog) + g.Go(func() error { + return s.handleSeenBlocks(ctx, results, c) + }) - // If we have finished loading blocks or the pipelineCtx - // has an error (like context.Canceled), we should avoid - // creating more goroutines (as there is a chance that - // Wait has returned). Attempting to create more goroutines - // after Wait has returned will cause a panic. - s.doneLoadingLock.Lock() - if !s.doneLoading && pipelineCtx.Err() == nil { - g.Go(func() error { - return s.fetchChannelBlocks(pipelineCtx, s.network, blockIndices, results) - }) - } else { - s.concurrency-- + cache := make(map[int64]*blockResult) + g.Go(func() error { + for { + select { + case b := <-c: + cache[b.index] = b + + if err := s.processBlocks(ctx, cache, endIndex); err != nil { + return fmt.Errorf("%w: %v", ErrBlocksProcessMultipleFailed, err) + } + + // Determine if concurrency should be adjusted. + s.recentBlockSizes = append(s.recentBlockSizes, utils.SizeOf(b)) + s.lastAdjustment++ + + s.concurrencyLock.Lock() + shouldCreate := s.adjustWorkers() + if !shouldCreate { + s.concurrencyLock.Unlock() + continue + } + + // If we have finished loading blocks or the pipelineCtx + // has an error (like context.Canceled), we should avoid + // creating more goroutines (as there is a chance that + // Wait has returned). Attempting to create more goroutines + // after Wait has returned will cause a panic. + s.doneLoadingLock.Lock() + if !s.doneLoading && pipelineCtx.Err() == nil { + g.Go(func() error { + return s.fetchChannelBlocks(pipelineCtx, s.network, blockIndices, results) + }) + } else { + s.concurrency-- + } + s.doneLoadingLock.Unlock() + + // Hold concurrencyLock until after we attempt to create another + // new goroutine in the case we accidentally go to 0 during shutdown. + s.concurrencyLock.Unlock() + case <-pipelineCtx.Done(): + return pipelineCtx.Err() + } } - s.doneLoadingLock.Unlock() - - // Hold concurrencyLock until after we attempt to create another - // new goroutine in the case we accidentally go to 0 during shutdown. - s.concurrencyLock.Unlock() - } + }) err := g.Wait() if err != nil { diff --git a/syncer/types.go b/syncer/types.go index 2acbcc36..c4ac7166 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -85,7 +85,7 @@ const ( type Handler interface { // Guaranteed that we will not invoke BlockAdded // until blockencountered has returned. - BlockEncountered( + BlockSeen( ctx context.Context, block *types.Block, ) error From 6155702fc1d7ce65822f55aa5147ba309963a1c7 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 6 Dec 2020 10:06:37 -0600 Subject: [PATCH 11/30] rename functions --- statefulsyncer/stateful_syncer.go | 6 +++--- storage/modules/block_storage.go | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/statefulsyncer/stateful_syncer.go b/statefulsyncer/stateful_syncer.go index 63d7dd87..6e5df6f0 100644 --- a/statefulsyncer/stateful_syncer.go +++ b/statefulsyncer/stateful_syncer.go @@ -219,9 +219,9 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error { return ctx.Err() } -// BlockEncountered is called by the syncer when a block is encountered. -func (s *StatefulSyncer) BlockEncountered(ctx context.Context, block *types.Block) error { - err := s.blockStorage.EncounterBlock(ctx, block) +// BlockSeen is called by the syncer when a block is seen. +func (s *StatefulSyncer) BlockSeen(ctx context.Context, block *types.Block) error { + err := s.blockStorage.SeeBlock(ctx, block) if err != nil { return fmt.Errorf( "%w: unable to encounter block to storage %s:%d", diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index 2e2172f4..8e86cbb0 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -561,7 +561,7 @@ func (b *BlockStorage) GetBlock( return b.GetBlockTransactional(ctx, transaction, blockIdentifier) } -func (b *BlockStorage) encounterBlock( +func (b *BlockStorage) seeBlock( ctx context.Context, transaction database.Transaction, blockResponse *types.BlockResponse, @@ -629,8 +629,8 @@ func (b *BlockStorage) storeBlock( return nil } -// EncounterBlock pre-stores a block or returns an error. -func (b *BlockStorage) EncounterBlock( +// SeeBlock pre-stores a block or returns an error. +func (b *BlockStorage) SeeBlock( ctx context.Context, block *types.Block, ) error { @@ -670,7 +670,7 @@ func (b *BlockStorage) EncounterBlock( } // Store block - exists, err := b.encounterBlock(ctx, transaction, blockWithoutTransactions) + exists, err := b.seeBlock(ctx, transaction, blockWithoutTransactions) if err != nil { return fmt.Errorf("%w: %v", storageErrs.ErrBlockStoreFailed, err) } From fffc69dd8208fe407c66ef14e0dc0ce2c23c469b Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 08:52:42 -0600 Subject: [PATCH 12/30] see how long adding coins takes --- storage/modules/coin_storage.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/storage/modules/coin_storage.go b/storage/modules/coin_storage.go index ac9e8c58..0a637360 100644 --- a/storage/modules/coin_storage.go +++ b/storage/modules/coin_storage.go @@ -20,6 +20,7 @@ import ( "math/big" "runtime" "strings" + "time" "github.com/neilotoole/errgroup" @@ -357,6 +358,10 @@ func (c *CoinStorage) AddingBlock( block *types.Block, transaction database.Transaction, ) (database.CommitWorker, error) { + start := time.Now() + defer func() { + fmt.Println("adding coins", time.Since(start)) + }() return nil, c.updateCoins(ctx, block, true, transaction) } From 56004cda52e9bb81bfa0114ce7c90145648306ec Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 09:17:48 -0600 Subject: [PATCH 13/30] Add encoder for AccountCurrency --- storage/encoder/encoder.go | 210 ++++++++++++++++++++++++++++++++++--- 1 file changed, 194 insertions(+), 16 deletions(-) diff --git a/storage/encoder/encoder.go b/storage/encoder/encoder.go index 1cefd9e7..a7e5645c 100644 --- a/storage/encoder/encoder.go +++ b/storage/encoder/encoder.go @@ -221,22 +221,6 @@ const ( unicodeRecordSeparator = '\u001E' ) -// Indexes of encoded AccountCoin struct -const ( - accountAddress = iota - coinIdentifier - amountValue - amountCurrencySymbol - amountCurrencyDecimals - - // If none exist below, we stop after amount. - accountMetadata - subAccountAddress - subAccountMetadata - amountMetadata - currencyMetadata -) - func (e *Encoder) encodeAndWrite(output *bytes.Buffer, object interface{}) error { buf := e.pool.Get() err := getEncoder(buf).Encode(object) @@ -368,6 +352,22 @@ func (e *Encoder) DecodeAccountCoin( // nolint:gocognit accountCoin *types.AccountCoin, reclaimInput bool, ) error { + // Indexes of encoded AccountCoin struct + const ( + accountAddress = iota + coinIdentifier + amountValue + amountCurrencySymbol + amountCurrencyDecimals + + // If none exist below, we stop after amount. + accountMetadata + subAccountAddress + subAccountMetadata + amountMetadata + currencyMetadata + ) + count := 0 currentBytes := b for { @@ -475,3 +475,181 @@ func (e *Encoder) DecodeAccountCoin( // nolint:gocognit return nil } + +func (e *Encoder) EncodeAccountCurrency( // nolint:gocognit + accountCurrency *types.AccountCurrency, +) ([]byte, error) { + output := e.pool.Get() + if _, err := output.WriteString(accountCurrency.Account.Address); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + if _, err := output.WriteRune(unicodeRecordSeparator); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + if _, err := output.WriteString(accountCurrency.Currency.Symbol); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + if _, err := output.WriteRune(unicodeRecordSeparator); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + if _, err := output.WriteString( + strconv.FormatInt(int64(accountCurrency.Currency.Decimals), 10), + ); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + + // Exit early if we don't have any complex data to record (this helps + // us save a lot of space on the happy path). + if accountCurrency.Account.Metadata == nil && + accountCurrency.Account.SubAccount == nil && + accountCurrency.Currency.Metadata == nil { + return output.Bytes(), nil + } + + if _, err := output.WriteRune(unicodeRecordSeparator); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + if accountCurrency.Account.Metadata != nil { + if err := e.encodeAndWrite(output, accountCurrency.Account.Metadata); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + } + if _, err := output.WriteRune(unicodeRecordSeparator); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + + if accountCurrency.Account.SubAccount != nil { + if _, err := output.WriteString(accountCurrency.Account.SubAccount.Address); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + } + if _, err := output.WriteRune(unicodeRecordSeparator); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + + if accountCurrency.Account.SubAccount != nil && accountCurrency.Account.SubAccount.Metadata != nil { + if err := e.encodeAndWrite(output, accountCurrency.Account.SubAccount.Metadata); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + } + if _, err := output.WriteRune(unicodeRecordSeparator); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + + if accountCurrency.Currency.Metadata != nil { + if err := e.encodeAndWrite(output, accountCurrency.Currency.Metadata); err != nil { + return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) + } + } + + return output.Bytes(), nil +} + +func (e *Encoder) DecodeAccountCurrency( // nolint:gocognit + b []byte, + accountCurrency *types.AccountCurrency, + reclaimInput bool, +) error { + // Indexes of encoded AccountCurrency struct + const ( + accountAddress = iota + currencySymbol + currencyDecimals + + // If none exist below, we stop after amount. + accountMetadata + subAccountAddress + subAccountMetadata + currencyMetadata + ) + + count := 0 + currentBytes := b + for { + nextRune := bytes.IndexRune(currentBytes, unicodeRecordSeparator) + if nextRune == -1 { + if count != currencyDecimals && count != currencyMetadata { + return fmt.Errorf("%w: next rune is -1 at %d", errors.ErrRawDecodeFailed, count) + } + + nextRune = len(currentBytes) + } + + val := currentBytes[:nextRune] + if len(val) == 0 { + goto handleNext + } + + switch count { + case accountAddress: + accountCurrency.Account = &types.AccountIdentifier{ + Address: string(val), + } + case currencySymbol: + accountCurrency.Currency = &types.Currency{ + Symbol: string(val), + } + case currencyDecimals: + i, err := strconv.ParseInt(string(val), 10, 32) + if err != nil { + return fmt.Errorf("%w: %s", errors.ErrRawDecodeFailed, err.Error()) + } + + accountCurrency.Currency.Decimals = int32(i) + case accountMetadata: + m, err := e.decodeMap(val) + if err != nil { + return fmt.Errorf("%w: account metadata %s", errors.ErrRawDecodeFailed, err.Error()) + } + + accountCurrency.Account.Metadata = m + case subAccountAddress: + accountCurrency.Account.SubAccount = &types.SubAccountIdentifier{ + Address: string(val), + } + case subAccountMetadata: + if accountCurrency.Account.SubAccount == nil { + return errors.ErrRawDecodeFailed // must have address + } + + m, err := e.decodeMap(val) + if err != nil { + return fmt.Errorf( + "%w: subaccount metadata %s", + errors.ErrRawDecodeFailed, + err.Error(), + ) + } + + accountCurrency.Account.SubAccount.Metadata = m + case currencyMetadata: + m, err := e.decodeMap(val) + if err != nil { + return fmt.Errorf( + "%w: currency metadata %s", + errors.ErrRawDecodeFailed, + err.Error(), + ) + } + + accountCurrency.Currency.Metadata = m + default: + return fmt.Errorf("%w: count %d > end", errors.ErrRawDecodeFailed, count) + } + + handleNext: + if nextRune == len(currentBytes) && + (count == currencyDecimals || count == currencyMetadata) { + break + } + + currentBytes = currentBytes[nextRune+1:] + count++ + } + + if reclaimInput { + e.pool.PutByteSlice(b) + } + + return nil +} From 89b1075cef44809fde78e79f9452b82301d55e7e Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 09:22:33 -0600 Subject: [PATCH 14/30] Use custom encoder in balance storage --- storage/modules/balance_storage.go | 33 +++++++++--------------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/storage/modules/balance_storage.go b/storage/modules/balance_storage.go index 49b9a349..244d7f54 100644 --- a/storage/modules/balance_storage.go +++ b/storage/modules/balance_storage.go @@ -329,11 +329,6 @@ func (b *BalanceStorage) RemovingBlock( }, nil } -type accountEntry struct { - Account *types.AccountIdentifier `json:"account"` - Currency *types.Currency `json:"currency"` -} - // SetBalance allows a client to set the balance of an account in a database // transaction (removing all historical states). This is particularly useful // for bootstrapping balances. @@ -360,7 +355,7 @@ func (b *BalanceStorage) SetBalance( } // Serialize account entry - serialAcc, err := b.db.Encoder().Encode(accountNamespace, accountEntry{ + serialAcc, err := b.db.Encoder().EncodeAccountCurrency(&types.AccountCurrency{ Account: account, Currency: amount.Currency, }) @@ -480,7 +475,7 @@ func (b *BalanceStorage) ReconciliationCoverage( ) (float64, error) { seen := 0 validCoverage := 0 - err := b.getAllAccountEntries(ctx, func(txn database.Transaction, entry accountEntry) error { + err := b.getAllAccountEntries(ctx, func(txn database.Transaction, entry *types.AccountCurrency) error { seen++ // Fetch last reconciliation index in same database.Transaction @@ -859,7 +854,7 @@ func (b *BalanceStorage) UpdateBalance( if !exists { newAccount = true key := GetAccountKey(accountNamespace, change.Account, change.Currency) - serialAcc, err := b.db.Encoder().Encode(accountNamespace, accountEntry{ + serialAcc, err := b.db.Encoder().EncodeAccountCurrency(&types.AccountCurrency{ Account: change.Account, Currency: change.Currency, }) @@ -1137,7 +1132,7 @@ func (b *BalanceStorage) BootstrapBalances( func (b *BalanceStorage) getAllAccountEntries( ctx context.Context, - handler func(database.Transaction, accountEntry) error, + handler func(database.Transaction, *types.AccountCurrency) error, ) error { txn := b.db.ReadTransaction(ctx) defer txn.Discard(ctx) @@ -1146,9 +1141,9 @@ func (b *BalanceStorage) getAllAccountEntries( []byte(accountNamespace), []byte(accountNamespace), func(k []byte, v []byte) error { - var accEntry accountEntry + var accCurrency types.AccountCurrency // We should not reclaim memory during a scan!! - err := b.db.Encoder().Decode(accountNamespace, v, &accEntry, false) + err := b.db.Encoder().DecodeAccountCurrency(v, &accCurrency, false) if err != nil { return fmt.Errorf( "%w: unable to parse balance entry for %s", @@ -1157,7 +1152,7 @@ func (b *BalanceStorage) getAllAccountEntries( ) } - return handler(txn, accEntry) + return handler(txn, &accCurrency) }, false, false, @@ -1177,22 +1172,14 @@ func (b *BalanceStorage) GetAllAccountCurrency( ) ([]*types.AccountCurrency, error) { log.Println("Loading previously seen accounts (this could take a while)...") - accountEntries := []*accountEntry{} - if err := b.getAllAccountEntries(ctx, func(_ database.Transaction, entry accountEntry) error { - accountEntries = append(accountEntries, &entry) + accounts := []*types.AccountCurrency{} + if err := b.getAllAccountEntries(ctx, func(_ database.Transaction, account *types.AccountCurrency) error { + accounts = append(accounts, account) return nil }); err != nil { return nil, fmt.Errorf("%w: unable to get all balance entries", err) } - accounts := make([]*types.AccountCurrency, len(accountEntries)) - for i, account := range accountEntries { - accounts[i] = &types.AccountCurrency{ - Account: account.Account, - Currency: account.Currency, - } - } - return accounts, nil } From 923ca3fd86318082039c39637a6da4b069fec2db Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 09:42:11 -0600 Subject: [PATCH 15/30] Add test for encoder --- storage/encoder/encoder_test.go | 72 +++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/storage/encoder/encoder_test.go b/storage/encoder/encoder_test.go index 99dd444c..3bfd97a2 100644 --- a/storage/encoder/encoder_test.go +++ b/storage/encoder/encoder_test.go @@ -257,3 +257,75 @@ func TestEncodeDecodeAccountCoin(t *testing.T) { }) } } + +func TestEncodeDecodeAccountCurrency(t *testing.T) { + tests := map[string]struct { + accountCurrency *types.AccountCurrency + }{ + "simple": { + accountCurrency: &types.AccountCurrency{ + Account: &types.AccountIdentifier{ + Address: "hello", + }, + Currency: &types.Currency{ + Symbol: "BTC", + Decimals: 8, + }, + }, + }, + "sub account info": { + accountCurrency: &types.AccountCurrency{ + Account: &types.AccountIdentifier{ + Address: "hello", + SubAccount: &types.SubAccountIdentifier{ + Address: "sub", + Metadata: map[string]interface{}{ + "test": "stuff", + }, + }, + }, + Currency: &types.Currency{ + Symbol: "BTC", + Decimals: 8, + }, + }, + }, + "currency metadata": { + accountCurrency: &types.AccountCurrency{ + Account: &types.AccountIdentifier{ + Address: "hello", + }, + Currency: &types.Currency{ + Symbol: "BTC", + Decimals: 8, + Metadata: map[string]interface{}{ + "issuer": "satoshi", + }, + }, + }, + }, + } + + for name, test := range tests { + e, err := NewEncoder(nil, NewBufferPool(), true) + assert.NoError(t, err) + + t.Run(name, func(t *testing.T) { + standardResult, err := e.Encode("", test.accountCurrency) + assert.NoError(t, err) + optimizedResult, err := e.EncodeAccountCurrency(test.accountCurrency) + assert.NoError(t, err) + fmt.Printf( + "Uncompressed: %d, Standard Compressed: %d, Optimized: %d\n", + len(types.PrintStruct(test.accountCurrency)), + len(standardResult), + len(optimizedResult), + ) + + var decoded types.AccountCurrency + assert.NoError(t, e.DecodeAccountCurrency(optimizedResult, &decoded, true)) + + assert.Equal(t, test.accountCurrency, &decoded) + }) + } +} From c11b51235b3598dd22882ff0f8884b06717f8b33 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 10:13:29 -0600 Subject: [PATCH 16/30] nits --- syncer/syncer.go | 134 +++++++++++++++++++++++++---------------------- syncer/types.go | 2 +- 2 files changed, 73 insertions(+), 63 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index ba2aa20e..603f3192 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -471,7 +471,7 @@ func (s *Syncer) handleSeenBlock( return s.handler.BlockSeen(ctx, result.block) } -func (s *Syncer) handleSeenBlocks( +func (s *Syncer) seeBlocks( ctx context.Context, input chan *blockResult, output chan *blockResult, @@ -504,6 +504,59 @@ func (s *Syncer) handleSeenBlocks( } } +func (s *Syncer) sequenceBlocks( + ctx context.Context, + g *errgroup.Group, + blockIndices chan int64, + fetchedBlocks chan *blockResult, + seenBlocks chan *blockResult, + endIndex int64, +) error { + cache := make(map[int64]*blockResult) + for { + select { + case b := <-seenBlocks: + cache[b.index] = b + + if err := s.processBlocks(ctx, cache, endIndex); err != nil { + return fmt.Errorf("%w: %v", ErrBlocksProcessMultipleFailed, err) + } + + // Determine if concurrency should be adjusted. + s.recentBlockSizes = append(s.recentBlockSizes, utils.SizeOf(b)) + s.lastAdjustment++ + + s.concurrencyLock.Lock() + shouldCreate := s.adjustWorkers() + if !shouldCreate { + s.concurrencyLock.Unlock() + continue + } + + // If we have finished loading blocks or the pipelineCtx + // has an error (like context.Canceled), we should avoid + // creating more goroutines (as there is a chance that + // Wait has returned). Attempting to create more goroutines + // after Wait has returned will cause a panic. + s.doneLoadingLock.Lock() + if !s.doneLoading && ctx.Err() == nil { + g.Go(func() error { + return s.fetchChannelBlocks(ctx, s.network, blockIndices, fetchedBlocks) + }) + } else { + s.concurrency-- + } + s.doneLoadingLock.Unlock() + + // Hold concurrencyLock until after we attempt to create another + // new goroutine in the case we accidentally go to 0 during shutdown. + s.concurrencyLock.Unlock() + case <-ctx.Done(): + return ctx.Err() + } + } +} + // syncRange fetches and processes a range of blocks // (from syncer.nextIndex to endIndex, inclusive) // with syncer.concurrency. @@ -512,7 +565,7 @@ func (s *Syncer) syncRange( endIndex int64, ) error { blockIndices := make(chan int64) - results := make(chan *blockResult) + fetchedBlocks := make(chan *blockResult) // Ensure default concurrency is less than max concurrency. startingConcurrency := DefaultConcurrency @@ -534,85 +587,42 @@ func (s *Syncer) syncRange( s.concurrency = startingConcurrency s.goalConcurrency = s.concurrency - // We create a separate derivative context here instead of - // replacing the provided ctx because the context returned - // by errgroup.WithContext is canceled as soon as Wait returns. - // If this canceled context is passed to a handler or helper, - // it can have unintended consequences (some functions - // return immediately if the context is canceled). - // - // Source: https://godoc.org/golang.org/x/sync/errgroup - g, pipelineCtx := errgroup.WithContext(ctx) + // Spawn syncing goroutines + g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - return s.addBlockIndices(pipelineCtx, blockIndices, s.nextIndex, endIndex) + return s.addBlockIndices(ctx, blockIndices, s.nextIndex, endIndex) }) for j := int64(0); j < s.concurrency; j++ { g.Go(func() error { - return s.fetchChannelBlocks(pipelineCtx, s.network, blockIndices, results) + return s.fetchChannelBlocks(ctx, s.network, blockIndices, fetchedBlocks) }) } // Wait for all block fetching goroutines to exit - // before closing the results channel. + // before closing the fetchedBlocks channel. go func() { _ = g.Wait() - close(results) + close(fetchedBlocks) }() - c := make(chan *blockResult, defaultEncounterBacklog) + seenBlocks := make(chan *blockResult, defaultSeenBacklog) g.Go(func() error { - return s.handleSeenBlocks(ctx, results, c) + return s.seeBlocks(ctx, fetchedBlocks, seenBlocks) }) - cache := make(map[int64]*blockResult) g.Go(func() error { - for { - select { - case b := <-c: - cache[b.index] = b - - if err := s.processBlocks(ctx, cache, endIndex); err != nil { - return fmt.Errorf("%w: %v", ErrBlocksProcessMultipleFailed, err) - } - - // Determine if concurrency should be adjusted. - s.recentBlockSizes = append(s.recentBlockSizes, utils.SizeOf(b)) - s.lastAdjustment++ - - s.concurrencyLock.Lock() - shouldCreate := s.adjustWorkers() - if !shouldCreate { - s.concurrencyLock.Unlock() - continue - } - - // If we have finished loading blocks or the pipelineCtx - // has an error (like context.Canceled), we should avoid - // creating more goroutines (as there is a chance that - // Wait has returned). Attempting to create more goroutines - // after Wait has returned will cause a panic. - s.doneLoadingLock.Lock() - if !s.doneLoading && pipelineCtx.Err() == nil { - g.Go(func() error { - return s.fetchChannelBlocks(pipelineCtx, s.network, blockIndices, results) - }) - } else { - s.concurrency-- - } - s.doneLoadingLock.Unlock() - - // Hold concurrencyLock until after we attempt to create another - // new goroutine in the case we accidentally go to 0 during shutdown. - s.concurrencyLock.Unlock() - case <-pipelineCtx.Done(): - return pipelineCtx.Err() - } - } + return s.sequenceBlocks( + ctx, + g, + blockIndices, + fetchedBlocks, + seenBlocks, + endIndex, + ) }) - err := g.Wait() - if err != nil { + if err := g.Wait(); err != nil { return fmt.Errorf("%w: unable to sync to %d", err, endIndex) } diff --git a/syncer/types.go b/syncer/types.go index c4ac7166..a9d9837e 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -76,7 +76,7 @@ const ( // already have a backlog >= to concurrency. defaultFetchSleep = 500 * time.Millisecond - defaultEncounterBacklog = 100 + defaultSeenBacklog = 100 ) // Handler is called at various times during the sync cycle From bf5ace87fa0608fe14f8e404502b8af155a706f2 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 10:35:09 -0600 Subject: [PATCH 17/30] pass process block tests --- syncer/syncer_test.go | 81 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index e31c2717..953b929f 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -468,6 +468,13 @@ func TestSync_NoReorg(t *testing.T) { continue } + mockHandler.On( + "BlockSeen", + mock.AnythingOfType("*context.cancelCtx"), + b, + ).Return( + nil, + ).Once() mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), @@ -533,6 +540,13 @@ func TestSync_SpecificStart(t *testing.T) { ).Run(func(args mock.Arguments) { assertNotCanceled(t, args) }).Once() + mockHandler.On( + "BlockSeen", + mock.AnythingOfType("*context.cancelCtx"), + b, + ).Return( + nil, + ).Once() mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), @@ -596,6 +610,13 @@ func TestSync_Cancel(t *testing.T) { b, nil, ).Once() + mockHandler.On( + "BlockSeen", + mock.AnythingOfType("*context.cancelCtx"), + b, + ).Return( + nil, + ).Once() mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), @@ -649,6 +670,15 @@ func TestSync_Reorg(t *testing.T) { ).Run(func(args mock.Arguments) { assertNotCanceled(t, args) }).Once() + mockHandler.On( + "BlockSeen", + mock.AnythingOfType("*context.cancelCtx"), + b, + ).Run(func(args mock.Arguments) { + assertNotCanceled(t, args) + }).Return( + nil, + ).Once() mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), @@ -703,6 +733,16 @@ func TestSync_Reorg(t *testing.T) { }).Once() } + mockHandler.On( + "BlockSeen", + mock.AnythingOfType("*context.cancelCtx"), + newBlocks[0], + ).Run(func(args mock.Arguments) { + err := args.Get(0).(context.Context) + assert.NoError(t, err.Err()) + }).Return( + nil, + ).Once() // only fetch this block once mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), @@ -727,6 +767,13 @@ func TestSync_Reorg(t *testing.T) { ).Run(func(args mock.Arguments) { assertNotCanceled(t, args) }).Once() + mockHandler.On( + "BlockSeen", + mock.AnythingOfType("*context.cancelCtx"), + b, + ).Return( + nil, + ).Once() mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), @@ -790,6 +837,15 @@ func TestSync_ManualReorg(t *testing.T) { ).Run(func(args mock.Arguments) { assertNotCanceled(t, args) }).Once() + mockHandler.On( + "BlockSeen", + mock.AnythingOfType("*context.cancelCtx"), + b, + ).Run(func(args mock.Arguments) { + assertNotCanceled(t, args) + }).Return( + nil, + ).Once() mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), @@ -838,6 +894,13 @@ func TestSync_ManualReorg(t *testing.T) { ).Run(func(args mock.Arguments) { assertNotCanceled(t, args) }).Once() + mockHandler.On( + "BlockSeen", + mock.AnythingOfType("*context.cancelCtx"), + b, + ).Return( + nil, + ).Once() mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), @@ -941,6 +1004,15 @@ func TestSync_Dynamic(t *testing.T) { continue } + mockHandler.On( + "BlockSeen", + mock.AnythingOfType("*context.cancelCtx"), + b, + ).Return( + nil, + ).Run(func(args mock.Arguments) { + assertNotCanceled(t, args) + }).Once() mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), @@ -1022,6 +1094,15 @@ func TestSync_DynamicOverhead(t *testing.T) { continue } + mockHandler.On( + "BlockSeen", + mock.AnythingOfType("*context.cancelCtx"), + b, + ).Return( + nil, + ).Run(func(args mock.Arguments) { + assertNotCanceled(t, args) + }).Once() mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), From 6d3cbf508eeee9805e32cddb40eefe77e6173e15 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 11:24:16 -0600 Subject: [PATCH 18/30] progress towards syncer tests passing --- syncer/syncer.go | 129 +++++++++++++++++++++--------------------- syncer/syncer_test.go | 7 +-- 2 files changed, 67 insertions(+), 69 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 603f3192..1615e9c6 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -473,8 +473,8 @@ func (s *Syncer) handleSeenBlock( func (s *Syncer) seeBlocks( ctx context.Context, - input chan *blockResult, - output chan *blockResult, + fetchedBlocks chan *blockResult, + seenBlocks chan *blockResult, ) error { numCPU := runtime.NumCPU() g, ctx := lerrgroup.WithContextN( @@ -483,29 +483,28 @@ func (s *Syncer) seeBlocks( numCPU, ) - for { - select { - case result := <-input: - g.Go(func() error { - if err := s.handleSeenBlock(ctx, result); err != nil { - return err - } - - select { - case output <- result: - return nil - case <-ctx.Done(): - return ctx.Err() - } - }) - case <-ctx.Done(): - return ctx.Err() - } + for result := range fetchedBlocks { + r := result + g.Go(func() error { + if err := s.handleSeenBlock(ctx, r); err != nil { + return err + } + + select { + case seenBlocks <- r: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }) } + + return g.Wait() } func (s *Syncer) sequenceBlocks( - ctx context.Context, + pipelineCtx context.Context, + handleCtx context.Context, g *errgroup.Group, blockIndices chan int64, fetchedBlocks chan *blockResult, @@ -513,48 +512,45 @@ func (s *Syncer) sequenceBlocks( endIndex int64, ) error { cache := make(map[int64]*blockResult) - for { - select { - case b := <-seenBlocks: - cache[b.index] = b - - if err := s.processBlocks(ctx, cache, endIndex); err != nil { - return fmt.Errorf("%w: %v", ErrBlocksProcessMultipleFailed, err) - } + for b := range seenBlocks { + cache[b.index] = b - // Determine if concurrency should be adjusted. - s.recentBlockSizes = append(s.recentBlockSizes, utils.SizeOf(b)) - s.lastAdjustment++ - - s.concurrencyLock.Lock() - shouldCreate := s.adjustWorkers() - if !shouldCreate { - s.concurrencyLock.Unlock() - continue - } + if err := s.processBlocks(handleCtx, cache, endIndex); err != nil { + return fmt.Errorf("%w: %v", ErrBlocksProcessMultipleFailed, err) + } - // If we have finished loading blocks or the pipelineCtx - // has an error (like context.Canceled), we should avoid - // creating more goroutines (as there is a chance that - // Wait has returned). Attempting to create more goroutines - // after Wait has returned will cause a panic. - s.doneLoadingLock.Lock() - if !s.doneLoading && ctx.Err() == nil { - g.Go(func() error { - return s.fetchChannelBlocks(ctx, s.network, blockIndices, fetchedBlocks) - }) - } else { - s.concurrency-- - } - s.doneLoadingLock.Unlock() + // Determine if concurrency should be adjusted. + s.recentBlockSizes = append(s.recentBlockSizes, utils.SizeOf(b)) + s.lastAdjustment++ - // Hold concurrencyLock until after we attempt to create another - // new goroutine in the case we accidentally go to 0 during shutdown. + s.concurrencyLock.Lock() + shouldCreate := s.adjustWorkers() + if !shouldCreate { s.concurrencyLock.Unlock() - case <-ctx.Done(): - return ctx.Err() + continue } + + // If we have finished loading blocks or the pipelineCtx + // has an error (like context.Canceled), we should avoid + // creating more goroutines (as there is a chance that + // Wait has returned). Attempting to create more goroutines + // after Wait has returned will cause a panic. + s.doneLoadingLock.Lock() + if !s.doneLoading && pipelineCtx.Err() == nil { + g.Go(func() error { + return s.fetchChannelBlocks(pipelineCtx, s.network, blockIndices, fetchedBlocks) + }) + } else { + s.concurrency-- + } + s.doneLoadingLock.Unlock() + + // Hold concurrencyLock until after we attempt to create another + // new goroutine in the case we accidentally go to 0 during shutdown. + s.concurrencyLock.Unlock() } + + return nil } // syncRange fetches and processes a range of blocks @@ -588,14 +584,14 @@ func (s *Syncer) syncRange( s.goalConcurrency = s.concurrency // Spawn syncing goroutines - g, ctx := errgroup.WithContext(ctx) + g, pipelineCtx := errgroup.WithContext(ctx) g.Go(func() error { - return s.addBlockIndices(ctx, blockIndices, s.nextIndex, endIndex) + return s.addBlockIndices(pipelineCtx, blockIndices, s.nextIndex, endIndex) }) for j := int64(0); j < s.concurrency; j++ { g.Go(func() error { - return s.fetchChannelBlocks(ctx, s.network, blockIndices, fetchedBlocks) + return s.fetchChannelBlocks(pipelineCtx, s.network, blockIndices, fetchedBlocks) }) } @@ -606,14 +602,17 @@ func (s *Syncer) syncRange( close(fetchedBlocks) }() + g2, handleCtx := errgroup.WithContext(ctx) seenBlocks := make(chan *blockResult, defaultSeenBacklog) - g.Go(func() error { - return s.seeBlocks(ctx, fetchedBlocks, seenBlocks) + g2.Go(func() error { + defer close(seenBlocks) + return s.seeBlocks(handleCtx, fetchedBlocks, seenBlocks) }) - g.Go(func() error { + g2.Go(func() error { return s.sequenceBlocks( - ctx, + pipelineCtx, + handleCtx, g, blockIndices, fetchedBlocks, @@ -622,7 +621,7 @@ func (s *Syncer) syncRange( ) }) - if err := g.Wait(); err != nil { + if err := g2.Wait(); err != nil { return fmt.Errorf("%w: unable to sync to %d", err, endIndex) } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 953b929f..a259acce 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -474,7 +474,9 @@ func TestSync_NoReorg(t *testing.T) { b, ).Return( nil, - ).Once() + ).Run(func(args mock.Arguments) { + assertNotCanceled(t, args) + }).Once() mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), @@ -483,9 +485,6 @@ func TestSync_NoReorg(t *testing.T) { nil, ).Run(func(args mock.Arguments) { assertNotCanceled(t, args) - if index == 1100 { - assert.Equal(t, int64(3), syncer.concurrency) - } // Test tip method if index > 200 { From 1cc31f98a56fd13574aebaf30a06060205bc5a49 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 14:46:47 -0600 Subject: [PATCH 19/30] fixed syncer tests --- syncer/syncer.go | 92 ++++++++++++++++-------------------------------- 1 file changed, 30 insertions(+), 62 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 1615e9c6..0eec6760 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -19,10 +19,8 @@ import ( "errors" "fmt" "log" - "runtime" "time" - lerrgroup "github.com/neilotoole/errgroup" "golang.org/x/sync/errgroup" "github.com/coinbase/rosetta-sdk-go/types" @@ -213,6 +211,7 @@ func (s *Syncer) addBlockIndices( endIndex int64, ) error { defer close(blockIndices) + i := startIndex for i <= endIndex { s.concurrencyLock.Lock() @@ -282,11 +281,11 @@ func (s *Syncer) safeExit(err error) error { return err } -// fetchChannelBlocks fetches blocks from a +// fetchBlocks fetches blocks from a // channel with retries until there are no // more blocks in the channel or there is an // error. -func (s *Syncer) fetchChannelBlocks( +func (s *Syncer) fetchBlocks( ctx context.Context, network *types.NetworkIdentifier, blockIndices chan int64, @@ -302,6 +301,10 @@ func (s *Syncer) fetchChannelBlocks( return s.safeExit(fmt.Errorf("%w %d: %v", ErrFetchBlockFailed, b, err)) } + if err := s.handleSeenBlock(ctx, br); err != nil { + return err + } + select { case results <- br: case <-ctx.Done(): @@ -471,56 +474,24 @@ func (s *Syncer) handleSeenBlock( return s.handler.BlockSeen(ctx, result.block) } -func (s *Syncer) seeBlocks( - ctx context.Context, - fetchedBlocks chan *blockResult, - seenBlocks chan *blockResult, -) error { - numCPU := runtime.NumCPU() - g, ctx := lerrgroup.WithContextN( - ctx, - numCPU, - numCPU, - ) - - for result := range fetchedBlocks { - r := result - g.Go(func() error { - if err := s.handleSeenBlock(ctx, r); err != nil { - return err - } - - select { - case seenBlocks <- r: - return nil - case <-ctx.Done(): - return ctx.Err() - } - }) - } - - return g.Wait() -} - func (s *Syncer) sequenceBlocks( + ctx context.Context, pipelineCtx context.Context, - handleCtx context.Context, g *errgroup.Group, blockIndices chan int64, fetchedBlocks chan *blockResult, - seenBlocks chan *blockResult, endIndex int64, ) error { cache := make(map[int64]*blockResult) - for b := range seenBlocks { - cache[b.index] = b + for result := range fetchedBlocks { + cache[result.index] = result - if err := s.processBlocks(handleCtx, cache, endIndex); err != nil { + if err := s.processBlocks(ctx, cache, endIndex); err != nil { return fmt.Errorf("%w: %v", ErrBlocksProcessMultipleFailed, err) } // Determine if concurrency should be adjusted. - s.recentBlockSizes = append(s.recentBlockSizes, utils.SizeOf(b)) + s.recentBlockSizes = append(s.recentBlockSizes, utils.SizeOf(result)) s.lastAdjustment++ s.concurrencyLock.Lock() @@ -538,7 +509,12 @@ func (s *Syncer) sequenceBlocks( s.doneLoadingLock.Lock() if !s.doneLoading && pipelineCtx.Err() == nil { g.Go(func() error { - return s.fetchChannelBlocks(pipelineCtx, s.network, blockIndices, fetchedBlocks) + return s.fetchBlocks( + pipelineCtx, + s.network, + blockIndices, + fetchedBlocks, + ) }) } else { s.concurrency-- @@ -591,7 +567,7 @@ func (s *Syncer) syncRange( for j := int64(0); j < s.concurrency; j++ { g.Go(func() error { - return s.fetchChannelBlocks(pipelineCtx, s.network, blockIndices, fetchedBlocks) + return s.fetchBlocks(pipelineCtx, s.network, blockIndices, fetchedBlocks) }) } @@ -602,26 +578,18 @@ func (s *Syncer) syncRange( close(fetchedBlocks) }() - g2, handleCtx := errgroup.WithContext(ctx) - seenBlocks := make(chan *blockResult, defaultSeenBacklog) - g2.Go(func() error { - defer close(seenBlocks) - return s.seeBlocks(handleCtx, fetchedBlocks, seenBlocks) - }) - - g2.Go(func() error { - return s.sequenceBlocks( - pipelineCtx, - handleCtx, - g, - blockIndices, - fetchedBlocks, - seenBlocks, - endIndex, - ) - }) + if err := s.sequenceBlocks( + ctx, + pipelineCtx, + g, + blockIndices, + fetchedBlocks, + endIndex, + ); err != nil { + return err + } - if err := g2.Wait(); err != nil { + if err := g.Wait(); err != nil { return fmt.Errorf("%w: unable to sync to %d", err, endIndex) } From d65b8f3822888b17e3f644a9453dd542ad14ff77 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 15:01:44 -0600 Subject: [PATCH 20/30] remove unnecessary variable --- syncer/types.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/syncer/types.go b/syncer/types.go index a9d9837e..4dfb5fef 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -75,8 +75,6 @@ const ( // when we are loading more blocks to fetch but we // already have a backlog >= to concurrency. defaultFetchSleep = 500 * time.Millisecond - - defaultSeenBacklog = 100 ) // Handler is called at various times during the sync cycle From a880c039794524a0d637e40c23de95aaf9047401 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 15:03:02 -0600 Subject: [PATCH 21/30] fix linting --- syncer/syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 0eec6760..cc0c8df1 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -474,7 +474,7 @@ func (s *Syncer) handleSeenBlock( return s.handler.BlockSeen(ctx, result.block) } -func (s *Syncer) sequenceBlocks( +func (s *Syncer) sequenceBlocks( // nolint:golint ctx context.Context, pipelineCtx context.Context, g *errgroup.Group, From fe70cb0be568b1900ac3063bb8ea8525fb6da6e5 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 16:15:24 -0600 Subject: [PATCH 22/30] fix syncing tests --- syncer/syncer.go | 12 ++++-------- syncer/syncer_test.go | 14 +++++++++----- syncer/types.go | 6 ++++-- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index cc0c8df1..4b1d3f29 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -267,6 +267,10 @@ func (s *Syncer) fetchBlockResult( br.block = block } + if err := s.handleSeenBlock(ctx, br); err != nil { + return nil, err + } + return br, nil } @@ -301,10 +305,6 @@ func (s *Syncer) fetchBlocks( return s.safeExit(fmt.Errorf("%w %d: %v", ErrFetchBlockFailed, b, err)) } - if err := s.handleSeenBlock(ctx, br); err != nil { - return err - } - select { case results <- br: case <-ctx.Done(): @@ -360,10 +360,6 @@ func (s *Syncer) processBlocks( if err != nil { return fmt.Errorf("%w: %v", ErrFetchBlockReorgFailed, err) } - - if err := s.handleSeenBlock(ctx, br); err != nil { - return err - } } else { // Anytime we re-fetch an index, we // will need to make another call to the node diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index a259acce..7e730546 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -736,10 +736,7 @@ func TestSync_Reorg(t *testing.T) { "BlockSeen", mock.AnythingOfType("*context.cancelCtx"), newBlocks[0], - ).Run(func(args mock.Arguments) { - err := args.Get(0).(context.Context) - assert.NoError(t, err.Err()) - }).Return( + ).Return( nil, ).Once() // only fetch this block once mockHandler.On( @@ -766,13 +763,20 @@ func TestSync_Reorg(t *testing.T) { ).Run(func(args mock.Arguments) { assertNotCanceled(t, args) }).Once() + + seenTimes := 2 + if b.BlockIdentifier.Index > 801 { + seenTimes = 1 + } mockHandler.On( "BlockSeen", mock.AnythingOfType("*context.cancelCtx"), b, ).Return( nil, - ).Once() + ).Run(func(args mock.Arguments) { + assertNotCanceled(t, args) + }).Times(seenTimes) mockHandler.On( "BlockAdded", mock.AnythingOfType("*context.cancelCtx"), diff --git a/syncer/types.go b/syncer/types.go index 4dfb5fef..fc06421a 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -81,8 +81,10 @@ const ( // to handle different events. It is common to write logs or // perform reconciliation in the sync processor. type Handler interface { - // Guaranteed that we will not invoke BlockAdded - // until blockencountered has returned. + // BlockSeen is invoked AT LEAST ONCE + // by the syncer prior to calling BlockAdded + // with the same arguments. This allows for + // storing block data before it is sequenced. BlockSeen( ctx context.Context, block *types.Block, From 5de7863ef8e8cb2c7426633526d4f51c765b1d40 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 16:24:19 -0600 Subject: [PATCH 23/30] pass storage tests --- storage/modules/block_storage_test.go | 53 +++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/storage/modules/block_storage_test.go b/storage/modules/block_storage_test.go index ca60f8cb..4f866ae3 100644 --- a/storage/modules/block_storage_test.go +++ b/storage/modules/block_storage_test.go @@ -335,6 +335,9 @@ func TestBlock(t *testing.T) { assert.Equal(t, int64(-1), oldestIndex) assert.Error(t, storageErrs.ErrOldestIndexMissing, err) + err = storage.SeeBlock(ctx, genesisBlock) + assert.NoError(t, err) + err = storage.AddBlock(ctx, genesisBlock) assert.NoError(t, err) @@ -349,6 +352,9 @@ func TestBlock(t *testing.T) { }) t.Run("Set and get block", func(t *testing.T) { + err = storage.SeeBlock(ctx, newBlock) + assert.NoError(t, err) + err = storage.AddBlock(ctx, newBlock) assert.NoError(t, err) @@ -419,6 +425,9 @@ func TestBlock(t *testing.T) { }) t.Run("Set duplicate transaction hash (from prior block)", func(t *testing.T) { + err = storage.SeeBlock(ctx, newBlock2) + assert.NoError(t, err) + err = storage.AddBlock(ctx, newBlock2) assert.NoError(t, err) @@ -486,6 +495,9 @@ func TestBlock(t *testing.T) { assert.NoError(t, err) assert.Equal(t, newBlock2.ParentBlockIdentifier, head) + err = storage.SeeBlock(ctx, newBlock2) + assert.NoError(t, err) + err = storage.AddBlock(ctx, newBlock2) assert.NoError(t, err) @@ -504,7 +516,10 @@ func TestBlock(t *testing.T) { }) t.Run("Add block with complex metadata", func(t *testing.T) { - err := storage.AddBlock(ctx, complexBlock) + err := storage.SeeBlock(ctx, complexBlock) + assert.NoError(t, err) + + err = storage.AddBlock(ctx, complexBlock) assert.NoError(t, err) oldestIndex, err := storage.GetOldestBlockIndex(ctx) @@ -533,7 +548,7 @@ func TestBlock(t *testing.T) { }) t.Run("Set duplicate transaction hash (same block)", func(t *testing.T) { - err = storage.AddBlock(ctx, duplicateTxBlock) + err = storage.SeeBlock(ctx, duplicateTxBlock) assert.Contains(t, err.Error(), storageErrs.ErrDuplicateTransactionHash.Error()) head, err := storage.GetHeadBlockIdentifier(ctx) @@ -542,7 +557,10 @@ func TestBlock(t *testing.T) { }) t.Run("Add block after omitted", func(t *testing.T) { - err := storage.AddBlock(ctx, gapBlock) + err := storage.SeeBlock(ctx, gapBlock) + assert.NoError(t, err) + + err = storage.AddBlock(ctx, gapBlock) assert.NoError(t, err) block, err := storage.GetBlock( @@ -565,6 +583,9 @@ func TestBlock(t *testing.T) { assert.NoError(t, err) assert.Equal(t, gapBlock.ParentBlockIdentifier, head) + err = storage.SeeBlock(ctx, gapBlock) + assert.NoError(t, err) + err = storage.AddBlock(ctx, gapBlock) assert.NoError(t, err) @@ -611,6 +632,7 @@ func TestBlock(t *testing.T) { ParentBlockIdentifier: parentBlockIdentifier, } + assert.NoError(t, storage.SeeBlock(ctx, block)) assert.NoError(t, storage.AddBlock(ctx, block)) head, err := storage.GetHeadBlockIdentifier(ctx) assert.NoError(t, err) @@ -736,6 +758,9 @@ func TestCreateBlockCache(t *testing.T) { }) t.Run("1 block processed", func(t *testing.T) { + err = storage.SeeBlock(ctx, genesisBlock) + assert.NoError(t, err) + err = storage.AddBlock(ctx, genesisBlock) assert.NoError(t, err) assert.Equal( @@ -746,6 +771,9 @@ func TestCreateBlockCache(t *testing.T) { }) t.Run("2 blocks processed", func(t *testing.T) { + err = storage.SeeBlock(ctx, newBlock) + assert.NoError(t, err) + err = storage.AddBlock(ctx, newBlock) assert.NoError(t, err) assert.Equal( @@ -764,6 +792,9 @@ func TestCreateBlockCache(t *testing.T) { ParentBlockIdentifier: newBlock.BlockIdentifier, } + err = storage.SeeBlock(ctx, simpleGap) + assert.NoError(t, err) + err = storage.AddBlock(ctx, simpleGap) assert.NoError(t, err) assert.Equal( @@ -804,7 +835,7 @@ func TestAtTip(t *testing.T) { }) t.Run("Add old block", func(t *testing.T) { - err := storage.AddBlock(ctx, &types.Block{ + b := &types.Block{ BlockIdentifier: &types.BlockIdentifier{ Hash: "block 0", Index: 0, @@ -814,7 +845,11 @@ func TestAtTip(t *testing.T) { Index: 0, }, Timestamp: utils.Milliseconds() - (3 * tipDelay * utils.MillisecondsInSecond), - }) + } + err := storage.SeeBlock(ctx, b) + assert.NoError(t, err) + + err = storage.AddBlock(ctx, b) assert.NoError(t, err) atTip, blockIdentifier, err := storage.AtTip(ctx, tipDelay) @@ -828,7 +863,7 @@ func TestAtTip(t *testing.T) { }) t.Run("Add new block", func(t *testing.T) { - err := storage.AddBlock(ctx, &types.Block{ + b := &types.Block{ BlockIdentifier: &types.BlockIdentifier{ Hash: "block 1", Index: 1, @@ -838,7 +873,11 @@ func TestAtTip(t *testing.T) { Index: 0, }, Timestamp: utils.Milliseconds(), - }) + } + err := storage.SeeBlock(ctx, b) + assert.NoError(t, err) + + err = storage.AddBlock(ctx, b) assert.NoError(t, err) atTip, blockIdentifier, err := storage.AtTip(ctx, tipDelay) From ac005dd2102344748324c08330d0cd81c93505db Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 16:26:45 -0600 Subject: [PATCH 24/30] don't lint comments --- Makefile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Makefile b/Makefile index fcb68a2b..8ab13a51 100644 --- a/Makefile +++ b/Makefile @@ -38,8 +38,7 @@ lint-examples: golangci-lint run -v -E ${LINT_SETTINGS} lint: | lint-examples - golangci-lint run --timeout 2m0s -v -E ${LINT_SETTINGS},gomnd && \ - make check-comments; + golangci-lint run --timeout 2m0s -v -E ${LINT_SETTINGS},gomnd format: gofmt -s -w -l . From 6172863fd3944d6e2d205a16c1339f717f8b666a Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 16:29:01 -0600 Subject: [PATCH 25/30] fix long lines --- storage/encoder/encoder.go | 3 ++- storage/modules/balance_storage.go | 35 ++++++++++++++++-------------- storage/modules/block_storage.go | 5 ++--- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/storage/encoder/encoder.go b/storage/encoder/encoder.go index a7e5645c..67a98272 100644 --- a/storage/encoder/encoder.go +++ b/storage/encoder/encoder.go @@ -527,7 +527,8 @@ func (e *Encoder) EncodeAccountCurrency( // nolint:gocognit return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) } - if accountCurrency.Account.SubAccount != nil && accountCurrency.Account.SubAccount.Metadata != nil { + if accountCurrency.Account.SubAccount != nil && + accountCurrency.Account.SubAccount.Metadata != nil { if err := e.encodeAndWrite(output, accountCurrency.Account.SubAccount.Metadata); err != nil { return nil, fmt.Errorf("%w: %s", errors.ErrObjectEncodeFailed, err.Error()) } diff --git a/storage/modules/balance_storage.go b/storage/modules/balance_storage.go index 244d7f54..3108147c 100644 --- a/storage/modules/balance_storage.go +++ b/storage/modules/balance_storage.go @@ -475,26 +475,29 @@ func (b *BalanceStorage) ReconciliationCoverage( ) (float64, error) { seen := 0 validCoverage := 0 - err := b.getAllAccountEntries(ctx, func(txn database.Transaction, entry *types.AccountCurrency) error { - seen++ + err := b.getAllAccountEntries( + ctx, + func(txn database.Transaction, entry *types.AccountCurrency) error { + seen++ - // Fetch last reconciliation index in same database.Transaction - key := GetAccountKey(reconciliationNamepace, entry.Account, entry.Currency) - exists, lastReconciled, err := BigIntGet(ctx, key, txn) - if err != nil { - return err - } + // Fetch last reconciliation index in same database.Transaction + key := GetAccountKey(reconciliationNamepace, entry.Account, entry.Currency) + exists, lastReconciled, err := BigIntGet(ctx, key, txn) + if err != nil { + return err + } - if !exists { - return nil - } + if !exists { + return nil + } - if lastReconciled.Int64() >= minimumIndex { - validCoverage++ - } + if lastReconciled.Int64() >= minimumIndex { + validCoverage++ + } - return nil - }) + return nil + }, + ) if err != nil { return -1, fmt.Errorf("%w: unable to get all account entries", err) } diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index 8e86cbb0..910d35c6 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -1048,9 +1048,8 @@ func (b *BlockStorage) FindTransaction( var newestTransaction *types.Transaction for _, blockTransaction := range blockTransactions { if newestBlock == nil || blockTransaction.BlockIdentifier.Index > newestBlock.Index { - // Now that we are optimistically storing data, there is a change - // we may fetch a transaction from an unfinalized block. In some cases (rosetta-bitcoin), - // we want this! + // Now that we are optimistically storing data, there is a chance + // we may fetch a transaction from a seen but unsequenced block. if head != nil && blockTransaction.BlockIdentifier.Index > head.Index { continue } From 247df358f4d370f255c06b1130498af036544623 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 16:31:37 -0600 Subject: [PATCH 26/30] Add comments to encoder --- storage/encoder/encoder.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/storage/encoder/encoder.go b/storage/encoder/encoder.go index 67a98272..988aa634 100644 --- a/storage/encoder/encoder.go +++ b/storage/encoder/encoder.go @@ -476,6 +476,14 @@ func (e *Encoder) DecodeAccountCoin( // nolint:gocognit return nil } +// EncodeAccountCurrency is used to encode an AccountCurrency using the scheme (on the happy path): +// accountAddress|currencySymbol|currencyDecimals +// +// And the following scheme on the unhappy path: +// accountAddress|currencySymbol|currencyDecimals|accountMetadata| +// subAccountAddress|subAccountMetadata|currencyMetadata +// +// In both cases, the | character is represented by the unicodeRecordSeparator rune. func (e *Encoder) EncodeAccountCurrency( // nolint:gocognit accountCurrency *types.AccountCurrency, ) ([]byte, error) { @@ -546,6 +554,8 @@ func (e *Encoder) EncodeAccountCurrency( // nolint:gocognit return output.Bytes(), nil } +// DecodeAccountCurrency decodes an AccountCurrency and optionally +// reclaims the memory associated with the input. func (e *Encoder) DecodeAccountCurrency( // nolint:gocognit b []byte, accountCurrency *types.AccountCurrency, From 6a3d17c4fe1f4e039488737a06c320cd2e68a99b Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 16:32:47 -0600 Subject: [PATCH 27/30] nits --- storage/encoder/encoder.go | 4 ++-- storage/modules/coin_storage.go | 5 ----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/storage/encoder/encoder.go b/storage/encoder/encoder.go index 988aa634..d8470c62 100644 --- a/storage/encoder/encoder.go +++ b/storage/encoder/encoder.go @@ -352,7 +352,7 @@ func (e *Encoder) DecodeAccountCoin( // nolint:gocognit accountCoin *types.AccountCoin, reclaimInput bool, ) error { - // Indexes of encoded AccountCoin struct + // Indices of encoded AccountCoin struct const ( accountAddress = iota coinIdentifier @@ -561,7 +561,7 @@ func (e *Encoder) DecodeAccountCurrency( // nolint:gocognit accountCurrency *types.AccountCurrency, reclaimInput bool, ) error { - // Indexes of encoded AccountCurrency struct + // Indices of encoded AccountCurrency struct const ( accountAddress = iota currencySymbol diff --git a/storage/modules/coin_storage.go b/storage/modules/coin_storage.go index 0a637360..ac9e8c58 100644 --- a/storage/modules/coin_storage.go +++ b/storage/modules/coin_storage.go @@ -20,7 +20,6 @@ import ( "math/big" "runtime" "strings" - "time" "github.com/neilotoole/errgroup" @@ -358,10 +357,6 @@ func (c *CoinStorage) AddingBlock( block *types.Block, transaction database.Transaction, ) (database.CommitWorker, error) { - start := time.Now() - defer func() { - fmt.Println("adding coins", time.Since(start)) - }() return nil, c.updateCoins(ctx, block, true, transaction) } From 6dab4bf07aefb9ff1881939090922492b41ea214 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 17:10:16 -0600 Subject: [PATCH 28/30] limit seen invocation concurrency --- syncer/configuration.go | 9 +++++++++ syncer/syncer.go | 35 ++++++++++++++++++++++++----------- syncer/types.go | 11 +++++++++++ 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/syncer/configuration.go b/syncer/configuration.go index cd63664c..259b979f 100644 --- a/syncer/configuration.go +++ b/syncer/configuration.go @@ -65,3 +65,12 @@ func WithAdjustmentWindow(adjustmentWindow int64) Option { s.adjustmentWindow = adjustmentWindow } } + +// WithSeenConcurrency overrides the number of concurrent +// invocations of BlockSeen we will make. We default +// to the value of runtime.NumCPU(). +func WithSeenConcurrency(concurrency int64) Option { + return func(s *Syncer) { + s.seenSemaphoreSize = concurrency + } +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 4b1d3f29..f295fc52 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -19,9 +19,11 @@ import ( "errors" "fmt" "log" + "runtime" "time" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" "github.com/coinbase/rosetta-sdk-go/types" "github.com/coinbase/rosetta-sdk-go/utils" @@ -37,17 +39,18 @@ func New( options ...Option, ) *Syncer { s := &Syncer{ - network: network, - helper: helper, - handler: handler, - concurrency: DefaultConcurrency, - cacheSize: DefaultCacheSize, - maxConcurrency: DefaultMaxConcurrency, - sizeMultiplier: DefaultSizeMultiplier, - cancel: cancel, - pastBlocks: []*types.BlockIdentifier{}, - pastBlockLimit: DefaultPastBlockLimit, - adjustmentWindow: DefaultAdjustmentWindow, + network: network, + helper: helper, + handler: handler, + concurrency: DefaultConcurrency, + cacheSize: DefaultCacheSize, + maxConcurrency: DefaultMaxConcurrency, + sizeMultiplier: DefaultSizeMultiplier, + cancel: cancel, + pastBlocks: []*types.BlockIdentifier{}, + pastBlockLimit: DefaultPastBlockLimit, + adjustmentWindow: DefaultAdjustmentWindow, + seenSemaphoreSize: int64(runtime.NumCPU()), } // Override defaults with any provided options @@ -55,6 +58,11 @@ func New( opt(s) } + // We set this after options because the caller + // has the ability to set the max concurrency + // of seen invocations. + s.seenSemaphore = semaphore.NewWeighted(s.seenSemaphoreSize) + return s } @@ -467,6 +475,11 @@ func (s *Syncer) handleSeenBlock( return nil } + if err := s.seenSemaphore.Acquire(ctx, semaphoreWeight); err != nil { + return err + } + defer s.seenSemaphore.Release(semaphoreWeight) + return s.handler.BlockSeen(ctx, result.block) } diff --git a/syncer/types.go b/syncer/types.go index fc06421a..31f25fe8 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -19,6 +19,8 @@ import ( "sync" "time" + "golang.org/x/sync/semaphore" + "github.com/coinbase/rosetta-sdk-go/types" ) @@ -75,6 +77,9 @@ const ( // when we are loading more blocks to fetch but we // already have a backlog >= to concurrency. defaultFetchSleep = 500 * time.Millisecond + + // semaphoreWeight is the weight of each semaphore request. + semaphoreWeight = int64(1) ) // Handler is called at various times during the sync cycle @@ -158,6 +163,12 @@ type Syncer struct { adjustmentWindow int64 concurrencyLock sync.Mutex + // SeenSemaphore controls how many concurrent + // invocations we make to the SeenBlock function + // in the handler. + seenSemaphore *semaphore.Weighted + seenSemaphoreSize int64 + // doneLoading is used to coordinate adding goroutines // when close to the end of syncing a range. doneLoading bool From 597f23386a18de6dc18e6d40fb52f8ad75a46ad3 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 10:08:06 -0600 Subject: [PATCH 29/30] move seen semaphore to statefulsyncer --- statefulsyncer/configuration.go | 9 ++++++++ statefulsyncer/stateful_syncer.go | 38 ++++++++++++++++++++++++------- syncer/configuration.go | 9 -------- syncer/syncer.go | 35 +++++++++------------------- syncer/types.go | 11 --------- 5 files changed, 50 insertions(+), 52 deletions(-) diff --git a/statefulsyncer/configuration.go b/statefulsyncer/configuration.go index 3ef3f1c3..d0f294a7 100644 --- a/statefulsyncer/configuration.go +++ b/statefulsyncer/configuration.go @@ -57,3 +57,12 @@ func WithPruneSleepTime(sleepTime int) Option { s.pruneSleepTime = time.Duration(sleepTime) * time.Second } } + +// WithSeenConcurrency overrides the number of concurrent +// invocations of BlockSeen we will make. We default +// to the value of runtime.NumCPU(). +func WithSeenConcurrency(concurrency int64) Option { + return func(s *StatefulSyncer) { + s.seenSemaphoreSize = concurrency + } +} diff --git a/statefulsyncer/stateful_syncer.go b/statefulsyncer/stateful_syncer.go index 6e5df6f0..da8a8579 100644 --- a/statefulsyncer/stateful_syncer.go +++ b/statefulsyncer/stateful_syncer.go @@ -19,8 +19,11 @@ import ( "errors" "fmt" "log" + "runtime" "time" + "golang.org/x/sync/semaphore" + "github.com/coinbase/rosetta-sdk-go/fetcher" storageErrs "github.com/coinbase/rosetta-sdk-go/storage/errors" "github.com/coinbase/rosetta-sdk-go/storage/modules" @@ -40,6 +43,9 @@ const ( // pruneBuffer is the cushion we apply to pastBlockLimit // when pruning. pruneBuffer = 2 + + // semaphoreWeight is the weight of each semaphore request. + semaphoreWeight = int64(1) ) // StatefulSyncer is an abstraction layer over @@ -61,6 +67,12 @@ type StatefulSyncer struct { pastBlockLimit int adjustmentWindow int64 pruneSleepTime time.Duration + + // SeenSemaphore controls how many concurrent + // invocations we make to the SeenBlock function + // in the handler. + seenSemaphore *semaphore.Weighted + seenSemaphoreSize int64 } // Logger is used by the statefulsyncer to @@ -103,11 +115,12 @@ func New( logger: logger, // Optional args - cacheSize: syncer.DefaultCacheSize, - maxConcurrency: syncer.DefaultMaxConcurrency, - pastBlockLimit: syncer.DefaultPastBlockLimit, - adjustmentWindow: syncer.DefaultAdjustmentWindow, - pruneSleepTime: DefaultPruneSleepTime, + cacheSize: syncer.DefaultCacheSize, + maxConcurrency: syncer.DefaultMaxConcurrency, + pastBlockLimit: syncer.DefaultPastBlockLimit, + adjustmentWindow: syncer.DefaultAdjustmentWindow, + pruneSleepTime: DefaultPruneSleepTime, + seenSemaphoreSize: int64(runtime.NumCPU()), } // Override defaults with any provided options @@ -115,6 +128,11 @@ func New( opt(s) } + // We set this after options because the caller + // has the ability to set the max concurrency + // of seen invocations. + s.seenSemaphore = semaphore.NewWeighted(s.seenSemaphoreSize) + return s } @@ -221,10 +239,14 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error { // BlockSeen is called by the syncer when a block is seen. func (s *StatefulSyncer) BlockSeen(ctx context.Context, block *types.Block) error { - err := s.blockStorage.SeeBlock(ctx, block) - if err != nil { + if err := s.seenSemaphore.Acquire(ctx, semaphoreWeight); err != nil { + return err + } + defer s.seenSemaphore.Release(semaphoreWeight) + + if err := s.blockStorage.SeeBlock(ctx, block); err != nil { return fmt.Errorf( - "%w: unable to encounter block to storage %s:%d", + "%w: unable to pre-store block %s:%d", err, block.BlockIdentifier.Hash, block.BlockIdentifier.Index, diff --git a/syncer/configuration.go b/syncer/configuration.go index 259b979f..cd63664c 100644 --- a/syncer/configuration.go +++ b/syncer/configuration.go @@ -65,12 +65,3 @@ func WithAdjustmentWindow(adjustmentWindow int64) Option { s.adjustmentWindow = adjustmentWindow } } - -// WithSeenConcurrency overrides the number of concurrent -// invocations of BlockSeen we will make. We default -// to the value of runtime.NumCPU(). -func WithSeenConcurrency(concurrency int64) Option { - return func(s *Syncer) { - s.seenSemaphoreSize = concurrency - } -} diff --git a/syncer/syncer.go b/syncer/syncer.go index f295fc52..4b1d3f29 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -19,11 +19,9 @@ import ( "errors" "fmt" "log" - "runtime" "time" "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" "github.com/coinbase/rosetta-sdk-go/types" "github.com/coinbase/rosetta-sdk-go/utils" @@ -39,18 +37,17 @@ func New( options ...Option, ) *Syncer { s := &Syncer{ - network: network, - helper: helper, - handler: handler, - concurrency: DefaultConcurrency, - cacheSize: DefaultCacheSize, - maxConcurrency: DefaultMaxConcurrency, - sizeMultiplier: DefaultSizeMultiplier, - cancel: cancel, - pastBlocks: []*types.BlockIdentifier{}, - pastBlockLimit: DefaultPastBlockLimit, - adjustmentWindow: DefaultAdjustmentWindow, - seenSemaphoreSize: int64(runtime.NumCPU()), + network: network, + helper: helper, + handler: handler, + concurrency: DefaultConcurrency, + cacheSize: DefaultCacheSize, + maxConcurrency: DefaultMaxConcurrency, + sizeMultiplier: DefaultSizeMultiplier, + cancel: cancel, + pastBlocks: []*types.BlockIdentifier{}, + pastBlockLimit: DefaultPastBlockLimit, + adjustmentWindow: DefaultAdjustmentWindow, } // Override defaults with any provided options @@ -58,11 +55,6 @@ func New( opt(s) } - // We set this after options because the caller - // has the ability to set the max concurrency - // of seen invocations. - s.seenSemaphore = semaphore.NewWeighted(s.seenSemaphoreSize) - return s } @@ -475,11 +467,6 @@ func (s *Syncer) handleSeenBlock( return nil } - if err := s.seenSemaphore.Acquire(ctx, semaphoreWeight); err != nil { - return err - } - defer s.seenSemaphore.Release(semaphoreWeight) - return s.handler.BlockSeen(ctx, result.block) } diff --git a/syncer/types.go b/syncer/types.go index 31f25fe8..fc06421a 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -19,8 +19,6 @@ import ( "sync" "time" - "golang.org/x/sync/semaphore" - "github.com/coinbase/rosetta-sdk-go/types" ) @@ -77,9 +75,6 @@ const ( // when we are loading more blocks to fetch but we // already have a backlog >= to concurrency. defaultFetchSleep = 500 * time.Millisecond - - // semaphoreWeight is the weight of each semaphore request. - semaphoreWeight = int64(1) ) // Handler is called at various times during the sync cycle @@ -163,12 +158,6 @@ type Syncer struct { adjustmentWindow int64 concurrencyLock sync.Mutex - // SeenSemaphore controls how many concurrent - // invocations we make to the SeenBlock function - // in the handler. - seenSemaphore *semaphore.Weighted - seenSemaphoreSize int64 - // doneLoading is used to coordinate adding goroutines // when close to the end of syncing a range. doneLoading bool From 9aca901afdc34f1067048c5859d964e518e1fea0 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 10:10:26 -0600 Subject: [PATCH 30/30] nits --- statefulsyncer/configuration.go | 2 +- statefulsyncer/stateful_syncer.go | 5 ++--- storage/modules/block_storage.go | 3 ++- syncer/syncer.go | 9 ++++++++- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/statefulsyncer/configuration.go b/statefulsyncer/configuration.go index d0f294a7..de1e37a0 100644 --- a/statefulsyncer/configuration.go +++ b/statefulsyncer/configuration.go @@ -59,7 +59,7 @@ func WithPruneSleepTime(sleepTime int) Option { } // WithSeenConcurrency overrides the number of concurrent -// invocations of BlockSeen we will make. We default +// invocations of BlockSeen we will handle. We default // to the value of runtime.NumCPU(). func WithSeenConcurrency(concurrency int64) Option { return func(s *StatefulSyncer) { diff --git a/statefulsyncer/stateful_syncer.go b/statefulsyncer/stateful_syncer.go index da8a8579..9056645c 100644 --- a/statefulsyncer/stateful_syncer.go +++ b/statefulsyncer/stateful_syncer.go @@ -68,9 +68,8 @@ type StatefulSyncer struct { adjustmentWindow int64 pruneSleepTime time.Duration - // SeenSemaphore controls how many concurrent - // invocations we make to the SeenBlock function - // in the handler. + // SeenSemaphore limits how many executions of + // BlockSeen occur concurrently. seenSemaphore *semaphore.Weighted seenSemaphoreSize int64 } diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index 910d35c6..42e519e0 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -634,7 +634,8 @@ func (b *BlockStorage) SeeBlock( ctx context.Context, block *types.Block, ) error { - transaction := b.db.WriteTransaction(ctx, block.BlockIdentifier.Hash, true) + _, key := getBlockHashKey(block.BlockIdentifier.Hash) + transaction := b.db.WriteTransaction(ctx, string(key), true) defer transaction.Discard(ctx) // Store all transactions in order and check for duplicates diff --git a/syncer/syncer.go b/syncer/syncer.go index 4b1d3f29..c4067233 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -555,7 +555,14 @@ func (s *Syncer) syncRange( s.concurrency = startingConcurrency s.goalConcurrency = s.concurrency - // Spawn syncing goroutines + // We create a separate derivative context here instead of + // replacing the provided ctx because the context returned + // by errgroup.WithContext is canceled as soon as Wait returns. + // If this canceled context is passed to a handler or helper, + // it can have unintended consequences (some functions + // return immediately if the context is canceled). + // + // Source: https://godoc.org/golang.org/x/sync/errgroup g, pipelineCtx := errgroup.WithContext(ctx) g.Go(func() error { return s.addBlockIndices(pipelineCtx, blockIndices, s.nextIndex, endIndex)