Skip to content

Commit

Permalink
feat(core): retry event sending when not acked
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Louvigny committed Oct 29, 2018
1 parent 5cac515 commit b78c6df
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 2 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Expand Up @@ -7,7 +7,7 @@ version: 2
GOTEST_TIMEOUT: 5m
GOPROXY: http://goproxy.berty.io:3000/
GO111MODULE: "on"
TEST_OPTS: "-coverprofile=profile.out -covermode=count -coverpkg=berty.tech/core/..."
TEST_OPTS: "-coverprofile=profile.out -covermode=count"
docker:
- image: circleci/golang:latest

Expand Down Expand Up @@ -49,7 +49,7 @@ jobs:
- run:
name: cleanup existing generated files
command: |
cd core
cd core
make clean
- run:
name: make generate_prepare
Expand Down
43 changes: 43 additions & 0 deletions core/api/p2p/event.go
Expand Up @@ -4,6 +4,9 @@ import (
"encoding/json"
"strings"
"time"

"github.com/jinzhu/gorm"
"github.com/pkg/errors"
)

func NewOutgoingEvent(sender, receiver string, kind Kind) *Event {
Expand Down Expand Up @@ -53,3 +56,43 @@ func (e Event) ToJSON() string {
}

func (e Event) IsNode() {} // required by gqlgen

// FindNonAcknowledgedEventDestinations finds non acknowledged event destinations emitted before the supplied time value
func FindNonAcknowledgedEventDestinations(db *gorm.DB, before time.Time) ([]*Event, error) {
var events []*Event

err := db.
Table("event").
Select("conversation_id, receiver_id").
Where("sent_at < :time", before).
Where("acked_at IS NULL").
Where(&Event{
Direction: Event_Outgoing,
}).
Group("conversation_id, receiver_id").
Scan(&events).
Error

if err != nil {
return nil, errors.Wrap(err, "unable to get non acknowledged events")
}

return events, nil
}

// FindNonAcknowledgedEventsForDestination finds non acknowledged events for the supplied destination (conversation/receiver)
func FindNonAcknowledgedEventsForDestination(db *gorm.DB, destination *Event) ([]*Event, error) {
var events []*Event

err := db.Find(&events, &Event{
ConversationID: destination.ConversationID,
ReceiverID: destination.ReceiverID,
Direction: Event_Outgoing,
}).Where("acked_at IS NULL").Error

if err != nil {
return nil, err
}

return events, nil
}
108 changes: 108 additions & 0 deletions core/api/p2p/event_test.go
@@ -0,0 +1,108 @@
package p2p

import (
"fmt"
"testing"
"time"

"berty.tech/core/test/mock"
"github.com/jinzhu/gorm"
)

func setupNonAcknowledgedEventDestinations() (string, *gorm.DB, time.Time, time.Time, time.Time) {
filename, db, _ := mock.GetMockedDb(Event{})

now := time.Now()
past := now.Add(-time.Second)
future := now.Add(time.Second)

db.Save(&Event{ID: "Event1", Direction: Event_Outgoing, ReceiverID: "Receiver1", SentAt: &past})
db.Save(&Event{ID: "Event2", Direction: Event_Outgoing, ReceiverID: "Receiver1", SentAt: &future})
db.Save(&Event{ID: "Event3", Direction: Event_Outgoing, ReceiverID: "Receiver2", SentAt: &future})

db.Save(&Event{ID: "Event4", Direction: Event_Incoming, ReceiverID: "Receiver1", SentAt: &past})
db.Save(&Event{ID: "Event5", Direction: Event_Incoming, ReceiverID: "Receiver1", SentAt: &future})
db.Save(&Event{ID: "Event6", Direction: Event_Incoming, ReceiverID: "Receiver2", SentAt: &future})

db.Save(&Event{ID: "Event7", Direction: Event_Outgoing, ConversationID: "Conversation1", SentAt: &past})
db.Save(&Event{ID: "Event8", Direction: Event_Outgoing, ConversationID: "Conversation1", SentAt: &future})
db.Save(&Event{ID: "Event9", Direction: Event_Outgoing, ConversationID: "Conversation2", SentAt: &future})

db.Save(&Event{ID: "Event10", Direction: Event_Incoming, ConversationID: "Conversation1", SentAt: &past})
db.Save(&Event{ID: "Event11", Direction: Event_Incoming, ConversationID: "Conversation1", SentAt: &future})
db.Save(&Event{ID: "Event12", Direction: Event_Incoming, ConversationID: "Conversation2", SentAt: &future})

return filename, db, past, now, future
}

func TestFindNonAcknowledgedEventDestinations(t *testing.T) {
filename, db, _, now, _ := setupNonAcknowledgedEventDestinations()
defer mock.RemoveDb(filename, db)

expected := map[string]bool{
"Receiver1:": false,
":Conversation1": false,
}

destinations, err := FindNonAcknowledgedEventDestinations(db, now)

if err != nil {
t.Error(err)
}

for _, destination := range destinations {
identifier := fmt.Sprintf("%s:%s", destination.ReceiverID, destination.ConversationID)

value, ok := expected[identifier]
if ok == false {
t.Error(fmt.Errorf("%s was not suppoesed to be found", identifier))
}

if value == true {
t.Error(fmt.Errorf("%s was found twice", identifier))
}

expected[identifier] = true
}

for identifier, value := range expected {
if value == false {
t.Error(fmt.Errorf("%s was supposed to be found but was not", identifier))
}
}
}

func TestFindNonAcknowledgedEventsForDestination(t *testing.T) {
filename, db, _, _, _ := setupNonAcknowledgedEventDestinations()
defer mock.RemoveDb(filename, db)

expected := map[string]bool{
"Event1": false,
"Event2": false,
}

events, err := FindNonAcknowledgedEventsForDestination(db, &Event{ReceiverID: "Receiver1"})

if err != nil {
t.Error(err)
}

for _, event := range events {
value, ok := expected[event.ID]
if ok == false {
t.Error(fmt.Errorf("%s was not suppoesed to be found", event.ID))
}

if value == true {
t.Error(fmt.Errorf("%s was found twice", event.ID))
}

expected[event.ID] = true
}

for identifier, value := range expected {
if value == false {
t.Error(fmt.Errorf("%s was supposed to be found but was not", identifier))
}
}
}
48 changes: 48 additions & 0 deletions core/node/mainloop.go
Expand Up @@ -2,16 +2,64 @@ package node

import (
"context"
"time"

"berty.tech/core/api/p2p"
"berty.tech/core/crypto/keypair"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
)

// EventRetry sends events which lack an AckedAt value emitted before the supplied time value
func (n *Node) EventRetry(before time.Time) ([]*p2p.Event, error) {
var retriedEvents []*p2p.Event
destinations, err := p2p.FindNonAcknowledgedEventDestinations(n.sql, before)

if err != nil {
return nil, err
}

for _, destination := range destinations {
events, err := p2p.FindNonAcknowledgedEventsForDestination(n.sql, destination)

if err != nil {
logger().Error("error while retrieving events for dst", zap.Error(err))
continue
}

for _, event := range events {
now := time.Now()
event.SentAt = &now
if err := n.sql.Save(event).Error; err != nil {
logger().Error("error while updating SentAt on event", zap.Error(err))
continue
}
n.outgoingEvents <- event

retriedEvents = append(retriedEvents, event)
}
}

return retriedEvents, nil
}

// Start is the node's mainloop
func (n *Node) Start() error {
ctx := context.Background()

go func() {
for true {
before := time.Now().Add(-time.Second * 60 * 10)
_, err := n.EventRetry(before)

if err != nil {
logger().Error("error while retrieving non acked destinations", zap.Error(err))
}

time.Sleep(time.Second * 60)
}
}()

for {
select {
case event := <-n.outgoingEvents:
Expand Down
82 changes: 82 additions & 0 deletions core/test/mainloop_test.go
@@ -0,0 +1,82 @@
package test

import (
"fmt"
"testing"
"time"

"berty.tech/core/entity"

networkmock "berty.tech/core/network/mock"
"github.com/pkg/errors"

"berty.tech/core/api/p2p"
)

func setupNonAcknowledgedEventDestinations() (*AppMock, time.Time, time.Time, time.Time) {
n, err := NewAppMock(&entity.Device{Name: "Alice's iPhone"}, networkmock.NewEnqueuer(), WithUnencryptedDb())

if err != nil {
panic(err)
}

now := time.Now()
past := now.Add(-time.Second)
future := now.Add(time.Second)

n.db.Save(&p2p.Event{ID: "Event1", Direction: p2p.Event_Outgoing, ReceiverID: "Receiver1", SentAt: &past})
n.db.Save(&p2p.Event{ID: "Event2", Direction: p2p.Event_Outgoing, ReceiverID: "Receiver1", SentAt: &future})
n.db.Save(&p2p.Event{ID: "Event3", Direction: p2p.Event_Outgoing, ReceiverID: "Receiver2", SentAt: &future})

n.db.Save(&p2p.Event{ID: "Event4", Direction: p2p.Event_Incoming, ReceiverID: "Receiver1", SentAt: &past})
n.db.Save(&p2p.Event{ID: "Event5", Direction: p2p.Event_Incoming, ReceiverID: "Receiver1", SentAt: &future})
n.db.Save(&p2p.Event{ID: "Event6", Direction: p2p.Event_Incoming, ReceiverID: "Receiver2", SentAt: &future})

n.db.Save(&p2p.Event{ID: "Event7", Direction: p2p.Event_Outgoing, ConversationID: "Conversation1", SentAt: &past})
n.db.Save(&p2p.Event{ID: "Event8", Direction: p2p.Event_Outgoing, ConversationID: "Conversation1", SentAt: &future})
n.db.Save(&p2p.Event{ID: "Event9", Direction: p2p.Event_Outgoing, ConversationID: "Conversation2", SentAt: &future})

n.db.Save(&p2p.Event{ID: "Event10", Direction: p2p.Event_Incoming, ConversationID: "Conversation1", SentAt: &past})
n.db.Save(&p2p.Event{ID: "Event11", Direction: p2p.Event_Incoming, ConversationID: "Conversation1", SentAt: &future})
n.db.Save(&p2p.Event{ID: "Event12", Direction: p2p.Event_Incoming, ConversationID: "Conversation2", SentAt: &future})

return n, past, now, future
}

func TestEventRetry(t *testing.T) {
appMock, _, now, _ := setupNonAcknowledgedEventDestinations()
defer appMock.Close()

expected := map[string]bool{
"Event1": false,
"Event2": false,
"Event7": false,
"Event8": false,
}

events, err := appMock.node.EventRetry(now)

if err != nil {
t.Error(err)
}

for _, event := range events {
value, ok := expected[event.ID]
if ok == false {
t.Error(errors.New(fmt.Sprintf("%s was not suppoesed to be found", event.ID)))
}

if value == true {
t.Error(errors.New(fmt.Sprintf("%s was found twice", event.ID)))
}

expected[event.ID] = true
}

for identifier, value := range expected {
if value == false {
t.Error(errors.New(fmt.Sprintf("%s was supposed to be found but was not", identifier)))
}
}

}

0 comments on commit b78c6df

Please sign in to comment.