-
Notifications
You must be signed in to change notification settings - Fork 199
/
shardChainMessenger.go
186 lines (153 loc) · 4.84 KB
/
shardChainMessenger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package broadcast
import (
"fmt"
"github.com/ElrondNetwork/elrond-go/consensus"
"github.com/ElrondNetwork/elrond-go/consensus/spos"
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/partitioning"
"github.com/ElrondNetwork/elrond-go/crypto"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/sharding"
)
type shardChainMessenger struct {
*commonMessenger
marshalizer marshal.Marshalizer
messenger consensus.P2PMessenger
shardCoordinator sharding.Coordinator
}
// NewShardChainMessenger creates a new shardChainMessenger object
func NewShardChainMessenger(
marshalizer marshal.Marshalizer,
messenger consensus.P2PMessenger,
privateKey crypto.PrivateKey,
shardCoordinator sharding.Coordinator,
singleSigner crypto.SingleSigner,
) (*shardChainMessenger, error) {
err := checkShardChainNilParameters(marshalizer, messenger, shardCoordinator, privateKey, singleSigner)
if err != nil {
return nil, err
}
cm := &commonMessenger{
marshalizer: marshalizer,
messenger: messenger,
privateKey: privateKey,
shardCoordinator: shardCoordinator,
singleSigner: singleSigner,
}
scm := &shardChainMessenger{
commonMessenger: cm,
marshalizer: marshalizer,
messenger: messenger,
shardCoordinator: shardCoordinator,
}
return scm, nil
}
func checkShardChainNilParameters(
marshalizer marshal.Marshalizer,
messenger consensus.P2PMessenger,
shardCoordinator sharding.Coordinator,
privateKey crypto.PrivateKey,
singleSigner crypto.SingleSigner,
) error {
if marshalizer == nil || marshalizer.IsInterfaceNil() {
return spos.ErrNilMarshalizer
}
if messenger == nil || messenger.IsInterfaceNil() {
return spos.ErrNilMessenger
}
if shardCoordinator == nil || shardCoordinator.IsInterfaceNil() {
return spos.ErrNilShardCoordinator
}
if privateKey == nil || privateKey.IsInterfaceNil() {
return spos.ErrNilPrivateKey
}
if singleSigner == nil || singleSigner.IsInterfaceNil() {
return spos.ErrNilSingleSigner
}
return nil
}
// BroadcastBlock will send on in-shard headers topic and on in-shard miniblocks topic the header and block body
func (scm *shardChainMessenger) BroadcastBlock(blockBody data.BodyHandler, header data.HeaderHandler) error {
if blockBody == nil || blockBody.IsInterfaceNil() {
return spos.ErrNilBody
}
err := blockBody.IntegrityAndValidity()
if err != nil {
return err
}
if header == nil || header.IsInterfaceNil() {
return spos.ErrNilHeader
}
msgHeader, err := scm.marshalizer.Marshal(header)
if err != nil {
return err
}
msgBlockBody, err := scm.marshalizer.Marshal(blockBody)
if err != nil {
return err
}
selfIdentifier := scm.shardCoordinator.CommunicationIdentifier(scm.shardCoordinator.SelfId())
go scm.messenger.Broadcast(factory.HeadersTopic+selfIdentifier, msgHeader)
go scm.messenger.Broadcast(factory.MiniBlocksTopic+selfIdentifier, msgBlockBody)
return nil
}
// BroadcastHeader will send on shard headers for metachain topic the header
func (scm *shardChainMessenger) BroadcastHeader(header data.HeaderHandler) error {
if header == nil || header.IsInterfaceNil() {
return spos.ErrNilHeader
}
msgHeader, err := scm.marshalizer.Marshal(header)
if err != nil {
return err
}
shardHeaderForMetachainTopic := factory.ShardHeadersForMetachainTopic +
scm.shardCoordinator.CommunicationIdentifier(sharding.MetachainShardId)
go scm.messenger.Broadcast(shardHeaderForMetachainTopic, msgHeader)
return nil
}
// BroadcastMiniBlocks will send on miniblocks topic the cross-shard miniblocks
func (scm *shardChainMessenger) BroadcastMiniBlocks(miniBlocks map[uint32][]byte) error {
mbs := 0
for k, v := range miniBlocks {
mbs++
miniBlocksTopic := factory.MiniBlocksTopic +
scm.shardCoordinator.CommunicationIdentifier(k)
go scm.messenger.Broadcast(miniBlocksTopic, v)
}
if mbs > 0 {
log.Info(fmt.Sprintf("sent %d miniblocks\n", mbs))
}
return nil
}
// BroadcastTransactions will send on transaction topic the transactions
func (scm *shardChainMessenger) BroadcastTransactions(transactions map[string][][]byte) error {
dataPacker, err := partitioning.NewSimpleDataPacker(scm.marshalizer)
if err != nil {
return err
}
txs := 0
for topic, v := range transactions {
txs += len(v)
// forward txs to the destination shards in packets
packets, err := dataPacker.PackDataInChunks(v, core.MaxBulkTransactionSize)
if err != nil {
return err
}
for _, buff := range packets {
go scm.messenger.Broadcast(topic, buff)
}
}
if txs > 0 {
log.Info(fmt.Sprintf("sent %d transactions\n", txs))
}
return nil
}
// IsInterfaceNil returns true if there is no value under the interface
func (scm *shardChainMessenger) IsInterfaceNil() bool {
if scm == nil {
return true
}
return false
}