forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
committer_impl.go
132 lines (104 loc) · 4.48 KB
/
committer_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package committer
import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
)
var logger = flogging.MustGetLogger("committer")
//--------!!!IMPORTANT!!-!!IMPORTANT!!-!!IMPORTANT!!---------
// This is used merely to complete the loop for the "skeleton"
// path so we can reason about and modify committer component
// more effectively using code.
// PeerLedgerSupport abstract out the API's of ledger.PeerLedger interface
// required to implement LedgerCommitter
type PeerLedgerSupport interface {
GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsCollFilter) (*ledger.BlockAndPvtData, error)
GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error)
CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
CommitPvtDataOfOldBlocks(blockPvtData []*ledger.BlockPvtData) ([]*ledger.PvtdataHashMismatch, error)
GetBlockchainInfo() (*common.BlockchainInfo, error)
GetBlockByNumber(blockNumber uint64) (*common.Block, error)
GetConfigHistoryRetriever() (ledger.ConfigHistoryRetriever, error)
GetMissingPvtDataTracker() (ledger.MissingPvtDataTracker, error)
Close()
}
// LedgerCommitter is the implementation of Committer interface
// it keeps the reference to the ledger to commit blocks and retrieve
// chain information
type LedgerCommitter struct {
PeerLedgerSupport
eventer ConfigBlockEventer
}
// ConfigBlockEventer callback function proto type to define action
// upon arrival on new configuaration update block
type ConfigBlockEventer func(block *common.Block) error
// NewLedgerCommitter is a factory function to create an instance of the committer
// which passes incoming blocks via validation and commits them into the ledger.
func NewLedgerCommitter(ledger PeerLedgerSupport) *LedgerCommitter {
return NewLedgerCommitterReactive(ledger, func(_ *common.Block) error { return nil })
}
// NewLedgerCommitterReactive is a factory function to create an instance of the committer
// same as way as NewLedgerCommitter, while also provides an option to specify callback to
// be called upon new configuration block arrival and commit event
func NewLedgerCommitterReactive(ledger PeerLedgerSupport, eventer ConfigBlockEventer) *LedgerCommitter {
return &LedgerCommitter{PeerLedgerSupport: ledger, eventer: eventer}
}
// preCommit takes care to validate the block and update based on its
// content
func (lc *LedgerCommitter) preCommit(block *common.Block) error {
// Updating CSCC with new configuration block
if utils.IsConfigBlock(block) {
logger.Debug("Received configuration update, calling CSCC ConfigUpdate")
if err := lc.eventer(block); err != nil {
return errors.WithMessage(err, "could not update CSCC with new configuration update")
}
}
return nil
}
// CommitWithPvtData commits blocks atomically with private data
func (lc *LedgerCommitter) CommitWithPvtData(blockAndPvtData *ledger.BlockAndPvtData) error {
// Do validation and whatever needed before
// committing new block
if err := lc.preCommit(blockAndPvtData.Block); err != nil {
return err
}
// Committing new block
if err := lc.PeerLedgerSupport.CommitWithPvtData(blockAndPvtData); err != nil {
return err
}
return nil
}
// GetPvtDataAndBlockByNum retrieves private data and block for given sequence number
func (lc *LedgerCommitter) GetPvtDataAndBlockByNum(seqNum uint64) (*ledger.BlockAndPvtData, error) {
return lc.PeerLedgerSupport.GetPvtDataAndBlockByNum(seqNum, nil)
}
// LedgerHeight returns recently committed block sequence number
func (lc *LedgerCommitter) LedgerHeight() (uint64, error) {
var info *common.BlockchainInfo
var err error
if info, err = lc.GetBlockchainInfo(); err != nil {
logger.Errorf("Cannot get blockchain info, %s", info)
return uint64(0), err
}
return info.Height, nil
}
// GetBlocks used to retrieve blocks with sequence numbers provided in the slice
func (lc *LedgerCommitter) GetBlocks(blockSeqs []uint64) []*common.Block {
var blocks []*common.Block
for _, seqNum := range blockSeqs {
if blck, err := lc.GetBlockByNumber(seqNum); err != nil {
logger.Errorf("Not able to acquire block num %d, from the ledger skipping...", seqNum)
continue
} else {
logger.Debug("Appending next block with seqNum = ", seqNum, " to the resulting set")
blocks = append(blocks, blck)
}
}
return blocks
}