Skip to content

Commit

Permalink
feat(core): add new way of retry
Browse files Browse the repository at this point in the history
Signed-off-by: Sacha <sfroment42@gmail.com>
  • Loading branch information
sfroment committed Apr 19, 2019
1 parent fb8967c commit d9bdc06
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 9 deletions.
19 changes: 19 additions & 0 deletions core/entity/event.go
Expand Up @@ -182,6 +182,25 @@ func FindDevicesWithNonAcknowledgedEvents(db *gorm.DB, before time.Time) ([]stri
return deviceIDs, nil
}

func FindDispatchesWithNonAcknowledgedEvents(db *gorm.DB, before time.Time) ([]*EventDispatch, error) {
var dispatches []*EventDispatch

err := db.
Model(&EventDispatch{}).
Joins("JOIN event ON event_dispatch.event_id = event.id").
Where("event.direction = ?", Event_Outgoing).
Where("event_dispatch.acked_at IS NULL").
Where("event_dispatch.sent_at > ?", before).
Find(&dispatches).
Error

if err != nil {
return nil, errorcodes.ErrDb.Wrap(err)
}

return dispatches, nil
}

func FindNonAcknowledgedDispatchesForDestination(db *gorm.DB, deviceID string) ([]*EventDispatch, error) {
var dispatches []*EventDispatch

Expand Down
90 changes: 84 additions & 6 deletions core/node/mainloop.go
Expand Up @@ -2,6 +2,7 @@ package node

import (
"context"
"math/rand"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -14,6 +15,12 @@ import (
"berty.tech/core/sql"
)

var rgenerator *rand.Rand

func init() {
rgenerator = rand.New(rand.NewSource(time.Now().UnixNano()))
}

// EventsRetry updates SentAt and requeue an event
func (n *Node) EventRequeue(ctx context.Context, event *entity.Event) error {
tracer := tracing.EnterFunc(ctx, event)
Expand Down Expand Up @@ -43,8 +50,21 @@ func (n *Node) EventDispatchRequeue(ctx context.Context, dispatch *entity.EventD
return nil
}

// EventsRetry sends events which lack an AckedAt value emitted before the supplied time value
func (n *Node) EventsRetry(ctx context.Context, before time.Time) ([]*entity.Event, error) {
func getRetriableEvents(dispatches []*entity.EventDispatch) []*entity.EventDispatch {
ret := make([]*entity.EventDispatch, 0)
now := time.Now()
for _, dispatch := range dispatches {
eventTime := dispatch.SentAt.Add(time.Duration(dispatch.RetryBackoff) * time.Millisecond)
if eventTime.Before(now) {
ret = append(ret, dispatch)
}
}

return ret
}

// OldEventsRetry sends events which lack an AckedAt value emitted before the supplied time value
func (n *Node) OldEventsRetry(ctx context.Context, before time.Time) ([]*entity.Event, error) {
tracer := tracing.EnterFunc(ctx, before)
defer tracer.Finish()
ctx = tracer.Context()
Expand Down Expand Up @@ -92,18 +112,67 @@ func (n *Node) EventsRetry(ctx context.Context, before time.Time) ([]*entity.Eve
return retriedEvents, nil
}

// EventsRetry sends events which lack an AckedAt value emitted sent -24h
func (n *Node) EventsRetry(ctx context.Context) (time.Duration, []*entity.Event, error) {
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()
ctx = tracer.Context()
db := n.sql(ctx)

retriedEventsMap := map[string]*entity.Event{}
var retriedEvents []*entity.Event

dispatches, err := entity.FindDispatchesWithNonAcknowledgedEvents(db, time.Now().Add(-time.Hour*24))
if err != nil {
return 20000, nil, err
}

dispatches = getRetriableEvents(dispatches)
var ret int64
for i, dispatch := range dispatches {
if i == 0 {
ret = dispatch.RetryBackoff
}

if err := n.EventDispatchRequeue(ctx, dispatch); err != nil {
n.LogBackgroundError(ctx, errors.Wrap(err, "error while enqueuing event"))
continue
}
event, err := sql.EventByID(db, dispatch.EventID)
if err != nil {
n.LogBackgroundError(ctx, errors.Wrap(err, "error while getting event detail"))
continue
}

if dispatch.RetryBackoff < ret {
ret = dispatch.RetryBackoff
}

retriedEventsMap[event.ID] = event
}

for _, event := range retriedEventsMap {
retriedEvents = append(retriedEvents, event)
}

if ret == 0 {
ret = 20000
}

return time.Duration(ret), retriedEvents, nil
}

func (n *Node) cron(ctx context.Context) {
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()
ctx = tracer.Context()

for {
before := time.Now().Add(-time.Second * 60 * 10)
if _, err := n.EventsRetry(ctx, before); err != nil {
waitTime, _, err := n.EventsRetry(ctx)
if err != nil {
n.LogBackgroundError(ctx, err)
}

time.Sleep(time.Second * 60)
time.Sleep(waitTime * time.Millisecond)
}
}

Expand Down Expand Up @@ -312,6 +381,15 @@ func (n *Node) handleOutgoingEventDispatch(ctx context.Context, dispatch *entity
db := n.sql(ctx)
now := time.Now()
dispatch.SentAt = &now
if dispatch.RetryBackoff == 0 {
dispatch.RetryBackoff = 1600
} else {
dispatch.RetryBackoff *= 16
dispatch.RetryBackoff /= 10
dispatch.RetryBackoff *= 10 + 2*((int64(rgenerator.Float64()*100)*2.0-1.0)/100)
dispatch.RetryBackoff /= 10
}

if err := db.Save(dispatch).Error; err != nil {
n.LogBackgroundError(ctx, errors.Wrap(sql.GenericError(err), "error while updating SentAt on event dispatch"))
return
Expand Down
2 changes: 1 addition & 1 deletion core/node/nodeapi_devtools.go
Expand Up @@ -399,7 +399,7 @@ func (n *Node) DebugRequeueAll(ctx context.Context, _ *node.Void) (*node.Void, e
defer tracer.Finish()
ctx = tracer.Context()

if _, err := n.EventsRetry(ctx, time.Now()); err != nil {
if _, err := n.OldEventsRetry(ctx, time.Now()); err != nil {
return nil, errorcodes.ErrNetQueue.Wrap(err)
}

Expand Down
2 changes: 2 additions & 0 deletions core/sql/migrations/list_migrations.go
Expand Up @@ -7,6 +7,7 @@ import (
"berty.tech/core/sql/migrations/v0004conversationinfos"
"berty.tech/core/sql/migrations/v0005eventdispatch"
"berty.tech/core/sql/migrations/v0006conversationLogic"
"berty.tech/core/sql/migrations/v0007eventdispatchretry"
gormigrate "gopkg.in/gormigrate.v1"
)

Expand All @@ -19,6 +20,7 @@ func GetMigrations() []*gormigrate.Migration {
migrations = append(migrations, v0004conversationinfos.GetMigration())
migrations = append(migrations, v0005eventdispatch.GetMigration())
migrations = append(migrations, v0006conversationLogic.GetMigration())
migrations = append(migrations, v0007eventdispatchretry.GetMigration())

return migrations
}
2 changes: 1 addition & 1 deletion core/sql/migrations/v0007eventdispatchretry/migration.go
Expand Up @@ -18,7 +18,7 @@ type EventDispatch struct {
SeenAt *time.Time `protobuf:"bytes,6,opt,name=seen_at,json=seenAt,proto3,stdtime" json:"seen_at,omitempty"`
AckMedium EventDispatch_Medium `protobuf:"varint,7,opt,name=ack_medium,json=ackMedium,proto3,enum=berty.entity.EventDispatch_Medium" json:"ack_medium,omitempty"`
SeenMedium EventDispatch_Medium `protobuf:"varint,8,opt,name=seen_medium,json=seenMedium,proto3,enum=berty.entity.EventDispatch_Medium" json:"seen_medium,omitempty"`
RetryBackoff float64 `protobuf:"fixed64,9,opt,name=retry_backoff,json=retryBackoff,proto3" json:"retry_backoff,omitempty"`
RetryBackoff int64 `protobuf:"varint,9,opt,name=retry_backoff,json=retryBackoff,proto3" json:"retry_backoff,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Expand Down
2 changes: 1 addition & 1 deletion core/test/mainloop_test.go
Expand Up @@ -65,7 +65,7 @@ func TestEventRetry(t *testing.T) {
"Event8": false,
}

events, err := appMock.node.EventsRetry(context.Background(), now)
events, err := appMock.node.OldEventsRetry(context.Background(), now)

if err != nil {
t.Error(err)
Expand Down

0 comments on commit d9bdc06

Please sign in to comment.