-
Notifications
You must be signed in to change notification settings - Fork 338
/
accounting.go
778 lines (647 loc) · 23.2 KB
/
accounting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package accounting
import (
"context"
"errors"
"fmt"
"math"
"strings"
"sync"
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/pricing"
"github.com/ethersphere/bee/pkg/settlement"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
var (
_ Interface = (*Accounting)(nil)
balancesPrefix string = "balance_"
balancesSurplusPrefix string = "surplusbalance_"
)
// Interface is the Accounting interface.
type Interface interface {
// Reserve reserves a portion of the balance for peer and attempts settlements if necessary.
// Returns an error if the operation risks exceeding the disconnect threshold or an attempted settlement failed.
//
// This has to be called (always in combination with Release) before a
// Credit action to prevent overspending in case of concurrent requests.
Reserve(ctx context.Context, peer swarm.Address, price uint64) error
// Release releases the reserved funds.
Release(peer swarm.Address, price uint64)
// Credit increases the balance the peer has with us (we "pay" the peer).
Credit(peer swarm.Address, price uint64) error
// Debit increases the balance we have with the peer (we get "paid" back).
Debit(peer swarm.Address, price uint64) error
// Balance returns the current balance for the given peer.
Balance(peer swarm.Address) (int64, error)
// SurplusBalance returns the current surplus balance for the given peer.
SurplusBalance(peer swarm.Address) (int64, error)
// Balances returns balances for all known peers.
Balances() (map[string]int64, error)
// CompensatedBalance returns the current balance deducted by current surplus balance for the given peer.
CompensatedBalance(peer swarm.Address) (int64, error)
// CompensatedBalances returns the compensated balances for all known peers.
CompensatedBalances() (map[string]int64, error)
}
// accountingPeer holds all in-memory accounting information for one peer.
type accountingPeer struct {
lock sync.Mutex // lock to be held during any accounting action for this peer
reservedBalance uint64 // amount currently reserved for active peer interaction
paymentThreshold uint64 // the threshold at which the peer expects us to pay
}
// Accounting is the main implementation of the accounting interface.
type Accounting struct {
// Mutex for accessing the accountingPeers map.
accountingPeersMu sync.Mutex
accountingPeers map[string]*accountingPeer
logger logging.Logger
store storage.StateStorer
// The payment threshold in BZZ we communicate to our peers.
paymentThreshold uint64
// The amount in BZZ we let peers exceed the payment threshold before we
// disconnect them.
paymentTolerance uint64
earlyPayment uint64
settlement settlement.Interface
pricing pricing.Interface
metrics metrics
}
var (
// ErrOverdraft denotes the expected debt in Reserve would exceed the payment thresholds.
ErrOverdraft = errors.New("attempted overdraft")
// ErrDisconnectThresholdExceeded denotes a peer has exceeded the disconnect threshold.
ErrDisconnectThresholdExceeded = errors.New("disconnect threshold exceeded")
// ErrPeerNoBalance is the error returned if no balance in store exists for a peer
ErrPeerNoBalance = errors.New("no balance for peer")
// ErrOverflow denotes an arithmetic operation overflowed.
ErrOverflow = errors.New("overflow error")
// ErrInvalidValue denotes an invalid value read from store
ErrInvalidValue = errors.New("invalid value")
)
// NewAccounting creates a new Accounting instance with the provided options.
func NewAccounting(
PaymentThreshold,
PaymentTolerance,
EarlyPayment uint64,
Logger logging.Logger,
Store storage.StateStorer,
Settlement settlement.Interface,
Pricing pricing.Interface,
) (*Accounting, error) {
if PaymentTolerance+PaymentThreshold > math.MaxInt64 {
return nil, fmt.Errorf("tolerance plus threshold too big: %w", ErrOverflow)
}
return &Accounting{
accountingPeers: make(map[string]*accountingPeer),
paymentThreshold: PaymentThreshold,
paymentTolerance: PaymentTolerance,
earlyPayment: EarlyPayment,
logger: Logger,
store: Store,
settlement: Settlement,
pricing: Pricing,
metrics: newMetrics(),
}, nil
}
// Reserve reserves a portion of the balance for peer and attempts settlements if necessary.
func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
currentBalance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load balance: %w", err)
}
}
// Check for safety of increase of reservedBalance by price
if accountingPeer.reservedBalance+price < accountingPeer.reservedBalance {
return ErrOverflow
}
nextReserved := accountingPeer.reservedBalance + price
// Subtract already reserved amount from actual balance, to get expected balance
expectedBalance, err := subtractI64mU64(currentBalance, nextReserved)
if err != nil {
return err
}
// Determine if we will owe anything to the peer, if we owe less than 0, we conclude we owe nothing
// This conversion is made safe by previous subtractI64mU64 not allowing MinInt64
expectedDebt := -expectedBalance
if expectedDebt < 0 {
expectedDebt = 0
}
threshold := accountingPeer.paymentThreshold
if threshold > a.earlyPayment {
threshold -= a.earlyPayment
} else {
threshold = 0
}
additionalDebt, err := a.SurplusBalance(peer)
if err != nil {
return fmt.Errorf("failed to load surplus balance: %w", err)
}
// uint64 conversion of surplusbalance is safe because surplusbalance is always positive
if additionalDebt < 0 {
return ErrInvalidValue
}
increasedExpectedDebt, err := addI64pU64(expectedDebt, uint64(additionalDebt))
if err != nil {
return err
}
// If our expected debt is less than earlyPayment away from our payment threshold
// and we are actually in debt, trigger settlement.
// we pay early to avoid needlessly blocking request later when concurrent requests occur and we are already close to the payment threshold.
if increasedExpectedDebt >= int64(threshold) && currentBalance < 0 {
err = a.settle(ctx, peer, accountingPeer)
if err != nil {
return fmt.Errorf("failed to settle with peer %v: %v", peer, err)
}
// if we settled successfully our balance is back at 0
// and the expected debt therefore equals next reserved amount
expectedDebt = int64(nextReserved)
increasedExpectedDebt, err = addI64pU64(expectedDebt, uint64(additionalDebt))
if err != nil {
return err
}
}
// if expectedDebt would still exceed the paymentThreshold at this point block this request
// this can happen if there is a large number of concurrent requests to the same peer
if increasedExpectedDebt > int64(accountingPeer.paymentThreshold) {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
}
accountingPeer.reservedBalance = nextReserved
return nil
}
// Release releases reserved funds.
func (a *Accounting) Release(peer swarm.Address, price uint64) {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
a.logger.Errorf("cannot release balance for peer: %v", err)
return
}
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
// NOTE: this should never happen if Reserve and Release calls are paired.
if price > accountingPeer.reservedBalance {
a.logger.Error("attempting to release more balance than was reserved for peer")
accountingPeer.reservedBalance = 0
} else {
accountingPeer.reservedBalance -= price
}
}
// Credit increases the amount of credit we have with the given peer
// (and decreases existing debt).
func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
currentBalance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load balance: %w", err)
}
}
// Calculate next balance by safely decreasing current balance with the price we credit
nextBalance, err := subtractI64mU64(currentBalance, price)
if err != nil {
return err
}
a.logger.Tracef("crediting peer %v with price %d, new balance is %d", peer, price, nextBalance)
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
}
a.metrics.TotalCreditedAmount.Add(float64(price))
a.metrics.CreditEventsCount.Inc()
return nil
}
// Settle all debt with a peer. The lock on the accountingPeer must be held when
// called.
func (a *Accounting) settle(ctx context.Context, peer swarm.Address, balance *accountingPeer) error {
oldBalance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load balance: %w", err)
}
}
// Don't do anything if there is no actual debt.
// This might be the case if the peer owes us and the total reserve for a
// peer exceeds the payment treshold.
if oldBalance >= 0 {
return nil
}
// check safety of the following -1 * int64 conversion, all negative int64 have positive int64 equals except MinInt64
if oldBalance == math.MinInt64 {
return ErrOverflow
}
// This is safe because of the earlier check for oldbalance < 0 and the check for != MinInt64
paymentAmount := uint64(-oldBalance)
nextBalance := 0
// Try to save the next balance first.
// Otherwise we might pay and then not be able to save, forcing us to pay
// again after restart.
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
}
err = a.settlement.Pay(ctx, peer, paymentAmount)
if err != nil {
err = fmt.Errorf("settlement for amount %d failed: %w", paymentAmount, err)
// If the payment didn't succeed we should restore the old balance in
// the state store.
if storeErr := a.store.Put(peerBalanceKey(peer), oldBalance); storeErr != nil {
a.logger.Errorf("failed to restore balance after failed settlement for peer %v: %v", peer, storeErr)
}
return err
}
return nil
}
// Debit increases the amount of debt we have with the given peer (and decreases
// existing credit).
func (a *Accounting) Debit(peer swarm.Address, price uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
cost := price
// see if peer has surplus balance to deduct this transaction of
surplusBalance, err := a.SurplusBalance(peer)
if err != nil {
return fmt.Errorf("failed to get surplus balance: %w", err)
}
if surplusBalance > 0 {
// get new surplus balance after deduct
newSurplusBalance, err := subtractI64mU64(surplusBalance, price)
if err != nil {
return err
}
// if nothing left for debiting, store new surplus balance and return from debit
if newSurplusBalance >= 0 {
a.logger.Tracef("surplus debiting peer %v with value %d, new surplus balance is %d", peer, price, newSurplusBalance)
err = a.store.Put(peerSurplusBalanceKey(peer), newSurplusBalance)
if err != nil {
return fmt.Errorf("failed to persist surplus balance: %w", err)
}
// count debit operations, terminate early
a.metrics.TotalDebitedAmount.Add(float64(price))
a.metrics.DebitEventsCount.Inc()
return nil
}
// if surplus balance didn't cover full transaction, let's continue with leftover part as cost
debitIncrease, err := subtractU64mI64(price, surplusBalance)
if err != nil {
return err
}
// conversion to uint64 is safe because we know the relationship between the values by now, but let's make a sanity check
if debitIncrease <= 0 {
return fmt.Errorf("sanity check failed for partial debit after surplus balance drawn")
}
cost = uint64(debitIncrease)
// if we still have something to debit, than have run out of surplus balance,
// let's store 0 as surplus balance
a.logger.Tracef("surplus debiting peer %v with value %d, new surplus balance is 0", peer, debitIncrease)
err = a.store.Put(peerSurplusBalanceKey(peer), 0)
if err != nil {
return fmt.Errorf("failed to persist surplus balance: %w", err)
}
}
currentBalance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load balance: %w", err)
}
}
// Get nextBalance by safely increasing current balance with price
nextBalance, err := addI64pU64(currentBalance, cost)
if err != nil {
return err
}
a.logger.Tracef("debiting peer %v with price %d, new balance is %d", peer, price, nextBalance)
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
}
a.metrics.TotalDebitedAmount.Add(float64(price))
a.metrics.DebitEventsCount.Inc()
if nextBalance >= int64(a.paymentThreshold+a.paymentTolerance) {
// peer too much in debt
a.metrics.AccountingDisconnectsCount.Inc()
return p2p.NewBlockPeerError(10000*time.Hour, ErrDisconnectThresholdExceeded)
}
return nil
}
// Balance returns the current balance for the given peer.
func (a *Accounting) Balance(peer swarm.Address) (balance int64, err error) {
err = a.store.Get(peerBalanceKey(peer), &balance)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return 0, ErrPeerNoBalance
}
return 0, err
}
return balance, nil
}
// SurplusBalance returns the current balance for the given peer.
func (a *Accounting) SurplusBalance(peer swarm.Address) (balance int64, err error) {
err = a.store.Get(peerSurplusBalanceKey(peer), &balance)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return 0, nil
}
return 0, err
}
return balance, nil
}
// CompensatedBalance returns balance decreased by surplus balance
func (a *Accounting) CompensatedBalance(peer swarm.Address) (compensated int64, err error) {
surplus, err := a.SurplusBalance(peer)
if err != nil {
return 0, err
}
if surplus < 0 {
return 0, ErrInvalidValue
}
balance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return 0, err
}
}
// if surplus is 0 and peer has no balance, propagate ErrPeerNoBalance
if surplus == 0 && errors.Is(err, ErrPeerNoBalance) {
return 0, err
}
// Compensated balance is balance decreased by surplus balance
compensated, err = subtractI64mU64(balance, uint64(surplus))
if err != nil {
return 0, err
}
return compensated, nil
}
// peerBalanceKey returns the balance storage key for the given peer.
func peerBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesPrefix, peer.String())
}
// peerSurplusBalanceKey returns the surplus balance storage key for the given peer
func peerSurplusBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesSurplusPrefix, peer.String())
}
// getAccountingPeer returns the accountingPeer for a given swarm address.
// If not found in memory it will initialize it.
func (a *Accounting) getAccountingPeer(peer swarm.Address) (*accountingPeer, error) {
a.accountingPeersMu.Lock()
defer a.accountingPeersMu.Unlock()
peerData, ok := a.accountingPeers[peer.String()]
if !ok {
peerData = &accountingPeer{
reservedBalance: 0,
// initially assume the peer has the same threshold as us
paymentThreshold: a.paymentThreshold,
}
a.accountingPeers[peer.String()] = peerData
}
return peerData, nil
}
// Balances gets balances for all peers from store.
func (a *Accounting) Balances() (map[string]int64, error) {
s := make(map[string]int64)
err := a.store.Iterate(balancesPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := balanceKeyPeer(key)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %v", string(key), err)
}
if _, ok := s[addr.String()]; !ok {
var storevalue int64
err = a.store.Get(peerBalanceKey(addr), &storevalue)
if err != nil {
return false, fmt.Errorf("get peer %s balance: %v", addr.String(), err)
}
s[addr.String()] = storevalue
}
return false, nil
})
if err != nil {
return nil, err
}
return s, nil
}
// Balances gets balances for all peers from store.
func (a *Accounting) CompensatedBalances() (map[string]int64, error) {
s := make(map[string]int64)
err := a.store.Iterate(balancesPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := balanceKeyPeer(key)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %v", string(key), err)
}
if _, ok := s[addr.String()]; !ok {
value, err := a.CompensatedBalance(addr)
if err != nil {
return false, fmt.Errorf("get peer %s balance: %v", addr.String(), err)
}
s[addr.String()] = value
}
return false, nil
})
if err != nil {
return nil, err
}
err = a.store.Iterate(balancesSurplusPrefix, func(key, val []byte) (stop bool, err error) {
addr, err := surplusBalanceKeyPeer(key)
if err != nil {
return false, fmt.Errorf("parse address from key: %s: %v", string(key), err)
}
if _, ok := s[addr.String()]; !ok {
value, err := a.CompensatedBalance(addr)
if err != nil {
return false, fmt.Errorf("get peer %s balance: %v", addr.String(), err)
}
s[addr.String()] = value
}
return false, nil
})
if err != nil {
return nil, err
}
return s, nil
}
// balanceKeyPeer returns the embedded peer from the balance storage key.
func balanceKeyPeer(key []byte) (swarm.Address, error) {
k := string(key)
split := strings.SplitAfter(k, balancesPrefix)
if len(split) != 2 {
return swarm.ZeroAddress, errors.New("no peer in key")
}
addr, err := swarm.ParseHexAddress(split[1])
if err != nil {
return swarm.ZeroAddress, err
}
return addr, nil
}
func surplusBalanceKeyPeer(key []byte) (swarm.Address, error) {
k := string(key)
split := strings.SplitAfter(k, balancesSurplusPrefix)
if len(split) != 2 {
return swarm.ZeroAddress, errors.New("no peer in key")
}
addr, err := swarm.ParseHexAddress(split[1])
if err != nil {
return swarm.ZeroAddress, err
}
return addr, nil
}
// NotifyPayment is called by Settlement when we receive a payment.
func (a *Accounting) NotifyPayment(peer swarm.Address, amount uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
currentBalance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return err
}
}
// if balance is already negative or zero, we credit full amount received to surplus balance and terminate early
if currentBalance <= 0 {
surplus, err := a.SurplusBalance(peer)
if err != nil {
return fmt.Errorf("failed to get surplus balance: %w", err)
}
increasedSurplus, err := addI64pU64(surplus, amount)
if err != nil {
return err
}
a.logger.Tracef("surplus crediting peer %v with amount %d due to payment, new surplus balance is %d", peer, amount, increasedSurplus)
err = a.store.Put(peerSurplusBalanceKey(peer), increasedSurplus)
if err != nil {
return fmt.Errorf("failed to persist surplus balance: %w", err)
}
return nil
}
// if current balance is positive, let's make a partial credit to
newBalance, err := subtractI64mU64(currentBalance, amount)
if err != nil {
return err
}
// Don't allow a payment to put us into debt
// This is to prevent another node tricking us into settling by settling
// first (e.g. send a bouncing cheque to trigger an honest cheque in swap).
nextBalance := newBalance
if newBalance < 0 {
nextBalance = 0
}
a.logger.Tracef("crediting peer %v with amount %d due to payment, new balance is %d", peer, amount, nextBalance)
err = a.store.Put(peerBalanceKey(peer), nextBalance)
if err != nil {
return fmt.Errorf("failed to persist balance: %w", err)
}
// If payment would have put us into debt, rather, let's add to surplusBalance,
// so as that an oversettlement attempt creates balance for future forwarding services
// charges to be deducted of
if newBalance < 0 {
surplusGrowth, err := subtractU64mI64(amount, currentBalance)
if err != nil {
return err
}
surplus, err := a.SurplusBalance(peer)
if err != nil {
return fmt.Errorf("failed to get surplus balance: %w", err)
}
increasedSurplus := surplus + surplusGrowth
if increasedSurplus < surplus {
return ErrOverflow
}
a.logger.Tracef("surplus crediting peer %v with amount %d due to payment, new surplus balance is %d", peer, surplusGrowth, increasedSurplus)
err = a.store.Put(peerSurplusBalanceKey(peer), increasedSurplus)
if err != nil {
return fmt.Errorf("failed to persist surplus balance: %w", err)
}
}
return nil
}
// AsyncNotifyPayment calls notify payment in a go routine.
// This is needed when accounting needs to be notified but the accounting lock is already held.
func (a *Accounting) AsyncNotifyPayment(peer swarm.Address, amount uint64) error {
go func() {
err := a.NotifyPayment(peer, amount)
if err != nil {
a.logger.Errorf("failed to notify accounting of payment: %v", err)
}
}()
return nil
}
// subtractI64mU64 is a helper function for safe subtraction of Int64 - Uint64
// It checks for
// - overflow safety in conversion of uint64 to int64
// - safety of the arithmetic
// - whether ( -1 * result ) is still Int64, as MinInt64 in absolute sense is 1 larger than MaxInt64
// If result is MinInt64, we also return overflow error, for two reasons:
// - in some cases we are going to use -1 * result in the following operations, which is secured by this check
// - we also do not want to possibly store this value as balance, even if ( -1 * result ) is not used immediately afterwards, because it could
// disable settleing for this amount as the value would create overflow
func subtractI64mU64(base int64, subtracted uint64) (result int64, err error) {
if subtracted > math.MaxInt64 {
return 0, ErrOverflow
}
result = base - int64(subtracted)
if result > base {
return 0, ErrOverflow
}
if result == math.MinInt64 {
return 0, ErrOverflow
}
return result, nil
}
func subtractU64mI64(base uint64, subtracted int64) (result int64, err error) {
if base > math.MaxInt64 {
return 0, ErrOverflow
}
// base is positive, overflow can happen by subtracting negative number
result = int64(base) - subtracted
if subtracted < 0 {
if result < int64(base) {
return 0, ErrOverflow
}
}
return result, nil
}
// addI64pU64 is a helper function for safe addition of Int64 + Uint64
// It checks for
// - overflow safety in conversion of uint64 to int64
// - safety of the arithmetic
func addI64pU64(a int64, b uint64) (result int64, err error) {
if b > math.MaxInt64 {
return 0, ErrOverflow
}
result = a + int64(b)
if result < a {
return 0, ErrOverflow
}
return result, nil
}
// NotifyPaymentThreshold should be called to notify accounting of changes in the payment threshold
func (a *Accounting) NotifyPaymentThreshold(peer swarm.Address, paymentThreshold uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
accountingPeer.paymentThreshold = paymentThreshold
return nil
}