diff --git a/CHANGELOG.md b/CHANGELOG.md index 4265f992..06cfe815 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Changes - ([\#71](https://github.com/forbole/juno/pull/71)) Retry RPC client connection upon failure instead of panic - ([\#72](https://github.com/forbole/juno/pull/72)) Updated missing blocks parsing +- ([\#73](https://github.com/forbole/juno/pull/73)) Re-enqueue failed block after average block time ## v3.3.0 ### Changes diff --git a/cmd/migrate/v3/migrate.go b/cmd/migrate/v3/migrate.go index 7fb7972e..ef1b87ee 100644 --- a/cmd/migrate/v3/migrate.go +++ b/cmd/migrate/v3/migrate.go @@ -97,7 +97,7 @@ func migrateConfig() (Config, error) { ParseGenesis: cfg.Parser.ParseGenesis, StartHeight: cfg.Parser.StartHeight, FastSync: cfg.Parser.FastSync, - AvgBlockTime: averageBlockTime, + AvgBlockTime: &averageBlockTime, }, Logging: cfg.Logging, Telemetry: cfg.Telemetry, diff --git a/cmd/start/cmd.go b/cmd/start/cmd.go index b2562a73..6d85b15c 100644 --- a/cmd/start/cmd.go +++ b/cmd/start/cmd.go @@ -177,15 +177,13 @@ func enqueueNewBlocks(exportQueue types.HeightQueue, ctx *parser.Context) { ctx.Logger.Debug("enqueueing new block", "height", currHeight) exportQueue <- currHeight } - time.Sleep(config.Cfg.Parser.AvgBlockTime) + time.Sleep(config.GetAvgBlockTime()) } } // mustGetLatestHeight tries getting the latest height from the RPC client. // If after 50 tries no latest height can be found, it returns 0. func mustGetLatestHeight(ctx *parser.Context) int64 { - avgBlockTime := config.Cfg.Parser.AvgBlockTime - for retryCount := 0; retryCount < 50; retryCount++ { latestBlockHeight, err := ctx.Node.LatestHeight() if err == nil { @@ -194,9 +192,10 @@ func mustGetLatestHeight(ctx *parser.Context) int64 { ctx.Logger.Error("failed to get last block from RPCConfig client", "err", err, - "retry interval", avgBlockTime, + "retry interval", config.GetAvgBlockTime(), "retry count", retryCount) - time.Sleep(avgBlockTime) + + time.Sleep(config.GetAvgBlockTime()) } return 0 diff --git a/parser/config/config.go b/parser/config/config.go index 74101baf..230cd09c 100644 --- a/parser/config/config.go +++ b/parser/config/config.go @@ -3,14 +3,14 @@ package config import "time" type Config struct { - Workers int64 `yaml:"workers"` - ParseNewBlocks bool `yaml:"listen_new_blocks"` - ParseOldBlocks bool `yaml:"parse_old_blocks"` - GenesisFilePath string `yaml:"genesis_file_path,omitempty"` - ParseGenesis bool `yaml:"parse_genesis"` - StartHeight int64 `yaml:"start_height"` - FastSync bool `yaml:"fast_sync,omitempty"` - AvgBlockTime time.Duration `yaml:"average_block_time"` + Workers int64 `yaml:"workers"` + ParseNewBlocks bool `yaml:"listen_new_blocks"` + ParseOldBlocks bool `yaml:"parse_old_blocks"` + GenesisFilePath string `yaml:"genesis_file_path,omitempty"` + ParseGenesis bool `yaml:"parse_genesis"` + StartHeight int64 `yaml:"start_height"` + FastSync bool `yaml:"fast_sync,omitempty"` + AvgBlockTime *time.Duration `yaml:"average_block_time"` } // NewParsingConfig allows to build a new Config instance @@ -19,7 +19,7 @@ func NewParsingConfig( parseNewBlocks, parseOldBlocks bool, parseGenesis bool, genesisFilePath string, startHeight int64, fastSync bool, - avgBlockTime time.Duration, + avgBlockTime *time.Duration, ) Config { return Config{ Workers: workers, @@ -35,6 +35,7 @@ func NewParsingConfig( // DefaultParsingConfig returns the default instance of Config func DefaultParsingConfig() Config { + avgBlockTime := 5 * time.Second return NewParsingConfig( 1, true, @@ -43,6 +44,6 @@ func DefaultParsingConfig() Config { "", 1, false, - 5*time.Second, + &avgBlockTime, ) } diff --git a/parser/worker.go b/parser/worker.go index c380eb93..970c32bd 100644 --- a/parser/worker.go +++ b/parser/worker.go @@ -3,6 +3,7 @@ package parser import ( "encoding/json" "fmt" + "time" "github.com/forbole/juno/v3/logging" @@ -60,7 +61,9 @@ func (w Worker) Start() { for i := range w.queue { if err := w.ProcessIfNotExists(i); err != nil { - // re-enqueue any failed job + // re-enqueue any failed job after average block time + time.Sleep(config.GetAvgBlockTime()) + // TODO: Implement exponential backoff or max retries for a block height. go func() { w.logger.Error("re-enqueueing failed block", "height", i, "err", err) diff --git a/types/config/utils.go b/types/config/utils.go index 823c2258..3f51cbaa 100644 --- a/types/config/utils.go +++ b/types/config/utils.go @@ -2,6 +2,7 @@ package config import ( "path" + "time" ) var ( @@ -12,3 +13,12 @@ var ( func GetConfigFilePath() string { return path.Join(HomePath, "config.yaml") } + +// GetAvgBlockTime returns the average_block_time in the configuration file or +// returns 3 seconds if it is not configured +func GetAvgBlockTime() time.Duration { + if Cfg.Parser.AvgBlockTime == nil { + return 3 * time.Second + } + return *Cfg.Parser.AvgBlockTime +}