-
Notifications
You must be signed in to change notification settings - Fork 0
/
outgoingsender.go
263 lines (227 loc) · 9.94 KB
/
outgoingsender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package logicrunner
import (
"context"
"fmt"
"sync/atomic"
"github.com/pkg/errors"
"go.opencensus.io/stats"
"github.com/insolar/go-actors/actor"
aerr "github.com/insolar/go-actors/actor/errors"
"github.com/insolar/insolar/insolar"
"github.com/insolar/insolar/insolar/payload"
"github.com/insolar/insolar/insolar/pulse"
"github.com/insolar/insolar/insolar/record"
"github.com/insolar/insolar/insolar/reply"
"github.com/insolar/insolar/instrumentation/inslogger"
"github.com/insolar/insolar/logicrunner/artifacts"
"github.com/insolar/insolar/logicrunner/metrics"
"github.com/insolar/insolar/logicrunner/requestresult"
)
var OutgoingRequestSenderDefaultQueueLimit = 1000
var OutgoingRequestSenderDefaultGoroutineLimit = int32(5000)
//go:generate minimock -i github.com/insolar/insolar/logicrunner.OutgoingRequestSender -o ./ -s _mock.go -g
// OutgoingRequestSender is a type-safe wrapper for an actor implementation.
// Currently OutgoingRequestSender is implemented as a pair of actors. OutgoingSenderActor is
// responsible for sending regular outgoing requests and AbandonedSenderActor is responsible for
// sending abandoned requests. Generally we want to limit the number of outgoing requests, i.e. use
// some sort of queue. While this is easy for abandoned requests it's a bit tricky for regular requests
// (see comments below). Also generally speaking a synchronous abandoned request can create new outgoing requests
// which may cause a deadlock situation when a single actor is responsible for both types of messages. This is why two
// actors are used with two independent queues and their logic differs a little.
type OutgoingRequestSender interface {
SendOutgoingRequest(ctx context.Context, reqRef insolar.Reference, req *record.OutgoingRequest) (*insolar.Reference, insolar.Arguments, *record.IncomingRequest, error)
SendAbandonedOutgoingRequest(ctx context.Context, reqRef insolar.Reference, req *record.OutgoingRequest)
Stop(ctx context.Context)
}
type outgoingRequestSender struct {
as actor.System
outgoingSenderPid actor.Pid
abandonedSenderPid actor.Pid
}
type actorDeps struct {
cr insolar.ContractRequester
am artifacts.Client
pa pulse.Accessor
}
type outgoingSenderActorState struct {
deps actorDeps
atomicRunningGoroutineCounter int32
}
type abandonedSenderActorState struct {
deps actorDeps
}
type sendOutgoingResult struct {
object *insolar.Reference // only for CTSaveAsChild
result insolar.Arguments
incoming *record.IncomingRequest // incoming request is used in a transcript
err error
}
type sendOutgoingRequestMessage struct {
ctx context.Context
requestReference insolar.Reference // registered request id
outgoingRequest *record.OutgoingRequest // outgoing request body
resultChan chan sendOutgoingResult // result that will be returned to the contract proxy
}
type sendAbandonedOutgoingRequestMessage struct {
ctx context.Context
requestReference insolar.Reference // registered request id
outgoingRequest *record.OutgoingRequest // outgoing request body
}
type stopRequestSenderMessage struct {
resultChan chan struct{}
}
func NewOutgoingRequestSender(as actor.System, cr insolar.ContractRequester, am artifacts.Client, pa pulse.Accessor) OutgoingRequestSender {
outgoingSenderPid := as.Spawn(func(system actor.System, pid actor.Pid) (actor.Actor, int) {
state := newOutgoingSenderActorState(cr, am, pa)
queueLimit := OutgoingRequestSenderDefaultQueueLimit
return state, queueLimit
})
abandonedSenderPid := as.Spawn(func(system actor.System, pid actor.Pid) (actor.Actor, int) {
state := newAbandonedSenderActorState(cr, am, pa)
queueLimit := OutgoingRequestSenderDefaultQueueLimit
return state, queueLimit
})
return &outgoingRequestSender{
as: as,
outgoingSenderPid: outgoingSenderPid,
abandonedSenderPid: abandonedSenderPid,
}
}
func (rs *outgoingRequestSender) SendOutgoingRequest(ctx context.Context, reqRef insolar.Reference, req *record.OutgoingRequest) (*insolar.Reference, insolar.Arguments, *record.IncomingRequest, error) {
resultChan := make(chan sendOutgoingResult, 1)
msg := sendOutgoingRequestMessage{
ctx: ctx,
requestReference: reqRef,
outgoingRequest: req,
resultChan: resultChan,
}
err := rs.as.Send(rs.outgoingSenderPid, msg)
if err != nil {
inslogger.FromContext(ctx).Errorf("SendOutgoingRequest failed: %v", err)
return nil, insolar.Arguments{}, nil, err
}
res := <-resultChan
return res.object, res.result, res.incoming, res.err
}
func (rs *outgoingRequestSender) SendAbandonedOutgoingRequest(ctx context.Context, reqRef insolar.Reference, req *record.OutgoingRequest) {
msg := sendAbandonedOutgoingRequestMessage{
ctx: ctx,
requestReference: reqRef,
outgoingRequest: req,
}
err := rs.as.Send(rs.abandonedSenderPid, msg)
if err != nil {
// Actor's mailbox is most likely full. This is OK to lost an abandoned OutgoingRequest
// in this case, LME will re-send a corresponding notification anyway.
inslogger.FromContext(ctx).Errorf("SendAbandonedOutgoingRequest failed: %v", err)
}
}
func (rs *outgoingRequestSender) Stop(ctx context.Context) {
resultChanOutgoing := make(chan struct{}, 1)
resultChanAbandoned := make(chan struct{}, 1)
// We ignore both errors here because the only reason why SendPriority can fail
// is that an actor doesn't exist or was already terminated. We don't expect either
// situation here and there is no reasonable way to handle an error. If somehow
// it happens Stop() will probably block forever and its OK (e.g. easy to debug using SIGABRT).
rs.as.SendPriority(rs.outgoingSenderPid, stopRequestSenderMessage{ //nolint: errcheck
resultChan: resultChanOutgoing,
})
rs.as.SendPriority(rs.abandonedSenderPid, stopRequestSenderMessage{ //nolint: errcheck
resultChan: resultChanAbandoned,
})
// wait for a termination
<-resultChanOutgoing
<-resultChanAbandoned
}
func newOutgoingSenderActorState(cr insolar.ContractRequester, am artifacts.Client, pa pulse.Accessor) actor.Actor {
return &outgoingSenderActorState{deps: actorDeps{cr: cr, am: am, pa: pa}}
}
func newAbandonedSenderActorState(cr insolar.ContractRequester, am artifacts.Client, pa pulse.Accessor) actor.Actor {
return &abandonedSenderActorState{deps: actorDeps{cr: cr, am: am, pa: pa}}
}
func (a *outgoingSenderActorState) Receive(message actor.Message) (actor.Actor, error) {
switch v := message.(type) {
case sendOutgoingRequestMessage:
if atomic.LoadInt32(&a.atomicRunningGoroutineCounter) >= OutgoingRequestSenderDefaultGoroutineLimit {
var res sendOutgoingResult
res.err = fmt.Errorf("OutgoingRequestActor: goroutine limit exceeded")
v.resultChan <- res
return a, nil
}
// The reason why a goroutine is needed here is that an outgoing request can result in
// creating a new outgoing request that can be directed to the same VE which would be
// waiting for a reply for a first request, i.e. a deadlock situation.
// We limit the number of simultaneously running goroutines to prevent resource leakage.
// It's OK to use atomics here because Receive is always executed by one goroutine. Thus
// it's impossible to exceed the limit. It's possible that for a short period of time we'll
// allow to create a little less goroutines that the limit says, but that's fine.
atomic.AddInt32(&a.atomicRunningGoroutineCounter, 1)
stats.Record(v.ctx, metrics.OutgoingSenderActorGoroutines.M(1))
go func() {
defer func() {
atomic.AddInt32(&a.atomicRunningGoroutineCounter, -1)
stats.Record(v.ctx, metrics.OutgoingSenderActorGoroutines.M(-1))
}()
var res sendOutgoingResult
res.object, res.result, res.incoming, res.err = a.deps.sendOutgoingRequest(v.ctx, v.requestReference, v.outgoingRequest)
v.resultChan <- res
}()
return a, nil
case stopRequestSenderMessage:
v.resultChan <- struct{}{}
return a, aerr.Terminate
default:
inslogger.FromContext(context.Background()).Errorf("OutgoingRequestActor: unexpected message %v", v)
return a, nil
}
}
func (a *abandonedSenderActorState) Receive(message actor.Message) (actor.Actor, error) {
switch v := message.(type) {
case sendAbandonedOutgoingRequestMessage:
_, _, _, err := a.deps.sendOutgoingRequest(v.ctx, v.requestReference, v.outgoingRequest)
// It's OK to just log an error, LME will re-send a corresponding notification anyway.
if err != nil {
inslogger.FromContext(context.Background()).Errorf("AbandonedSenderActor: sendOutgoingRequest failed %v", err)
}
return a, nil
case stopRequestSenderMessage:
v.resultChan <- struct{}{}
return a, aerr.Terminate
default:
inslogger.FromContext(context.Background()).Errorf("AbandonedSenderActor: unexpected message %v", v)
return a, nil
}
}
func (a *actorDeps) sendOutgoingRequest(ctx context.Context, outgoingReqRef insolar.Reference, outgoing *record.OutgoingRequest) (*insolar.Reference, insolar.Arguments, *record.IncomingRequest, error) {
var object *insolar.Reference
incoming := buildIncomingRequestFromOutgoing(outgoing)
latestPulse, err := a.pa.Latest(ctx)
if err != nil {
err = errors.Wrapf(err, "sendOutgoingRequest: failed to get current pulse")
return nil, nil, nil, err
}
// Actually make a call.
callMsg := &payload.CallMethod{Request: incoming, PulseNumber: latestPulse.PulseNumber}
res, _, err := a.cr.SendRequest(ctx, callMsg)
if err != nil {
return nil, nil, nil, err
}
var result []byte
switch v := res.(type) {
case *reply.CallMethod: // regular call
object = v.Object // only for CTSaveAsChild
result = v.Result
case *reply.RegisterRequest: // no-wait call
result = v.Request.Bytes()
default:
err = fmt.Errorf("sendOutgoingRequest: cr.Call returned unexpected type %T", res)
return nil, nil, nil, err
}
// Register result of the outgoing method
reqResult := requestresult.New(result, outgoing.Caller)
err = a.am.RegisterResult(ctx, outgoingReqRef, reqResult)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "can't register result")
}
return object, result, incoming, nil
}