forked from lightningnetwork/lnd
/
session_queue.go
666 lines (562 loc) · 20 KB
/
session_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
package wtclient
import (
"container/list"
"fmt"
"sync"
"time"
"github.com/John-Tonny/lnd/input"
"github.com/John-Tonny/lnd/keychain"
"github.com/John-Tonny/lnd/lnwire"
"github.com/John-Tonny/lnd/watchtower/wtdb"
"github.com/John-Tonny/lnd/watchtower/wtserver"
"github.com/John-Tonny/lnd/watchtower/wtwire"
"github.com/John-Tonny/vclsuite_vcld/chaincfg/chainhash"
"github.com/btcsuite/btclog"
)
// reserveStatus is an enum that signals how full a particular session is.
type reserveStatus uint8
const (
// reserveAvailable indicates that the session has space for at least
// one more backup.
reserveAvailable reserveStatus = iota
// reserveExhausted indicates that all slots in the session have been
// allocated.
reserveExhausted
)
// sessionQueueConfig bundles the resources required by the sessionQueue to
// perform its duties. All entries MUST be non-nil.
type sessionQueueConfig struct {
// ClientSession provides access to the negotiated session parameters
// and updating its persistent storage.
ClientSession *wtdb.ClientSession
// ChainHash identifies the chain for which the session's justice
// transactions are targeted.
ChainHash chainhash.Hash
// Dial allows the client to dial the tower using it's public key and
// net address.
Dial func(keychain.SingleKeyECDH, *lnwire.NetAddress) (wtserver.Peer,
error)
// SendMessage encodes, encrypts, and writes a message to the given peer.
SendMessage func(wtserver.Peer, wtwire.Message) error
// ReadMessage receives, decypts, and decodes a message from the given
// peer.
ReadMessage func(wtserver.Peer) (wtwire.Message, error)
// Signer facilitates signing of inputs, used to construct the witnesses
// for justice transaction inputs.
Signer input.Signer
// DB provides access to the client's stable storage.
DB DB
// MinBackoff defines the initial backoff applied by the session
// queue before reconnecting to the tower after a failed or partially
// successful batch is sent. Subsequent backoff durations will grow
// exponentially up until MaxBackoff.
MinBackoff time.Duration
// MaxBackoff defines the maximum backoff applied by the session
// queue before reconnecting to the tower after a failed or partially
// successful batch is sent. If the exponential backoff produces a
// timeout greater than this value, the backoff duration will be clamped
// to MaxBackoff.
MaxBackoff time.Duration
// Log specifies the desired log output, which should be prefixed by the
// client type, e.g. anchor or legacy.
Log btclog.Logger
}
// sessionQueue implements a reliable queue that will encrypt and send accepted
// backups to the watchtower specified in the config's ClientSession. Calling
// Quit will attempt to perform a clean shutdown by receiving an ACK from the
// tower for all pending backups before exiting. The clean shutdown can be
// aborted by using ForceQuit, which will attempt to shutdown the queue
// immediately.
type sessionQueue struct {
started sync.Once
stopped sync.Once
forced sync.Once
cfg *sessionQueueConfig
log btclog.Logger
commitQueue *list.List
pendingQueue *list.List
queueMtx sync.Mutex
queueCond *sync.Cond
localInit *wtwire.Init
towerAddr *lnwire.NetAddress
seqNum uint16
retryBackoff time.Duration
quit chan struct{}
forceQuit chan struct{}
shutdown chan struct{}
}
// newSessionQueue intiializes a fresh sessionQueue.
func newSessionQueue(cfg *sessionQueueConfig) *sessionQueue {
localInit := wtwire.NewInitMessage(
lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
cfg.ChainHash,
)
towerAddr := &lnwire.NetAddress{
IdentityKey: cfg.ClientSession.Tower.IdentityKey,
Address: cfg.ClientSession.Tower.Addresses[0],
}
sq := &sessionQueue{
cfg: cfg,
log: cfg.Log,
commitQueue: list.New(),
pendingQueue: list.New(),
localInit: localInit,
towerAddr: towerAddr,
seqNum: cfg.ClientSession.SeqNum,
retryBackoff: cfg.MinBackoff,
quit: make(chan struct{}),
forceQuit: make(chan struct{}),
shutdown: make(chan struct{}),
}
sq.queueCond = sync.NewCond(&sq.queueMtx)
// The database should return them in sorted order, and session queue's
// sequence number will be equal to that of the last committed update.
for _, update := range sq.cfg.ClientSession.CommittedUpdates {
sq.commitQueue.PushBack(update)
}
return sq
}
// Start idempotently starts the sessionQueue so that it can begin accepting
// backups.
func (q *sessionQueue) Start() {
q.started.Do(func() {
go q.sessionManager()
})
}
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
// will clear all pending tasks in the queue before returning to the caller.
func (q *sessionQueue) Stop() {
q.stopped.Do(func() {
q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
close(q.quit)
q.signalUntilShutdown()
// Skip log if we also force quit.
select {
case <-q.forceQuit:
return
default:
}
q.log.Debugf("SessionQueue(%s) stopped", q.ID())
})
}
// ForceQuit idempotently aborts any clean shutdown in progress and returns to
// he caller after all lingering goroutines have spun down.
func (q *sessionQueue) ForceQuit() {
q.forced.Do(func() {
q.log.Infof("SessionQueue(%s) force quitting...", q.ID())
close(q.forceQuit)
q.signalUntilShutdown()
q.log.Infof("SessionQueue(%s) force quit", q.ID())
})
}
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
// identify this a particular queue.
func (q *sessionQueue) ID() *wtdb.SessionID {
return &q.cfg.ClientSession.ID
}
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
// tower. The session will only be accepted if the queue is not already
// exhausted and the task is successfully bound to the ClientSession.
func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
q.queueCond.L.Lock()
numPending := uint32(q.pendingQueue.Len())
maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
"pending=%d max-updates=%d",
q.ID(), task.id, q.seqNum, numPending, maxUpdates)
// Examine the current reserve status of the session queue.
curStatus := q.reserveStatus()
switch curStatus {
// The session queue is exhausted, and cannot accept the task because it
// is full. Reject the task such that it can be tried against a
// different session.
case reserveExhausted:
q.queueCond.L.Unlock()
return curStatus, false
// The session queue is not exhausted. Compute the sweep and reward
// outputs as a function of the session parameters. If the outputs are
// dusty or uneconomical to backup, the task is rejected and will not be
// tried again.
//
// TODO(conner): queue backups and retry with different session params.
case reserveAvailable:
err := task.bindSession(&q.cfg.ClientSession.ClientSessionBody)
if err != nil {
q.queueCond.L.Unlock()
q.log.Debugf("SessionQueue(%s) rejected %v: %v ",
q.ID(), task.id, err)
return curStatus, false
}
}
// The sweep and reward outputs satisfy the session's policy, queue the
// task for final signing and delivery.
q.pendingQueue.PushBack(task)
// Finally, compute the session's *new* reserve status. This will be
// used by the client to determine if it can continue using this session
// queue, or if it should negotiate a new one.
newStatus := q.reserveStatus()
q.queueCond.L.Unlock()
q.queueCond.Signal()
return newStatus, true
}
// sessionManager is the primary event loop for the sessionQueue, and is
// responsible for encrypting and sending accepted tasks to the tower.
func (q *sessionQueue) sessionManager() {
defer close(q.shutdown)
for {
q.queueCond.L.Lock()
for q.commitQueue.Len() == 0 &&
q.pendingQueue.Len() == 0 {
q.queueCond.Wait()
select {
case <-q.quit:
if q.commitQueue.Len() == 0 &&
q.pendingQueue.Len() == 0 {
q.queueCond.L.Unlock()
return
}
case <-q.forceQuit:
q.queueCond.L.Unlock()
return
default:
}
}
q.queueCond.L.Unlock()
// Exit immediately if a force quit has been requested. If the
// either of the queues still has state updates to send to the
// tower, we may never exit in the above case if we are unable
// to reach the tower for some reason.
select {
case <-q.forceQuit:
return
default:
}
// Initiate a new connection to the watchtower and attempt to
// drain all pending tasks.
q.drainBackups()
}
}
// drainBackups attempts to send all pending updates in the queue to the tower.
func (q *sessionQueue) drainBackups() {
// First, check that we are able to dial this session's tower.
conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionKeyECDH, q.towerAddr)
if err != nil {
q.log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v",
q.ID(), q.towerAddr, err)
q.increaseBackoff()
select {
case <-time.After(q.retryBackoff):
case <-q.forceQuit:
}
return
}
defer conn.Close()
// Begin draining the queue of pending state updates. Before the first
// update is sent, we will precede it with an Init message. If the first
// is successful, subsequent updates can be streamed without sending an
// Init.
for sendInit := true; ; sendInit = false {
// Generate the next state update to upload to the tower. This
// method will first proceed in dequeueing committed updates
// before attempting to dequeue any pending updates.
stateUpdate, isPending, backupID, err := q.nextStateUpdate()
if err != nil {
q.log.Errorf("SessionQueue(%v) unable to get next state "+
"update: %v", q.ID(), err)
return
}
// Now, send the state update to the tower and wait for a reply.
err = q.sendStateUpdate(
conn, stateUpdate, q.localInit, sendInit, isPending,
)
if err != nil {
q.log.Errorf("SessionQueue(%s) unable to send state "+
"update: %v", q.ID(), err)
q.increaseBackoff()
select {
case <-time.After(q.retryBackoff):
case <-q.forceQuit:
}
return
}
q.log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
q.ID(), backupID, stateUpdate.SeqNum)
// If the last task was backed up successfully, we'll exit and
// continue once more tasks are added to the queue. We'll also
// clear any accumulated backoff as this batch was able to be
// sent reliably.
if stateUpdate.IsComplete == 1 {
q.resetBackoff()
return
}
// Always apply a small delay between sends, which makes the
// unit tests more reliable. If we were requested to back off,
// when we will do so.
select {
case <-time.After(time.Millisecond):
case <-q.forceQuit:
return
}
}
}
// nextStateUpdate returns the next wtwire.StateUpdate to upload to the tower.
// If any committed updates are present, this method will reconstruct the state
// update from the committed update using the current last applied value found
// in the database. Otherwise, it will select the next pending update, craft the
// payload, and commit an update before returning the state update to send. The
// boolean value in the response is true if the state update is taken from the
// pending queue, allowing the caller to remove the update from either the
// commit or pending queue if the update is successfully acked.
func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
wtdb.BackupID, error) {
var (
seqNum uint16
update wtdb.CommittedUpdate
isLast bool
isPending bool
)
q.queueCond.L.Lock()
switch {
// If the commit queue is non-empty, parse the next committed update.
case q.commitQueue.Len() > 0:
next := q.commitQueue.Front()
update = next.Value.(wtdb.CommittedUpdate)
seqNum = update.SeqNum
// If this is the last item in the commit queue and no items
// exist in the pending queue, we will use the IsComplete flag
// in the StateUpdate to signal that the tower can release the
// connection after replying to free up resources.
isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
q.queueCond.L.Unlock()
q.log.Debugf("SessionQueue(%s) reprocessing committed state "+
"update for %v seqnum=%d",
q.ID(), update.BackupID, seqNum)
// Otherwise, craft and commit the next update from the pending queue.
default:
isPending = true
// Determine the current sequence number to apply for this
// pending update.
seqNum = q.seqNum + 1
// Obtain the next task from the queue.
next := q.pendingQueue.Front()
task := next.Value.(*backupTask)
// If this is the last item in the pending queue, we will use
// the IsComplete flag in the StateUpdate to signal that the
// tower can release the connection after replying to free up
// resources.
isLast = q.pendingQueue.Len() == 1
q.queueCond.L.Unlock()
hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer)
if err != nil {
// TODO(conner): mark will not send
err := fmt.Errorf("unable to craft session payload: %v",
err)
return nil, false, wtdb.BackupID{}, err
}
// TODO(conner): special case other obscure errors
update = wtdb.CommittedUpdate{
SeqNum: seqNum,
CommittedUpdateBody: wtdb.CommittedUpdateBody{
BackupID: task.id,
Hint: hint,
EncryptedBlob: encBlob,
},
}
q.log.Debugf("SessionQueue(%s) committing state update "+
"%v seqnum=%d", q.ID(), update.BackupID, seqNum)
}
// Before sending the task to the tower, commit the state update
// to disk using the assigned sequence number. If this task has already
// been committed, the call will succeed and only be used for the
// purpose of obtaining the last applied value to send to the tower.
//
// This step ensures that if we crash before receiving an ack that we
// will retransmit the same update. If the tower successfully received
// the update from before, it will reply with an ACK regardless of what
// we send the next time. This step ensures that if we reliably send the
// same update for a given sequence number, to prevent us from thinking
// we backed up a state when we instead backed up another.
lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update)
if err != nil {
// TODO(conner): mark failed/reschedule
err := fmt.Errorf("unable to commit state update for "+
"%v seqnum=%d: %v", update.BackupID, seqNum, err)
return nil, false, wtdb.BackupID{}, err
}
stateUpdate := &wtwire.StateUpdate{
SeqNum: update.SeqNum,
LastApplied: lastApplied,
Hint: update.Hint,
EncryptedBlob: update.EncryptedBlob,
}
// Set the IsComplete flag if this is the last queued item.
if isLast {
stateUpdate.IsComplete = 1
}
return stateUpdate, isPending, update.BackupID, nil
}
// sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes
// the ACK before returning. If sendInit is true, this method will first send
// the localInit message and verify that the tower supports our required feature
// bits. And error is returned if any part of the send fails. The boolean return
// variable indicates whether or not we should back off before attempting to
// send the next state update.
func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
stateUpdate *wtwire.StateUpdate, localInit *wtwire.Init,
sendInit, isPending bool) error {
// If this is the first message being sent to the tower, we must send an
// Init message to establish that server supports the features we
// require.
if sendInit {
// Send Init to tower.
err := q.cfg.SendMessage(conn, q.localInit)
if err != nil {
return err
}
// Receive Init from tower.
remoteMsg, err := q.cfg.ReadMessage(conn)
if err != nil {
return err
}
remoteInit, ok := remoteMsg.(*wtwire.Init)
if !ok {
return fmt.Errorf("watchtower %s responded with %T "+
"to Init", q.towerAddr, remoteMsg)
}
// Validate Init.
err = q.localInit.CheckRemoteInit(
remoteInit, wtwire.FeatureNames,
)
if err != nil {
return err
}
}
// Send StateUpdate to tower.
err := q.cfg.SendMessage(conn, stateUpdate)
if err != nil {
return err
}
// Receive StateUpdate from tower.
remoteMsg, err := q.cfg.ReadMessage(conn)
if err != nil {
return err
}
stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply)
if !ok {
return fmt.Errorf("watchtower %s responded with %T to "+
"StateUpdate", q.towerAddr, remoteMsg)
}
// Process the reply from the tower.
switch stateUpdateReply.Code {
// The tower reported a successful update, validate the response and
// record the last applied returned.
case wtwire.CodeOK:
// TODO(conner): handle other error cases properly, ban towers, etc.
default:
err := fmt.Errorf("received error code %v in "+
"StateUpdateReply for seqnum=%d",
stateUpdateReply.Code, stateUpdate.SeqNum)
q.log.Warnf("SessionQueue(%s) unable to upload state update to "+
"tower=%s: %v", q.ID(), q.towerAddr, err)
return err
}
lastApplied := stateUpdateReply.LastApplied
err = q.cfg.DB.AckUpdate(q.ID(), stateUpdate.SeqNum, lastApplied)
switch {
case err == wtdb.ErrUnallocatedLastApplied:
// TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err)
q.log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err)
return err
case err == wtdb.ErrLastAppliedReversion:
// TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err)
q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.ID(), err)
return err
case err != nil:
err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err)
q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.ID(), err)
return err
}
q.queueCond.L.Lock()
if isPending {
// If a pending update was successfully sent, increment the
// sequence number and remove the item from the queue. This
// ensures the total number of backups in the session remains
// unchanged, which maintains the external view of the session's
// reserve status.
q.seqNum++
q.pendingQueue.Remove(q.pendingQueue.Front())
} else {
// Otherwise, simply remove the update from the committed queue.
// This has no effect on the queues reserve status since the
// update had already been committed.
q.commitQueue.Remove(q.commitQueue.Front())
}
q.queueCond.L.Unlock()
return nil
}
// reserveStatus returns a reserveStatus indicating whether or not the
// sessionQueue can accept another task. reserveAvailable is returned when a
// task can be accepted, and reserveExhausted is returned if the all slots in
// the session have been allocated.
//
// NOTE: This method MUST be called with queueCond's exclusive lock held.
func (q *sessionQueue) reserveStatus() reserveStatus {
numPending := uint32(q.pendingQueue.Len())
maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
if uint32(q.seqNum)+numPending < maxUpdates {
return reserveAvailable
}
return reserveExhausted
}
// resetBackoff returns the connection backoff the minimum configured backoff.
func (q *sessionQueue) resetBackoff() {
q.retryBackoff = q.cfg.MinBackoff
}
// increaseBackoff doubles the current connection backoff, clamping to the
// configured maximum backoff if it would exceed the limit.
func (q *sessionQueue) increaseBackoff() {
q.retryBackoff *= 2
if q.retryBackoff > q.cfg.MaxBackoff {
q.retryBackoff = q.cfg.MaxBackoff
}
}
// signalUntilShutdown strobes the sessionQueue's condition variable until the
// main event loop exits.
func (q *sessionQueue) signalUntilShutdown() {
for {
select {
case <-time.After(time.Millisecond):
q.queueCond.Signal()
case <-q.shutdown:
return
}
}
}
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
// sessionQueue.
type sessionQueueSet map[wtdb.SessionID]*sessionQueue
// Add inserts a sessionQueue into the sessionQueueSet.
func (s *sessionQueueSet) Add(sessionQueue *sessionQueue) {
(*s)[*sessionQueue.ID()] = sessionQueue
}
// ApplyAndWait executes the nil-adic function returned from getApply for each
// sessionQueue in the set in parallel, then waits for all of them to finish
// before returning to the caller.
func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) {
var wg sync.WaitGroup
for _, sessionq := range *s {
wg.Add(1)
go func(sq *sessionQueue) {
defer wg.Done()
getApply(sq)()
}(sessionq)
}
wg.Wait()
}