Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

P2p validate accounting #2051

Merged
merged 7 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions p2p/protocols/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Price struct {
// A protocol provides the message price in absolute value
// This method then returns the correct signed amount,
// depending on who pays, which is identified by the `payer` argument:
// `Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
// Sending will pass a `Sender` payer, receiving will pass the `Receiver` argument.
mortelli marked this conversation as resolved.
Show resolved Hide resolved
// Thus: If Sending and sender pays, amount negative, otherwise positive
// If Receiving, and receiver pays, amount negative, otherwise positive
func (p *Price) For(payer Payer, size uint32) int64 {
Expand All @@ -93,6 +93,10 @@ type Balance interface {
// positive amount = credit local node
// negative amount = debit local node
Add(amount int64, peer *Peer) error
// Check is a dry-run for the Add operation:
// As the accounting takes place **after** the actual send/receive operation happens,
// we want to make sure that that operation would not result in any problem
Check(amount int64, peer *Peer) error
}

// Accounting implements the Hook interface
Expand All @@ -118,44 +122,35 @@ func SetupAccountingMetrics(reportInterval time.Duration, path string) *Accounti
return NewAccountingMetrics(metrics.AccountingRegistry, reportInterval, path)
}

// Send takes a peer, a size and a msg and
// - calculates the cost for the local node sending a msg of size to peer querying the message for its price
// - credits/debits local node using balance interface
func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error {
// get the price for a message
var pricedMessage PricedMessage
var ok bool
// if the msg implements `Price`, it is an accounted message
if pricedMessage, ok = msg.(PricedMessage); !ok {
return nil
}
// evaluate the price for sending messages
costToLocalNode := pricedMessage.Price().For(Sender, size)
// Apply takes a peer, the signed cost for the local node and the msg size and credits/debits local node using balance interface
func (ah *Accounting) Apply(peer *Peer, costToLocalNode int64, size uint32) error {
// do the accounting
err := ah.Add(costToLocalNode, peer)
// record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err)
return err
}

// Receive takes a peer, a size and a msg and
// - calculates the cost for the local node receiving a msg of size from peer querying the message for its price
// - credits/debits local node using balance interface
func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error {
// Validate calculates the cost for the local node sending or receiving a msg to/from a peer querying the message for its price.
// It returns either the signed cost for the local node as int64 or an error, signaling that the accounting operation would fail
// (no change has been applied at this point)
func (ah *Accounting) Validate(peer *Peer, size uint32, msg interface{}, payer Payer) (int64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conceptually, i don't understand the difference between this function and Check, can you please elaborate on this?

this one isn't mentioned in the PR description but it seems to be new

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well Validate is the interface we need to call it from the p2p/protocols package, and does some common work like evaluating the cost for the local node and get the message price.

Check is then implemented by the swap package, and only really does the actual check to see if the operation would incur in some problems in the swap package.

mortelli marked this conversation as resolved.
Show resolved Hide resolved
// get the price for a message (by querying the message type via the PricedMessage interface)
var pricedMessage PricedMessage
var ok bool
// if the msg implements `Price`, it is an accounted message
if pricedMessage, ok = msg.(PricedMessage); !ok {
return nil
return 0, nil
}
// evaluate the price for receiving messages
costToLocalNode := pricedMessage.Price().For(Receiver, size)
// do the accounting
err := ah.Add(costToLocalNode, peer)
// record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err)
return err
costToLocalNode := pricedMessage.Price().For(payer, size)
// check that the operation would perform correctly
err := ah.Check(costToLocalNode, peer)
if err != nil {
// signal to caller that the operation would fail
return 0, err
}
return costToLocalNode, nil
}

// record some metrics
Expand Down
16 changes: 10 additions & 6 deletions p2p/protocols/accounting_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func TestAccountingSimulation(t *testing.T) {
}
defer os.RemoveAll(dir)
SetupAccountingMetrics(1*time.Second, filepath.Join(dir, "metrics.db"))
//define the node.Service for this test
services := adapters.Services{
"accounting": func(ctx *adapters.ServiceContext) (node.Service, error) {
return bal.newNode(), nil
Expand All @@ -82,8 +81,9 @@ func TestAccountingSimulation(t *testing.T) {
net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{DefaultService: "accounting"})
defer net.Shutdown()

// we send msgs messages per node, wait for all messages to arrive
bal.wg.Add(*nodes * *msgs)
// we send msgs messages per node, wait for all messages to have been accounted
// (Add runs for each send and receive, so we need double the amount)
bal.wg.Add(*nodes * *msgs * 2)
trigger := make(chan enode.ID)
go func() {
// wait for all of them to arrive
Expand Down Expand Up @@ -168,13 +168,14 @@ func newMatrix(n int) *matrix {
}

// called from the testBalance's Add accounting function: register balance change
func (m *matrix) add(i, j int, v int64) error {
func (m *matrix) add(i, j int, v int64, wg *sync.WaitGroup) error {
// index for the balance of local node i with remote nodde j is
// i * number of nodes + remote node
mi := i*m.n + j
// register that balance
m.lock.Lock()
m.m[mi] += v
wg.Done()
m.lock.Unlock()
return nil
}
Expand Down Expand Up @@ -229,13 +230,17 @@ type testNode struct {
peerCount int
}

func (t *testNode) Check(a int64, p *Peer) error {
return nil
}

// do the accounting for the peer's test protocol
// testNode implements protocols.Balance
func (t *testNode) Add(a int64, p *Peer) error {
//get the index for the remote peer
remote := t.bal.id2n[p.ID()]
log.Debug("add", "local", t.i, "remote", remote, "amount", a)
return t.bal.add(t.i, remote, a)
return t.bal.add(t.i, remote, a, t.bal.wg)
}

//run the p2p protocol
Expand All @@ -260,7 +265,6 @@ func (t *testNode) run(p *p2p.Peer, rw p2p.MsgReadWriter) error {

// p2p message receive handler function
func (tp *testPeer) handle(ctx context.Context, msg interface{}) error {
tp.wg.Done()
log.Debug("receive", "from", tp.remote, "to", tp.local, "type", reflect.TypeOf(msg), "msg", msg)
return nil
}
Expand Down
20 changes: 17 additions & 3 deletions p2p/protocols/accounting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (m *zeroPriceMsg) Price() *Price {
}
}

//dummy accounting implementation, only stores values for later check
func (d *dummyBalance) Check(amount int64, peer *Peer) error {
return nil
}

//dummy accounting implementation, only stores values for later check
func (d *dummyBalance) Add(amount int64, peer *Peer) error {
d.amount = amount
Expand Down Expand Up @@ -168,16 +173,25 @@ func TestBalance(t *testing.T) {
}

func checkAccountingTestCases(t *testing.T, cases []testCase, acc *Accounting, peer *Peer, balance *dummyBalance, send bool) {
t.Helper()
for _, c := range cases {
var err error
var expectedResult int64
var expectedResult, cost int64
//reset balance before every check
balance.amount = 0
if send {
err = acc.Send(peer, c.size, c.msg)
cost, err = acc.Validate(peer, c.size, c.msg, Sender)
Eknir marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatal(err)
}
err = acc.Apply(peer, cost, c.size)
expectedResult = c.sendResult
Eknir marked this conversation as resolved.
Show resolved Hide resolved
} else {
err = acc.Receive(peer, c.size, c.msg)
cost, err = acc.Validate(peer, c.size, c.msg, Receiver)
if err != nil {
t.Fatal(err)
}
err = acc.Apply(peer, cost, c.size)
expectedResult = c.recvResult
}

Expand Down
70 changes: 52 additions & 18 deletions p2p/protocols/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ func errorf(code int, format string, params ...interface{}) *Error {
//To access this functionality, we provide a Hook interface which will call accounting methods
//NOTE: there could be more such (horizontal) hooks in the future
type Hook interface {
//A hook for sending messages
Send(peer *Peer, size uint32, msg interface{}) error
//A hook for receiving messages
Receive(peer *Peer, size uint32, msg interface{}) error
// A hook for applying accounting
Apply(peer *Peer, costToLocalNode int64, size uint32) error
// Run some validation before applying accounting
Validate(peer *Peer, size uint32, msg interface{}, payer Payer) (int64, error)
}

// Spec is a protocol specification including its name and version as well as
Expand Down Expand Up @@ -203,6 +203,7 @@ type Peer struct {
spec *Spec
encode func(context.Context, interface{}) (interface{}, int, error)
decode func(p2p.Msg) (context.Context, []byte, error)
lock sync.Mutex
}

// NewPeer constructs a new peer
Expand Down Expand Up @@ -277,15 +278,31 @@ func (p *Peer) Send(ctx context.Context, msg interface{}) error {
}
size = len(r)
}
// if the accounting hook is set, call it

// if the accounting hook is set, do accounting logic
if p.spec.Hook != nil {
err = p.spec.Hook.Send(p, uint32(size), msg)

// let's lock, we want to avoid that after validating, a separate call might interfere
p.lock.Lock()
defer p.lock.Unlock()
// validate that this operation would succeed...
costToLocalNode, err := p.spec.Hook.Validate(p, uint32(size), wmsg, Sender)
holisticode marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// ...because if it would fail, we return and don't send the message
return err
}
// seems like accounting would succeed, thus send the message first...
err = p2p.Send(p.rw, code, wmsg)
if err != nil {
return err
}
// ...and finally apply (write) the accounting change
err = p.spec.Hook.Apply(p, costToLocalNode, uint32(size))
} else {
err = p2p.Send(p.rw, code, wmsg)
}

return p2p.Send(p.rw, code, wmsg)
return err
}

// handleIncoming(code)
Expand Down Expand Up @@ -322,23 +339,40 @@ func (p *Peer) handleIncoming(handle func(ctx context.Context, msg interface{})
return errorf(ErrDecode, "<= %v: %v", msg, err)
}

// if the accounting hook is set, call it
// if the accounting hook is set, do accounting logic
if p.spec.Hook != nil {
err := p.spec.Hook.Receive(p, uint32(len(msgBytes)), val)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a warning, a potential conflict with #2018, but ti should be easy to resolve, whichever PR gets merged first. @pradovic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, thanks for the heads-up!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up

p.lock.Lock()
defer p.lock.Unlock()

size := uint32(len(msgBytes))

// validate that the accounting call would succeed...
costToLocalNode, err := p.spec.Hook.Validate(p, size, val, Receiver)
if err != nil {
// ...because if it would fail, we return and don't handle the message
return err
}
}

// call the registered handler callbacks
// a registered callback take the decoded message as argument as an interface
// which the handler is supposed to cast to the appropriate type
// it is entirely safe not to check the cast in the handler since the handler is
// chosen based on the proper type in the first place
if err := handle(ctx, val); err != nil {
return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err)
// seems like accounting would be fine, so handle the message
if err := handle(ctx, val); err != nil {
ralph-pichler marked this conversation as resolved.
Show resolved Hide resolved
return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err)
}

// handling succeeded, finally apply accounting
err = p.spec.Hook.Apply(p, costToLocalNode, size)
} else {
// call the registered handler callbacks
// a registered callback take the decoded message as argument as an interface
// which the handler is supposed to cast to the appropriate type
// it is entirely safe not to check the cast in the handler since the handler is
// chosen based on the proper type in the first place
if err := handle(ctx, val); err != nil {
return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err)
}
}
return nil

return err
}

// Handshake negotiates a handshake on the peer connection
Expand Down
19 changes: 9 additions & 10 deletions p2p/protocols/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,26 +209,25 @@ type dummyMsg struct {
Content string
}

func (d *dummyHook) Send(peer *Peer, size uint32, msg interface{}) error {
func (d *dummyHook) Validate(peer *Peer, size uint32, msg interface{}, payer Payer) (int64, error) {
d.mu.Lock()
defer d.mu.Unlock()

d.peer = peer
d.size = size
d.msg = msg
d.send = true
return d.err
if payer == Sender {
d.send = true
} else {
d.send = false
d.waitC <- struct{}{}
}
return 0, d.err
}

func (d *dummyHook) Receive(peer *Peer, size uint32, msg interface{}) error {
func (d *dummyHook) Apply(peer *Peer, cost int64, size uint32) error {
d.mu.Lock()
defer d.mu.Unlock()

d.peer = peer
d.size = size
d.msg = msg
d.send = false
d.waitC <- struct{}{}
return d.err
}

Expand Down
37 changes: 31 additions & 6 deletions swap/swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func keyToID(key string, prefix string) enode.ID {
return enode.HexID(key[len(prefix):])
}

// createOwner assings keys and addresses
// createOwner assigns keys and addresses
func createOwner(prvkey *ecdsa.PrivateKey) *Owner {
pubkey := &prvkey.PublicKey
return &Owner{
Expand All @@ -314,20 +314,45 @@ func createOwner(prvkey *ecdsa.PrivateKey) *Owner {
}
}

// modifyBalanceOk checks that the amount would not result in crossing the disconnection threshold
func (s *Swap) modifyBalanceOk(amount int64, swapPeer *Peer) (err error) {
// check if balance with peer is over the disconnect threshold and if the message would increase the existing debt
balance := swapPeer.getBalance()
if balance >= s.params.DisconnectThreshold && amount > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you did not introduce this line, but it seems to me that here, we would need to test whether balance + amount < s.params.DisconnectTreshold. If not, the following situation would be allowed:
balance = 499, disconnectThreshold = 500, amount = 1000

But I think we should, in this case, we should disconnect whenever we receive a single message with a positive amount.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be sure, it would be great to see where and by whom this line was introduced.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was introduced by @mortelli with the idea to always allow debt-reducing messages. Let him comment on this; in any case, if this would have to change, it is an important change and then it should be done in a separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vaguely remember this... This comment is not a blocker for me, but would like to get reassurance from @mortelli :)

Copy link
Contributor

@mortelli mortelli Jan 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, i introduced this change through PR #1922.

the only difference in checking amount instead of balance + amount is one message.

in the first case, one message will go over the threshold and then stop the flow; in the second, the threshold will never be reached or surpassed, but there might be a bigger difference between the threshold and the final balance before subsequent messages are stopped.

while correct, the situation you mentioned is unlikely to come up: if a message is priced at 1000, we should never have set a threshold at 500.

assuming then, that amount is comparatively small in regards to the threshold, the impact of having this little bit of extra slack should be minimal.

return fmt.Errorf("balance for peer %s is over the disconnect threshold %d and cannot incur more debt, disconnecting", swapPeer.ID().String(), s.params.DisconnectThreshold)
}

return nil
}

// Check is called as a *dry run* before applying the actual accounting to an operation.
// It only checks that performing a given accounting operation would not incur in an error.
// If it returns no error, this signals to the caller that the operation is safe
func (s *Swap) Check(amount int64, peer *protocols.Peer) (err error) {
swapPeer := s.getPeer(peer.ID())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do s.getPeer(peer.ID()) here and you also do it in the function checkBalance (swap:319)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because getPeer is under a lock.

getBalance is called from different places (also from Add) and thus needs to be called independently

Copy link
Contributor

@Eknir Eknir Jan 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand then why you would call it at this place at all. Seemingly all you are doing is verifying whether the peer is a swapPeer, but this is also the first thing you are doing in checkBalance.
I think that this function can be improved by not doing a call to getPeer at all, or by modifying checkBalance to only accept a swapPeer and then pass the swapPeer to checkBalance.
Let me know if I see it correctly!

Copy link
Contributor Author

@holisticode holisticode Jan 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to explain before but I'll try again, a bit more explicit.

There are two things here:

  • Check is called from the p2p/protocols package, which does not know the notion of a Swap peer, thus, Check needs to have a protocols.Peer in its signature - hence, we need a conversion
  • checkBalance is also called from Add, which also for the same reason has a protocols.Peer in its signature, thus also needs a conversion
  • the swapPeer is also used elsewhere in the Add function

checkBalance is called independently, from two places.

The other thing is that we are locking the swapPeer during these operations. The lock is on the swapPeer, that is why we need it,

Your optimization suggestions would be more stringent if we can remove the lock from swapPeer in this case as we are already locking the protocols.Peer in the protocols package, as @mortelli also noted. However, I am not 100% sure we can omit that lock, it is the same network remote peer but two different objects (protocols vs swap). I don't feel safe doing this at the moment, which is why I suggest keeping it as-is, but it is a valid suggestion to revisit this part and analyze if the lock can be omitted, in which case we could probably optimize those methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However your second suggestion is good, I changed checkBalance to only accept a swap Peer

if swapPeer == nil {
return fmt.Errorf("peer %s not a swap enabled peer", peer.ID().String())
}

swapPeer.lock.Lock()
defer swapPeer.lock.Unlock()
// currently this is the only real check needed:
return s.modifyBalanceOk(amount, swapPeer)
}

// Add is the (sole) accounting function
// Swap implements the protocols.Balance interface
func (s *Swap) Add(amount int64, peer *protocols.Peer) (err error) {
swapPeer := s.getPeer(peer.ID())
if swapPeer == nil {
return fmt.Errorf("peer %s not a swap enabled peer", peer.ID().String())
}

swapPeer.lock.Lock()
defer swapPeer.lock.Unlock()

// check if balance with peer is over the disconnect threshold and if the message would increase the existing debt
balance := swapPeer.getBalance()
if balance >= s.params.DisconnectThreshold && amount > 0 {
return fmt.Errorf("balance for peer %s is over the disconnect threshold %d and cannot incur more debt, disconnecting", peer.ID().String(), s.params.DisconnectThreshold)
// we should probably check here again:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? With the peer under lock the balance couldn't have changed since the call to Check, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. However, just the nature of the two being separate functions could lead to the situation that Add be called in a different workflow. For example, I do not recall if all tests now do this Validate-Apply workflow - some tests might be calling directly Add (I would actually assume so).

Even if the probability is very low, it is correct to check again. However, I am fine removing the check if a majority thinks it is superfluous.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I don't really like this double-check approach. From just looking at it, it looks as if you are doing something redundantly and it is probably also not water-tight.

A different solution could be that the Check function would leave a trace in Swap that a certain amount has been checked and Add only executes on a validated amount. For example, we could create a mapping where each amount maps to a bool (Checked)), to be set to true during the function Check and toggled again after Add has executed.

Might be too much engineering though. Let me know your thoughts!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the additional check, and adding a comment that it is the responsibility of the calling function to check the balance would be more than enough, I think though!

if err = s.modifyBalanceOk(amount, swapPeer); err != nil {
return err
}

if err = swapPeer.updateBalance(amount); err != nil {
Expand Down