Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Syncer #89

Merged
merged 14 commits into from
Aug 6, 2020
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.PHONY: deps gen lint format check-format test test-coverage add-license \
check-license shorten-lines shellcheck salus release
check-license shorten-lines shellcheck salus release mocks

# To run the the following packages as commands,
# it is necessary to use `go run <pkg>`. Running `go get` does
Expand All @@ -14,7 +14,7 @@ GOVERALLS_CMD=go run github.com/mattn/goveralls
GO_PACKAGES=./asserter/... ./fetcher/... ./types/... ./client/... ./server/... \
./parser/... ./syncer/... ./reconciler/... ./keys/...
GO_FOLDERS=$(shell echo ${GO_PACKAGES} | sed -e "s/\.\///g" | sed -e "s/\/\.\.\.//g")
TEST_SCRIPT=go test -v ${GO_PACKAGES}
TEST_SCRIPT=go test ${GO_PACKAGES}
LINT_SETTINGS=golint,misspell,gocyclo,gocritic,whitespace,goconst,gocognit,bodyclose,unconvert,lll,unparam

deps:
Expand Down Expand Up @@ -63,3 +63,7 @@ salus:
docker run --rm -t -v ${PWD}:/home/repo coinbase/salus

release: shellcheck check-gen check-license check-format test lint salus

mocks:
rm -rf mocks;
mockery --dir syncer --all --case underscore --outpkg syncer --output mocks/syncer;
14 changes: 0 additions & 14 deletions examples/fetcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,4 @@ func main() {

// Step 5: Print the block
log.Printf("Current Block: %s\n", types.PrettyPrintStruct(block))

// Step 6: Get a range of blocks
blockMap, err := newFetcher.BlockRange(
ctx,
primaryNetwork,
networkStatus.GenesisBlockIdentifier.Index,
networkStatus.GenesisBlockIdentifier.Index+10,
)
if err != nil {
log.Fatal(err)
}

// Step 7: Print the block range
log.Printf("Block Range: %s\n", types.PrettyPrintStruct(blockMap))
}
97 changes: 0 additions & 97 deletions fetcher/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,100 +232,3 @@ func (f *Fetcher) BlockRetry(
types.PrettyPrintStruct(blockIdentifier),
)
}

// addIndicies appends a range of indicies (from
// startIndex to endIndex, inclusive) to the
// blockIndicies channel. When all indicies are added,
// the channel is closed.
func addBlockIndicies(
ctx context.Context,
blockIndicies chan int64,
startIndex int64,
endIndex int64,
) error {
defer close(blockIndicies)
for i := startIndex; i <= endIndex; i++ {
select {
case blockIndicies <- i:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

// fetchChannelBlocks fetches blocks from a
// channel with retries until there are no
// more blocks in the channel or there is an
// error.
func (f *Fetcher) fetchChannelBlocks(
ctx context.Context,
network *types.NetworkIdentifier,
blockIndicies chan int64,
results chan *types.Block,
) error {
for b := range blockIndicies {
block, err := f.BlockRetry(
ctx,
network,
&types.PartialBlockIdentifier{
Index: &b,
},
)
if err != nil {
return err
}

select {
case results <- block:
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

// BlockRange concurrently fetches blocks from startIndex to endIndex,
// inclusive. Blocks returned by this method may not contain a path
// from the endBlock to the startBlock over Block.ParentBlockIdentifers
// if a re-org occurs during the fetch. This should be handled gracefully
// by any callers.
func (f *Fetcher) BlockRange(
ctx context.Context,
network *types.NetworkIdentifier,
startIndex int64,
endIndex int64,
) (map[int64]*types.Block, error) {
blockIndicies := make(chan int64)
results := make(chan *types.Block)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return addBlockIndicies(ctx, blockIndicies, startIndex, endIndex)
})

for j := uint64(0); j < f.blockConcurrency; j++ {
g.Go(func() error {
return f.fetchChannelBlocks(ctx, network, blockIndicies, results)
})
}

// Wait for all block fetching goroutines to exit
// before closing the results channel.
go func() {
_ = g.Wait()
close(results)
}()

m := make(map[int64]*types.Block)
for b := range results {
m[b.BlockIdentifier.Index] = b
}

err := g.Wait()
if err != nil {
return nil, err
}

return m, nil
}
7 changes: 0 additions & 7 deletions fetcher/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ func WithClient(client *client.APIClient) Option {
}
}

// WithBlockConcurrency overrides the default block concurrency.
func WithBlockConcurrency(concurrency uint64) Option {
return func(f *Fetcher) {
f.blockConcurrency = concurrency
}
}

// WithTransactionConcurrency overrides the default transaction
// concurrency.
func WithTransactionConcurrency(concurrency uint64) Option {
Expand Down
6 changes: 0 additions & 6 deletions fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ const (
// attempt a retry on a failed request.
DefaultRetries = 10

// DefaultBlockConcurrency is the default number of
// blocks a Fetcher will try to get concurrently.
DefaultBlockConcurrency = 8

// DefaultHTTPTimeout is the default timeout for
// HTTP requests.
DefaultHTTPTimeout = 10 * time.Second
Expand Down Expand Up @@ -68,7 +64,6 @@ type Fetcher struct {
// be applied.
Asserter *asserter.Asserter
rosettaClient *client.APIClient
blockConcurrency uint64
transactionConcurrency uint64
maxRetries uint64
retryElapsedTime time.Duration
Expand All @@ -90,7 +85,6 @@ func New(

f := &Fetcher{
rosettaClient: client,
blockConcurrency: DefaultBlockConcurrency,
transactionConcurrency: DefaultTransactionConcurrency,
maxRetries: DefaultRetries,
retryElapsedTime: DefaultElapsedTime,
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM=
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3/go.mod h1:hpGUWaI9xL8pRQCTXQgocU38Qw1g0Us7n5PxxTwTCYU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down
58 changes: 58 additions & 0 deletions mocks/syncer/handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 76 additions & 0 deletions mocks/syncer/helper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions syncer/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2020 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncer

import (
"github.com/coinbase/rosetta-sdk-go/types"
)

// Option is used to overwrite default values in
// Syncer construction. Any Option not provided
// falls back to the default value.
type Option func(s *Syncer)

// WithConcurrency overrides the default block concurrency.
func WithConcurrency(concurrency uint64) Option {
return func(s *Syncer) {
s.concurrency = concurrency
}
}

// WithPastBlocks provides the syncer with a cache
// of previously processed blocks to handle reorgs.
func WithPastBlocks(blocks []*types.BlockIdentifier) Option {
return func(s *Syncer) {
s.pastBlocks = blocks
}
}
Loading