Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swap: refactor lastReceivedCheque, lastSentCheque, balances to peer #1725

Merged
merged 9 commits into from
Sep 10, 2019
132 changes: 129 additions & 3 deletions swap/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package swap

import (
"context"
"errors"
"fmt"
"strconv"
"sync"

"github.com/ethereum/go-ethereum/common"

"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/p2p/protocols"
)

Expand All @@ -30,18 +34,140 @@ var ErrDontOwe = errors.New("no negative balance")
// Peer is a devp2p peer for the Swap protocol
type Peer struct {
mortelli marked this conversation as resolved.
Show resolved Hide resolved
*protocols.Peer
lock sync.RWMutex
swap *Swap
beneficiary common.Address
contractAddress common.Address
lastReceivedCheque *Cheque
lastSentCheque *Cheque
balance int64
}

// NewPeer creates a new swap Peer instance
func NewPeer(p *protocols.Peer, s *Swap, beneficiary common.Address, contractAddress common.Address) *Peer {
return &Peer{
func NewPeer(p *protocols.Peer, s *Swap, beneficiary common.Address, contractAddress common.Address) (peer *Peer, err error) {
peer = &Peer{
Peer: p,
swap: s,
beneficiary: beneficiary,
contractAddress: contractAddress,
}

if peer.lastReceivedCheque, err = s.loadLastReceivedCheque(p.ID()); err != nil {
return nil, err
}

if peer.lastSentCheque, err = s.loadLastSentCheque(p.ID()); err != nil {
return nil, err
}

if peer.balance, err = s.loadBalance(p.ID()); err != nil {
return nil, err
}
return peer, nil
}

func (p *Peer) getLastReceivedCheque() *Cheque {
holisticode marked this conversation as resolved.
Show resolved Hide resolved
return p.lastReceivedCheque
mortelli marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *Peer) getLastSentCheque() *Cheque {
return p.lastSentCheque
}

func (p *Peer) setLastReceivedCheque(cheque *Cheque) error {
p.lastReceivedCheque = cheque
return p.swap.saveLastReceivedCheque(p.ID(), cheque)
mortelli marked this conversation as resolved.
Show resolved Hide resolved
holisticode marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *Peer) setLastSentCheque(cheque *Cheque) error {
p.lastSentCheque = cheque
return p.swap.saveLastSentCheque(p.ID(), cheque)
}

func (p *Peer) getLastCumulativePayout() uint64 {
lastCheque := p.getLastReceivedCheque()
if lastCheque != nil {
return lastCheque.CumulativePayout
}
return 0
}

func (p *Peer) setBalance(balance int64) error {
p.balance = balance
return p.swap.saveBalance(p.ID(), balance)
Eknir marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *Peer) getBalance() int64 {
return p.balance
}

// To be called with mutex already held
func (p *Peer) updateBalance(amount int64) error {
//adjust the balance
//if amount is negative, it will decrease, otherwise increase
newBalance := p.getBalance() + amount
if err := p.setBalance(newBalance); err != nil {
return err
}
log.Debug("balance for peer after accounting", "peer", p.ID().String(), "balance", strconv.FormatInt(newBalance, 10))
return nil
}

// createCheque creates a new cheque whose beneficiary will be the peer and
// whose amount is based on the last cheque and current balance for this peer
// The cheque will be signed and point to the issuer's contract
// To be called with mutex already held
// Caller must be careful that the same resources aren't concurrently read and written by multiple routines
func (p *Peer) createCheque() (*Cheque, error) {
var cheque *Cheque
var err error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i understand our coding standards correctly, we could use named parameters here so as not to have to initialize these variables (as long as we don't use naked returns).

however, i don't know if this is still acceptable for relatively long functions like this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you dont need this declarations, just put a comma after cheque on line 138


if p.getBalance() >= 0 {
return nil, fmt.Errorf("expected negative balance, found: %d", p.getBalance())
}
// the balance should be negative here, we take the absolute value:
honey := uint64(-p.getBalance())

amount, err := p.swap.oracle.GetPrice(honey)
if err != nil {
return nil, fmt.Errorf("error getting price from oracle: %v", err)
}

total := p.getLastCumulativePayout()

cheque = &Cheque{
ChequeParams: ChequeParams{
CumulativePayout: total + amount,
Contract: p.swap.owner.Contract,
Beneficiary: p.beneficiary,
},
Honey: honey,
mortelli marked this conversation as resolved.
Show resolved Hide resolved
}
cheque.Signature, err = cheque.Sign(p.swap.owner.privateKey)

return cheque, err
}

// sendCheque sends a cheque to peer
// To be called with mutex already held
// Caller must be careful that the same resources aren't concurrently read and written by multiple routines
func (p *Peer) sendCheque() error {
cheque, err := p.createCheque()
if err != nil {
return fmt.Errorf("error while creating cheque: %v", err)
}

if err := p.setLastSentCheque(cheque); err != nil {
return fmt.Errorf("error while storing the last cheque: %v", err)
}

if err := p.updateBalance(int64(cheque.Honey)); err != nil {
return err
}

log.Info("sending cheque", "honey", cheque.Honey, "cumulativePayout", cheque.ChequeParams.CumulativePayout, "beneficiary", cheque.Beneficiary, "contract", cheque.Contract)

return p.Send(context.Background(), &EmitChequeMsg{
Cheque: cheque,
})
}
19 changes: 13 additions & 6 deletions swap/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ func (s *Swap) run(p *p2p.Peer, rw p2p.MsgReadWriter) error {
return err
}

swapPeer := NewPeer(protoPeer, s, beneficiary, response.ContractAddress)
s.addPeer(swapPeer)
swapPeer, err := s.addPeer(protoPeer, beneficiary, response.ContractAddress)
if err != nil {
return err
}
defer s.removePeer(swapPeer)

return swapPeer.Run(s.handleMsg(swapPeer))
Expand All @@ -134,17 +136,22 @@ func (s *Swap) removePeer(p *Peer) {
delete(s.peers, p.ID())
}

func (s *Swap) addPeer(p *Peer) {
func (s *Swap) addPeer(protoPeer *protocols.Peer, beneficiary common.Address, contractAddress common.Address) (*Peer, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep addPeer as it was and put the peer initialisation in the run function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to keep peer initialisation and adding it to the swaps peer mapping in one common function as creating a Peer without adding it to the map is always a mistake (this only happens at one place in the main code but would lead to a lot of duplicate code in the tests). I used to have this in a separate separate function but @holisticode recommended doing this in addPeer (see #1725 (comment)).

s.peersLock.Lock()
defer s.peersLock.Unlock()
p, err := NewPeer(protoPeer, s, beneficiary, contractAddress)
if err != nil {
return nil, err
}
s.peers[p.ID()] = p
return p, nil
}

func (s *Swap) getPeer(id enode.ID) (*Peer, bool) {
func (s *Swap) getPeer(id enode.ID) *Peer {
s.peersLock.RLock()
defer s.peersLock.RUnlock()
peer, ok := s.peers[id]
return peer, ok
peer := s.peers[id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we removing the ok check here? What if the peer is not in the map? Do you just want to force the caller to check for nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was suggested here (#1725 (comment))

return peer
}

type swapAPI interface {
Expand Down
91 changes: 46 additions & 45 deletions swap/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestHandshake(t *testing.T) {
creditor := protocolTester.Nodes[1]

// set balance artifially
swap.balances[creditor.ID()] = -42
swap.saveBalance(creditor.ID(), -42)

// create the expected cheque to be received
cheque := newTestCheque()
Expand Down Expand Up @@ -126,14 +126,17 @@ func TestEmitCheque(t *testing.T) {
// create the debitor peer
dPtpPeer := p2p.NewPeer(enode.ID{}, "debitor", []p2p.Cap{})
dProtoPeer := protocols.NewPeer(dPtpPeer, nil, Spec)
debitor := NewPeer(dProtoPeer, creditorSwap, debitorSwap.owner.address, debitorSwap.owner.Contract)
debitor, err := creditorSwap.addPeer(dProtoPeer, debitorSwap.owner.address, debitorSwap.owner.Contract)
if err != nil {
t.Fatal(err)
}

// set balance artificially
creditorSwap.balances[debitor.ID()] = 42
log.Debug("balance", "balance", creditorSwap.balances[debitor.ID()])
debitor.setBalance(42)
log.Debug("balance", "balance", debitor.getBalance())
// a safe check: at this point no cheques should be in the swap
if len(creditorSwap.cheques) != 0 {
t.Fatalf("Expected no cheques at creditor, but there are %d:", len(creditorSwap.cheques))
if debitor.getLastReceivedCheque() != nil {
t.Fatalf("Expected no cheques at creditor, but there is %v:", debitor.getLastReceivedCheque())
}

log.Debug("create a cheque")
Expand Down Expand Up @@ -170,15 +173,15 @@ func TestEmitCheque(t *testing.T) {
case <-time.After(4 * time.Second):
t.Fatalf("Timeout waiting for cash transaction to complete")
}
log.Debug("balance", "balance", creditorSwap.balances[debitor.ID()])
log.Debug("balance", "balance", debitor.getBalance())
// check that the balance has been reset
if creditorSwap.balances[debitor.ID()] != 0 {
t.Fatalf("Expected debitor balance to have been reset to %d, but it is %d", 0, creditorSwap.balances[debitor.ID()])
if debitor.getBalance() != 0 {
t.Fatalf("Expected debitor balance to have been reset to %d, but it is %d", 0, debitor.getBalance())
}
recvCheque := creditorSwap.loadLastReceivedCheque(debitor)
recvCheque := debitor.getLastReceivedCheque()
log.Debug("expected cheque", "cheque", recvCheque)
if recvCheque != cheque {
t.Fatalf("Expected exactly one cheque at creditor, but there are %d:", len(creditorSwap.cheques))
t.Fatalf("Expected cheque at creditor, but it was %v:", recvCheque)
}
}

Expand All @@ -196,48 +199,48 @@ func TestTriggerPaymentThreshold(t *testing.T) {

// create a dummy pper
cPeer := newDummyPeerWithSpec(Spec)
creditor := NewPeer(cPeer.Peer, debitorSwap, common.Address{}, common.Address{})
// set the creditor as peer into the debitor's swap
debitorSwap.peers[creditor.ID()] = creditor
creditor, err := debitorSwap.addPeer(cPeer.Peer, common.Address{}, common.Address{})
if err != nil {
t.Fatal(err)
}

// set the balance to manually be at PaymentThreshold
overDraft := 42
debitorSwap.balances[creditor.ID()] = 0 - DefaultPaymentThreshold
creditor.setBalance(-DefaultPaymentThreshold)

// we expect a cheque at the end of the test, but not yet
lenCheques := len(debitorSwap.cheques)
if lenCheques != 0 {
t.Fatalf("Expected no cheques yet, but there are %d", lenCheques)
if creditor.getLastSentCheque() != nil {
t.Fatalf("Expected no cheques yet, but there is %v:", creditor.getLastSentCheque())
}
// do some accounting, no error expected, just a WARN
err := debitorSwap.Add(int64(-overDraft), creditor.Peer)
err = debitorSwap.Add(int64(-overDraft), creditor.Peer)
if err != nil {
t.Fatal(err)
}

// we should now have a cheque
lenCheques = len(debitorSwap.cheques)
if lenCheques != 1 {
t.Fatalf("Expected one cheque, but there are %d", lenCheques)
if creditor.getLastSentCheque() == nil {
t.Fatal("Expected one cheque, but there is none")
}
cheque := debitorSwap.cheques[creditor.ID()]

cheque := creditor.getLastSentCheque()
expectedAmount := uint64(overDraft) + DefaultPaymentThreshold
if cheque.CumulativePayout != expectedAmount {
t.Fatalf("Expected cheque cumulative payout to be %d, but is %d", expectedAmount, cheque.CumulativePayout)
}

// because no other accounting took place in the meantime the balance should be exactly 0
if debitorSwap.balances[creditor.ID()] != 0 {
t.Fatalf("Expected debitorSwap balance to be 0, but is %d", debitorSwap.balances[creditor.ID()])
if creditor.getBalance() != 0 {
t.Fatalf("Expected debitorSwap balance to be 0, but is %d", creditor.getBalance())
}

// do some accounting again to trigger a second cheque
if err = debitorSwap.Add(int64(-DefaultPaymentThreshold), creditor.Peer); err != nil {
t.Fatal(err)
}

if debitorSwap.balances[creditor.ID()] != 0 {
t.Fatalf("Expected debitorSwap balance to be 0, but is %d", debitorSwap.balances[creditor.ID()])
if creditor.getBalance() != 0 {
t.Fatalf("Expected debitorSwap balance to be 0, but is %d", creditor.getBalance())
}
}

Expand All @@ -251,34 +254,33 @@ func TestTriggerDisconnectThreshold(t *testing.T) {

// create a dummy pper
cPeer := newDummyPeerWithSpec(Spec)
debitor := NewPeer(cPeer.Peer, creditorSwap, common.Address{}, common.Address{})
// set the debitor as peer into the creditor's swap
creditorSwap.peers[debitor.ID()] = debitor
debitor, err := creditorSwap.addPeer(cPeer.Peer, common.Address{}, common.Address{})
if err != nil {
t.Fatal(err)
}

// set the balance to manually be at DisconnectThreshold
overDraft := 42
expectedBalance := int64(DefaultDisconnectThreshold)
// we don't expect any change after the test
creditorSwap.balances[debitor.ID()] = expectedBalance
debitor.setBalance(expectedBalance)
// we also don't expect any cheques yet
lenCheques := len(creditorSwap.cheques)
if lenCheques != 0 {
t.Fatalf("Expected no cheques yet, but there are %d", lenCheques)
if debitor.getLastSentCheque() != nil {
t.Fatalf("Expected no cheques yet, but there is %v", debitor.getLastSentCheque())
}
// now do some accounting
err := creditorSwap.Add(int64(overDraft), debitor.Peer)
err = creditorSwap.Add(int64(overDraft), debitor.Peer)
// it should fail due to overdraft
if err == nil {
t.Fatal("Expected an error due to overdraft, but did not get any")
}
// no balance change expected
if creditorSwap.balances[debitor.ID()] != expectedBalance {
t.Fatalf("Expected balance to be %d, but is %d", expectedBalance, creditorSwap.balances[debitor.ID()])
if debitor.getBalance() != expectedBalance {
t.Fatalf("Expected balance to be %d, but is %d", expectedBalance, debitor.getBalance())
}
// still no cheques expected
lenCheques = len(creditorSwap.cheques)
if lenCheques != 0 {
t.Fatalf("Expected still no cheque, but there are %d", lenCheques)
if debitor.getLastSentCheque() != nil {
t.Fatalf("Expected still no cheques yet, but there is %v", debitor.getLastSentCheque())
}

// let's do the whole thing again (actually a bit silly, it's somehow simulating the peer would have been dropped)
Expand All @@ -287,12 +289,11 @@ func TestTriggerDisconnectThreshold(t *testing.T) {
t.Fatal("Expected an error due to overdraft, but did not get any")
}

if creditorSwap.balances[debitor.ID()] != expectedBalance {
t.Fatalf("Expected balance to be %d, but is %d", expectedBalance, creditorSwap.balances[debitor.ID()])
if debitor.getBalance() != expectedBalance {
t.Fatalf("Expected balance to be %d, but is %d", expectedBalance, debitor.getBalance())
}

lenCheques = len(creditorSwap.cheques)
if lenCheques != 0 {
t.Fatalf("Expected still no cheque, but there are %d", lenCheques)
if debitor.getLastSentCheque() != nil {
t.Fatalf("Expected no cheques yet, but there is %v", debitor.getLastSentCheque())
}
}
Loading