/
relayMsgs.go
168 lines (146 loc) · 4.07 KB
/
relayMsgs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package core
import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
)
// RelayMsgs contains the msgs that need to be sent to both a src and dst chain
// after a given relay round. MaxTxSize and MaxMsgLength are ignored if they are
// set to zero.
type RelayMsgs struct {
Src []sdk.Msg `json:"src"`
Dst []sdk.Msg `json:"dst"`
MaxTxSize uint64 `json:"max_tx_size"` // maximum permitted size of the msgs in a bundled relay transaction
MaxMsgLength uint64 `json:"max_msg_length"` // maximum amount of messages in a bundled relay transaction
Last bool `json:"last"`
Succeeded bool `json:"success"`
SrcMsgIDs []MsgID `json:"src_msg_ids"`
DstMsgIDs []MsgID `json:"dst_msg_ids"`
}
// NewRelayMsgs returns an initialized version of relay messages
func NewRelayMsgs() *RelayMsgs {
return &RelayMsgs{Src: []sdk.Msg{}, Dst: []sdk.Msg{}, Last: false, Succeeded: false}
}
// Ready returns true if there are messages to relay
func (r *RelayMsgs) Ready() bool {
if r == nil {
return false
}
if len(r.Src) == 0 && len(r.Dst) == 0 {
return false
}
return true
}
// Success returns the success var
func (r *RelayMsgs) Success() bool {
return r.Succeeded
}
func (r *RelayMsgs) IsMaxTx(msgLen, txSize uint64) bool {
return (r.MaxMsgLength != 0 && msgLen > r.MaxMsgLength) ||
(r.MaxTxSize != 0 && txSize > r.MaxTxSize)
}
// Send sends the messages with appropriate output
// TODO: Parallelize? Maybe?
func (r *RelayMsgs) Send(src, dst Chain) {
logger := GetChannelPairLogger(src, dst)
//nolint:prealloc // can not be pre allocated
var (
msgLen, txSize uint64
msgs []sdk.Msg
)
r.Succeeded = true
srcMsgIDs := make([]MsgID, len(r.Src))
dstMsgIDs := make([]MsgID, len(r.Dst))
// submit batches of relay transactions
maxTxCount := 0
for _, msg := range r.Src {
bz, err := proto.Marshal(msg)
if err != nil {
logger.Error("failed to marshal msg", err)
panic(err)
}
msgLen++
txSize += uint64(len(bz))
if r.IsMaxTx(msgLen, txSize) {
// Submit the transactions to src chain and update its status
msgIDs, err := src.SendMsgs(msgs)
if err != nil {
logger.Error("failed to send msgs", err, "msgs", msgs)
}
r.Succeeded = r.Succeeded && (err == nil)
if err == nil {
for i := range msgs {
srcMsgIDs[i+maxTxCount] = msgIDs[i]
}
}
// clear the current batch and reset variables
maxTxCount += len(msgs)
msgLen, txSize = 1, uint64(len(bz))
msgs = []sdk.Msg{}
}
msgs = append(msgs, msg)
}
// submit leftover msgs
if len(msgs) > 0 {
msgIDs, err := src.SendMsgs(msgs)
if err != nil {
logger.Error("failed to send msgs", err, "msgs", msgs)
}
r.Succeeded = r.Succeeded && (err == nil)
if err == nil {
for i := range msgs {
srcMsgIDs[i+maxTxCount] = msgIDs[i]
}
}
}
// reset variables
msgLen, txSize = 0, 0
msgs = []sdk.Msg{}
maxTxCount = 0
for _, msg := range r.Dst {
bz, err := proto.Marshal(msg)
if err != nil {
logger.Error("failed to marshal msg", err)
panic(err)
}
msgLen++
txSize += uint64(len(bz))
if r.IsMaxTx(msgLen, txSize) {
// Submit the transaction to dst chain and update its status
msgIDs, err := dst.SendMsgs(msgs)
if err != nil {
logger.Error("failed to send msgs", err, "msgs", msgs)
}
r.Succeeded = r.Succeeded && (err == nil)
if err == nil {
for i := range msgs {
dstMsgIDs[i+maxTxCount] = msgIDs[i]
}
}
// clear the current batch and reset variables
maxTxCount += len(msgs)
msgLen, txSize = 1, uint64(len(bz))
msgs = []sdk.Msg{}
}
msgs = append(msgs, msg)
}
// submit leftover msgs
if len(msgs) > 0 {
msgIDs, err := dst.SendMsgs(msgs)
if err != nil {
logger.Error("failed to send msgs", err, "msgs", msgs)
}
r.Succeeded = r.Succeeded && (err == nil)
if err == nil {
for i := range msgs {
dstMsgIDs[i+maxTxCount] = msgIDs[i]
}
}
}
r.SrcMsgIDs = srcMsgIDs
r.DstMsgIDs = dstMsgIDs
}
// Merge merges the argument into the receiver
func (r *RelayMsgs) Merge(other *RelayMsgs) {
r.Src = append(r.Src, other.Src...)
r.Dst = append(r.Dst, other.Dst...)
}