From baec241787fc1b7632f2c8f182aadd06c073cdd3 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Mon, 7 Jun 2021 18:15:07 +0200 Subject: [PATCH] feat: block on swap payment --- pkg/accounting/accounting.go | 73 ++++++++++++++++++++++++------- pkg/accounting/accounting_test.go | 5 ++- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/pkg/accounting/accounting.go b/pkg/accounting/accounting.go index c15fd958569..e0eeea0c6d1 100644 --- a/pkg/accounting/accounting.go +++ b/pkg/accounting/accounting.go @@ -88,6 +88,7 @@ type accountingPeer struct { paymentThreshold *big.Int // the threshold at which the peer expects us to pay refreshTimestamp int64 // last time we attempted time-based settlement paymentOngoing bool // indicate if we are currently settling with the peer + paymentChan chan struct{} } // Accounting is the main implementation of the accounting interface. @@ -156,6 +157,33 @@ func NewAccounting( }, nil } +func (a *Accounting) increasedExpectedDebt(peer swarm.Address, accountingPeer *accountingPeer, bigPrice *big.Int) (*big.Int, error) { + currentBalance, err := a.Balance(peer) + if err != nil { + if !errors.Is(err, ErrPeerNoBalance) { + return nil, fmt.Errorf("failed to load balance: %w", err) + } + } + currentDebt := new(big.Int).Neg(currentBalance) + if currentDebt.Cmp(big.NewInt(0)) < 0 { + currentDebt.SetInt64(0) + } + + nextReserved := new(big.Int).Add(accountingPeer.reservedBalance, bigPrice) + + // debt if all reserved operations are successfully credited excluding debt created by surplus balance + expectedDebt := new(big.Int).Add(currentDebt, nextReserved) + + // additionalDebt is debt created by incoming payments which we don't consider debt for monetary settlement purposes + additionalDebt, err := a.SurplusBalance(peer) + if err != nil { + return nil, fmt.Errorf("failed to load surplus balance: %w", err) + } + + // debt if all reserved operations are successfully credited including debt created by surplus balance + return new(big.Int).Add(expectedDebt, additionalDebt), nil +} + // Reserve reserves a portion of the balance for peer and attempts settlements if necessary. func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint64) error { accountingPeer := a.getAccountingPeer(peer) @@ -171,16 +199,8 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint return fmt.Errorf("failed to load balance: %w", err) } } - currentDebt := new(big.Int).Neg(currentBalance) - if currentDebt.Cmp(big.NewInt(0)) < 0 { - currentDebt.SetInt64(0) - } bigPrice := new(big.Int).SetUint64(price) - nextReserved := new(big.Int).Add(accountingPeer.reservedBalance, bigPrice) - - // debt if all reserved operations are successfully credited excluding debt created by surplus balance - expectedDebt := new(big.Int).Add(currentDebt, nextReserved) threshold := new(big.Int).Set(accountingPeer.paymentThreshold) if threshold.Cmp(a.earlyPayment) > 0 { @@ -189,14 +209,10 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint threshold.SetInt64(0) } - // additionalDebt is debt created by incoming payments which we don't consider debt for monetary settlement purposes - additionalDebt, err := a.SurplusBalance(peer) + increasedExpectedDebt, err := a.increasedExpectedDebt(peer, accountingPeer, bigPrice) if err != nil { - return fmt.Errorf("failed to load surplus balance: %w", err) + return err } - - // debt if all reserved operations are successfully credited including debt created by surplus balance - increasedExpectedDebt := new(big.Int).Add(expectedDebt, additionalDebt) // debt if all reserved operations are successfully credited and all shadow reserved operations are debited including debt created by surplus balance // in other words this the debt the other node sees if everything pending is successful increasedExpectedDebtReduced := new(big.Int).Sub(increasedExpectedDebt, accountingPeer.shadowReservedBalance) @@ -214,11 +230,32 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint // if expectedDebt would still exceed the paymentThreshold at this point block this request // this can happen if there is a large number of concurrent requests to the same peer if increasedExpectedDebt.Cmp(accountingPeer.paymentThreshold) > 0 { - a.metrics.AccountingBlocksCount.Inc() - return ErrOverdraft + if accountingPeer.paymentOngoing { + accountingPeer.lock.Unlock() + select { + case <-accountingPeer.paymentChan: + case <-ctx.Done(): + accountingPeer.lock.Lock() + return ErrOverdraft + } + accountingPeer.lock.Lock() + + increasedExpectedDebt, err = a.increasedExpectedDebt(peer, accountingPeer, bigPrice) + if err != nil { + return err + } + + if increasedExpectedDebt.Cmp(accountingPeer.paymentThreshold) > 0 { + a.metrics.AccountingBlocksCount.Inc() + return ErrOverdraft + } + } else { + a.metrics.AccountingBlocksCount.Inc() + return ErrOverdraft + } } - accountingPeer.reservedBalance = nextReserved + accountingPeer.reservedBalance = new(big.Int).Add(accountingPeer.reservedBalance, bigPrice) return nil } @@ -323,6 +360,7 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error { // if the remaining debt is still larger than some minimum amount, trigger monetary settlement if paymentAmount.Cmp(a.minimumPayment) >= 0 { balance.paymentOngoing = true + balance.paymentChan = make(chan struct{}) // add settled amount to shadow reserve before sending it balance.shadowReservedBalance.Add(balance.shadowReservedBalance, paymentAmount) go a.payFunction(context.Background(), peer, paymentAmount) @@ -602,6 +640,7 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece defer accountingPeer.lock.Unlock() accountingPeer.paymentOngoing = false + close(accountingPeer.paymentChan) // decrease shadow reserve by payment value accountingPeer.shadowReservedBalance.Sub(accountingPeer.shadowReservedBalance, amount) diff --git a/pkg/accounting/accounting_test.go b/pkg/accounting/accounting_test.go index c7a173124a5..72b08ca3fe7 100644 --- a/pkg/accounting/accounting_test.go +++ b/pkg/accounting/accounting_test.go @@ -449,7 +449,10 @@ func TestAccountingCallSettlementMonetary(t *testing.T) { // Credit until the expected debt exceeds payment threshold expectedAmount := testPaymentThreshold.Uint64() - err = acc.Reserve(context.Background(), peer1Addr, expectedAmount) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = acc.Reserve(ctx, peer1Addr, expectedAmount) if !errors.Is(err, accounting.ErrOverdraft) { t.Fatalf("expected overdraft, got %v", err) }