-
Notifications
You must be signed in to change notification settings - Fork 199
/
crossShardPeerTopicNotifier.go
111 lines (93 loc) · 3.17 KB
/
crossShardPeerTopicNotifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package monitor
import (
"fmt"
"strconv"
"strings"
"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go/common"
"github.com/ElrondNetwork/elrond-go/heartbeat"
"github.com/ElrondNetwork/elrond-go/sharding"
)
const topicSeparator = "_"
// ArgsCrossShardPeerTopicNotifier represents the arguments for the cross shard peer topic notifier
type ArgsCrossShardPeerTopicNotifier struct {
ShardCoordinator sharding.Coordinator
PeerShardMapper heartbeat.PeerShardMapper
}
type crossShardPeerTopicNotifier struct {
shardCoordinator sharding.Coordinator
peerShardMapper heartbeat.PeerShardMapper
}
// NewCrossShardPeerTopicNotifier create a new cross shard peer topic notifier instance
func NewCrossShardPeerTopicNotifier(args ArgsCrossShardPeerTopicNotifier) (*crossShardPeerTopicNotifier, error) {
err := checkArgsCrossShardPeerTopicNotifier(args)
if err != nil {
return nil, err
}
notifier := &crossShardPeerTopicNotifier{
shardCoordinator: args.ShardCoordinator,
peerShardMapper: args.PeerShardMapper,
}
return notifier, nil
}
func checkArgsCrossShardPeerTopicNotifier(args ArgsCrossShardPeerTopicNotifier) error {
if check.IfNil(args.PeerShardMapper) {
return heartbeat.ErrNilPeerShardMapper
}
if check.IfNil(args.ShardCoordinator) {
return heartbeat.ErrNilShardCoordinator
}
return nil
}
// NewPeerFound is called whenever a new peer was found
func (notifier *crossShardPeerTopicNotifier) NewPeerFound(pid core.PeerID, topic string) {
splt := strings.Split(topic, topicSeparator)
if len(splt) != 3 {
// not a cross shard peer or the topic is global
return
}
shardID1, err := notifier.getShardID(splt[1])
if err != nil {
log.Error("failed to extract first shard for topic", "topic", topic, "error", err.Error())
return
}
shardID2, err := notifier.getShardID(splt[2])
if err != nil {
log.Error("failed to extract second shard for topic", "topic", topic, "error", err.Error())
return
}
if shardID1 == shardID2 {
return
}
notifier.checkAndAddShardID(pid, shardID1, topic, shardID2)
notifier.checkAndAddShardID(pid, shardID2, topic, shardID1)
}
// TODO make a standalone component out of this
func (notifier *crossShardPeerTopicNotifier) getShardID(data string) (uint32, error) {
if data == common.MetachainTopicIdentifier {
return common.MetachainShardId, nil
}
val, err := strconv.Atoi(data)
if err != nil {
return 0, err
}
if uint32(val) >= notifier.shardCoordinator.NumberOfShards() || val < 0 {
return 0, fmt.Errorf("invalid value in crossShardPeerTopicNotifier.getShardID %d", val)
}
return uint32(val), nil
}
func (notifier *crossShardPeerTopicNotifier) checkAndAddShardID(pid core.PeerID, shardID1 uint32, topic string, shardID2 uint32) {
if shardID1 != notifier.shardCoordinator.SelfId() {
return
}
log.Trace("crossShardPeerTopicNotifier.NewPeerFound found a cross shard peer",
"topic", topic,
"pid", pid.Pretty(),
"shard", shardID2)
notifier.peerShardMapper.PutPeerIdShardId(pid, shardID2)
}
// IsInterfaceNil returns true if there is no value under the interface
func (notifier *crossShardPeerTopicNotifier) IsInterfaceNil() bool {
return notifier == nil
}