Skip to content

Commit

Permalink
feat: block on swap payment
Browse files Browse the repository at this point in the history
  • Loading branch information
ralph-pichler committed Jun 7, 2021
1 parent ebedbfe commit baec241
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 18 deletions.
73 changes: 56 additions & 17 deletions pkg/accounting/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type accountingPeer struct {
paymentThreshold *big.Int // the threshold at which the peer expects us to pay
refreshTimestamp int64 // last time we attempted time-based settlement
paymentOngoing bool // indicate if we are currently settling with the peer
paymentChan chan struct{}
}

// Accounting is the main implementation of the accounting interface.
Expand Down Expand Up @@ -156,6 +157,33 @@ func NewAccounting(
}, nil
}

func (a *Accounting) increasedExpectedDebt(peer swarm.Address, accountingPeer *accountingPeer, bigPrice *big.Int) (*big.Int, error) {
currentBalance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return nil, fmt.Errorf("failed to load balance: %w", err)
}
}
currentDebt := new(big.Int).Neg(currentBalance)
if currentDebt.Cmp(big.NewInt(0)) < 0 {
currentDebt.SetInt64(0)
}

nextReserved := new(big.Int).Add(accountingPeer.reservedBalance, bigPrice)

// debt if all reserved operations are successfully credited excluding debt created by surplus balance
expectedDebt := new(big.Int).Add(currentDebt, nextReserved)

// additionalDebt is debt created by incoming payments which we don't consider debt for monetary settlement purposes
additionalDebt, err := a.SurplusBalance(peer)
if err != nil {
return nil, fmt.Errorf("failed to load surplus balance: %w", err)
}

// debt if all reserved operations are successfully credited including debt created by surplus balance
return new(big.Int).Add(expectedDebt, additionalDebt), nil
}

// Reserve reserves a portion of the balance for peer and attempts settlements if necessary.
func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint64) error {
accountingPeer := a.getAccountingPeer(peer)
Expand All @@ -171,16 +199,8 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
return fmt.Errorf("failed to load balance: %w", err)
}
}
currentDebt := new(big.Int).Neg(currentBalance)
if currentDebt.Cmp(big.NewInt(0)) < 0 {
currentDebt.SetInt64(0)
}

bigPrice := new(big.Int).SetUint64(price)
nextReserved := new(big.Int).Add(accountingPeer.reservedBalance, bigPrice)

// debt if all reserved operations are successfully credited excluding debt created by surplus balance
expectedDebt := new(big.Int).Add(currentDebt, nextReserved)

threshold := new(big.Int).Set(accountingPeer.paymentThreshold)
if threshold.Cmp(a.earlyPayment) > 0 {
Expand All @@ -189,14 +209,10 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
threshold.SetInt64(0)
}

// additionalDebt is debt created by incoming payments which we don't consider debt for monetary settlement purposes
additionalDebt, err := a.SurplusBalance(peer)
increasedExpectedDebt, err := a.increasedExpectedDebt(peer, accountingPeer, bigPrice)
if err != nil {
return fmt.Errorf("failed to load surplus balance: %w", err)
return err
}

// debt if all reserved operations are successfully credited including debt created by surplus balance
increasedExpectedDebt := new(big.Int).Add(expectedDebt, additionalDebt)
// debt if all reserved operations are successfully credited and all shadow reserved operations are debited including debt created by surplus balance
// in other words this the debt the other node sees if everything pending is successful
increasedExpectedDebtReduced := new(big.Int).Sub(increasedExpectedDebt, accountingPeer.shadowReservedBalance)
Expand All @@ -214,11 +230,32 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
// if expectedDebt would still exceed the paymentThreshold at this point block this request
// this can happen if there is a large number of concurrent requests to the same peer
if increasedExpectedDebt.Cmp(accountingPeer.paymentThreshold) > 0 {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
if accountingPeer.paymentOngoing {
accountingPeer.lock.Unlock()
select {
case <-accountingPeer.paymentChan:
case <-ctx.Done():
accountingPeer.lock.Lock()
return ErrOverdraft
}
accountingPeer.lock.Lock()

increasedExpectedDebt, err = a.increasedExpectedDebt(peer, accountingPeer, bigPrice)
if err != nil {
return err
}

if increasedExpectedDebt.Cmp(accountingPeer.paymentThreshold) > 0 {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
}
} else {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
}
}

accountingPeer.reservedBalance = nextReserved
accountingPeer.reservedBalance = new(big.Int).Add(accountingPeer.reservedBalance, bigPrice)
return nil
}

Expand Down Expand Up @@ -323,6 +360,7 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
// if the remaining debt is still larger than some minimum amount, trigger monetary settlement
if paymentAmount.Cmp(a.minimumPayment) >= 0 {
balance.paymentOngoing = true
balance.paymentChan = make(chan struct{})
// add settled amount to shadow reserve before sending it
balance.shadowReservedBalance.Add(balance.shadowReservedBalance, paymentAmount)
go a.payFunction(context.Background(), peer, paymentAmount)
Expand Down Expand Up @@ -602,6 +640,7 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece
defer accountingPeer.lock.Unlock()

accountingPeer.paymentOngoing = false
close(accountingPeer.paymentChan)
// decrease shadow reserve by payment value
accountingPeer.shadowReservedBalance.Sub(accountingPeer.shadowReservedBalance, amount)

Expand Down
5 changes: 4 additions & 1 deletion pkg/accounting/accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,10 @@ func TestAccountingCallSettlementMonetary(t *testing.T) {
// Credit until the expected debt exceeds payment threshold
expectedAmount := testPaymentThreshold.Uint64()

err = acc.Reserve(context.Background(), peer1Addr, expectedAmount)
ctx, cancel := context.WithCancel(context.Background())
cancel()

err = acc.Reserve(ctx, peer1Addr, expectedAmount)
if !errors.Is(err, accounting.ErrOverdraft) {
t.Fatalf("expected overdraft, got %v", err)
}
Expand Down

0 comments on commit baec241

Please sign in to comment.