forked from lightningnetwork/lnd
-
Notifications
You must be signed in to change notification settings - Fork 24
/
payment_control.go
807 lines (662 loc) · 22.1 KB
/
payment_control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
package channeldb
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
"github.com/decred/dcrlnd/kvdb"
"github.com/decred/dcrlnd/lntypes"
)
const (
// paymentSeqBlockSize is the block size used when we batch allocate
// payment sequences for future payments.
paymentSeqBlockSize = 1000
)
var (
// ErrAlreadyPaid signals we have already paid this payment hash.
ErrAlreadyPaid = errors.New("invoice is already paid")
// ErrPaymentInFlight signals that payment for this payment hash is
// already "in flight" on the network.
ErrPaymentInFlight = errors.New("payment is in transition")
// ErrPaymentNotInitiated is returned if the payment wasn't initiated.
ErrPaymentNotInitiated = errors.New("payment isn't initiated")
// ErrPaymentAlreadySucceeded is returned in the event we attempt to
// change the status of a payment already succeeded.
ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
// ErrPaymentAlreadyFailed is returned in the event we attempt to alter
// a failed payment.
ErrPaymentAlreadyFailed = errors.New("payment has already failed")
// ErrUnknownPaymentStatus is returned when we do not recognize the
// existing state of a payment.
ErrUnknownPaymentStatus = errors.New("unknown payment status")
// ErrPaymentTerminal is returned if we attempt to alter a payment that
// already has reached a terminal condition.
ErrPaymentTerminal = errors.New("payment has reached terminal condition")
// ErrAttemptAlreadySettled is returned if we try to alter an already
// settled HTLC attempt.
ErrAttemptAlreadySettled = errors.New("attempt already settled")
// ErrAttemptAlreadyFailed is returned if we try to alter an already
// failed HTLC attempt.
ErrAttemptAlreadyFailed = errors.New("attempt already failed")
// ErrValueMismatch is returned if we try to register a non-MPP attempt
// with an amount that doesn't match the payment amount.
ErrValueMismatch = errors.New("attempted value doesn't match payment" +
"amount")
// ErrValueExceedsAmt is returned if we try to register an attempt that
// would take the total sent amount above the payment amount.
ErrValueExceedsAmt = errors.New("attempted value exceeds payment" +
"amount")
// ErrNonMPPayment is returned if we try to register an MPP attempt for
// a payment that already has a non-MPP attempt regitered.
ErrNonMPPayment = errors.New("payment has non-MPP attempts")
// ErrMPPayment is returned if we try to register a non-MPP attempt for
// a payment that already has an MPP attempt regitered.
ErrMPPayment = errors.New("payment has MPP attempts")
// ErrMPPPaymentAddrMismatch is returned if we try to register an MPP
// shard where the payment address doesn't match existing shards.
ErrMPPPaymentAddrMismatch = errors.New("payment address mismatch")
// ErrMPPTotalAmountMismatch is returned if we try to register an MPP
// shard where the total amount doesn't match existing shards.
ErrMPPTotalAmountMismatch = errors.New("mp payment total amount mismatch")
// errNoAttemptInfo is returned when no attempt info is stored yet.
errNoAttemptInfo = errors.New("unable to find attempt info for " +
"inflight payment")
// errNoSequenceNrIndex is returned when an attempt to lookup a payment
// index is made for a sequence number that is not indexed.
errNoSequenceNrIndex = errors.New("payment sequence number index " +
"does not exist")
)
// PaymentControl implements persistence for payments and payment attempts.
type PaymentControl struct {
paymentSeqMx sync.Mutex
currPaymentSeq uint64
storedPaymentSeq uint64
db *DB
}
// NewPaymentControl creates a new instance of the PaymentControl.
func NewPaymentControl(db *DB) *PaymentControl {
return &PaymentControl{
db: db,
}
}
// InitPayment checks or records the given PaymentCreationInfo with the DB,
// making sure it does not already exist as an in-flight payment. When this
// method returns successfully, the payment is guranteeed to be in the InFlight
// state.
func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
info *PaymentCreationInfo) error {
// Obtain a new sequence number for this payment. This is used
// to sort the payments in order of creation, and also acts as
// a unique identifier for each payment.
sequenceNum, err := p.nextPaymentSequence()
if err != nil {
return err
}
var b bytes.Buffer
if err := serializePaymentCreationInfo(&b, info); err != nil {
return err
}
infoBytes := b.Bytes()
var updateErr error
err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
// Reset the update error, to avoid carrying over an error
// from a previous execution of the batched db transaction.
updateErr = nil
prefetchPayment(tx, paymentHash)
bucket, err := createPaymentBucket(tx, paymentHash)
if err != nil {
return err
}
// Get the existing status of this payment, if any.
paymentStatus, err := fetchPaymentStatus(bucket)
if err != nil {
return err
}
switch paymentStatus {
// We allow retrying failed payments.
case StatusFailed:
// This is a new payment that is being initialized for the
// first time.
case StatusUnknown:
// We already have an InFlight payment on the network. We will
// disallow any new payments.
case StatusInFlight:
updateErr = ErrPaymentInFlight
return nil
// We've already succeeded a payment to this payment hash,
// forbid the switch from sending another.
case StatusSucceeded:
updateErr = ErrAlreadyPaid
return nil
default:
updateErr = ErrUnknownPaymentStatus
return nil
}
// Before we set our new sequence number, we check whether this
// payment has a previously set sequence number and remove its
// index entry if it exists. This happens in the case where we
// have a previously attempted payment which was left in a state
// where we can retry.
seqBytes := bucket.Get(paymentSequenceKey)
if seqBytes != nil {
indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
if err := indexBucket.Delete(seqBytes); err != nil {
return err
}
}
// Once we have obtained a sequence number, we add an entry
// to our index bucket which will map the sequence number to
// our payment identifier.
err = createPaymentIndexEntry(
tx, sequenceNum, info.PaymentIdentifier,
)
if err != nil {
return err
}
err = bucket.Put(paymentSequenceKey, sequenceNum)
if err != nil {
return err
}
// Add the payment info to the bucket, which contains the
// static information for this payment
err = bucket.Put(paymentCreationInfoKey, infoBytes)
if err != nil {
return err
}
// We'll delete any lingering HTLCs to start with, in case we
// are initializing a payment that was attempted earlier, but
// left in a state where we could retry.
err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
if err != nil && err != kvdb.ErrBucketNotFound {
return err
}
// Mark payment as inflight in the inflight payments index.
err = createPaymentInflightIndexEntry(tx, sequenceNum)
if err != nil {
return err
}
// Also delete any lingering failure info now that we are
// re-attempting.
return bucket.Delete(paymentFailInfoKey)
})
if err != nil {
return err
}
return updateErr
}
// paymentIndexTypeHash is a payment index type which indicates that we have
// created an index of payment sequence number to payment hash.
type paymentIndexType uint8
// paymentIndexTypeHash is a payment index type which indicates that we have
// created an index of payment sequence number to payment hash.
const paymentIndexTypeHash paymentIndexType = 0
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
// index produced contains a payment index type (which can be used in future to
// signal different payment index types) and the payment identifier.
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
id lntypes.Hash) error {
var b bytes.Buffer
if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
return err
}
indexes := tx.ReadWriteBucket(paymentsIndexBucket)
return indexes.Put(sequenceNumber, b.Bytes())
}
// deserializePaymentIndex deserializes a payment index entry. This function
// currently only supports deserialization of payment hash indexes, and will
// fail for other types.
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
var (
indexType paymentIndexType
paymentHash []byte
)
if err := ReadElements(r, &indexType, &paymentHash); err != nil {
return lntypes.Hash{}, err
}
// While we only have on payment index type, we do not need to use our
// index type to deserialize the index. However, we sanity check that
// this type is as expected, since we had to read it out anyway.
if indexType != paymentIndexTypeHash {
return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
"type: %v", indexType)
}
hash, err := lntypes.MakeHash(paymentHash)
if err != nil {
return lntypes.Hash{}, err
}
return hash, nil
}
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
// DB.
func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
attempt *HTLCAttemptInfo) (*MPPayment, error) {
// Serialize the information before opening the db transaction.
var a bytes.Buffer
err := serializeHTLCAttemptInfo(&a, attempt)
if err != nil {
return nil, err
}
htlcInfoBytes := a.Bytes()
htlcIDBytes := make([]byte, 8)
binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
var payment *MPPayment
err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
prefetchPayment(tx, paymentHash)
bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
if err != nil {
return err
}
p, err := fetchPayment(bucket)
if err != nil {
return err
}
// We cannot register a new attempt if the payment already has
// reached a terminal condition. We check this before
// ensureInFlight because it is a more general check.
settle, fail := p.TerminalInfo()
if settle != nil || fail != nil {
return ErrPaymentTerminal
}
// Ensure the payment is in-flight.
if err := ensureInFlight(p); err != nil {
return err
}
// Make sure any existing shards match the new one with regards
// to MPP options.
mpp := attempt.Route.FinalHop().MPP
for _, h := range p.InFlightHTLCs() {
hMpp := h.Route.FinalHop().MPP
switch {
// We tried to register a non-MPP attempt for a MPP
// payment.
case mpp == nil && hMpp != nil:
return ErrMPPayment
// We tried to register a MPP shard for a non-MPP
// payment.
case mpp != nil && hMpp == nil:
return ErrNonMPPayment
// Non-MPP payment, nothing more to validate.
case mpp == nil:
continue
}
// Check that MPP options match.
if mpp.PaymentAddr() != hMpp.PaymentAddr() {
return ErrMPPPaymentAddrMismatch
}
if mpp.TotalMAtoms() != hMpp.TotalMAtoms() {
return ErrMPPTotalAmountMismatch
}
}
// If this is a non-MPP attempt, it must match the total amount
// exactly.
amt := attempt.Route.ReceiverAmt()
if mpp == nil && amt != p.Info.Value {
return ErrValueMismatch
}
// Ensure we aren't sending more than the total payment amount.
sentAmt, _ := p.SentAmt()
if sentAmt+amt > p.Info.Value {
return ErrValueExceedsAmt
}
htlcsBucket, err := bucket.CreateBucketIfNotExists(
paymentHtlcsBucket,
)
if err != nil {
return err
}
err = htlcsBucket.Put(
htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
htlcInfoBytes,
)
if err != nil {
return err
}
// Retrieve attempt info for the notification.
payment, err = fetchPayment(bucket)
if err != nil {
return err
}
err = updatePaymentInflightIndexEntry(tx, payment)
return err
})
if err != nil {
return nil, err
}
return payment, err
}
// SettleAttempt marks the given attempt settled with the preimage. If this is
// a multi shard payment, this might implicitly mean that the full payment
// succeeded.
//
// After invoking this method, InitPayment should always return an error to
// prevent us from making duplicate payments to the same payment hash. The
// provided preimage is atomically saved to the DB for record keeping.
func (p *PaymentControl) SettleAttempt(hash lntypes.Hash,
attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
var b bytes.Buffer
if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
return nil, err
}
settleBytes := b.Bytes()
return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
}
// FailAttempt marks the given payment attempt failed.
func (p *PaymentControl) FailAttempt(hash lntypes.Hash,
attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
var b bytes.Buffer
if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
return nil, err
}
failBytes := b.Bytes()
return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
}
// updateHtlcKey updates a database key for the specified htlc.
func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
attemptID uint64, key, value []byte) (*MPPayment, error) {
aid := make([]byte, 8)
binary.BigEndian.PutUint64(aid, attemptID)
var payment *MPPayment
err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
payment = nil
prefetchPayment(tx, paymentHash)
bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
if err != nil {
return err
}
p, err := fetchPayment(bucket)
if err != nil {
return err
}
// We can only update keys of in-flight payments. We allow
// updating keys even if the payment has reached a terminal
// condition, since the HTLC outcomes must still be updated.
if err := ensureInFlight(p); err != nil {
return err
}
htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
if htlcsBucket == nil {
return fmt.Errorf("htlcs bucket not found")
}
if htlcsBucket.Get(htlcBucketKey(htlcAttemptInfoKey, aid)) == nil {
return fmt.Errorf("HTLC with ID %v not registered",
attemptID)
}
// Make sure the shard is not already failed or settled.
if htlcsBucket.Get(htlcBucketKey(htlcFailInfoKey, aid)) != nil {
return ErrAttemptAlreadyFailed
}
if htlcsBucket.Get(htlcBucketKey(htlcSettleInfoKey, aid)) != nil {
return ErrAttemptAlreadySettled
}
// Add or update the key for this htlc.
err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
if err != nil {
return err
}
// Retrieve attempt info for the notification.
payment, err = fetchPayment(bucket)
if err != nil {
return err
}
err = updatePaymentInflightIndexEntry(tx, payment)
return err
})
if err != nil {
return nil, err
}
return payment, err
}
// Fail transitions a payment into the Failed state, and records the reason the
// payment failed. After invoking this method, InitPayment should return nil on
// its next call for this payment hash, allowing the switch to make a
// subsequent payment.
func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
reason FailureReason) (*MPPayment, error) {
var (
updateErr error
payment *MPPayment
)
err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
// Reset the update error, to avoid carrying over an error
// from a previous execution of the batched db transaction.
updateErr = nil
payment = nil
prefetchPayment(tx, paymentHash)
bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
if err == ErrPaymentNotInitiated {
updateErr = ErrPaymentNotInitiated
return nil
} else if err != nil {
return err
}
// We mark the payment as failed as long as it is known. This
// lets the last attempt to fail with a terminal write its
// failure to the PaymentControl without synchronizing with
// other attempts.
paymentStatus, err := fetchPaymentStatus(bucket)
if err != nil {
return err
}
if paymentStatus == StatusUnknown {
updateErr = ErrPaymentNotInitiated
return nil
}
// Put the failure reason in the bucket for record keeping.
v := []byte{byte(reason)}
err = bucket.Put(paymentFailInfoKey, v)
if err != nil {
return err
}
// Retrieve attempt info for the notification, if available.
payment, err = fetchPayment(bucket)
if err != nil {
return err
}
err = updatePaymentInflightIndexEntry(tx, payment)
return err
})
if err != nil {
return nil, err
}
return payment, updateErr
}
// FetchPayment returns information about a payment from the database.
func (p *PaymentControl) FetchPayment(paymentHash lntypes.Hash) (
*MPPayment, error) {
var payment *MPPayment
err := kvdb.View(p.db, func(tx kvdb.RTx) error {
prefetchPayment(tx, paymentHash)
bucket, err := fetchPaymentBucket(tx, paymentHash)
if err != nil {
return err
}
payment, err = fetchPayment(bucket)
return err
}, func() {
payment = nil
})
if err != nil {
return nil, err
}
return payment, nil
}
// prefetchPayment attempts to prefetch as much of the payment as possible to
// reduce DB roundtrips.
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
rb := kvdb.RootBucket(tx)
kvdb.Prefetch(
rb,
[]string{
// Prefetch all keys in the payment's bucket.
string(paymentsRootBucket),
string(paymentHash[:]),
},
[]string{
// Prefetch all keys in the payment's htlc bucket.
string(paymentsRootBucket),
string(paymentHash[:]),
string(paymentHtlcsBucket),
},
)
}
// createPaymentBucket creates or fetches the sub-bucket assigned to this
// payment hash.
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
kvdb.RwBucket, error) {
payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
if err != nil {
return nil, err
}
return payments.CreateBucketIfNotExists(paymentHash[:])
}
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
// the bucket does not exist, it returns ErrPaymentNotInitiated.
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
kvdb.RBucket, error) {
payments := tx.ReadBucket(paymentsRootBucket)
if payments == nil {
return nil, ErrPaymentNotInitiated
}
bucket := payments.NestedReadBucket(paymentHash[:])
if bucket == nil {
return nil, ErrPaymentNotInitiated
}
return bucket, nil
}
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
// bucket that can be written to.
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
kvdb.RwBucket, error) {
payments := tx.ReadWriteBucket(paymentsRootBucket)
if payments == nil {
return nil, ErrPaymentNotInitiated
}
bucket := payments.NestedReadWriteBucket(paymentHash[:])
if bucket == nil {
return nil, ErrPaymentNotInitiated
}
return bucket, nil
}
// nextPaymentSequence returns the next sequence number to store for a new
// payment.
func (p *PaymentControl) nextPaymentSequence() ([]byte, error) {
p.paymentSeqMx.Lock()
defer p.paymentSeqMx.Unlock()
// Set a new upper bound in the DB every 1000 payments to avoid
// conflicts on the sequence when using etcd.
if p.currPaymentSeq == p.storedPaymentSeq {
var currPaymentSeq, newUpperBound uint64
if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
paymentsBucket, err := tx.CreateTopLevelBucket(
paymentsRootBucket,
)
if err != nil {
return err
}
currPaymentSeq = paymentsBucket.Sequence()
newUpperBound = currPaymentSeq + paymentSeqBlockSize
return paymentsBucket.SetSequence(newUpperBound)
}, func() {}); err != nil {
return nil, err
}
// We lazy initialize the cached currPaymentSeq here using the
// first nextPaymentSequence() call. This if statement will auto
// initialize our stored currPaymentSeq, since by default both
// this variable and storedPaymentSeq are zero which in turn
// will have us fetch the current values from the DB.
if p.currPaymentSeq == 0 {
p.currPaymentSeq = currPaymentSeq
}
p.storedPaymentSeq = newUpperBound
}
p.currPaymentSeq++
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, p.currPaymentSeq)
return b, nil
}
// fetchPaymentStatus fetches the payment status of the payment. If the payment
// isn't found, it will default to "StatusUnknown".
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
// Creation info should be set for all payments, regardless of state.
// If not, it is unknown.
if bucket.Get(paymentCreationInfoKey) == nil {
return StatusUnknown, nil
}
payment, err := fetchPayment(bucket)
if err != nil {
return 0, err
}
return payment.Status, nil
}
// ensureInFlight checks whether the payment found in the given bucket has
// status InFlight, and returns an error otherwise. This should be used to
// ensure we only mark in-flight payments as succeeded or failed.
func ensureInFlight(payment *MPPayment) error {
paymentStatus := payment.Status
switch {
// The payment was indeed InFlight.
case paymentStatus == StatusInFlight:
return nil
// Our records show the payment as unknown, meaning it never
// should have left the switch.
case paymentStatus == StatusUnknown:
return ErrPaymentNotInitiated
// The payment succeeded previously.
case paymentStatus == StatusSucceeded:
return ErrPaymentAlreadySucceeded
// The payment was already failed.
case paymentStatus == StatusFailed:
return ErrPaymentAlreadyFailed
default:
return ErrUnknownPaymentStatus
}
}
// FetchInFlightPayments returns all payments with status InFlight.
func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
var inFlights []*MPPayment
err := kvdb.View(p.db, func(tx kvdb.RTx) error {
payments := tx.ReadBucket(paymentsRootBucket)
if payments == nil {
return nil
}
// Try to load the list of inflight payments by index. If the
// index does not exist (which is signalled by a specific error),
// continue attempting to list inflight payments
var err error
inFlights, err = fetchInflightPaymentsByIndex(tx)
if err == nil {
return nil
}
if err != nil && !errors.Is(err, errPaymentsInflightIndexNotExists) {
return err
}
return payments.ForEach(func(k, _ []byte) error {
bucket := payments.NestedReadBucket(k)
if bucket == nil {
return fmt.Errorf("non bucket element")
}
// If the status is not InFlight, we can return early.
paymentStatus, err := fetchPaymentStatus(bucket)
if err != nil {
return err
}
if paymentStatus != StatusInFlight {
return nil
}
p, err := fetchPayment(bucket)
if err != nil {
return err
}
inFlights = append(inFlights, p)
return nil
})
}, func() {
inFlights = nil
})
if err != nil {
return nil, err
}
return inFlights, nil
}