Skip to content

Commit

Permalink
feat: Maintain originated balance
Browse files Browse the repository at this point in the history
  • Loading branch information
metacertain committed May 25, 2021
1 parent b97efad commit 87534c0
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 18 deletions.
121 changes: 116 additions & 5 deletions pkg/accounting/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
)

var (
_ Interface = (*Accounting)(nil)
balancesPrefix string = "accounting_balance_"
balancesSurplusPrefix string = "accounting_surplusbalance_"
_ Interface = (*Accounting)(nil)
balancesPrefix string = "accounting_balance_"
balancesSurplusPrefix string = "accounting_surplusbalance_"
balancesOriginatedPrefix string = "accounting_originatedbalance_"
// fraction of the refresh rate that is the minimum for monetary settlement
// this value is chosen so that tiny payments are prevented while still allowing small payments in environments with lower payment thresholds
minimumPaymentDivisor = int64(5)
Expand All @@ -42,7 +43,7 @@ type Interface interface {
// Release releases the reserved funds.
Release(peer swarm.Address, price uint64)
// Credit increases the balance the peer has with us (we "pay" the peer).
Credit(peer swarm.Address, price uint64) error
Credit(peer swarm.Address, price uint64, orignated bool) error
// PrepareDebit returns an accounting Action for the later debit to be executed on and to implement shadowing a possibly credited part of reserve on the other side.
PrepareDebit(peer swarm.Address, price uint64) Action
// Balance returns the current balance for the given peer.
Expand Down Expand Up @@ -240,7 +241,7 @@ func (a *Accounting) Release(peer swarm.Address, price uint64) {

// Credit increases the amount of credit we have with the given peer
// (and decreases existing debt).
func (a *Accounting) Credit(peer swarm.Address, price uint64) error {
func (a *Accounting) Credit(peer swarm.Address, price uint64, originated bool) error {
accountingPeer := a.getAccountingPeer(peer)

accountingPeer.lock.Lock()
Expand All @@ -265,6 +266,33 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error {

a.metrics.TotalCreditedAmount.Add(float64(price))
a.metrics.CreditEventsCount.Inc()

//

if originated {
originBalance, err := a.OriginatedBalance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load originated balance: %w", err)
}
}

// Calculate next balance by decreasing current balance with the price we credit
nextOriginBalance := new(big.Int).Sub(originBalance, new(big.Int).SetUint64(price))

a.logger.Tracef("crediting peer %v with price %d, new originated balance is %d", peer, price, nextOriginBalance)

err = a.store.Put(originatedBalanceKey(peer), nextOriginBalance)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}

a.metrics.TotalOriginatedCreditedAmount.Add(float64(price))
a.metrics.OriginatedCreditEventsCount.Inc()
}

//

return nil
}

Expand Down Expand Up @@ -312,6 +340,11 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error {
if err != nil {
return fmt.Errorf("settle: failed to persist balance: %w", err)
}

err = a.decreaseOriginatedBalanceTo(peer, oldBalance)
if err != nil {
a.logger.Warningf("settle: failed to decrease originated balance: %w", err)
}
}

if a.payFunction != nil && !balance.paymentOngoing {
Expand Down Expand Up @@ -344,6 +377,20 @@ func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) {
return balance, nil
}

// Balance returns the current balance for the given peer.
func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, err error) {
err = a.store.Get(originatedBalanceKey(peer), &balance)

if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return big.NewInt(0), ErrPeerNoBalance
}
return nil, err
}

return balance, nil
}

// SurplusBalance returns the current balance for the given peer.
func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err error) {
err = a.store.Get(peerSurplusBalanceKey(peer), &balance)
Expand Down Expand Up @@ -396,6 +443,10 @@ func peerSurplusBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesSurplusPrefix, peer.String())
}

func originatedBalanceKey(peer swarm.Address) string {
return fmt.Sprintf("%s%s", balancesOriginatedPrefix, peer.String())
}

// getAccountingPeer returns the accountingPeer for a given swarm address.
// If not found in memory it will initialize it.
func (a *Accounting) getAccountingPeer(peer swarm.Address) *accountingPeer {
Expand Down Expand Up @@ -626,6 +677,12 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece
a.logger.Errorf("accounting: notifypaymentsent failed to persist balance: %v", err)
return
}

err = a.decreaseOriginatedBalanceBy(peer, amount)
if err != nil {
a.logger.Warningf("accounting: notifypaymentsent failed to decrease originated balance: %w", err)
}

}

// NotifyPaymentThreshold should be called to notify accounting of changes in the payment threshold
Expand Down Expand Up @@ -821,6 +878,11 @@ func (a *Accounting) increaseBalance(peer swarm.Address, accountingPeer *account
return nil, fmt.Errorf("failed to persist balance: %w", err)
}

err = a.decreaseOriginatedBalanceTo(peer, nextBalance)
if err != nil {
a.logger.Warningf("increase balance: failed to decrease originated balance: %w", err)
}

return nextBalance, nil
}

Expand Down Expand Up @@ -864,6 +926,55 @@ func (d *debitAction) Cleanup() {
}
}

