forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventer.go
114 lines (92 loc) · 3.13 KB
/
eventer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package service
import (
"reflect"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/protos/peer"
)
// Config enumerates the configuration methods required by gossip
type Config interface {
// ChainID returns the chainID for this channel
ChainID() string
// Organizations returns a map of org ID to ApplicationOrgConfig
Organizations() map[string]channelconfig.ApplicationOrg
// Sequence should return the sequence number of the current configuration
Sequence() uint64
// OrdererAddresses returns the list of valid orderer addresses to connect to to invoke Broadcast/Deliver
OrdererAddresses() []string
}
// ConfigProcessor receives config updates
type ConfigProcessor interface {
// ProcessConfig should be invoked whenever a channel's configuration is initialized or updated
ProcessConfigUpdate(config Config)
}
type configStore struct {
anchorPeers []*peer.AnchorPeer
orgMap map[string]channelconfig.ApplicationOrg
}
type configEventReceiver interface {
updateAnchors(config Config)
updateEndpoints(chainID string, endpoints []string)
}
type configEventer struct {
lastConfig *configStore
receiver configEventReceiver
}
func newConfigEventer(receiver configEventReceiver) *configEventer {
return &configEventer{
receiver: receiver,
}
}
// ProcessConfigUpdate should be invoked whenever a channel's configuration is initialized or updated
// it invokes the associated method in configEventReceiver when configuration is updated
// but only if the configuration value actually changed
// Note, that a changing sequence number is ignored as changing configuration
func (ce *configEventer) ProcessConfigUpdate(config Config) {
logger.Debugf("Processing new config for channel %s", config.ChainID())
orgMap := cloneOrgConfig(config.Organizations())
if ce.lastConfig != nil && reflect.DeepEqual(ce.lastConfig.orgMap, orgMap) {
logger.Debugf("Ignoring new config for channel %s because it contained no anchor peer updates", config.ChainID())
} else {
var newAnchorPeers []*peer.AnchorPeer
for _, group := range config.Organizations() {
newAnchorPeers = append(newAnchorPeers, group.AnchorPeers()...)
}
newConfig := &configStore{
orgMap: orgMap,
anchorPeers: newAnchorPeers,
}
ce.lastConfig = newConfig
logger.Debugf("Calling out because config was updated for channel %s", config.ChainID())
ce.receiver.updateAnchors(config)
}
ce.receiver.updateEndpoints(config.ChainID(), config.OrdererAddresses())
}
func cloneOrgConfig(src map[string]channelconfig.ApplicationOrg) map[string]channelconfig.ApplicationOrg {
clone := make(map[string]channelconfig.ApplicationOrg)
for k, v := range src {
clone[k] = &appGrp{
name: v.Name(),
mspID: v.MSPID(),
anchorPeers: v.AnchorPeers(),
}
}
return clone
}
type appGrp struct {
name string
mspID string
anchorPeers []*peer.AnchorPeer
}
func (ag *appGrp) Name() string {
return ag.name
}
func (ag *appGrp) MSPID() string {
return ag.mspID
}
func (ag *appGrp) AnchorPeers() []*peer.AnchorPeer {
return ag.anchorPeers
}