forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
blockpuller.go
109 lines (94 loc) · 3.22 KB
/
blockpuller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft
import (
"encoding/pem"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/pkg/errors"
)
// LedgerBlockPuller pulls blocks upon demand, or fetches them from the ledger
type LedgerBlockPuller struct {
BlockPuller
BlockRetriever cluster.BlockRetriever
Height func() uint64
}
func (lp *LedgerBlockPuller) PullBlock(seq uint64) *common.Block {
lastSeq := lp.Height() - 1
if lastSeq >= seq {
return lp.BlockRetriever.Block(seq)
}
return lp.BlockPuller.PullBlock(seq)
}
// EndpointconfigFromSupport extracts TLS CA certificates and endpoints from the ConsenterSupport
func EndpointconfigFromSupport(support consensus.ConsenterSupport, bccsp bccsp.BCCSP) ([]cluster.EndpointCriteria, error) {
lastConfigBlock, err := lastConfigBlockFromSupport(support)
if err != nil {
return nil, err
}
endpointconf, err := cluster.EndpointconfigFromConfigBlock(lastConfigBlock, bccsp)
if err != nil {
return nil, err
}
return endpointconf, nil
}
func lastConfigBlockFromSupport(support consensus.ConsenterSupport) (*common.Block, error) {
lastBlockSeq := support.Height() - 1
lastBlock := support.Block(lastBlockSeq)
if lastBlock == nil {
return nil, errors.Errorf("unable to retrieve block [%d]", lastBlockSeq)
}
lastConfigBlock, err := cluster.LastConfigBlock(lastBlock, support)
if err != nil {
return nil, err
}
return lastConfigBlock, nil
}
// NewBlockPuller creates a new block puller
func NewBlockPuller(support consensus.ConsenterSupport,
baseDialer *cluster.PredicateDialer,
clusterConfig localconfig.Cluster,
bccsp bccsp.BCCSP,
) (BlockPuller, error) {
verifyBlockSequence := func(blocks []*common.Block, _ string) error {
return cluster.VerifyBlocks(blocks, support)
}
stdDialer := &cluster.StandardDialer{
Config: baseDialer.Config.Clone(),
}
stdDialer.Config.AsyncConnect = false
stdDialer.Config.SecOpts.VerifyCertificate = nil
// Extract the TLS CA certs and endpoints from the configuration,
endpoints, err := EndpointconfigFromSupport(support, bccsp)
if err != nil {
return nil, err
}
der, _ := pem.Decode(stdDialer.Config.SecOpts.Certificate)
if der == nil {
return nil, errors.Errorf("client certificate isn't in PEM format: %v",
string(stdDialer.Config.SecOpts.Certificate))
}
bp := &cluster.BlockPuller{
VerifyBlockSequence: verifyBlockSequence,
Logger: flogging.MustGetLogger("orderer.common.cluster.puller").With("channel", support.ChannelID()),
RetryTimeout: clusterConfig.ReplicationRetryTimeout,
MaxTotalBufferBytes: clusterConfig.ReplicationBufferSize,
FetchTimeout: clusterConfig.ReplicationPullTimeout,
Endpoints: endpoints,
Signer: support,
TLSCert: der.Bytes,
Channel: support.ChannelID(),
Dialer: stdDialer,
}
return &LedgerBlockPuller{
Height: support.Height,
BlockRetriever: support,
BlockPuller: bp,
}, nil
}