Skip to content

Commit

Permalink
feat: block on swap payment
Browse files Browse the repository at this point in the history
  • Loading branch information
ralph-pichler committed Jun 7, 2021
1 parent ebedbfe commit bd42310
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 25 deletions.
95 changes: 71 additions & 24 deletions pkg/accounting/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,13 @@ type RefreshFunc func(context.Context, swarm.Address, *big.Int, *big.Int) (*big.

// accountingPeer holds all in-memory accounting information for one peer.
type accountingPeer struct {
lock sync.Mutex // lock to be held during any accounting action for this peer
reservedBalance *big.Int // amount currently reserved for active peer interaction
shadowReservedBalance *big.Int // amount potentially to be debited for active peer interaction
paymentThreshold *big.Int // the threshold at which the peer expects us to pay
refreshTimestamp int64 // last time we attempted time-based settlement
paymentOngoing bool // indicate if we are currently settling with the peer
lock sync.Mutex // lock to be held during any accounting action for this peer
reservedBalance *big.Int // amount currently reserved for active peer interaction
shadowReservedBalance *big.Int // amount potentially to be debited for active peer interaction
paymentThreshold *big.Int // the threshold at which the peer expects us to pay
refreshTimestamp int64 // last time we attempted time-based settlement
paymentOngoing bool // indicate if we are currently settling with the peer
paymentChan chan struct{} // channel which is closed on sent monetary settlement
}

// Accounting is the main implementation of the accounting interface.
Expand Down Expand Up @@ -156,6 +157,34 @@ func NewAccounting(
}, nil
}

// compute the increasedExpectedDebt
func (a *Accounting) increasedExpectedDebt(peer swarm.Address, accountingPeer *accountingPeer, bigPrice *big.Int) (*big.Int, error) {
currentBalance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return nil, fmt.Errorf("failed to load balance: %w", err)
}
}
currentDebt := new(big.Int).Neg(currentBalance)
if currentDebt.Cmp(big.NewInt(0)) < 0 {
currentDebt.SetInt64(0)
}

nextReserved := new(big.Int).Add(accountingPeer.reservedBalance, bigPrice)

// debt if all reserved operations are successfully credited excluding debt created by surplus balance
expectedDebt := new(big.Int).Add(currentDebt, nextReserved)

// additionalDebt is debt created by incoming payments which we don't consider debt for monetary settlement purposes
additionalDebt, err := a.SurplusBalance(peer)
if err != nil {
return nil, fmt.Errorf("failed to load surplus balance: %w", err)
}

// debt if all reserved operations are successfully credited including debt created by surplus balance
return new(big.Int).Add(expectedDebt, additionalDebt), nil
}

// Reserve reserves a portion of the balance for peer and attempts settlements if necessary.
func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint64) error {
accountingPeer := a.getAccountingPeer(peer)
Expand All @@ -171,16 +200,8 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
return fmt.Errorf("failed to load balance: %w", err)
}
}
currentDebt := new(big.Int).Neg(currentBalance)
if currentDebt.Cmp(big.NewInt(0)) < 0 {
currentDebt.SetInt64(0)
}

bigPrice := new(big.Int).SetUint64(price)
nextReserved := new(big.Int).Add(accountingPeer.reservedBalance, bigPrice)

// debt if all reserved operations are successfully credited excluding debt created by surplus balance
expectedDebt := new(big.Int).Add(currentDebt, nextReserved)

threshold := new(big.Int).Set(accountingPeer.paymentThreshold)
if threshold.Cmp(a.earlyPayment) > 0 {
Expand All @@ -189,14 +210,11 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
threshold.SetInt64(0)
}

// additionalDebt is debt created by incoming payments which we don't consider debt for monetary settlement purposes
additionalDebt, err := a.SurplusBalance(peer)
increasedExpectedDebt, err := a.increasedExpectedDebt(peer, accountingPeer, bigPrice)
if err != nil {
return fmt.Errorf("failed to load surplus balance: %w", err)
return err
}

