From 49739f94e673a2e11ef4ca893ce4031c154d5b2f Mon Sep 17 00:00:00 2001 From: huichiaotsou Date: Wed, 31 Aug 2022 16:05:07 +0800 Subject: [PATCH 1/7] re-enqueue failed block after average block time --- parser/worker.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/parser/worker.go b/parser/worker.go index c380eb93..57af1405 100644 --- a/parser/worker.go +++ b/parser/worker.go @@ -3,6 +3,7 @@ package parser import ( "encoding/json" "fmt" + "time" "github.com/forbole/juno/v3/logging" @@ -60,7 +61,9 @@ func (w Worker) Start() { for i := range w.queue { if err := w.ProcessIfNotExists(i); err != nil { - // re-enqueue any failed job + // re-enqueue any failed job after average block time + time.Sleep(config.Cfg.Parser.AvgBlockTime) + // TODO: Implement exponential backoff or max retries for a block height. go func() { w.logger.Error("re-enqueueing failed block", "height", i, "err", err) From 770adf780df15a84eb66ab0e911e8866b88d5a3f Mon Sep 17 00:00:00 2001 From: huichiaotsou Date: Wed, 31 Aug 2022 16:16:23 +0800 Subject: [PATCH 2/7] add change log --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65e8b94b..b3258be9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## Unreleased +### Changes +#### Blocks +- ([\#73](https://github.com/forbole/juno/pull/73)) re-enqueue failed block after average block time + ## v3.3.0 ### Changes - ([\#67](https://github.com/forbole/juno/pull/67)) Added support for concurrent transaction handling From 6211baf4ed794f47424b00e02b3d3c7fb0373e3c Mon Sep 17 00:00:00 2001 From: huichiaotsou Date: Wed, 31 Aug 2022 16:17:09 +0800 Subject: [PATCH 3/7] rm title --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b3258be9..1dc9c61a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,5 @@ ## Unreleased ### Changes -#### Blocks - ([\#73](https://github.com/forbole/juno/pull/73)) re-enqueue failed block after average block time ## v3.3.0 From b4860d1865bb755f35731f37eba4006082f01341 Mon Sep 17 00:00:00 2001 From: huichiaotsou Date: Thu, 1 Sep 2022 19:21:08 +0800 Subject: [PATCH 4/7] use pointer --- cmd/migrate/v3/migrate.go | 2 +- cmd/start/cmd.go | 2 +- parser/config/config.go | 21 +++++++++++---------- types/config/utils.go | 8 ++++++++ 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/cmd/migrate/v3/migrate.go b/cmd/migrate/v3/migrate.go index 7fb7972e..ef1b87ee 100644 --- a/cmd/migrate/v3/migrate.go +++ b/cmd/migrate/v3/migrate.go @@ -97,7 +97,7 @@ func migrateConfig() (Config, error) { ParseGenesis: cfg.Parser.ParseGenesis, StartHeight: cfg.Parser.StartHeight, FastSync: cfg.Parser.FastSync, - AvgBlockTime: averageBlockTime, + AvgBlockTime: &averageBlockTime, }, Logging: cfg.Logging, Telemetry: cfg.Telemetry, diff --git a/cmd/start/cmd.go b/cmd/start/cmd.go index 92b05d94..f181bc33 100644 --- a/cmd/start/cmd.go +++ b/cmd/start/cmd.go @@ -172,7 +172,7 @@ func enqueueNewBlocks(exportQueue types.HeightQueue, ctx *parser.Context) { ctx.Logger.Debug("enqueueing new block", "height", currHeight) exportQueue <- currHeight } - time.Sleep(config.Cfg.Parser.AvgBlockTime) + time.Sleep(*config.Cfg.Parser.AvgBlockTime) } } diff --git a/parser/config/config.go b/parser/config/config.go index 74101baf..230cd09c 100644 --- a/parser/config/config.go +++ b/parser/config/config.go @@ -3,14 +3,14 @@ package config import "time" type Config struct { - Workers int64 `yaml:"workers"` - ParseNewBlocks bool `yaml:"listen_new_blocks"` - ParseOldBlocks bool `yaml:"parse_old_blocks"` - GenesisFilePath string `yaml:"genesis_file_path,omitempty"` - ParseGenesis bool `yaml:"parse_genesis"` - StartHeight int64 `yaml:"start_height"` - FastSync bool `yaml:"fast_sync,omitempty"` - AvgBlockTime time.Duration `yaml:"average_block_time"` + Workers int64 `yaml:"workers"` + ParseNewBlocks bool `yaml:"listen_new_blocks"` + ParseOldBlocks bool `yaml:"parse_old_blocks"` + GenesisFilePath string `yaml:"genesis_file_path,omitempty"` + ParseGenesis bool `yaml:"parse_genesis"` + StartHeight int64 `yaml:"start_height"` + FastSync bool `yaml:"fast_sync,omitempty"` + AvgBlockTime *time.Duration `yaml:"average_block_time"` } // NewParsingConfig allows to build a new Config instance @@ -19,7 +19,7 @@ func NewParsingConfig( parseNewBlocks, parseOldBlocks bool, parseGenesis bool, genesisFilePath string, startHeight int64, fastSync bool, - avgBlockTime time.Duration, + avgBlockTime *time.Duration, ) Config { return Config{ Workers: workers, @@ -35,6 +35,7 @@ func NewParsingConfig( // DefaultParsingConfig returns the default instance of Config func DefaultParsingConfig() Config { + avgBlockTime := 5 * time.Second return NewParsingConfig( 1, true, @@ -43,6 +44,6 @@ func DefaultParsingConfig() Config { "", 1, false, - 5*time.Second, + &avgBlockTime, ) } diff --git a/types/config/utils.go b/types/config/utils.go index 823c2258..0eb8221b 100644 --- a/types/config/utils.go +++ b/types/config/utils.go @@ -2,6 +2,7 @@ package config import ( "path" + "time" ) var ( @@ -12,3 +13,10 @@ var ( func GetConfigFilePath() string { return path.Join(HomePath, "config.yaml") } + +func GetAvgBlockTime() time.Duration { + if Cfg.Parser.AvgBlockTime == nil { + return 3 * time.Second + } + return *Cfg.Parser.AvgBlockTime +} From d6c8f0d99221143bd2050acf9e97334cb6e6d605 Mon Sep 17 00:00:00 2001 From: huichiaotsou Date: Thu, 1 Sep 2022 19:27:17 +0800 Subject: [PATCH 5/7] use config.GetAvgBlockTime() for time.Sleep --- cmd/start/cmd.go | 2 +- parser/worker.go | 2 +- types/config/utils.go | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/start/cmd.go b/cmd/start/cmd.go index f181bc33..4b4e6c09 100644 --- a/cmd/start/cmd.go +++ b/cmd/start/cmd.go @@ -172,7 +172,7 @@ func enqueueNewBlocks(exportQueue types.HeightQueue, ctx *parser.Context) { ctx.Logger.Debug("enqueueing new block", "height", currHeight) exportQueue <- currHeight } - time.Sleep(*config.Cfg.Parser.AvgBlockTime) + time.Sleep(config.GetAvgBlockTime()) } } diff --git a/parser/worker.go b/parser/worker.go index 57af1405..970c32bd 100644 --- a/parser/worker.go +++ b/parser/worker.go @@ -62,7 +62,7 @@ func (w Worker) Start() { for i := range w.queue { if err := w.ProcessIfNotExists(i); err != nil { // re-enqueue any failed job after average block time - time.Sleep(config.Cfg.Parser.AvgBlockTime) + time.Sleep(config.GetAvgBlockTime()) // TODO: Implement exponential backoff or max retries for a block height. go func() { diff --git a/types/config/utils.go b/types/config/utils.go index 0eb8221b..abb20524 100644 --- a/types/config/utils.go +++ b/types/config/utils.go @@ -18,5 +18,6 @@ func GetAvgBlockTime() time.Duration { if Cfg.Parser.AvgBlockTime == nil { return 3 * time.Second } + return *Cfg.Parser.AvgBlockTime } From fd18aa7f8f53db3595abddbdb00cf0a17cd701a5 Mon Sep 17 00:00:00 2001 From: huichiaotsou Date: Thu, 1 Sep 2022 21:09:44 +0800 Subject: [PATCH 6/7] add comment --- types/config/utils.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/types/config/utils.go b/types/config/utils.go index abb20524..3f51cbaa 100644 --- a/types/config/utils.go +++ b/types/config/utils.go @@ -14,10 +14,11 @@ func GetConfigFilePath() string { return path.Join(HomePath, "config.yaml") } +// GetAvgBlockTime returns the average_block_time in the configuration file or +// returns 3 seconds if it is not configured func GetAvgBlockTime() time.Duration { if Cfg.Parser.AvgBlockTime == nil { return 3 * time.Second } - return *Cfg.Parser.AvgBlockTime } From d4c280c2506d1bdaead104d11f2af788052e9d21 Mon Sep 17 00:00:00 2001 From: huichiaotsou Date: Fri, 2 Sep 2022 15:57:03 +0800 Subject: [PATCH 7/7] replace cfg.Parser.AvgBlockTime --- cmd/start/cmd.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/start/cmd.go b/cmd/start/cmd.go index 1ddba809..6d85b15c 100644 --- a/cmd/start/cmd.go +++ b/cmd/start/cmd.go @@ -184,8 +184,6 @@ func enqueueNewBlocks(exportQueue types.HeightQueue, ctx *parser.Context) { // mustGetLatestHeight tries getting the latest height from the RPC client. // If after 50 tries no latest height can be found, it returns 0. func mustGetLatestHeight(ctx *parser.Context) int64 { - avgBlockTime := config.Cfg.Parser.AvgBlockTime - for retryCount := 0; retryCount < 50; retryCount++ { latestBlockHeight, err := ctx.Node.LatestHeight() if err == nil { @@ -194,9 +192,10 @@ func mustGetLatestHeight(ctx *parser.Context) int64 { ctx.Logger.Error("failed to get last block from RPCConfig client", "err", err, - "retry interval", avgBlockTime, + "retry interval", config.GetAvgBlockTime(), "retry count", retryCount) - time.Sleep(avgBlockTime) + + time.Sleep(config.GetAvgBlockTime()) } return 0