Skip to content

Commit

Permalink
feat: Always reset balances to zero after disconnection or blocklisti…
Browse files Browse the repository at this point in the history
…ng (#1983)
  • Loading branch information
metacertain committed Jun 16, 2021
1 parent 0a861f1 commit 0f889d8
Show file tree
Hide file tree
Showing 10 changed files with 518 additions and 41 deletions.
149 changes: 144 additions & 5 deletions pkg/accounting/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Interface interface {
// Credit increases the balance the peer has with us (we "pay" the peer).
Credit(peer swarm.Address, price uint64, originated bool) error
// PrepareDebit returns an accounting Action for the later debit to be executed on and to implement shadowing a possibly credited part of reserve on the other side.
PrepareDebit(peer swarm.Address, price uint64) Action
PrepareDebit(peer swarm.Address, price uint64) (Action, error)
// Balance returns the current balance for the given peer.
Balance(peer swarm.Address) (*big.Int, error)
// SurplusBalance returns the current surplus balance for the given peer.
Expand Down Expand Up @@ -87,10 +87,13 @@ type accountingPeer struct {
lock sync.Mutex // lock to be held during any accounting action for this peer
reservedBalance *big.Int // amount currently reserved for active peer interaction
shadowReservedBalance *big.Int // amount potentially to be debited for active peer interaction
ghostBalance *big.Int // amount potentially could have been debited for but was not
paymentThreshold *big.Int // the threshold at which the peer expects us to pay
refreshTimestamp int64 // last time we attempted time-based settlement
paymentOngoing bool // indicate if we are currently settling with the peer
lastSettlementFailureTimestamp int64 // time of last unsuccessful attempt to issue a cheque
reconnectAllowTimestamp int64
lastSettlementFailureTimestamp int64 // time of last unsuccessful attempt to issue a cheque
connected bool
}

// Accounting is the main implementation of the accounting interface.
Expand Down Expand Up @@ -120,6 +123,7 @@ type Accounting struct {
pricing pricing.Interface
metrics metrics
wg sync.WaitGroup
p2p p2p.Service
timeNow func() time.Time
}

Expand All @@ -143,6 +147,8 @@ func NewAccounting(
Store storage.StateStorer,
Pricing pricing.Interface,
refreshRate *big.Int,
p2pService p2p.Service,

) (*Accounting, error) {
return &Accounting{
accountingPeers: make(map[string]*accountingPeer),
Expand All @@ -157,6 +163,7 @@ func NewAccounting(
refreshRate: refreshRate,
timeNow: time.Now,
minimumPayment: new(big.Int).Div(refreshRate, big.NewInt(minimumPaymentDivisor)),
p2p: p2pService,
}, nil
}

Expand All @@ -165,6 +172,11 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
accountingPeer := a.getAccountingPeer(peer)

accountingPeer.lock.Lock()

if !accountingPeer.connected {
return fmt.Errorf("connection not initialized yet")
}

defer accountingPeer.lock.Unlock()

a.metrics.AccountingReserveCount.Inc()
Expand Down Expand Up @@ -485,8 +497,10 @@ func (a *Accounting) getAccountingPeer(peer swarm.Address) *accountingPeer {
peerData = &accountingPeer{
reservedBalance: big.NewInt(0),
shadowReservedBalance: big.NewInt(0),
ghostBalance: big.NewInt(0),
// initially assume the peer has the same threshold as us
paymentThreshold: new(big.Int).Set(a.paymentThreshold),
connected: false,
}
a.accountingPeers[peer.String()] = peerData
}
Expand Down Expand Up @@ -633,6 +647,36 @@ func (a *Accounting) PeerDebt(peer swarm.Address) (*big.Int, error) {
return peerDebt, nil
}

// peerLatentDebt returns the sum of the positive part of the outstanding balance, shadow reserve and the ghost balance
func (a *Accounting) peerLatentDebt(peer swarm.Address) (*big.Int, error) {

accountingPeer := a.getAccountingPeer(peer)

balance := new(big.Int)
zero := big.NewInt(0)

err := a.store.Get(peerBalanceKey(peer), &balance)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
balance = big.NewInt(0)
}

if balance.Cmp(zero) < 0 {
balance.Set(zero)
}

peerDebt := new(big.Int).Add(balance, accountingPeer.shadowReservedBalance)
peerLatentDebt := new(big.Int).Add(peerDebt, accountingPeer.ghostBalance)

if peerLatentDebt.Cmp(zero) < 0 {
return zero, nil
}

return peerLatentDebt, nil
}

// shadowBalance returns the current debt reduced by any potentially debitable amount stored in shadowReservedBalance
// this represents how much less our debt could potentially be seen by the other party if it's ahead with processing credits corresponding to our shadow reserve
func (a *Accounting) shadowBalance(peer swarm.Address) (shadowBalance *big.Int, err error) {
Expand Down Expand Up @@ -827,12 +871,16 @@ func (a *Accounting) NotifyRefreshmentReceived(peer swarm.Address, amount *big.I
}

// PrepareDebit prepares a debit operation by increasing the shadowReservedBalance
func (a *Accounting) PrepareDebit(peer swarm.Address, price uint64) Action {
func (a *Accounting) PrepareDebit(peer swarm.Address, price uint64) (Action, error) {
accountingPeer := a.getAccountingPeer(peer)

accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()

if !accountingPeer.connected {
return nil, fmt.Errorf("connection not initialized yet")
}

bigPrice := new(big.Int).SetUint64(price)

accountingPeer.shadowReservedBalance = new(big.Int).Add(accountingPeer.shadowReservedBalance, bigPrice)
Expand All @@ -843,7 +891,7 @@ func (a *Accounting) PrepareDebit(peer swarm.Address, price uint64) Action {
peer: peer,
accountingPeer: accountingPeer,
applied: false,
}
}, nil
}

func (a *Accounting) increaseBalance(peer swarm.Address, accountingPeer *accountingPeer, price *big.Int) (*big.Int, error) {
Expand Down Expand Up @@ -940,7 +988,13 @@ func (d *debitAction) Apply() error {
if nextBalance.Cmp(a.disconnectLimit) >= 0 {
// peer too much in debt
a.metrics.AccountingDisconnectsCount.Inc()
return p2p.NewBlockPeerError(24*time.Hour, ErrDisconnectThresholdExceeded)

disconnectFor, err := a.blocklistUntil(d.peer, 1)
if err != nil {
return p2p.NewBlockPeerError(1*time.Minute, ErrDisconnectThresholdExceeded)
}
return p2p.NewBlockPeerError(time.Duration(disconnectFor), ErrDisconnectThresholdExceeded)

}

return nil
Expand All @@ -951,7 +1005,75 @@ func (d *debitAction) Cleanup() {
if !d.applied {
d.accountingPeer.lock.Lock()
defer d.accountingPeer.lock.Unlock()
a := d.accounting
d.accountingPeer.shadowReservedBalance = new(big.Int).Sub(d.accountingPeer.shadowReservedBalance, d.price)
d.accountingPeer.ghostBalance = new(big.Int).Add(d.accountingPeer.ghostBalance, d.price)
if d.accountingPeer.ghostBalance.Cmp(a.disconnectLimit) > 0 {
_ = a.blocklist(d.peer, 1)
}
}
}

func (a *Accounting) blocklistUntil(peer swarm.Address, multiplier int64) (int64, error) {

debt, err := a.peerLatentDebt(peer)
if err != nil {
return 0, err
}

if debt.Cmp(a.refreshRate) < 0 {
debt.Set(a.refreshRate)
}

additionalDebt := new(big.Int).Add(debt, a.paymentThreshold)

multiplyDebt := new(big.Int).Mul(additionalDebt, big.NewInt(multiplier))

k := new(big.Int).Div(multiplyDebt, a.refreshRate)

kInt := k.Int64()

return kInt, nil
}

func (a *Accounting) blocklist(peer swarm.Address, multiplier int64) error {

disconnectFor, err := a.blocklistUntil(peer, multiplier)
if err != nil {
return a.p2p.Blocklist(peer, 1*time.Minute)
}

return a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second)
}

func (a *Accounting) Connect(peer swarm.Address) {
accountingPeer := a.getAccountingPeer(peer)
zero := big.NewInt(0)

accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()

accountingPeer.connected = true
accountingPeer.shadowReservedBalance.Set(zero)
accountingPeer.ghostBalance.Set(zero)
accountingPeer.reservedBalance.Set(zero)

err := a.store.Put(peerBalanceKey(peer), zero)
if err != nil {
a.logger.Errorf("failed to persist balance: %w", err)
}

err = a.store.Put(peerSurplusBalanceKey(peer), zero)
if err != nil {
a.logger.Errorf("failed to persist surplus balance: %w", err)
}

if accountingPeer.reconnectAllowTimestamp != 0 {
timeNow := a.timeNow().Unix()
if timeNow < accountingPeer.reconnectAllowTimestamp {
disconnectFor := accountingPeer.reconnectAllowTimestamp - timeNow
_ = a.p2p.Blocklist(peer, time.Duration(disconnectFor)*time.Second)
}
}
}

Expand Down Expand Up @@ -1003,6 +1125,23 @@ func (a *Accounting) decreaseOriginatedBalanceBy(peer swarm.Address, amount *big
return nil
}

func (a *Accounting) Disconnect(peer swarm.Address) {
accountingPeer := a.getAccountingPeer(peer)

accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()

timeNow := a.timeNow().Unix()

disconnectFor, err := a.blocklistUntil(peer, 1)
if err != nil {
disconnectFor = int64(60)
}
timestamp := timeNow + disconnectFor
accountingPeer.connected = false
accountingPeer.reconnectAllowTimestamp = timestamp
}

func (a *Accounting) SetRefreshFunc(f RefreshFunc) {
a.refreshFunction = f
}
Expand Down
Loading

0 comments on commit 0f889d8

Please sign in to comment.