// debt if all reserved operations are successfully credited including debt created by surplus balance
increasedExpectedDebt := new(big.Int).Add(expectedDebt, additionalDebt)
// debt if all reserved operations are successfully credited and all shadow reserved operations are debited including debt created by surplus balance
// in other words this the debt the other node sees if everything pending is successful
increasedExpectedDebtReduced := new(big.Int).Sub(increasedExpectedDebt, accountingPeer.shadowReservedBalance)
Expand All @@ -211,14 +229,40 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
}
}

// if expectedDebt would still exceed the paymentThreshold at this point block this request
// this can happen if there is a large number of concurrent requests to the same peer
// if expectedDebt would still exceed the paymentThreshold at this point we cannot (immediately) progress
if increasedExpectedDebt.Cmp(accountingPeer.paymentThreshold) > 0 {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
// if there is already a monetary settlement happening attempt to wait
if accountingPeer.paymentOngoing {
accountingPeer.lock.Unlock()
select {
case <-accountingPeer.paymentChan:
case <-ctx.Done():
// if the request context is already done there is not point in waiting further
// we have to reacquire the lock due to the defer unlock
accountingPeer.lock.Lock()
return ErrOverdraft
}
accountingPeer.lock.Lock()

// recompute increasedExpectedDebt as much can have changed in the meantime
increasedExpectedDebt, err = a.increasedExpectedDebt(peer, accountingPeer, bigPrice)
if err != nil {
return err
}

// if we still cannot afford it return overdraft
if increasedExpectedDebt.Cmp(accountingPeer.paymentThreshold) > 0 {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
}
} else {
// otherwise return overdraft immediately
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
}
}

accountingPeer.reservedBalance = nextReserved
accountingPeer.reservedBalance = new(big.Int).Add(accountingPeer.reservedBalance, bigPrice)
return nil
}

Expand Down Expand Up @@ -323,6 +367,7 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
// if the remaining debt is still larger than some minimum amount, trigger monetary settlement
if paymentAmount.Cmp(a.minimumPayment) >= 0 {
balance.paymentOngoing = true
balance.paymentChan = make(chan struct{})
// add settled amount to shadow reserve before sending it
balance.shadowReservedBalance.Add(balance.shadowReservedBalance, paymentAmount)
go a.payFunction(context.Background(), peer, paymentAmount)
Expand Down Expand Up @@ -602,6 +647,8 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece
defer accountingPeer.lock.Unlock()

accountingPeer.paymentOngoing = false
// close the payment notify channel so all waiting reserve calls get unblocked
close(accountingPeer.paymentChan)
// decrease shadow reserve by payment value
accountingPeer.shadowReservedBalance.Sub(accountingPeer.shadowReservedBalance, amount)

Expand Down
18 changes: 17 additions & 1 deletion pkg/accounting/accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,11 @@ func TestAccountingCallSettlementMonetary(t *testing.T) {
// Credit until the expected debt exceeds payment threshold
expectedAmount := testPaymentThreshold.Uint64()

err = acc.Reserve(context.Background(), peer1Addr, expectedAmount)
ctx, cancel := context.WithCancel(context.Background())
// immediately cancel to simulate a timed out request
cancel()

err = acc.Reserve(ctx, peer1Addr, expectedAmount)
if !errors.Is(err, accounting.ErrOverdraft) {
t.Fatalf("expected overdraft, got %v", err)
}
Expand All @@ -471,6 +475,18 @@ func TestAccountingCallSettlementMonetary(t *testing.T) {
t.Fatal("pay called twice")
case <-time.After(1 * time.Second):
}

acc.SetRefreshFunc(func(c context.Context, a swarm.Address, i1, i2 *big.Int) (*big.Int, int64, error) {
// while time settling for the next reserve, the monetary settlement finally goes through
// because the refresh call happens under the accounting lock this can only process once reserve is waiting on the swap payment
go acc.NotifyPaymentSent(peer1Addr, notTimeSettledAmount, nil)
return big.NewInt(0), 0, nil
})

err = acc.Reserve(context.Background(), peer1Addr, expectedAmount)
if err != nil {
t.Fatal(err)
}
}

func TestAccountingCallSettlementTooSoon(t *testing.T) {
Expand Down

0 comments on commit bd42310

Please sign in to comment.