Skip to content

Commit

Permalink
Merge f13d976 into 643eda8
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilthoniel committed May 1, 2019
2 parents 643eda8 + f13d976 commit 09dbd3f
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 17 deletions.
21 changes: 20 additions & 1 deletion network/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type Router struct {
connectionErrorHandlers []func(*ServerIdentity)

// keep bandwidth of closed connections
traffic counterSafe
traffic counterSafe
msgTraffic counterSafe
// If paused is not nil, then handleConn will stop processing. When unpaused
// it will break the connection. This is for testing node failure cases.
paused chan bool
Expand Down Expand Up @@ -171,6 +172,9 @@ func (r *Router) Send(e *ServerIdentity, msg Message) (uint64, error) {
return 0, errors.New("Can't send nil-packet")
}

// Update the message counter with the new message about to be sent.
r.msgTraffic.updateTx(1)

// If sending to ourself, directly dispatch it
if e.ID.Equal(r.ServerIdentity.ID) {
log.Lvlf4("Sending to ourself (%s) msg: %+v", e, msg)
Expand Down Expand Up @@ -337,6 +341,9 @@ func (r *Router) handleConn(remote *ServerIdentity, c Conn) {

packet.ServerIdentity = remote

// Update the message counter with the new message about to be processed.
r.msgTraffic.updateRx(1)

if err := r.Dispatch(packet); err != nil {
log.Lvl3("Error dispatching:", err)
}
Expand Down Expand Up @@ -423,6 +430,18 @@ func (r *Router) Rx() uint64 {
return rx
}

// MsgTx implements monitor/CounterIO.
// It returns the number of messages transmitted by the interface.
func (r *Router) MsgTx() uint64 {
return r.msgTraffic.Tx()
}

// MsgRx implements monitor/CounterIO.
// It returns the number of messages received by the interface.
func (r *Router) MsgRx() uint64 {
return r.msgTraffic.Rx()
}

// Listening returns true if this router is started.
func (r *Router) Listening() bool {
return r.host.Listening()
Expand Down
4 changes: 4 additions & 0 deletions network/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ func TestRouterRxTx(t *testing.T) {
return len(router1.connections[si2]) == 0
})
require.Equal(t, rx, router1.Rx())
require.Equal(t, uint64(1), router1.MsgRx())
require.Equal(t, uint64(0), router1.MsgTx())
require.Equal(t, uint64(1), router2.MsgTx())
require.Equal(t, uint64(0), router2.MsgRx())
defer router1.Stop()
}

Expand Down
43 changes: 30 additions & 13 deletions simul/monitor/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,32 +141,40 @@ func (tm *TimeMeasure) reset() {
// CounterIO is an interface that can be used to count how many bytes does an
// object have written and how many bytes does it have read. For example it is
// implemented by cothority/network/ Conn + Host to know how many bytes a
// connection / Host has written /read
// connection / Host has written /read.
type CounterIO interface {
// Rx returns the number of bytes read by this interface
// Rx returns the number of bytes read by this interface.
Rx() uint64
// Tx returns the number of bytes transmitted / written by this interface
// Tx returns the number of bytes transmitted / written by this interface.
Tx() uint64
// MsgRx returns the number of messages read by this interface.
MsgRx() uint64
// MsgTx returns the number of messages transmitted / written by this interface.
MsgTx() uint64
}

// CounterIOMeasure is a struct that takes a CounterIO and can send the
// measurements to the monitor. Each time Record() is called, the measurements
// are put back to 0 (while the CounterIO still sends increased bytes number).
type CounterIOMeasure struct {
name string
counter CounterIO
baseTx uint64
baseRx uint64
name string
counter CounterIO
baseTx uint64
baseRx uint64
baseMsgTx uint64
baseMsgRx uint64
}

// NewCounterIOMeasure returns an CounterIOMeasure fresh. The base value are set
// to the current value of counter.Rx() and counter.Tx()
// to the current value of counter.Rx() and counter.Tx().
func NewCounterIOMeasure(name string, counter CounterIO) *CounterIOMeasure {
return &CounterIOMeasure{
name: name,
counter: counter,
baseTx: counter.Tx(),
baseRx: counter.Rx(),
name: name,
counter: counter,
baseTx: counter.Tx(),
baseRx: counter.Rx(),
baseMsgTx: counter.MsgTx(),
baseMsgRx: counter.MsgRx(),
}
}

Expand All @@ -182,13 +190,22 @@ func (cm *CounterIOMeasure) Record() {
bTx := cm.counter.Tx()
written := newSingleMeasure(cm.name+"_tx", float64(bTx-cm.baseTx))

// send them both
bMsgRx := cm.counter.MsgRx()
readMsg := newSingleMeasure(cm.name+"_msg_rx", float64(bMsgRx-cm.baseMsgRx))
bMsgTx := cm.counter.MsgTx()
writtenMsg := newSingleMeasure(cm.name+"_msg_tx", float64(bMsgTx-cm.baseMsgTx))

// send them
read.Record()
written.Record()
readMsg.Record()
writtenMsg.Record()

// reset counters
cm.baseRx = bRx
cm.baseTx = bTx
cm.baseMsgRx = bMsgRx
cm.baseMsgTx = bMsgTx
}

// Send transmits the given struct over the network.
Expand Down
25 changes: 22 additions & 3 deletions simul/monitor/measure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
)

type DummyCounterIO struct {
rvalue uint64
wvalue uint64
rvalue uint64
wvalue uint64
msgrvalue uint64
msgwvalue uint64
}

func (dm *DummyCounterIO) Rx() uint64 {
Expand All @@ -21,9 +23,19 @@ func (dm *DummyCounterIO) Tx() uint64 {
return dm.wvalue
}

func (dm *DummyCounterIO) MsgRx() uint64 {
dm.msgrvalue++
return dm.msgrvalue
}

func (dm *DummyCounterIO) MsgTx() uint64 {
dm.msgwvalue++
return dm.msgwvalue
}

func TestCounterIOMeasureRecord(t *testing.T) {
mon, _ := setupMonitor(t)
dm := &DummyCounterIO{0, 0}
dm := &DummyCounterIO{0, 0, 0, 0}
// create the counter measure
cm := NewCounterIOMeasure("dummy", dm)
if cm.baseRx != dm.rvalue || cm.baseTx != dm.wvalue {
Expand Down Expand Up @@ -55,6 +67,13 @@ func TestCounterIOMeasureRecord(t *testing.T) {
if re == nil || re.Avg() != 10 {
t.Fatal("Stats doesn't have the right value (read)")
}
mwr, mre := stat.Value("dummy_msg_tx"), stat.Value("dummy_msg_rx")
if mwr == nil || mwr.Avg() != 1 {
t.Fatal("Stats doesn't have the right value (msg written)")
}
if mre == nil || mre.Avg() != 1 {
t.Fatal("Stats doesn't have the right value (msg read)")
}
EndAndCleanup()
time.Sleep(100 * time.Millisecond)
}

0 comments on commit 09dbd3f

Please sign in to comment.