-
Notifications
You must be signed in to change notification settings - Fork 112
Conversation
… Now the size of the outbox and the number of workers are the same
pss/pubsub.go
Outdated
@@ -65,6 +68,8 @@ func (p *PubSub) Register(topic string, prox bool, handler func(msg []byte, p *p | |||
|
|||
// Send sends a message using pss SendRaw | |||
func (p *PubSub) Send(to []byte, topic string, msg []byte) error { | |||
metrics.GetOrRegisterCounter("pss.pubsub.send.num", nil).Inc(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to include a counter
where you have a timer
- the timer
data structure already includes a counter, that would be pss.pubsub.send.count
in this example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! that's nice. I was wondering why the framework failed when trying to register a counter with the same name. I will change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, when you use the same name, then they collide, and it panics if I remember correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, not exactly, the value is pss.pubsub.send.span
and the select
field(count).
pss/pss.go
Outdated
@@ -740,6 +741,7 @@ func sendMsg(p *Pss, sp *network.Peer, msg *message.Message) bool { | |||
//// successfully forwarded to at least one peer. | |||
func (p *Pss) forward(msg *message.Message) error { | |||
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) | |||
defer metrics.GetOrRegisterResettingTimer("pss.forward.time", nil).UpdateSince(time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the counter from the line above please, as there is already a counter part of the timer.
pss/outbox/outbox_whitebox_test.go
Outdated
|
||
const blockTimeout = 100 * time.Millisecond | ||
|
||
func expectNotTimeout(completionC chan struct{}, t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think test helpers generally have t *testing.T
as a first argument, not as a last. I find this construct a bit weird when t
is the last argument.
pss/outbox/outbox_whitebox_test.go
Outdated
func expectTimeout(completionC chan struct{}, t *testing.T) { | ||
select { | ||
case <-completionC: | ||
t.Fatalf("epxected blocking enqueue") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo - expected
.
pss/pss.go
Outdated
@@ -441,6 +439,7 @@ func (p *Pss) handle(ctx context.Context, peer *protocols.Peer, msg interface{}) | |||
// If yes, it CAN be for us, and we process it | |||
// Only passes error to pss protocol handler if payload is not valid pssmsg | |||
func (p *Pss) handlePssMsg(ctx context.Context, pssmsg *message.Message) error { | |||
metrics.GetOrRegisterCounter("pss.handle.num", nil).Inc(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need of a counter here, as we have a timer, which includes a counter.
pss/pss.go
Outdated
@@ -495,6 +492,7 @@ func (p *Pss) handlePssMsg(ctx context.Context, pssmsg *message.Message) error { | |||
// Attempts symmetric and asymmetric decryption with stored keys. | |||
// Dispatches message to all handlers matching the message topic | |||
func (p *Pss) process(pssmsg *message.Message, raw bool, prox bool) error { | |||
metrics.GetOrRegisterCounter("pss.process.num", nil).Inc(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need of a counter here, as we have a timer.
pss/pss.go
Outdated
@@ -589,18 +587,19 @@ func (p *Pss) isSelfPossibleRecipient(msg *message.Message, prox bool) bool { | |||
// SECTION: Message sending | |||
///////////////////////////////////////////////////////////////////// | |||
|
|||
func (p *Pss) enqueue(msg *message.Message) error { | |||
func (p *Pss) enqueue(msg *message.Message) { | |||
metrics.GetOrRegisterCounter("pss.enqueue.num", nil).Inc(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need of a counter.
pss/pss.go
Outdated
@@ -724,7 +725,7 @@ func sendMsg(p *Pss, sp *network.Peer, msg *message.Message) bool { | |||
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1) | |||
log.Error(err.Error()) | |||
} | |||
|
|||
metrics.GetOrRegisterCounter("pss.pp.send", nil).Inc(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a timer for the whole function would provide more visibility, than just a counter after the pp.Send has returned.
pss/pubsub.go
Outdated
pt := message.NewTopic([]byte(topic)) | ||
return p.pss.SendRaw(PssAddress(to), pt, msg) | ||
return p.pss.SendRaw(PssAddress(to), pt, msg, 20*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we must include TTL for PSS, this should be configurable for any given PubSub
, rather than hidden as it currently is. Please at least extract in a documented variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I was not aware this Pubsub was used for other use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't used currently for anything else, but this hardcoded value is rather hidden in its implementation. It'd be better if it was a var
at the top of the package, or a config when you create a PubSub value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have initialized PubSub in swarm.go with 20 seconds, maybe this value (and also outbox length should be configurable using flags, but I am not sure how to do it or if it makes sense do it now)
pss/outbox/outbox.go
Outdated
outbox := &Outbox{ | ||
forwardFunc: config.Forward, | ||
queue: make([]*outboxMsg, config.NumberSlots), | ||
slots: make(chan int, config.NumberSlots), | ||
process: make(chan int), | ||
numWorkers: config.NumWorkers, | ||
numWorkers: config.NumberSlots, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we need this anymore, we could just use len(queue)
(or cap(queue)
) - also the naming is confusing - is it slots or workers? I thought we are removing workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Workers stay, what we remove is to have a huge queue bigger than workers, now we have one slot / one worker.
I will remove the numWorkers field and use cap(queue).
I added a few comments, mostly cosmetics, overall the PR looks good and I think it is an improvement, because makes using PSS easier, as we no longer have to handle errors when the outbox is full. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice. It would be good to choose only one term between worker and slot.
pss/outbox/outbox.go
Outdated
// first we try to obtain a slot in the outbox. | ||
slot := <-o.slots |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a select with return on o.stopC. It is a potential deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
pss/pss.go
Outdated
@@ -495,6 +492,7 @@ func (p *Pss) handlePssMsg(ctx context.Context, pssmsg *message.Message) error { | |||
// Attempts symmetric and asymmetric decryption with stored keys. | |||
// Dispatches message to all handlers matching the message topic | |||
func (p *Pss) process(pssmsg *message.Message, raw bool, prox bool) error { | |||
metrics.GetOrRegisterCounter("pss.process.num", nil).Inc(1) | |||
defer metrics.GetOrRegisterResettingTimer("pss.process", nil).UpdateSince(time.Now()) | |||
|
|||
var err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error variable is no longer needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is needed but I have moved it closer to where is declared and return nil at the end of the function.
…covered by timers
Slots and workers are different things, even though the current implementation has one worker per slot. An slot is an space in a local memory so a message is stored until it is forwarded. It can be retried several times but the message always keep the same slot. |
pss/outbox/outbox_test.go
Outdated
@@ -168,6 +170,24 @@ func newTestMessage(num byte) *message.Message { | |||
} | |||
} | |||
|
|||
const blockTimeout = 100 * time.Millisecond | |||
|
|||
func expectNotTimeout(completionC chan struct{}, t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move t
as a first argument, please.
pss/outbox/outbox_test.go
Outdated
} | ||
} | ||
|
||
func expectTimeout(completionC chan struct{}, t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move t
as a first argument, please.
pss/pss.go
Outdated
@@ -739,7 +737,7 @@ func sendMsg(p *Pss, sp *network.Peer, msg *message.Message) bool { | |||
//// forwarding fails, the node should try to forward it to the next best peer, until the message is | |||
//// successfully forwarded to at least one peer. | |||
func (p *Pss) forward(msg *message.Message) error { | |||
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1) | |||
defer metrics.GetOrRegisterResettingTimer("pss.forward.time", nil).UpdateSince(time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to add a time
suffix to metrics names. This will be emitted as pss.forward.span
and pss.forward.count
by the reporter - the time part is implicit when you register a resetting timer.
pss/pss.go
Outdated
} | ||
|
||
// Send a raw message (any encryption is responsibility of calling client) | ||
// | ||
// Will fail if raw messages are disallowed | ||
func (p *Pss) SendRaw(address PssAddress, topic message.Topic, msg []byte) error { | ||
func (p *Pss) SendRaw(address PssAddress, topic message.Topic, msg []byte, messageTtl time.Duration) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
messageTtl
-> messageTTL
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few more nitpicks, please fix if you agree. Overall LGTM.
I renamed the PR so that when you merge it actually includes the package that has been changed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks.
@kortatu tests seem to be failing on PSS. |
@kortatu once you fix the tests, could you piggy-back and remove of the
It is spammy out log infrastructure and doesn't provide much valuable input - it is always the bootnode. Thanks! |
Agree, in fact we removed it temporary from the code during tests to allow checking the logs better ;-) |
Well, it is not such a big deal to have in stats one removed peer, it is just more a cleanup thing. |
I am sorry for posting this question to a wrong PR, this is the intended comment #1774 (comment). I think that it is possible situation and that at some point someone would need to address the problem of invalid state. |
This PR tries to improve the performance of PSS under the push-sync use case. It has been tested with the integration tests and several minor fixes has been applied:
A new dashboard could be added to Grafana. I have an example combining PSS and pushsync metrics in epiclabs repo
Because Enqueue doesn't return an error anymore, the tests have been complicated to check for blocking/non-blocking of the Enqueue method.