Skip to content

Commit

Permalink
feat: maintain originated balance
Browse files Browse the repository at this point in the history
  • Loading branch information
metacertain committed Jun 11, 2021
1 parent 18bdc86 commit a3e7050
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 41 deletions.
137 changes: 131 additions & 6 deletions pkg/accounting/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
)

var (
_ Interface = (*Accounting)(nil)
balancesPrefix string = "accounting_balance_"
balancesSurplusPrefix string = "accounting_surplusbalance_"
_ Interface = (*Accounting)(nil)
balancesPrefix = "accounting_balance_"
balancesSurplusPrefix = "accounting_surplusbalance_"
balancesOriginatedPrefix = "accounting_originatedbalance_"
// fraction of the refresh rate that is the minimum for monetary settlement
// this value is chosen so that tiny payments are prevented while still allowing small payments in environments with lower payment thresholds
minimumPaymentDivisor = int64(5)
Expand All @@ -42,7 +43,7 @@ type Interface interface {
// Release releases the reserved funds.
Release(peer swarm.Address, price uint64)
// Credit increases the balance the peer has with us (we "pay" the peer).
Credit(peer swarm.Address, price uint64) error
Credit(peer swarm.Address, price uint64, originated bool) error
// PrepareDebit returns an accounting Action for the later debit to be executed on and to implement shadowing a possibly credited part of reserve on the other side.
PrepareDebit(peer swarm.Address, price uint64) Action
// Balance returns the current balance for the given peer.
Expand Down Expand Up @@ -242,7 +243,7 @@ func (a *Accounting) Release(peer swarm.Address, price uint64) {

// Credit increases the amount of credit we have with the given peer
// (and decreases existing debt).
func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
func (a *Accounting) Credit(peer swarm.Address, price uint64, originated bool) error {
accountingPeer := a.getAccountingPeer(peer)

accountingPeer.lock.Lock()
Expand All @@ -267,6 +268,41 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {

a.metrics.TotalCreditedAmount.Add(float64(price))
a.metrics.CreditEventsCount.Inc()

if !originated {
return nil
}

originBalance, err := a.OriginatedBalance(peer)
if err != nil && !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load originated balance: %w", err)
}

// Calculate next balance by decreasing current balance with the price we credit
nextOriginBalance := new(big.Int).Sub(originBalance, new(big.Int).SetUint64(price))

a.logger.Tracef("crediting peer %v with price %d, new originated balance is %d", peer, price, nextOriginBalance)

zero := big.NewInt(0)
// only consider negative balance for limiting originated balance
if nextBalance.Cmp(zero) > 0 {
nextBalance.Set(zero)
}

// If originated balance is more into the negative domain, set it to balance
if nextOriginBalance.Cmp(nextBalance) < 0 {
nextOriginBalance.Set(nextBalance)
a.logger.Tracef("decreasing originated balance to peer %v to current balance %d", peer, nextOriginBalance)
}

err = a.store.Put(originatedBalanceKey(peer), nextOriginBalance)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}

a.metrics.TotalOriginatedCreditedAmount.Add(float64(price))
a.metrics.OriginatedCreditEventsCount.Inc()

return nil
}

Expand Down Expand Up @@ -314,12 +350,24 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
if err != nil {
return fmt.Errorf("settle: failed to persist balance: %w", err)
}

err = a.decreaseOriginatedBalanceTo(peer, oldBalance)
if err != nil {
return fmt.Errorf("settle: failed to decrease originated balance: %w", err)
}
}

if a.payFunction != nil && !balance.paymentOngoing {
// if there is no monetary settlement happening, check if there is something to settle
// compute debt excluding debt created by incoming payments
paymentAmount := new(big.Int).Neg(oldBalance)
originatedBalance, err := a.OriginatedBalance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load originated balance to settle: %w", err)
}
}

paymentAmount := new(big.Int).Neg(originatedBalance)
// if the remaining debt is still larger than some minimum amount, trigger monetary settlement
if paymentAmount.Cmp(a.minimumPayment) >= 0 {
balance.paymentOngoing = true
Expand All @@ -346,6 +394,20 @@ func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) {
return balance, nil
}

// Balance returns the current balance for the given peer.
func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, err error) {
err = a.store.Get(originatedBalanceKey(peer), &balance)

if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return big.NewInt(0), ErrPeerNoBalance
}
return nil, err
}

return balance, nil
}

// SurplusBalance returns the current balance for the given peer.
func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err error) {
err = a.store.Get(peerSurplusBalanceKey(peer), &balance)
Expand Down Expand Up @@ -398,6 +460,10 @@ func peerSurplusBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesSurplusPrefix, peer.String())
}

func originatedBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesOriginatedPrefix, peer.String())
}

// getAccountingPeer returns the accountingPeer for a given swarm address.
// If not found in memory it will initialize it.
func (a *Accounting) getAccountingPeer(peer swarm.Address) *accountingPeer {
Expand Down Expand Up @@ -628,6 +694,12 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece
a.logger.Errorf("accounting: notifypaymentsent failed to persist balance: %v", err)
return
}

err = a.decreaseOriginatedBalanceBy(peer, amount)
if err != nil {
a.logger.Warningf("accounting: notifypaymentsent failed to decrease originated balance: %v", err)
}

}

// NotifyPaymentThreshold should be called to notify accounting of changes in the payment threshold
Expand Down Expand Up @@ -823,6 +895,11 @@ func (a *Accounting) increaseBalance(peer swarm.Address, accountingPeer *account
return nil, fmt.Errorf("failed to persist balance: %w", err)
}

err = a.decreaseOriginatedBalanceTo(peer, nextBalance)
if err != nil {
a.logger.Warningf("increase balance: failed to decrease originated balance: %v", err)
}

return nextBalance, nil
}

Expand Down Expand Up @@ -866,6 +943,54 @@ func (d *debitAction) Cleanup() {
}
}

// decreaseOriginatedBalanceTo decreases the originated balance to provided limit or 0 if limit is positive
func (a *Accounting) decreaseOriginatedBalanceTo(peer swarm.Address, limit *big.Int) error {

zero := big.NewInt(0)

toSet := new(big.Int).Set(limit)

originatedBalance, err := a.OriginatedBalance(peer)
if err != nil && !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load originated balance: %w", err)
}

if toSet.Cmp(zero) > 0 {
toSet.Set(zero)
}

// If originated balance is more into the negative domain, set it to limit
if originatedBalance.Cmp(toSet) < 0 {
err = a.store.Put(originatedBalanceKey(peer), toSet)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}
a.logger.Tracef("decreasing originated balance to peer %v to current balance %d", peer, toSet)
}

return nil
}

// decreaseOriginatedBalanceTo decreases the originated balance by provided amount even below 0
func (a *Accounting) decreaseOriginatedBalanceBy(peer swarm.Address, amount *big.Int) error {

originatedBalance, err := a.OriginatedBalance(peer)
if err != nil && !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load balance: %w", err)
}

// Move originated balance into the positive domain by amount
newOriginatedBalance := new(big.Int).Add(originatedBalance, amount)

err = a.store.Put(originatedBalanceKey(peer), newOriginatedBalance)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}
a.logger.Tracef("decreasing originated balance to peer %v by amount %d to current balance %d", peer, amount, newOriginatedBalance)

return nil
}

func (a *Accounting) SetRefreshFunc(f RefreshFunc) {
a.refreshFunction = f
}
Expand Down
Loading

0 comments on commit a3e7050

Please sign in to comment.