From 87534c0ddd3c28302ea0f6d86ad10c84d3a82463 Mon Sep 17 00:00:00 2001 From: Metacertain Date: Tue, 25 May 2021 18:42:03 +0200 Subject: [PATCH] feat: Maintain originated balance --- pkg/accounting/accounting.go | 121 +++++++++++++++++++++++++++++++++-- pkg/accounting/metrics.go | 26 ++++++-- pkg/pushsync/pushsync.go | 8 +-- pkg/retrieval/retrieval.go | 6 +- 4 files changed, 143 insertions(+), 18 deletions(-) diff --git a/pkg/accounting/accounting.go b/pkg/accounting/accounting.go index 64c352f5746..3e53a5ec0f5 100644 --- a/pkg/accounting/accounting.go +++ b/pkg/accounting/accounting.go @@ -23,9 +23,10 @@ import ( ) var ( - _ Interface = (*Accounting)(nil) - balancesPrefix string = "accounting_balance_" - balancesSurplusPrefix string = "accounting_surplusbalance_" + _ Interface = (*Accounting)(nil) + balancesPrefix string = "accounting_balance_" + balancesSurplusPrefix string = "accounting_surplusbalance_" + balancesOriginatedPrefix string = "accounting_originatedbalance_" // fraction of the refresh rate that is the minimum for monetary settlement // this value is chosen so that tiny payments are prevented while still allowing small payments in environments with lower payment thresholds minimumPaymentDivisor = int64(5) @@ -42,7 +43,7 @@ type Interface interface { // Release releases the reserved funds. Release(peer swarm.Address, price uint64) // Credit increases the balance the peer has with us (we "pay" the peer). - Credit(peer swarm.Address, price uint64) error + Credit(peer swarm.Address, price uint64, orignated bool) error // PrepareDebit returns an accounting Action for the later debit to be executed on and to implement shadowing a possibly credited part of reserve on the other side. PrepareDebit(peer swarm.Address, price uint64) Action // Balance returns the current balance for the given peer. @@ -240,7 +241,7 @@ func (a *Accounting) Release(peer swarm.Address, price uint64) { // Credit increases the amount of credit we have with the given peer // (and decreases existing debt). -func (a *Accounting) Credit(peer swarm.Address, price uint64) error { +func (a *Accounting) Credit(peer swarm.Address, price uint64, originated bool) error { accountingPeer := a.getAccountingPeer(peer) accountingPeer.lock.Lock() @@ -265,6 +266,33 @@ func (a *Accounting) Credit(peer swarm.Address, price uint64) error { a.metrics.TotalCreditedAmount.Add(float64(price)) a.metrics.CreditEventsCount.Inc() + + // + + if originated { + originBalance, err := a.OriginatedBalance(peer) + if err != nil { + if !errors.Is(err, ErrPeerNoBalance) { + return fmt.Errorf("failed to load originated balance: %w", err) + } + } + + // Calculate next balance by decreasing current balance with the price we credit + nextOriginBalance := new(big.Int).Sub(originBalance, new(big.Int).SetUint64(price)) + + a.logger.Tracef("crediting peer %v with price %d, new originated balance is %d", peer, price, nextOriginBalance) + + err = a.store.Put(originatedBalanceKey(peer), nextOriginBalance) + if err != nil { + return fmt.Errorf("failed to persist originated balance: %w", err) + } + + a.metrics.TotalOriginatedCreditedAmount.Add(float64(price)) + a.metrics.OriginatedCreditEventsCount.Inc() + } + + // + return nil } @@ -312,6 +340,11 @@ func (a *Accounting) settle(peer swarm.Address, balance *accountingPeer) error { if err != nil { return fmt.Errorf("settle: failed to persist balance: %w", err) } + + err = a.decreaseOriginatedBalanceTo(peer, oldBalance) + if err != nil { + a.logger.Warningf("settle: failed to decrease originated balance: %w", err) + } } if a.payFunction != nil && !balance.paymentOngoing { @@ -344,6 +377,20 @@ func (a *Accounting) Balance(peer swarm.Address) (balance *big.Int, err error) { return balance, nil } +// Balance returns the current balance for the given peer. +func (a *Accounting) OriginatedBalance(peer swarm.Address) (balance *big.Int, err error) { + err = a.store.Get(originatedBalanceKey(peer), &balance) + + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return big.NewInt(0), ErrPeerNoBalance + } + return nil, err + } + + return balance, nil +} + // SurplusBalance returns the current balance for the given peer. func (a *Accounting) SurplusBalance(peer swarm.Address) (balance *big.Int, err error) { err = a.store.Get(peerSurplusBalanceKey(peer), &balance) @@ -396,6 +443,10 @@ func peerSurplusBalanceKey(peer swarm.Address) string { return fmt.Sprintf("%s%s", balancesSurplusPrefix, peer.String()) } +func originatedBalanceKey(peer swarm.Address) string { + return fmt.Sprintf("%s%s", balancesOriginatedPrefix, peer.String()) +} + // getAccountingPeer returns the accountingPeer for a given swarm address. // If not found in memory it will initialize it. func (a *Accounting) getAccountingPeer(peer swarm.Address) *accountingPeer { @@ -626,6 +677,12 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece a.logger.Errorf("accounting: notifypaymentsent failed to persist balance: %v", err) return } + + err = a.decreaseOriginatedBalanceBy(peer, amount) + if err != nil { + a.logger.Warningf("accounting: notifypaymentsent failed to decrease originated balance: %w", err) + } + } // NotifyPaymentThreshold should be called to notify accounting of changes in the payment threshold @@ -821,6 +878,11 @@ func (a *Accounting) increaseBalance(peer swarm.Address, accountingPeer *account return nil, fmt.Errorf("failed to persist balance: %w", err) } + err = a.decreaseOriginatedBalanceTo(peer, nextBalance) + if err != nil { + a.logger.Warningf("increase balance: failed to decrease originated balance: %w", err) + } + return nextBalance, nil } @@ -864,6 +926,55 @@ func (d *debitAction) Cleanup() { } } +// decreaseOriginatedBalanceTo decreases the originated balance to provided limit or 0 if limit is positive +func (a *Accounting) decreaseOriginatedBalanceTo(peer swarm.Address, limit *big.Int) error { + + zero := big.NewInt(0) + + originatedBalance, err := a.OriginatedBalance(peer) + if err != nil { + if !errors.Is(err, ErrPeerNoBalance) { + return fmt.Errorf("failed to load originated balance: %w", err) + } + } + + // If originated balance is more into the negative domain, set it to limit + if originatedBalance.Cmp(limit) < 0 { + if limit.Cmp(zero) > 0 { + limit.Set(zero) + } + err = a.store.Put(originatedBalanceKey(peer), limit) + if err != nil { + return fmt.Errorf("failed to persist originated balance: %w", err) + } + a.logger.Tracef("decreasing originated balance to peer %v to current balance %d", peer, limit) + } + + return nil +} + +// decreaseOriginatedBalanceTo decreases the originated balance by provided amount even below 0 +func (a *Accounting) decreaseOriginatedBalanceBy(peer swarm.Address, amount *big.Int) error { + + originatedBalance, err := a.OriginatedBalance(peer) + if err != nil { + if !errors.Is(err, ErrPeerNoBalance) { + return fmt.Errorf("failed to load balance: %w", err) + } + } + + // Move originated balance into the positive domain by amount + newOriginatedBalance := new(big.Int).Add(originatedBalance, amount) + + err = a.store.Put(originatedBalanceKey(peer), newOriginatedBalance) + if err != nil { + return fmt.Errorf("failed to persist originated balance: %w", err) + } + a.logger.Tracef("decreasing originated balance to peer %v by amount %d to current balance %d", peer, amount, newOriginatedBalance) + + return nil +} + func (a *Accounting) SetRefreshFunc(f RefreshFunc) { a.refreshFunction = f } diff --git a/pkg/accounting/metrics.go b/pkg/accounting/metrics.go index cfb3a0698da..1073e8173e3 100644 --- a/pkg/accounting/metrics.go +++ b/pkg/accounting/metrics.go @@ -13,12 +13,14 @@ type metrics struct { // all metrics fields must be exported // to be able to return them by Metrics() // using reflection - TotalDebitedAmount prometheus.Counter - TotalCreditedAmount prometheus.Counter - DebitEventsCount prometheus.Counter - CreditEventsCount prometheus.Counter - AccountingDisconnectsCount prometheus.Counter - AccountingBlocksCount prometheus.Counter + TotalDebitedAmount prometheus.Counter + TotalCreditedAmount prometheus.Counter + TotalOriginatedCreditedAmount prometheus.Counter + DebitEventsCount prometheus.Counter + CreditEventsCount prometheus.Counter + OriginatedCreditEventsCount prometheus.Counter + AccountingDisconnectsCount prometheus.Counter + AccountingBlocksCount prometheus.Counter } func newMetrics() metrics { @@ -37,6 +39,12 @@ func newMetrics() metrics { Name: "total_credited_amount", Help: "Amount of BZZ credited to peers (potential cost of the node)", }), + TotalOriginatedCreditedAmount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_originated_credited_amount", + Help: "Amount of BZZ credited to peers (potential cost of the node) for originated traffic", + }), DebitEventsCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, @@ -49,6 +57,12 @@ func newMetrics() metrics { Name: "credit_events_count", Help: "Number of occurrences of BZZ credit events towards peers", }), + OriginatedCreditEventsCount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "originated_credit_events_count", + Help: "Number of occurrences of BZZ credit events as originator towards peers", + }), AccountingDisconnectsCount: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 18332aeba57..e6c07cad1fc 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -283,7 +283,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) return } - err = ps.accounting.Credit(peer, receiptPrice) + err = ps.accounting.Credit(peer, receiptPrice, false) }(peer) @@ -374,7 +374,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo ctxd, canceld := context.WithTimeout(ctx, defaultTTL) defer canceld() - r, attempted, err := ps.pushPeer(ctxd, peer, ch) + r, attempted, err := ps.pushPeer(ctxd, peer, ch, retryAllowed) // attempted is true if we get past accounting and actually attempt // to send the request to the peer. If we dont get past accounting, we // should not count the retry and try with a different peer again @@ -411,7 +411,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo return nil, ErrNoPush } -func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk) (*pb.Receipt, bool, error) { +func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk, originated bool) (*pb.Receipt, bool, error) { // compute the price we pay for this receipt and reserve it for the rest of this function receiptPrice := ps.pricer.PeerPrice(peer, ch.Address()) @@ -465,7 +465,7 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C return nil, true, fmt.Errorf("invalid receipt. chunk %s, peer %s", ch.Address(), peer) } - err = ps.accounting.Credit(peer, receiptPrice) + err = ps.accounting.Credit(peer, receiptPrice, originated) if err != nil { return nil, true, err } diff --git a/pkg/retrieval/retrieval.go b/pkg/retrieval/retrieval.go index 9c2e6e5fd17..5d056bf08cc 100644 --- a/pkg/retrieval/retrieval.go +++ b/pkg/retrieval/retrieval.go @@ -137,7 +137,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin s.metrics.PeerRequestCounter.Inc() go func() { - chunk, peer, requested, err := s.retrieveChunk(ctx, addr, sp) + chunk, peer, requested, err := s.retrieveChunk(ctx, addr, sp, origin) resultC <- retrievalResult{ chunk: chunk, peer: peer, @@ -193,7 +193,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address, origin return v.(swarm.Chunk), nil } -func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, sp *skipPeers) (chunk swarm.Chunk, peer swarm.Address, requested bool, err error) { +func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, sp *skipPeers, originated bool) (chunk swarm.Chunk, peer swarm.Address, requested bool, err error) { startTimer := time.Now() v := ctx.Value(requestSourceContextKey{}) @@ -293,7 +293,7 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, sp *ski } // credit the peer after successful delivery - err = s.accounting.Credit(peer, chunkPrice) + err = s.accounting.Credit(peer, chunkPrice, originated) if err != nil { return nil, peer, true, err }