// decreaseOriginatedBalanceTo decreases the originated balance to provided limit or 0 if limit is positive
func (a *Accounting) decreaseOriginatedBalanceTo(peer swarm.Address, limit *big.Int) error {

zero := big.NewInt(0)

originatedBalance, err := a.OriginatedBalance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load originated balance: %w", err)
}
}

// If originated balance is more into the negative domain, set it to limit
if originatedBalance.Cmp(limit) < 0 {
if limit.Cmp(zero) > 0 {
limit.Set(zero)
}
err = a.store.Put(originatedBalanceKey(peer), limit)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}
a.logger.Tracef("decreasing originated balance to peer %v to current balance %d", peer, limit)
}

return nil
}

// decreaseOriginatedBalanceTo decreases the originated balance by provided amount even below 0
func (a *Accounting) decreaseOriginatedBalanceBy(peer swarm.Address, amount *big.Int) error {

originatedBalance, err := a.OriginatedBalance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return fmt.Errorf("failed to load balance: %w", err)
}
}

// Move originated balance into the positive domain by amount
newOriginatedBalance := new(big.Int).Add(originatedBalance, amount)

err = a.store.Put(originatedBalanceKey(peer), newOriginatedBalance)
if err != nil {
return fmt.Errorf("failed to persist originated balance: %w", err)
}
a.logger.Tracef("decreasing originated balance to peer %v by amount %d to current balance %d", peer, amount, newOriginatedBalance)

return nil
}

func (a *Accounting) SetRefreshFunc(f RefreshFunc) {
a.refreshFunction = f
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/accounting/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ type metrics struct {
// all metrics fields must be exported
// to be able to return them by Metrics()
// using reflection
TotalDebitedAmount prometheus.Counter
TotalCreditedAmount prometheus.Counter
DebitEventsCount prometheus.Counter
CreditEventsCount prometheus.Counter
AccountingDisconnectsCount prometheus.Counter
AccountingBlocksCount prometheus.Counter
TotalDebitedAmount prometheus.Counter
TotalCreditedAmount prometheus.Counter
TotalOriginatedCreditedAmount prometheus.Counter
DebitEventsCount prometheus.Counter
CreditEventsCount prometheus.Counter
OriginatedCreditEventsCount prometheus.Counter
AccountingDisconnectsCount prometheus.Counter
AccountingBlocksCount prometheus.Counter
}

func newMetrics() metrics {
Expand All @@ -37,6 +39,12 @@ func newMetrics() metrics {
Name: "total_credited_amount",
Help: "Amount of BZZ credited to peers (potential cost of the node)",
}),
TotalOriginatedCreditedAmount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_originated_credited_amount",
Help: "Amount of BZZ credited to peers (potential cost of the node) for originated traffic",
}),
DebitEventsCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand All @@ -49,6 +57,12 @@ func newMetrics() metrics {
Name: "credit_events_count",
Help: "Number of occurrences of BZZ credit events towards peers",
}),
OriginatedCreditEventsCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "originated_credit_events_count",
Help: "Number of occurrences of BZZ credit events as originator towards peers",
}),
AccountingDisconnectsCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand Down
8 changes: 4 additions & 4 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return
}

err = ps.accounting.Credit(peer, receiptPrice)
err = ps.accounting.Credit(peer, receiptPrice, false)

}(peer)

Expand Down Expand Up @@ -374,7 +374,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
ctxd, canceld := context.WithTimeout(ctx, defaultTTL)
defer canceld()

r, attempted, err := ps.pushPeer(ctxd, peer, ch)
r, attempted, err := ps.pushPeer(ctxd, peer, ch, retryAllowed)
// attempted is true if we get past accounting and actually attempt
// to send the request to the peer. If we dont get past accounting, we
// should not count the retry and try with a different peer again
Expand Down Expand Up @@ -411,7 +411,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
return nil, ErrNoPush
}

func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk) (*pb.Receipt, bool, error) {
func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk, originated bool) (*pb.Receipt, bool, error) {
// compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())

Expand Down Expand Up @@ -465,7 +465,7 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C
return nil, true, fmt.Errorf("invalid receipt. chunk %s, peer %s", ch.Address(), peer)
}

err = ps.accounting.Credit(peer, receiptPrice)
err = ps.accounting.Credit(peer, receiptPrice, originated)
if err != nil {
return nil, true, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin
s.metrics.PeerRequestCounter.Inc()

go func() {
chunk, peer, requested, err := s.retrieveChunk(ctx, addr, sp)
chunk, peer, requested, err := s.retrieveChunk(ctx, addr, sp, origin)
resultC <- retrievalResult{
chunk: chunk,
peer: peer,
Expand Down Expand Up @@ -193,7 +193,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin
return v.(swarm.Chunk), nil
}

func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, sp *skipPeers) (chunk swarm.Chunk, peer swarm.Address, requested bool, err error) {
func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, sp *skipPeers, originated bool) (chunk swarm.Chunk, peer swarm.Address, requested bool, err error) {
startTimer := time.Now()

v := ctx.Value(requestSourceContextKey{})
Expand Down Expand Up @@ -293,7 +293,7 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, sp *ski
}

// credit the peer after successful delivery
err = s.accounting.Credit(peer, chunkPrice)
err = s.accounting.Credit(peer, chunkPrice, originated)
if err != nil {
return nil, peer, true, err
}
Expand Down

0 comments on commit 87534c0

Please sign in to comment.