Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

pss: Limit by time the retries of messages in the outbox #1861

Merged
merged 4 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions pss/outbox/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,3 @@ type outboxMsg struct {
msg *message.Message
startedAt time.Time
}

// NewOutboxMessage creates a new outbox message wrapping a pss message.
func NewOutboxMessage(msg *message.Message) *outboxMsg {
return &outboxMsg{
msg: msg,
startedAt: time.Now(),
}
}
73 changes: 53 additions & 20 deletions pss/outbox/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,33 @@ package outbox

import (
"errors"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/pss/message"
"github.com/tilinna/clock"
)

// Config contains the Outbox configuration.
type Config struct {
NumberSlots int // number of slots for messages in Outbox.
NumWorkers int // number of parallel goroutines forwarding messages.
Forward forwardFunction // function that executes the actual forwarding.
NumberSlots int // number of slots for messages in Outbox.
NumWorkers int // number of parallel goroutines forwarding messages.
Forward forwardFunction // function that executes the actual forwarding.
MaxRetryTime *time.Duration // max time a message will be retried in the outbox.
Clock clock.Clock // clock dependency to calculate elapsed time.
}

// Outbox will be in charge of forwarding messages. These will be enqueued and retry until successfully forwarded.
type Outbox struct {
forwardFunc forwardFunction
queue []*outboxMsg
slots chan int
process chan int
numWorkers int
stopC chan struct{}
forwardFunc forwardFunction
queue []*outboxMsg
slots chan int
process chan int
numWorkers int
stopC chan struct{}
maxRetryTime time.Duration
clock clock.Clock
}

type forwardFunction func(msg *message.Message) error
Expand All @@ -46,19 +52,28 @@ type forwardFunction func(msg *message.Message) error
var ErrOutboxFull = errors.New("outbox full")

const defaultOutboxWorkers = 100
const defaultMaxRetryTime = 10 * time.Minute

// NewOutbox creates a new Outbox. Config must be provided. IF NumWorkers is not providers, default will be used.
func NewOutbox(config *Config) *Outbox {
if config.NumWorkers == 0 {
config.NumWorkers = defaultOutboxWorkers
}
outbox := &Outbox{
forwardFunc: config.Forward,
queue: make([]*outboxMsg, config.NumberSlots),
slots: make(chan int, config.NumberSlots),
process: make(chan int),
numWorkers: config.NumWorkers,
stopC: make(chan struct{}),
forwardFunc: config.Forward,
queue: make([]*outboxMsg, config.NumberSlots),
slots: make(chan int, config.NumberSlots),
process: make(chan int),
numWorkers: config.NumWorkers,
stopC: make(chan struct{}),
maxRetryTime: defaultMaxRetryTime,
clock: clock.Realtime(),
}
if config.MaxRetryTime != nil {
outbox.maxRetryTime = *config.MaxRetryTime
}
if config.Clock != nil {
outbox.clock = config.Clock
}
// fill up outbox slots
for i := 0; i < cap(outbox.slots); i++ {
Expand Down Expand Up @@ -86,7 +101,7 @@ func (o *Outbox) Enqueue(outboxMsg *outboxMsg) error {
select {
case slot := <-o.slots:
o.queue[slot] = outboxMsg
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.Len()))
// we send this message slot to process.
select {
case <-o.stopC:
Expand All @@ -104,6 +119,14 @@ func (o *Outbox) SetForward(forwardFunc forwardFunction) {
o.forwardFunc = forwardFunc
}

// NewOutboxMessage creates a new outbox message wrapping a pss message and set the startedTime using the clock
func (o *Outbox) NewOutboxMessage(msg *message.Message) *outboxMsg {
return &outboxMsg{
msg: msg,
startedAt: o.clock.Now(),
}
}

// ProcessOutbox starts a routine that tries to forward messages present in the outbox queue.
func (o *Outbox) processOutbox() {
workerLimitC := make(chan struct{}, o.numWorkers)
Expand All @@ -112,23 +135,33 @@ func (o *Outbox) processOutbox() {
case <-o.stopC:
return
case slot := <-o.process:
log.Debug("Processing, taking worker", "workerLimit size", len(workerLimitC), "numWorkers", o.numWorkers)
workerLimitC <- struct{}{}
go func(slot int) {
//Free worker space
defer func() { <-workerLimitC }()
msg := o.queue[slot]
metrics.GetOrRegisterResettingTimer("pss.handle.outbox", nil).UpdateSince(msg.startedAt)
if err := o.forwardFunc(msg.msg); err != nil {
metrics.GetOrRegisterCounter("pss.forward.err", nil).Inc(1)
log.Debug(err.Error())
limit := msg.startedAt.Add(o.maxRetryTime)
now := o.clock.Now()
if now.After(limit) {
metrics.GetOrRegisterCounter("pss.forward.expired", nil).Inc(1)
log.Warn("Message expired, won't be requeued", "limit", limit, "now", now)
o.free(slot)
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.Len()))
return
}
// requeue the message for processing
o.requeue(slot)
log.Debug("Message requeued", "slot", slot)
return
}
//message processed, free the outbox slot
o.free(slot)
//Free worker space
<-workerLimitC
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.len()))
metrics.GetOrRegisterGauge("pss.outbox.len", nil).Update(int64(o.Len()))
}(slot)
}
}
Expand All @@ -148,6 +181,6 @@ func (o *Outbox) requeue(slot int) {
case o.process <- slot:
}
}
func (o *Outbox) len() int {
func (o *Outbox) Len() int {
return cap(o.slots) - len(o.slots)
}
86 changes: 78 additions & 8 deletions pss/outbox/outbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"testing"
"time"

"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/pss/message"
"github.com/ethersphere/swarm/pss/outbox"
"github.com/tilinna/clock"
)

