From ff4d9e8dc9ddac44ad8e038bcfde1b9c7977b2ad Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Mon, 28 Jan 2019 19:29:39 +0800 Subject: [PATCH 01/17] Add set/clear support in blockNode struct --- blockproducer/blocknode.go | 25 +++++++++++++++++++------ blockproducer/branch.go | 16 +++++++++------- blockproducer/chain.go | 6 +++--- blockproducer/chain_io.go | 9 +++------ blockproducer/chain_test.go | 4 ++-- 5 files changed, 36 insertions(+), 24 deletions(-) diff --git a/blockproducer/blocknode.go b/blockproducer/blocknode.go index 8a0aaacc8..1a5d2eafa 100644 --- a/blockproducer/blocknode.go +++ b/blockproducer/blocknode.go @@ -17,6 +17,8 @@ package blockproducer import ( + "sync/atomic" + "github.com/CovenantSQL/CovenantSQL/crypto/hash" "github.com/CovenantSQL/CovenantSQL/types" ) @@ -27,12 +29,13 @@ type blockNode struct { count uint32 height uint32 // Cached fields for quick reference - hash hash.Hash - block *types.BPBlock + hash hash.Hash + txCount int + block atomic.Value } -func newBlockNode(h uint32, b *types.BPBlock, p *blockNode) *blockNode { - return &blockNode{ +func newBlockNode(h uint32, b *types.BPBlock, p *blockNode) (node *blockNode) { + node = &blockNode{ parent: p, count: func() uint32 { @@ -43,9 +46,19 @@ func newBlockNode(h uint32, b *types.BPBlock, p *blockNode) *blockNode { }(), height: h, - hash: b.SignedHeader.BlockHash, - block: b, + hash: b.SignedHeader.BlockHash, + txCount: len(b.Transactions), } + node.block.Store(b) + return +} + +func (n *blockNode) load() *types.BPBlock { + return n.block.Load().(*types.BPBlock) +} + +func (n *blockNode) clear() { + n.block.Store(nil) } // fetchNodeList returns the block node list within range (from, n.count] from node head n. diff --git a/blockproducer/branch.go b/blockproducer/branch.go index fcba6e5dc..d1a5d98f3 100644 --- a/blockproducer/branch.go +++ b/blockproducer/branch.go @@ -57,11 +57,12 @@ func newBranch( } // Apply new blocks to view and pool for _, bn := range list { - if len(bn.block.Transactions) > conf.MaxTransactionsPerBlock { + var block = bn.load() + if len(block.Transactions) > conf.MaxTransactionsPerBlock { return nil, ErrTooManyTransactionsInBlock } - for _, v := range bn.block.Transactions { + for _, v := range block.Transactions { var k = v.Hash() // Check in tx pool if _, ok := inst.unpacked[k]; ok { @@ -126,17 +127,18 @@ func (b *branch) addTx(tx pi.Transaction) { } func (b *branch) applyBlock(n *blockNode) (br *branch, err error) { - if !b.head.hash.IsEqual(n.block.ParentHash()) { + var block = n.load() + if !b.head.hash.IsEqual(block.ParentHash()) { err = ErrParentNotMatch return } var cpy = b.makeArena() - if len(n.block.Transactions) > conf.MaxTransactionsPerBlock { + if len(block.Transactions) > conf.MaxTransactionsPerBlock { return nil, ErrTooManyTransactionsInBlock } - for _, v := range n.block.Transactions { + for _, v := range block.Transactions { var k = v.Hash() // Check in tx pool if _, ok := cpy.unpacked[k]; ok { @@ -258,13 +260,13 @@ func (b *branch) sprint(from uint32) (buff string) { if i == 0 { var p = v.parent buff += fmt.Sprintf("* #%d:%d %s {%d}", - p.height, p.count, p.hash.Short(4), len(p.block.Transactions)) + p.height, p.count, p.hash.Short(4), p.txCount) } if d := v.height - v.parent.height; d > 1 { buff += fmt.Sprintf(" <-- (skip %d blocks)", d-1) } buff += fmt.Sprintf(" <-- #%d:%d %s {%d}", - v.height, v.count, v.hash.Short(4), len(v.block.Transactions)) + v.height, v.count, v.hash.Short(4), v.txCount) } return } diff --git a/blockproducer/chain.go b/blockproducer/chain.go index 8b8def50a..21a6b2c6d 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -649,7 +649,7 @@ func (c *Chain) replaceAndSwitchToBranch( resultTxPool[k] = v } for _, b := range newIrres { - for _, tx := range b.block.Transactions { + for _, tx := range b.load().Transactions { if err := c.immutable.apply(tx); err != nil { log.WithError(err).Fatal("failed to apply block to immutable database") } @@ -680,7 +680,7 @@ func (c *Chain) replaceAndSwitchToBranch( sps = append(sps, addBlock(height, newBlock)) sps = append(sps, buildBlockIndex(height, newBlock)) for _, n := range newIrres { - sps = append(sps, deleteTxs(n.block.Transactions)) + sps = append(sps, deleteTxs(n.load().Transactions)) } if len(expiredTxs) > 0 { sps = append(sps, deleteTxs(expiredTxs)) @@ -726,7 +726,7 @@ func (c *Chain) replaceAndSwitchToBranch( // Clear transactions in each branch for _, b := range newIrres { for _, br := range c.branches { - br.clearPackedTxs(b.block.Transactions) + br.clearPackedTxs(b.load().Transactions) } } for _, br := range c.branches { diff --git a/blockproducer/chain_io.go b/blockproducer/chain_io.go index 7e0755dc2..c93ef536f 100644 --- a/blockproducer/chain_io.go +++ b/blockproducer/chain_io.go @@ -49,8 +49,7 @@ func (c *Chain) fetchLastIrreversibleBlock() ( b *types.BPBlock, count uint32, height uint32, err error, ) { var node = c.lastIrreversibleBlock() - if node.block != nil { - b = node.block + if b = node.load(); b != nil { height = node.height count = node.count return @@ -71,8 +70,7 @@ func (c *Chain) fetchBlockByHeight(h uint32) (b *types.BPBlock, count uint32, er return } // OK, and block is cached - if node.block != nil { - b = node.block + if b = node.load(); b != nil { count = node.count return } @@ -91,8 +89,7 @@ func (c *Chain) fetchBlockByCount(count uint32) (b *types.BPBlock, height uint32 return } // OK, and block is cached - if node.block != nil { - b = node.block + if b = node.load(); b != nil { height = node.height return } diff --git a/blockproducer/chain_test.go b/blockproducer/chain_test.go index 58fd88bf0..d048268f0 100644 --- a/blockproducer/chain_test.go +++ b/blockproducer/chain_test.go @@ -344,7 +344,7 @@ func TestChain(t *testing.T) { // Try to use the no-cache version var node = chain.headBranch.head.ancestorByCount(5) - node.block = nil // Clear cached block + node.clear() bl, count, err = chain.fetchBlockByHeight(node.height) So(err, ShouldBeNil) So(count, ShouldEqual, node.count) @@ -355,7 +355,7 @@ func TestChain(t *testing.T) { So(bl.BlockHash(), ShouldResemble, &node.hash) var irreBlock = chain.lastIrre.block - chain.lastIrre.block = nil // Clear cached block + node.clear() bl, count, height, err = chain.fetchLastIrreversibleBlock() So(err, ShouldBeNil) So(bl, ShouldResemble, irreBlock) From a2b8db4ada24c274093e56e49f54f0d98c053d19 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Mon, 28 Jan 2019 20:59:01 +0800 Subject: [PATCH 02/17] Fix bug: bad critical section for multiple values --- rpc/rpcutil.go | 6 ++++-- rpc/rpcutil_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/rpc/rpcutil.go b/rpc/rpcutil.go index 147a32726..f7a5adb90 100644 --- a/rpc/rpcutil.go +++ b/rpc/rpcutil.go @@ -182,7 +182,9 @@ func recordRPCCost(startTime time.Time, method string, err error) { // Optimistically, val will not be nil except the first Call of method // expvar uses sync.Map // So, we try it first without lock - if val = expvar.Get(name); val == nil { + val = expvar.Get(name) + valC = expvar.Get(nameC) + if val == nil || valC == nil { callRPCExpvarLock.Lock() val = expvar.Get(name) if val == nil { @@ -191,9 +193,9 @@ func recordRPCCost(startTime time.Time, method string, err error) { } callRPCExpvarLock.Unlock() val = expvar.Get(name) + valC = expvar.Get(nameC) } val.(mw.Metric).Add(costTime.Seconds()) - valC = expvar.Get(nameC) valC.(mw.Metric).Add(1) return } diff --git a/rpc/rpcutil_test.go b/rpc/rpcutil_test.go index 90a432095..ebde85893 100644 --- a/rpc/rpcutil_test.go +++ b/rpc/rpcutil_test.go @@ -18,6 +18,7 @@ package rpc import ( "context" + "fmt" "os" "path/filepath" "runtime" @@ -443,3 +444,28 @@ func BenchmarkPersistentCaller_Call(b *testing.B) { server.Stop() } + +func TestRecordRPCCost(t *testing.T) { + Convey("Bug: bad critical section for multiple values", t, func(c C) { + var ( + start = time.Now() + rounds = 1000 + concurrent = 10 + wg = &sync.WaitGroup{} + body = func(i int) { + defer func() { + c.So(recover(), ShouldBeNil) + wg.Done() + }() + recordRPCCost(start, fmt.Sprintf("M%d", i), nil) + } + ) + defer wg.Wait() + for i := 0; i < rounds; i++ { + for j := 0; j < concurrent; j++ { + wg.Add(1) + go body(i) + } + } + }) +} From 89bb26eae886a547a51a493f3108e03683601add Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 29 Jan 2019 10:42:06 +0800 Subject: [PATCH 03/17] Remove the `New` method of config struct Since all needed fields are exported, just use its literal notation. --- blockproducer/config.go | 17 ----------------- cmd/cqld/bootstrap.go | 20 ++++++++++---------- 2 files changed, 10 insertions(+), 27 deletions(-) diff --git a/blockproducer/config.go b/blockproducer/config.go index d6f5befb7..5722a16cb 100644 --- a/blockproducer/config.go +++ b/blockproducer/config.go @@ -52,20 +52,3 @@ type Config struct { Period time.Duration Tick time.Duration } - -// NewConfig creates new config. -func NewConfig(genesis *types.BPBlock, dataFile string, - server *rpc.Server, peers *proto.Peers, - nodeID proto.NodeID, period time.Duration, tick time.Duration) *Config { - config := Config{ - Mode: BPMode, - Genesis: genesis, - DataFile: dataFile, - Server: server, - Peers: peers, - NodeID: nodeID, - Period: period, - Tick: tick, - } - return &config -} diff --git a/cmd/cqld/bootstrap.go b/cmd/cqld/bootstrap.go index b93eb8617..370231b36 100644 --- a/cmd/cqld/bootstrap.go +++ b/cmd/cqld/bootstrap.go @@ -153,16 +153,16 @@ func runNode(nodeID proto.NodeID, listenAddr string) (err error) { // init main chain service log.Info("register main chain service rpc") - chainConfig := bp.NewConfig( - genesis, - conf.GConf.BP.ChainFileName, - server, - peers, - nodeID, - conf.GConf.BPPeriod, - conf.GConf.BPTick, - ) - chainConfig.Mode = mode + chainConfig := &bp.Config{ + Mode: mode, + Genesis: genesis, + DataFile: conf.GConf.BP.ChainFileName, + Server: server, + Peers: peers, + NodeID: nodeID, + Period: conf.GConf.BPPeriod, + Tick: conf.GConf.BPTick, + } chain, err := bp.NewChain(chainConfig) if err != nil { log.WithError(err).Error("init chain failed") From 27033445e8fa99e455e60ddcd26d45c8ba2e9f53 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 29 Jan 2019 10:50:11 +0800 Subject: [PATCH 04/17] Format license --- blockproducer/branch.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/blockproducer/branch.go b/blockproducer/branch.go index d1a5d98f3..346b6bea0 100644 --- a/blockproducer/branch.go +++ b/blockproducer/branch.go @@ -10,7 +10,8 @@ * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and * limitations under the License. + * See the License for the specific language governing permissions and + * limitations under the License. */ package blockproducer From 0857e5160fd957fb5d59c9cc5247f895c829dac4 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 29 Jan 2019 10:51:19 +0800 Subject: [PATCH 05/17] Format imports --- blockproducer/chain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockproducer/chain.go b/blockproducer/chain.go index 21a6b2c6d..7b61d0c63 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/pkg/errors" mw "github.com/zserge/metric" pi "github.com/CovenantSQL/CovenantSQL/blockproducer/interfaces" @@ -41,7 +42,6 @@ import ( "github.com/CovenantSQL/CovenantSQL/types" "github.com/CovenantSQL/CovenantSQL/utils/log" xi "github.com/CovenantSQL/CovenantSQL/xenomint/interfaces" - "github.com/pkg/errors" ) // Chain defines the main chain. From e70d0eb4fa7f8ed5f769381bec7c1f8eb5056dd2 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 29 Jan 2019 10:51:46 +0800 Subject: [PATCH 06/17] Add cached block setting --- conf/limits.go | 5 +++++ conf/parameters.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/conf/limits.go b/conf/limits.go index 8c12c08cb..3b72ff0ec 100644 --- a/conf/limits.go +++ b/conf/limits.go @@ -25,3 +25,8 @@ const ( // MaxTransactionsPerBlock defines the limit of transactions per block. MaxTransactionsPerBlock = 10000 ) + +// These limits will not cause inconsistency within certain range. +const ( + MaxCachedBlock = 1000 +) diff --git a/conf/parameters.go b/conf/parameters.go index 4fa2296ef..0025fda57 100644 --- a/conf/parameters.go +++ b/conf/parameters.go @@ -21,7 +21,7 @@ const ( DefaultConfirmThreshold = float64(2) / 3.0 ) -// This parameters will not cause inconsistency within certain range. +// These parameters will not cause inconsistency within certain range. const ( BPStartupRequiredReachableCount = 2 // NOTE: this includes myself ) From 4f51f5792ab79a23cca1ec2be0af552c2c4dc7d0 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 29 Jan 2019 11:53:10 +0800 Subject: [PATCH 07/17] Fix test case and other minor fixes --- blockproducer/blocknode.go | 2 +- blockproducer/chain.go | 16 ++++++++-------- blockproducer/chain_test.go | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/blockproducer/blocknode.go b/blockproducer/blocknode.go index 1a5d2eafa..5d2577536 100644 --- a/blockproducer/blocknode.go +++ b/blockproducer/blocknode.go @@ -58,7 +58,7 @@ func (n *blockNode) load() *types.BPBlock { } func (n *blockNode) clear() { - n.block.Store(nil) + n.block.Store((*types.BPBlock)(nil)) } // fetchNodeList returns the block node list within range (from, n.count] from node head n. diff --git a/blockproducer/chain.go b/blockproducer/chain.go index 7b61d0c63..db3ad1793 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -44,6 +44,10 @@ import ( xi "github.com/CovenantSQL/CovenantSQL/xenomint/interfaces" ) +func init() { + expvar.Publish("height", mw.NewCounter("5m1s")) +} + // Chain defines the main chain. type Chain struct { // Routine controlling components @@ -61,6 +65,7 @@ type Chain struct { pendingAddTxReqs chan *types.AddTxReq // The following fields are read-only in runtime address proto.AccountAddress + mode RunMode genesisTime time.Time period time.Duration tick time.Duration @@ -78,16 +83,10 @@ type Chain struct { headBranch *branch branches []*branch txPool map[hash.Hash]pi.Transaction - mode RunMode } // NewChain creates a new blockchain. func NewChain(cfg *Config) (c *Chain, err error) { - // Normally, NewChain() should only be called once in app. - // So, we just check expvar without a lock - if expvar.Get("height") == nil { - expvar.Publish("height", mw.NewGauge("5m1s")) - } return NewChainWithContext(context.Background(), cfg) } @@ -218,6 +217,7 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) pendingAddTxReqs: make(chan *types.AddTxReq), address: addr, + mode: cfg.Mode, genesisTime: cfg.Genesis.SignedHeader.Timestamp, period: cfg.Period, tick: cfg.Tick, @@ -235,8 +235,8 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) headBranch: head, branches: branches, txPool: txPool, - mode: cfg.Mode, } + expvar.Get("height").(mw.Metric).Add(float64(c.nextHeight)) log.WithFields(log.Fields{ "local": c.getLocalBPInfo(), "period": c.period, @@ -369,7 +369,6 @@ func (c *Chain) advanceNextHeight(now time.Time, d time.Duration) { }).Warn("too much time elapsed in the new period, skip this block") return } - expvar.Get("height").(mw.Metric).Add(float64(c.getNextHeight())) log.WithField("height", c.getNextHeight()).Info("producing a new block") if err := c.produceBlock(now); err != nil { log.WithField("now", now.Format(time.RFC3339Nano)).WithError(err).Errorln( @@ -895,6 +894,7 @@ func (c *Chain) isMyTurn() bool { // increaseNextHeight prepares the chain state for the next turn. func (c *Chain) increaseNextHeight() { + expvar.Get("height").(mw.Metric).Add(1) c.Lock() defer c.Unlock() c.nextHeight++ diff --git a/blockproducer/chain_test.go b/blockproducer/chain_test.go index d048268f0..954a22702 100644 --- a/blockproducer/chain_test.go +++ b/blockproducer/chain_test.go @@ -340,7 +340,7 @@ func TestChain(t *testing.T) { So(err, ShouldBeNil) So(count, ShouldEqual, chain.lastIrre.count) So(height, ShouldEqual, chain.lastIrre.height) - So(bl, ShouldResemble, chain.lastIrre.block) + So(bl, ShouldResemble, chain.lastIrre.load()) // Try to use the no-cache version var node = chain.headBranch.head.ancestorByCount(5) @@ -354,7 +354,7 @@ func TestChain(t *testing.T) { So(height, ShouldEqual, node.height) So(bl.BlockHash(), ShouldResemble, &node.hash) - var irreBlock = chain.lastIrre.block + var irreBlock = chain.lastIrre.load() node.clear() bl, count, height, err = chain.fetchLastIrreversibleBlock() So(err, ShouldBeNil) From e5bddc07d8ef3d1c10febf457294e2088f1912e3 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 29 Jan 2019 11:57:36 +0800 Subject: [PATCH 08/17] Move limits to correct section --- conf/limits.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/conf/limits.go b/conf/limits.go index 3b72ff0ec..5d835080b 100644 --- a/conf/limits.go +++ b/conf/limits.go @@ -17,9 +17,6 @@ package conf const ( - // MaxTxBroadcastTTL defines the TTL limit of a AddTx request broadcasting within the - // block producers. - MaxTxBroadcastTTL = 1 // MaxPendingTxsPerAccount defines the limit of pending transactions of one account. MaxPendingTxsPerAccount = 1000 // MaxTransactionsPerBlock defines the limit of transactions per block. @@ -28,5 +25,8 @@ const ( // These limits will not cause inconsistency within certain range. const ( - MaxCachedBlock = 1000 + // MaxTxBroadcastTTL defines the TTL limit of a AddTx request broadcasting within the + // block producers. + MaxTxBroadcastTTL = 1 + MaxCachedBlock = 1000 ) From 5f09562f318c7edb21b27836fd7ca2df805dc127 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 29 Jan 2019 16:13:38 +0800 Subject: [PATCH 09/17] Add metric values --- blockproducer/chain.go | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/blockproducer/chain.go b/blockproducer/chain.go index db3ad1793..f756da3f7 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -44,8 +44,16 @@ import ( xi "github.com/CovenantSQL/CovenantSQL/xenomint/interfaces" ) +// Metric keys +const ( + mwKeyHeight = "service:bp:height" + mwKeyTxPooled = "service:bp:pooled" + mwKeyTxConfirmed = "service:bp:confirmed" +) + func init() { - expvar.Publish("height", mw.NewCounter("5m1s")) + expvar.Publish(mwKeyTxPooled, mw.NewCounter("5m1m")) + expvar.Publish(mwKeyTxConfirmed, mw.NewCounter("5m1m")) } // Chain defines the main chain. @@ -236,7 +244,14 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) branches: branches, txPool: txPool, } - expvar.Get("height").(mw.Metric).Add(float64(c.nextHeight)) + + // NOTE(leventeliu): this implies that BP chain is a singleton, otherwise we will need + // independent metric key for each chain instance. + if expvar.Get(mwKeyHeight) == nil { + expvar.Publish(mwKeyHeight, mw.NewGauge(fmt.Sprintf("5m%.0fs", cfg.Period.Seconds()))) + } + expvar.Get(mwKeyHeight).(mw.Metric).Add(float64(c.head().height)) + log.WithFields(log.Fields{ "local": c.getLocalBPInfo(), "period": c.period, @@ -354,7 +369,11 @@ func (c *Chain) advanceNextHeight(now time.Time, d time.Duration) { "elapsed_seconds": elapsed.Seconds(), }).Info("enclosing current height and advancing to next height") - defer c.increaseNextHeight() + defer func() { + c.increaseNextHeight() + expvar.Get(mwKeyHeight).(mw.Metric).Add(float64(c.head().height)) + }() + // Skip if it's not my turn if c.mode == APINodeMode || !c.isMyTurn() { return @@ -500,7 +519,9 @@ func (c *Chain) processAddTxReq(addTxReq *types.AddTxReq) { // Add to tx pool if err = c.storeTx(tx); err != nil { le.WithError(err).Error("failed to add transaction") + return } + expvar.Get(mwKeyTxPooled).(mw.Metric).Add(1) } func (c *Chain) processTxs(ctx context.Context) { @@ -629,6 +650,7 @@ func (c *Chain) replaceAndSwitchToBranch( newIrres []*blockNode sps []storageProcedure up storageCallback + txCount int height = c.heightOfTime(newBlock.Timestamp()) resultTxPool = make(map[hash.Hash]pi.Transaction) @@ -648,6 +670,7 @@ func (c *Chain) replaceAndSwitchToBranch( resultTxPool[k] = v } for _, b := range newIrres { + txCount += b.txCount for _, tx := range b.load().Transactions { if err := c.immutable.apply(tx); err != nil { log.WithError(err).Fatal("failed to apply block to immutable database") @@ -738,7 +761,9 @@ func (c *Chain) replaceAndSwitchToBranch( // Write to immutable database and update cache if err = store(c.storage, sps, up); err != nil { c.immutable.clean() + return } + expvar.Get(mwKeyTxConfirmed).(mw.Metric).Add(float64(txCount)) // TODO(leventeliu): trigger ChainBus.Publish. // ... return @@ -894,7 +919,6 @@ func (c *Chain) isMyTurn() bool { // increaseNextHeight prepares the chain state for the next turn. func (c *Chain) increaseNextHeight() { - expvar.Get("height").(mw.Metric).Add(1) c.Lock() defer c.Unlock() c.nextHeight++ From 36e828458ef070125c852a9be568c61718c00429 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 29 Jan 2019 16:24:45 +0800 Subject: [PATCH 10/17] Format query strings --- blockproducer/storage.go | 94 ++++++++++++++++++++-------------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/blockproducer/storage.go b/blockproducer/storage.go index 414bed504..0fca97223 100644 --- a/blockproducer/storage.go +++ b/blockproducer/storage.go @@ -36,71 +36,71 @@ var ( ddls = [...]string{ // Chain state tables `CREATE TABLE IF NOT EXISTS "blocks" ( - "height" INT, - "hash" TEXT, - "parent" TEXT, - "encoded" BLOB, - UNIQUE ("hash") - );`, + "height" INT, + "hash" TEXT, + "parent" TEXT, + "encoded" BLOB, + UNIQUE ("hash") +);`, `CREATE TABLE IF NOT EXISTS "txPool" ( - "type" INT, - "hash" TEXT, - "encoded" BLOB, - UNIQUE ("hash") - );`, + "type" INT, + "hash" TEXT, + "encoded" BLOB, + UNIQUE ("hash") +);`, `CREATE TABLE IF NOT EXISTS "irreversible" ( - "id" INT, - "hash" TEXT, - UNIQUE ("id") - );`, + "id" INT, + "hash" TEXT, + UNIQUE ("id") +);`, // Meta state tables `CREATE TABLE IF NOT EXISTS "accounts" ( - "address" TEXT, - "encoded" BLOB, - UNIQUE ("address") - );`, + "address" TEXT, + "encoded" BLOB, + UNIQUE ("address") +);`, `CREATE TABLE IF NOT EXISTS "shardChain" ( - "address" TEXT, - "id" TEXT, - "encoded" BLOB, - UNIQUE ("address", "id") - );`, + "address" TEXT, + "id" TEXT, + "encoded" BLOB, + UNIQUE ("address", "id") +);`, `CREATE TABLE IF NOT EXISTS "provider" ( - "address" TEXT, - "encoded" BLOB, - UNIQUE ("address") - );`, + "address" TEXT, + "encoded" BLOB, + UNIQUE ("address") +);`, `CREATE TABLE IF NOT EXISTS "indexed_blocks" ( - "height" INTEGER PRIMARY KEY, - "hash" TEXT, - "timestamp" INTEGER, - "version" INTEGER, - "producer" TEXT, - "merkle_root" TEXT, - "parent" TEXT, - "tx_count" INTEGER - );`, + "height" INTEGER PRIMARY KEY, + "hash" TEXT, + "timestamp" INTEGER, + "version" INTEGER, + "producer" TEXT, + "merkle_root" TEXT, + "parent" TEXT, + "tx_count" INTEGER +);`, `CREATE INDEX IF NOT EXISTS "idx__indexed_blocks__hash" ON "indexed_blocks" ("hash");`, `CREATE INDEX IF NOT EXISTS "idx__indexed_blocks__timestamp" ON "indexed_blocks" ("timestamp" DESC);`, `CREATE TABLE IF NOT EXISTS "indexed_transactions" ( - "block_height" INTEGER, - "tx_index" INTEGER, - "hash" TEXT, - "block_hash" TEXT, - "timestamp" INTEGER, - "tx_type" INTEGER, - "address" TEXT, - "raw" TEXT, - PRIMARY KEY ("block_height", "tx_index") - );`, + "block_height" INTEGER, + "tx_index" INTEGER, + "hash" TEXT, + "block_hash" TEXT, + "timestamp" INTEGER, + "tx_type" INTEGER, + "address" TEXT, + "raw" TEXT, + PRIMARY KEY ("block_height", "tx_index") +);`, `CREATE INDEX IF NOT EXISTS "idx__indexed_transactions__hash" ON "indexed_transactions" ("hash");`, `CREATE INDEX IF NOT EXISTS "idx__indexed_transactions__block_hash" ON "indexed_transactions" ("block_hash");`, From be6b10d2fe0c29d05252365667cdf98f0d33ee06 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 12 Feb 2019 14:27:58 +0800 Subject: [PATCH 11/17] Add block cache LRU list --- blockproducer/chain.go | 22 ++++++++++++++++++++++ blockproducer/config.go | 2 ++ 2 files changed, 24 insertions(+) diff --git a/blockproducer/chain.go b/blockproducer/chain.go index 2eb1a8163..0ebdcd86a 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -25,6 +25,7 @@ import ( "sync" "time" + lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" mw "github.com/zserge/metric" @@ -67,6 +68,8 @@ type Chain struct { // Other components storage xi.Storage chainBus chainbus.Bus + // This LRU object is only used for block cache control, do NOT use it for query. + blockCache *lru.Cache // Channels for incoming blocks and transactions pendingBlocks chan *types.BPBlock pendingAddTxReqs chan *types.AddTxReq @@ -110,6 +113,7 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) m uint32 st xi.Storage + cache *lru.Cache irre *blockNode heads []*blockNode immutable *metaState @@ -151,6 +155,18 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) } }() + // Create block cache + if cfg.blockCacheSize > conf.MaxCachedBlock { + cfg.blockCacheSize = conf.MaxCachedBlock + } + if cache, err = lru.NewWithEvict(cfg.blockCacheSize, func(key interface{}, value interface{}) { + if node, ok := value.(*blockNode); ok && node != nil { + node.clear() + } + }); err != nil { + return + } + // Create initial state from genesis block and store if !existed { var init = newMetaState() @@ -239,6 +255,8 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) storage: st, chainBus: bus, + blockCache: cache, + pendingBlocks: make(chan *types.BPBlock), pendingAddTxReqs: make(chan *types.AddTxReq), @@ -755,6 +773,10 @@ func (c *Chain) replaceAndSwitchToBranch( } // Update txPool to result txPool (packed and expired transactions cleared!) c.txPool = resultTxPool + // Register new irreversible blocks to LRU cache list + for _, b := range newIrres { + c.blockCache.Add(b.count, b) + } } // Write to immutable database and update cache diff --git a/blockproducer/config.go b/blockproducer/config.go index 5722a16cb..3fb1b2ee1 100644 --- a/blockproducer/config.go +++ b/blockproducer/config.go @@ -51,4 +51,6 @@ type Config struct { Period time.Duration Tick time.Duration + + blockCacheSize int } From adf6bbdefcd9dfe66a89812842cfd8b5723b73ec Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 12 Feb 2019 14:52:08 +0800 Subject: [PATCH 12/17] Use including notation for fetchNodeList method Which allows the genesis block to be included by a normal function call. --- blockproducer/blocknode.go | 6 +++--- blockproducer/blocknode_test.go | 6 +++--- blockproducer/branch.go | 2 +- blockproducer/chain.go | 18 ++++++++++++++---- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/blockproducer/blocknode.go b/blockproducer/blocknode.go index 2124c71f0..bc3e56c48 100644 --- a/blockproducer/blocknode.go +++ b/blockproducer/blocknode.go @@ -61,12 +61,12 @@ func (n *blockNode) clear() { n.block.Store((*types.BPBlock)(nil)) } -// fetchNodeList returns the block node list within range (from, n.count] from node head n. +// fetchNodeList returns the block node list within range [from, n.count] from node head n. func (n *blockNode) fetchNodeList(from uint32) (bl []*blockNode) { - if n.count <= from { + if n.count < from { return } - bl = make([]*blockNode, n.count-from) + bl = make([]*blockNode, n.count-from+1) var iter = n for i := len(bl) - 1; i >= 0; i-- { bl[i] = iter diff --git a/blockproducer/blocknode_test.go b/blockproducer/blocknode_test.go index 09299abdc..8c72f5111 100644 --- a/blockproducer/blocknode_test.go +++ b/blockproducer/blocknode_test.go @@ -115,11 +115,11 @@ func TestBlockNode(t *testing.T) { So(n0.count, ShouldEqual, 0) So(n1.count, ShouldEqual, n0.count+1) - So(n0.fetchNodeList(0), ShouldBeEmpty) So(n0.fetchNodeList(1), ShouldBeEmpty) So(n0.fetchNodeList(2), ShouldBeEmpty) - So(n3.fetchNodeList(0), ShouldResemble, []*blockNode{n1, n2, n3}) - So(n4p.fetchNodeList(2), ShouldResemble, []*blockNode{n3p, n4p}) + So(n0.fetchNodeList(3), ShouldBeEmpty) + So(n3.fetchNodeList(1), ShouldResemble, []*blockNode{n1, n2, n3}) + So(n4p.fetchNodeList(3), ShouldResemble, []*blockNode{n3p, n4p}) So(n0.ancestor(1), ShouldBeNil) So(n3.ancestor(3), ShouldEqual, n3) diff --git a/blockproducer/branch.go b/blockproducer/branch.go index 346b6bea0..83db975b1 100644 --- a/blockproducer/branch.go +++ b/blockproducer/branch.go @@ -44,7 +44,7 @@ func newBranch( br *branch, err error, ) { var ( - list = headNode.fetchNodeList(baseNode.count) + list = headNode.fetchNodeList(baseNode.count + 1) inst = &branch{ head: headNode, preview: baseState.makeCopy(), diff --git a/blockproducer/chain.go b/blockproducer/chain.go index 0ebdcd86a..2f59895ab 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -185,16 +185,26 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) } } - // Load from database and rebuild branches + // Load from database if irre, heads, immutable, txPool, ierr = loadDatabase(st); ierr != nil { err = errors.Wrap(ierr, "failed to load data from storage") return } - if persistedGenesis := irre.ancestorByCount(0); persistedGenesis == nil || + + // Check genesis block + var irreBlocks = irre.fetchNodeList(0) + if persistedGenesis := irreBlocks[0]; persistedGenesis == nil || !persistedGenesis.hash.IsEqual(cfg.Genesis.BlockHash()) { err = ErrGenesisHashNotMatch return } + + // Add blocks to LRU list + for _, v := range irreBlocks { + cache.Add(v.count, v) + } + + // Rebuild branches for _, v := range heads { log.WithFields(log.Fields{ "irre_hash": irre.hash.Short(4), @@ -680,7 +690,7 @@ func (c *Chain) replaceAndSwitchToBranch( // May have multiple new irreversible blocks here if peer list shrinks. May also have // no new irreversible block at all if peer list expands. lastIrre = newBranch.head.lastIrreversible(c.confirms) - newIrres = lastIrre.fetchNodeList(c.lastIrre.count) + newIrres = lastIrre.fetchNodeList(c.lastIrre.count + 1) // Apply irreversible blocks to create dirty map on immutable cache for k, v := range c.txPool { @@ -800,7 +810,7 @@ func (c *Chain) stat() { } else { buff += fmt.Sprintf("[%04d] ", i) } - buff += v.sprint(c.lastIrre.count) + buff += v.sprint(c.lastIrre.count + 1) log.WithFields(log.Fields{ "branch": buff, }).Info("runtime state") From cea0aee04d399072bf36451c50c2ee763cf1326c Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 12 Feb 2019 15:08:44 +0800 Subject: [PATCH 13/17] Rearrange some variable declarations --- blockproducer/chain.go | 45 +++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/blockproducer/chain.go b/blockproducer/chain.go index 2f59895ab..dbbb3e563 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -103,14 +103,7 @@ func NewChain(cfg *Config) (c *Chain, err error) { // NewChainWithContext creates a new blockchain with context. func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) { var ( - existed bool - ierr error - - cld context.Context - ccl context.CancelFunc - l = uint32(len(cfg.Peers.Servers)) - t float64 - m uint32 + ierr error st xi.Storage cache *lru.Cache @@ -119,11 +112,10 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) immutable *metaState txPool map[hash.Hash]pi.Transaction - branches []*branch - br, head *branch - headIndex int + branches []*branch + headBranch *branch + headIndex int - pubKey *asymmetric.PublicKey addr proto.AccountAddress bpInfos []*blockProducerInfo localBPInfo *blockProducerInfo @@ -142,6 +134,7 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) } // Open storage + var existed bool if fi, err := os.Stat(cfg.DataFile); err == nil && fi.Mode().IsRegular() { existed = true } @@ -213,6 +206,7 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) "head_count": v.count, }).Debug("checking head") if v.hasAncestor(irre) { + var br *branch if br, ierr = newBranch(irre, v, immutable, txPool); ierr != nil { err = errors.Wrapf(ierr, "failed to rebuild branch with head %s", v.hash.Short(4)) return @@ -227,13 +221,14 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) // Set head branch for i, v := range branches { - if head == nil || v.head.count > head.head.count { + if headBranch == nil || v.head.count > headBranch.head.count { headIndex = i - head = v + headBranch = v } } // Get accountAddress + var pubKey *asymmetric.PublicKey if pubKey, err = kms.GetLocalPublicKey(); err != nil { return } @@ -242,18 +237,24 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) } // Setup peer list + var ( + l = uint32(len(cfg.Peers.Servers)) + + threshold float64 + needConfirms uint32 + ) if localBPInfo, bpInfos, err = buildBlockProducerInfos(cfg.NodeID, cfg.Peers, cfg.Mode == APINodeMode); err != nil { return } - if t = cfg.ConfirmThreshold; t <= 0.0 { - t = conf.DefaultConfirmThreshold + if threshold = cfg.ConfirmThreshold; threshold <= 0.0 { + threshold = conf.DefaultConfirmThreshold } - if m = uint32(math.Ceil(float64(l)*t + 1)); m > l { - m = l + if needConfirms = uint32(math.Ceil(float64(l)*threshold + 1)); needConfirms > l { + needConfirms = l } // create chain - cld, ccl = context.WithCancel(ctx) + var cld, ccl = context.WithCancel(ctx) c = &Chain{ ctx: cld, cancel: ccl, @@ -279,14 +280,14 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) bpInfos: bpInfos, localBPInfo: localBPInfo, localNodeID: cfg.NodeID, - confirms: m, - nextHeight: head.head.height + 1, + confirms: needConfirms, + nextHeight: headBranch.head.height + 1, offset: time.Duration(0), // TODO(leventeliu): initialize offset lastIrre: irre, immutable: immutable, headIndex: headIndex, - headBranch: head, + headBranch: headBranch, branches: branches, txPool: txPool, } From 1474ad36322eaf505520db98fe199b626283e82b Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 12 Feb 2019 15:18:25 +0800 Subject: [PATCH 14/17] Remove unused ChainBus field --- blockproducer/chain.go | 50 ++++++++++++++++++++-------------------- blockproducer/rpc.go | 7 ------ blockproducer/storage.go | 4 ++-- 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/blockproducer/chain.go b/blockproducer/chain.go index dbbb3e563..1b6d20711 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -30,7 +30,6 @@ import ( mw "github.com/zserge/metric" pi "github.com/CovenantSQL/CovenantSQL/blockproducer/interfaces" - "github.com/CovenantSQL/CovenantSQL/chainbus" "github.com/CovenantSQL/CovenantSQL/conf" "github.com/CovenantSQL/CovenantSQL/crypto" "github.com/CovenantSQL/CovenantSQL/crypto/asymmetric" @@ -62,17 +61,21 @@ type Chain struct { ctx context.Context cancel context.CancelFunc wg *sync.WaitGroup + // RPC components server *rpc.Server caller *rpc.Caller + // Other components - storage xi.Storage - chainBus chainbus.Bus - // This LRU object is only used for block cache control, do NOT use it for query. + storage xi.Storage + // NOTE(leventeliu): this LRU object is only used for block cache control, + // do NOT read it in any case. blockCache *lru.Cache + // Channels for incoming blocks and transactions pendingBlocks chan *types.BPBlock pendingAddTxReqs chan *types.AddTxReq + // The following fields are read-only in runtime address proto.AccountAddress mode RunMode @@ -107,7 +110,7 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) st xi.Storage cache *lru.Cache - irre *blockNode + lastIrre *blockNode heads []*blockNode immutable *metaState txPool map[hash.Hash]pi.Transaction @@ -119,8 +122,6 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) addr proto.AccountAddress bpInfos []*blockProducerInfo localBPInfo *blockProducerInfo - - bus = chainbus.New() ) // Verify genesis block in config @@ -179,13 +180,13 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) } // Load from database - if irre, heads, immutable, txPool, ierr = loadDatabase(st); ierr != nil { + if lastIrre, heads, immutable, txPool, ierr = loadDatabase(st); ierr != nil { err = errors.Wrap(ierr, "failed to load data from storage") return } // Check genesis block - var irreBlocks = irre.fetchNodeList(0) + var irreBlocks = lastIrre.fetchNodeList(0) if persistedGenesis := irreBlocks[0]; persistedGenesis == nil || !persistedGenesis.hash.IsEqual(cfg.Genesis.BlockHash()) { err = ErrGenesisHashNotMatch @@ -200,14 +201,14 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) // Rebuild branches for _, v := range heads { log.WithFields(log.Fields{ - "irre_hash": irre.hash.Short(4), - "irre_count": irre.count, + "irre_hash": lastIrre.hash.Short(4), + "irre_count": lastIrre.count, "head_hash": v.hash.Short(4), "head_count": v.count, }).Debug("checking head") - if v.hasAncestor(irre) { + if v.hasAncestor(lastIrre) { var br *branch - if br, ierr = newBranch(irre, v, immutable, txPool); ierr != nil { + if br, ierr = newBranch(lastIrre, v, immutable, txPool); ierr != nil { err = errors.Wrapf(ierr, "failed to rebuild branch with head %s", v.hash.Short(4)) return } @@ -219,7 +220,7 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) return } - // Set head branch + // Select head branch for i, v := range branches { if headBranch == nil || v.head.count > headBranch.head.count { headIndex = i @@ -243,7 +244,9 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) threshold float64 needConfirms uint32 ) - if localBPInfo, bpInfos, err = buildBlockProducerInfos(cfg.NodeID, cfg.Peers, cfg.Mode == APINodeMode); err != nil { + if localBPInfo, bpInfos, err = buildBlockProducerInfos( + cfg.NodeID, cfg.Peers, cfg.Mode == APINodeMode, + ); err != nil { return } if threshold = cfg.ConfirmThreshold; threshold <= 0.0 { @@ -263,9 +266,7 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) server: cfg.Server, caller: rpc.NewCaller(), - storage: st, - chainBus: bus, - + storage: st, blockCache: cache, pendingBlocks: make(chan *types.BPBlock), @@ -283,13 +284,12 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) confirms: needConfirms, nextHeight: headBranch.head.height + 1, offset: time.Duration(0), // TODO(leventeliu): initialize offset - - lastIrre: irre, - immutable: immutable, - headIndex: headIndex, - headBranch: headBranch, - branches: branches, - txPool: txPool, + lastIrre: lastIrre, + immutable: immutable, + headIndex: headIndex, + headBranch: headBranch, + branches: branches, + txPool: txPool, } // NOTE(leventeliu): this implies that BP chain is a singleton, otherwise we will need diff --git a/blockproducer/rpc.go b/blockproducer/rpc.go index 1b021614b..61e46b723 100644 --- a/blockproducer/rpc.go +++ b/blockproducer/rpc.go @@ -138,13 +138,6 @@ func (s *ChainRPCService) QueryTxState( return } -// Sub is the RPC method to subscribe some event. -func (s *ChainRPCService) Sub(req *types.SubReq, resp *types.SubResp) (err error) { - return s.chain.chainBus.Subscribe(req.Topic, func(request interface{}, response interface{}) { - s.chain.caller.CallNode(req.NodeID.ToNodeID(), req.Callback, request, response) - }) -} - // WaitDatabaseCreation waits for database creation complete. func WaitDatabaseCreation( ctx context.Context, diff --git a/blockproducer/storage.go b/blockproducer/storage.go index 0fca97223..23c1adaad 100644 --- a/blockproducer/storage.go +++ b/blockproducer/storage.go @@ -416,7 +416,7 @@ func loadTxPool(st xi.Storage) (txPool map[hash.Hash]pi.Transaction, err error) } func loadBlocks( - st xi.Storage, irreHash hash.Hash) (irre *blockNode, heads []*blockNode, err error, + st xi.Storage, irreHash hash.Hash) (lastIrre *blockNode, heads []*blockNode, err error, ) { var ( rows *sql.Rows @@ -494,7 +494,7 @@ func loadBlocks( headsIndex[bh] = bn } - if irre, ok = index[irreHash]; !ok { + if lastIrre, ok = index[irreHash]; !ok { err = errors.Wrapf(ErrParentNotFound, "irreversible block %s not found", ph.Short(4)) return } From 5d225a206a964e009451ce5345c51439bf8fdc10 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 12 Feb 2019 15:23:43 +0800 Subject: [PATCH 15/17] Fix issue: must provide a positive size for LRU cache --- blockproducer/chain.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/blockproducer/chain.go b/blockproducer/chain.go index 1b6d20711..d6e60b375 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -153,6 +153,9 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) if cfg.blockCacheSize > conf.MaxCachedBlock { cfg.blockCacheSize = conf.MaxCachedBlock } + if cfg.blockCacheSize <= 0 { + cfg.blockCacheSize = 1 // Must provide a positive size + } if cache, err = lru.NewWithEvict(cfg.blockCacheSize, func(key interface{}, value interface{}) { if node, ok := value.(*blockNode); ok && node != nil { node.clear() From 34858f00e0a6f87e55156a267d36551040da9236 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 12 Feb 2019 17:03:41 +0800 Subject: [PATCH 16/17] Export block cache size field in config --- blockproducer/chain.go | 10 +++++----- blockproducer/config.go | 2 +- cmd/cqld/bootstrap.go | 17 +++++++++-------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/blockproducer/chain.go b/blockproducer/chain.go index d6e60b375..86789e2fc 100644 --- a/blockproducer/chain.go +++ b/blockproducer/chain.go @@ -150,13 +150,13 @@ func NewChainWithContext(ctx context.Context, cfg *Config) (c *Chain, err error) }() // Create block cache - if cfg.blockCacheSize > conf.MaxCachedBlock { - cfg.blockCacheSize = conf.MaxCachedBlock + if cfg.BlockCacheSize > conf.MaxCachedBlock { + cfg.BlockCacheSize = conf.MaxCachedBlock } - if cfg.blockCacheSize <= 0 { - cfg.blockCacheSize = 1 // Must provide a positive size + if cfg.BlockCacheSize <= 0 { + cfg.BlockCacheSize = 1 // Must provide a positive size } - if cache, err = lru.NewWithEvict(cfg.blockCacheSize, func(key interface{}, value interface{}) { + if cache, err = lru.NewWithEvict(cfg.BlockCacheSize, func(key interface{}, value interface{}) { if node, ok := value.(*blockNode); ok && node != nil { node.clear() } diff --git a/blockproducer/config.go b/blockproducer/config.go index 3fb1b2ee1..9c167a50f 100644 --- a/blockproducer/config.go +++ b/blockproducer/config.go @@ -52,5 +52,5 @@ type Config struct { Period time.Duration Tick time.Duration - blockCacheSize int + BlockCacheSize int } diff --git a/cmd/cqld/bootstrap.go b/cmd/cqld/bootstrap.go index 2b90d262b..80bf2d423 100644 --- a/cmd/cqld/bootstrap.go +++ b/cmd/cqld/bootstrap.go @@ -159,14 +159,15 @@ func runNode(nodeID proto.NodeID, listenAddr string) (err error) { // init main chain service log.Info("register main chain service rpc") chainConfig := &bp.Config{ - Mode: mode, - Genesis: genesis, - DataFile: conf.GConf.BP.ChainFileName, - Server: server, - Peers: peers, - NodeID: nodeID, - Period: conf.GConf.BPPeriod, - Tick: conf.GConf.BPTick, + Mode: mode, + Genesis: genesis, + DataFile: conf.GConf.BP.ChainFileName, + Server: server, + Peers: peers, + NodeID: nodeID, + Period: conf.GConf.BPPeriod, + Tick: conf.GConf.BPTick, + BlockCacheSize: 1000, } chain, err := bp.NewChain(chainConfig) if err != nil { From ec33085dca1264966e50e4ad72dca86b404747e0 Mon Sep 17 00:00:00 2001 From: Levente Liu Date: Tue, 12 Feb 2019 17:10:54 +0800 Subject: [PATCH 17/17] Make use of txCount field in blockNode --- blockproducer/branch.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/blockproducer/branch.go b/blockproducer/branch.go index 83db975b1..6182d3f7d 100644 --- a/blockproducer/branch.go +++ b/blockproducer/branch.go @@ -58,11 +58,11 @@ func newBranch( } // Apply new blocks to view and pool for _, bn := range list { - var block = bn.load() - if len(block.Transactions) > conf.MaxTransactionsPerBlock { + if bn.txCount > conf.MaxTransactionsPerBlock { return nil, ErrTooManyTransactionsInBlock } + var block = bn.load() for _, v := range block.Transactions { var k = v.Hash() // Check in tx pool @@ -135,7 +135,7 @@ func (b *branch) applyBlock(n *blockNode) (br *branch, err error) { } var cpy = b.makeArena() - if len(block.Transactions) > conf.MaxTransactionsPerBlock { + if n.txCount > conf.MaxTransactionsPerBlock { return nil, ErrTooManyTransactionsInBlock }