Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Always reset balances to zero after disconnection or blocklisting #1983

Merged
merged 10 commits into from
Jun 16, 2021
131 changes: 129 additions & 2 deletions pkg/accounting/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ type accountingPeer struct {
lock sync.Mutex // lock to be held during any accounting action for this peer
reservedBalance *big.Int // amount currently reserved for active peer interaction
shadowReservedBalance *big.Int // amount potentially to be debited for active peer interaction
ghostBalance *big.Int // amount potentially could have been debited for but was not
paymentThreshold *big.Int // the threshold at which the peer expects us to pay
refreshTimestamp int64 // last time we attempted time-based settlement
paymentOngoing bool // indicate if we are currently settling with the peer
lastSettlementFailureTimestamp int64 // time of last unsuccessful attempt to issue a cheque
reconnectAllowTimestamp int64
lastSettlementFailureTimestamp int64 // time of last unsuccessful attempt to issue a cheque
}

// Accounting is the main implementation of the accounting interface.
Expand Down Expand Up @@ -120,6 +122,7 @@ type Accounting struct {
pricing pricing.Interface
metrics metrics
wg sync.WaitGroup
p2p p2p.Service
timeNow func() time.Time
}

Expand All @@ -143,6 +146,8 @@ func NewAccounting(
Store storage.StateStorer,
Pricing pricing.Interface,
refreshRate *big.Int,
p2pService p2p.Service,

) (*Accounting, error) {
return &Accounting{
accountingPeers: make(map[string]*accountingPeer),
Expand All @@ -157,6 +162,7 @@ func NewAccounting(
refreshRate: refreshRate,
timeNow: time.Now,
minimumPayment: new(big.Int).Div(refreshRate, big.NewInt(minimumPaymentDivisor)),
p2p: p2pService,
}, nil
}

Expand Down Expand Up @@ -485,6 +491,7 @@ func (a *Accounting) getAccountingPeer(peer swarm.Address) *accountingPeer {
peerData = &accountingPeer{
reservedBalance: big.NewInt(0),
shadowReservedBalance: big.NewInt(0),
ghostBalance: big.NewInt(0),
// initially assume the peer has the same threshold as us
paymentThreshold: new(big.Int).Set(a.paymentThreshold),
}
Expand Down Expand Up @@ -633,6 +640,36 @@ func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) {
return peerDebt, nil
}

// peerLatentDebt returns the sum of the positive part of the outstanding balance, shadow reserve and the ghost balance
func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) {

accountingPeer := a.getAccountingPeer(peer)

balance := new(big.Int)
zero := big.NewInt(0)
anatollupacescu marked this conversation as resolved.
Show resolved Hide resolved

err := a.store.Get(peerBalanceKey(peer), &balance)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
balance = big.NewInt(0)
}

if balance.Cmp(zero) < 0 {
balance.Set(zero)
}

peerDebt := new(big.Int).Add(balance, accountingPeer.shadowReservedBalance)
peerLatentDebt := new(big.Int).Add(peerDebt, accountingPeer.ghostBalance)

if peerLatentDebt.Cmp(zero) < 0 {
return zero, nil
}

return peerLatentDebt, nil
}

// shadowBalance returns the current debt reduced by any potentially debitable amount stored in shadowReservedBalance
// this represents how much less our debt could potentially be seen by the other party if it's ahead with processing credits corresponding to our shadow reserve
func (a *Accounting) shadowBalance(peer swarm.Address) (shadowBalance *big.Int, err error) {
Expand Down Expand Up @@ -940,7 +977,13 @@ func (d *debitAction) Apply() error {
if nextBalance.Cmp(a.disconnectLimit) >= 0 {
// peer too much in debt
a.metrics.AccountingDisconnectsCount.Inc()
return p2p.NewBlockPeerError(24*time.Hour, ErrDisconnectThresholdExceeded)

disconnectFor, err := a.blocklistUntil(d.peer, 1)
if err != nil {
return p2p.NewBlockPeerError(1*time.Minute, ErrDisconnectThresholdExceeded)
}
return p2p.NewBlockPeerError(time.Duration(disconnectFor), ErrDisconnectThresholdExceeded)

}

return nil
Expand All @@ -951,7 +994,74 @@ func (d *debitAction) Cleanup() {
if !d.applied {
d.accountingPeer.lock.Lock()
defer d.accountingPeer.lock.Unlock()
a := d.accounting
d.accountingPeer.shadowReservedBalance = new(big.Int).Sub(d.accountingPeer.shadowReservedBalance, d.price)
d.accountingPeer.ghostBalance = new(big.Int).Add(d.accountingPeer.ghostBalance, d.price)
if d.accountingPeer.ghostBalance.Cmp(a.disconnectLimit) > 0 {
_ = a.blocklist(d.peer, 1)
}
}
}

func (a *Accounting) blocklistUntil(peer swarm.Address, multiplier int64) (int64, error) {

debt, err := a.peerLatentDebt(peer)
if err != nil {
return 0, err
}

if debt.Cmp(a.refreshRate) < 0 {
debt.Set(a.refreshRate)
}

additionalDebt := new(big.Int).Add(debt, a.paymentThreshold)

multiplyDebt := new(big.Int).Mul(additionalDebt, big.NewInt(multiplier))

k := new(big.Int).Div(multiplyDebt, a.refreshRate)

kInt := k.Int64()

return kInt, nil
}

func (a *Accounting) blocklist(peer swarm.Address, multiplier int64) error {

disconnectFor, err := a.blocklistUntil(peer, multiplier)
if err != nil {
return a.p2p.Blocklist(peer, 1*time.Minute)
}

return a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second)
}

func (a *Accounting) Connect(peer swarm.Address) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can unit test these functions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, LGTM. Nice work!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, planning to add a unit test still, thanks!

accountingPeer := a.getAccountingPeer(peer)
zero := big.NewInt(0)

accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()

accountingPeer.shadowReservedBalance.Set(zero)
accountingPeer.ghostBalance.Set(zero)
accountingPeer.reservedBalance.Set(zero)

err := a.store.Put(peerBalanceKey(peer), zero)
if err != nil {
a.logger.Errorf("failed to persist balance: %w", err)
}

err = a.store.Put(peerSurplusBalanceKey(peer), zero)
if err != nil {
a.logger.Errorf("failed to persist surplus balance: %w", err)
}

if accountingPeer.reconnectAllowTimestamp != 0 {
timeNow := a.timeNow().Unix()
if timeNow < accountingPeer.reconnectAllowTimestamp {
disconnectFor := accountingPeer.reconnectAllowTimestamp - timeNow
_ = a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second)
}
}
}

Expand Down Expand Up @@ -1003,6 +1113,23 @@ func (a *Accounting) decreaseOriginatedBalanceBy(peer swarm.Address, amount *big
return nil
}

func (a *Accounting) Disconnect(peer swarm.Address) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can unit test these functions?

accountingPeer := a.getAccountingPeer(peer)

accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()

timeNow := a.timeNow().Unix()

disconnectFor, err := a.blocklistUntil(peer, 1)
if err != nil {
disconnectFor = int64(60)
}
timestamp := timeNow + disconnectFor

accountingPeer.reconnectAllowTimestamp = timestamp
}

func (a *Accounting) SetRefreshFunc(f RefreshFunc) {
a.refreshFunction = f
}
Expand Down