const timeout = 2 * time.Second
Expand Down Expand Up @@ -57,6 +59,14 @@ func TestOutbox(t *testing.T) {
testOutbox.Start()
defer testOutbox.Stop()

testOutboxMessage := testOutbox.NewOutboxMessage(&message.Message{
To: nil,
Flags: message.Flags{},
Expire: 0,
Topic: message.Topic{},
Payload: nil,
})

err := testOutbox.Enqueue(testOutboxMessage)
if err != nil {
t.Fatalf("unexpected error enqueueing, %v", err)
Expand Down Expand Up @@ -129,7 +139,7 @@ func TestOutboxWorkers(t *testing.T) {
wg.Add(numMessages)
for i := 0; i < numMessages; i++ {
go func(num byte) {
testOutbox.Enqueue(outbox.NewOutboxMessage(newTestMessage(num)))
testOutbox.Enqueue(testOutbox.NewOutboxMessage(newTestMessage(num)))
}(byte(i))
}

Expand Down Expand Up @@ -158,10 +168,70 @@ func newTestMessage(num byte) *message.Message {
}
}

var testOutboxMessage = outbox.NewOutboxMessage(&message.Message{
To: nil,
Flags: message.Flags{},
Expire: 0,
Topic: message.Topic{},
Payload: nil,
})
func TestMessageRetriesExpired(t *testing.T) {
failForwardFunction := func(msg *message.Message) error {
return errors.New("forward error")
}

// We are going to simulate that 5 minutes has passed with a mock Clock
duration := 5 * time.Minute
now := time.Now()
mockClock := clock.NewMock(now)
testOutbox := outbox.NewMock(&outbox.Config{
NumberSlots: 1,
Forward: failForwardFunction,
MaxRetryTime: &duration,
Clock: mockClock,
})

testOutbox.Start()
defer testOutbox.Stop()

msg := testOutbox.NewOutboxMessage(&message.Message{
To: nil,
Flags: message.Flags{},
Expire: 0,
Topic: message.Topic{},
Payload: nil,
})

err := testOutbox.Enqueue(msg)
if err != nil {
t.Errorf("Expected no error enqueueing, instead got %v", err)
}

numMessages := testOutbox.Len()
if numMessages != 1 {
t.Errorf("Expected one message in outbox, instead got %v", numMessages)
}

mockClock.Set(now.Add(duration / 2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you not need the same retry as below to verify here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, bellow I'm waiting for something to happen, in this case I should wait for something not to happen. It is hard to do that because in the case everything works fine I should wait the whole time, delaying the tests execution. In the last case, even though there are 10 iterations waiting, in the normal case, it will take only 10 milliseconds to finish.
What I can do is wait a couple of iterations.

numMessages = testOutbox.Len()
// Now we wait a bit expecting that the number of messages doesn't change
iterations := 0
for numMessages == 1 && iterations < 2 {
// Wait a bit more to check that the message has not been expired.
time.Sleep(10 * time.Millisecond)
iterations++
numMessages = testOutbox.Len()
}
if numMessages != 1 {
t.Errorf("Expected one message in outbox after half maxRetryTime, instead got %v", numMessages)
}

mockClock.Set(now.Add(duration + 1*time.Millisecond))
numMessages = testOutbox.Len()
// Now we wait for the process routine to retry and expire message at least 10 iterations
iterations = 0
for numMessages != 0 && iterations < 10 {
// Still not expired, wait a bit more
log.Debug("Still not there, waiting another iteration", numMessages, iterations)
time.Sleep(10 * time.Millisecond)
iterations++
numMessages = testOutbox.Len()
}
if numMessages != 0 {
t.Errorf("Expected 0 message in outbox after expired message, instead got %v", numMessages)
}

}
16 changes: 8 additions & 8 deletions pss/outbox/outbox_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func TestFullOutbox(t *testing.T) {
testOutbox.Start()
defer testOutbox.Stop()

testOutboxMessage := testOutbox.NewOutboxMessage(&message.Message{
To: nil,
Flags: message.Flags{},
Expire: 0,
Topic: message.Topic{},
Payload: nil,
})

err := testOutbox.Enqueue(testOutboxMessage)
if err != nil {
t.Fatalf("unexpected error enqueueing, %v", err)
Expand All @@ -64,11 +72,3 @@ func TestFullOutbox(t *testing.T) {
t.Fatalf("timeout waiting for a free slot")
}
}

var testOutboxMessage = NewOutboxMessage(&message.Message{
To: nil,
Flags: message.Flags{},
Expire: 0,
Topic: message.Topic{},
Payload: nil,
})
3 changes: 2 additions & 1 deletion pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ func (p *Pss) isSelfPossibleRecipient(msg *message.Message, prox bool) bool {
func (p *Pss) enqueue(msg *message.Message) error {
defer metrics.GetOrRegisterResettingTimer("pss.enqueue", nil).UpdateSince(time.Now())

outboxMsg := outbox.NewOutboxMessage(msg)
// TODO: create and enqueue in one outbox method
outboxMsg := p.outbox.NewOutboxMessage(msg)
return p.outbox.Enqueue(outboxMsg)
}

Expand Down