diff --git a/.circleci/config.yml b/.circleci/config.yml index 62e015c985..22217fb27a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,7 +7,7 @@ version: 2 GOTEST_TIMEOUT: 5m GOPROXY: http://goproxy.berty.io:3000/ GO111MODULE: "on" - TEST_OPTS: "-coverprofile=profile.out -covermode=count -coverpkg=berty.tech/core/..." + TEST_OPTS: "-coverprofile=profile.out -covermode=count" docker: - image: circleci/golang:latest @@ -49,7 +49,7 @@ jobs: - run: name: cleanup existing generated files command: | - cd core + cd core make clean - run: name: make generate_prepare diff --git a/core/api/p2p/event.go b/core/api/p2p/event.go index b96e0cedd3..f209fb6fea 100644 --- a/core/api/p2p/event.go +++ b/core/api/p2p/event.go @@ -4,6 +4,9 @@ import ( "encoding/json" "strings" "time" + + "github.com/jinzhu/gorm" + "github.com/pkg/errors" ) func NewOutgoingEvent(sender, receiver string, kind Kind) *Event { @@ -53,3 +56,43 @@ func (e Event) ToJSON() string { } func (e Event) IsNode() {} // required by gqlgen + +// FindNonAcknowledgedEventDestinations finds non acknowledged event destinations emitted before the supplied time value +func FindNonAcknowledgedEventDestinations(db *gorm.DB, before time.Time) ([]*Event, error) { + var events []*Event + + err := db. + Table("event"). + Select("conversation_id, receiver_id"). + Where("sent_at < :time", before). + Where("acked_at IS NULL"). + Where(&Event{ + Direction: Event_Outgoing, + }). + Group("conversation_id, receiver_id"). + Scan(&events). + Error + + if err != nil { + return nil, errors.Wrap(err, "unable to get non acknowledged events") + } + + return events, nil +} + +// FindNonAcknowledgedEventsForDestination finds non acknowledged events for the supplied destination (conversation/receiver) +func FindNonAcknowledgedEventsForDestination(db *gorm.DB, destination *Event) ([]*Event, error) { + var events []*Event + + err := db.Find(&events, &Event{ + ConversationID: destination.ConversationID, + ReceiverID: destination.ReceiverID, + Direction: Event_Outgoing, + }).Where("acked_at IS NULL").Error + + if err != nil { + return nil, err + } + + return events, nil +} diff --git a/core/api/p2p/event_test.go b/core/api/p2p/event_test.go new file mode 100644 index 0000000000..a71a0fb5dd --- /dev/null +++ b/core/api/p2p/event_test.go @@ -0,0 +1,108 @@ +package p2p + +import ( + "fmt" + "testing" + "time" + + "berty.tech/core/test/mock" + "github.com/jinzhu/gorm" +) + +func setupNonAcknowledgedEventDestinations() (string, *gorm.DB, time.Time, time.Time, time.Time) { + filename, db, _ := mock.GetMockedDb(Event{}) + + now := time.Now() + past := now.Add(-time.Second) + future := now.Add(time.Second) + + db.Save(&Event{ID: "Event1", Direction: Event_Outgoing, ReceiverID: "Receiver1", SentAt: &past}) + db.Save(&Event{ID: "Event2", Direction: Event_Outgoing, ReceiverID: "Receiver1", SentAt: &future}) + db.Save(&Event{ID: "Event3", Direction: Event_Outgoing, ReceiverID: "Receiver2", SentAt: &future}) + + db.Save(&Event{ID: "Event4", Direction: Event_Incoming, ReceiverID: "Receiver1", SentAt: &past}) + db.Save(&Event{ID: "Event5", Direction: Event_Incoming, ReceiverID: "Receiver1", SentAt: &future}) + db.Save(&Event{ID: "Event6", Direction: Event_Incoming, ReceiverID: "Receiver2", SentAt: &future}) + + db.Save(&Event{ID: "Event7", Direction: Event_Outgoing, ConversationID: "Conversation1", SentAt: &past}) + db.Save(&Event{ID: "Event8", Direction: Event_Outgoing, ConversationID: "Conversation1", SentAt: &future}) + db.Save(&Event{ID: "Event9", Direction: Event_Outgoing, ConversationID: "Conversation2", SentAt: &future}) + + db.Save(&Event{ID: "Event10", Direction: Event_Incoming, ConversationID: "Conversation1", SentAt: &past}) + db.Save(&Event{ID: "Event11", Direction: Event_Incoming, ConversationID: "Conversation1", SentAt: &future}) + db.Save(&Event{ID: "Event12", Direction: Event_Incoming, ConversationID: "Conversation2", SentAt: &future}) + + return filename, db, past, now, future +} + +func TestFindNonAcknowledgedEventDestinations(t *testing.T) { + filename, db, _, now, _ := setupNonAcknowledgedEventDestinations() + defer mock.RemoveDb(filename, db) + + expected := map[string]bool{ + "Receiver1:": false, + ":Conversation1": false, + } + + destinations, err := FindNonAcknowledgedEventDestinations(db, now) + + if err != nil { + t.Error(err) + } + + for _, destination := range destinations { + identifier := fmt.Sprintf("%s:%s", destination.ReceiverID, destination.ConversationID) + + value, ok := expected[identifier] + if ok == false { + t.Error(fmt.Errorf("%s was not suppoesed to be found", identifier)) + } + + if value == true { + t.Error(fmt.Errorf("%s was found twice", identifier)) + } + + expected[identifier] = true + } + + for identifier, value := range expected { + if value == false { + t.Error(fmt.Errorf("%s was supposed to be found but was not", identifier)) + } + } +} + +func TestFindNonAcknowledgedEventsForDestination(t *testing.T) { + filename, db, _, _, _ := setupNonAcknowledgedEventDestinations() + defer mock.RemoveDb(filename, db) + + expected := map[string]bool{ + "Event1": false, + "Event2": false, + } + + events, err := FindNonAcknowledgedEventsForDestination(db, &Event{ReceiverID: "Receiver1"}) + + if err != nil { + t.Error(err) + } + + for _, event := range events { + value, ok := expected[event.ID] + if ok == false { + t.Error(fmt.Errorf("%s was not suppoesed to be found", event.ID)) + } + + if value == true { + t.Error(fmt.Errorf("%s was found twice", event.ID)) + } + + expected[event.ID] = true + } + + for identifier, value := range expected { + if value == false { + t.Error(fmt.Errorf("%s was supposed to be found but was not", identifier)) + } + } +} diff --git a/core/node/mainloop.go b/core/node/mainloop.go index 77014837dc..5358f7602a 100644 --- a/core/node/mainloop.go +++ b/core/node/mainloop.go @@ -2,6 +2,7 @@ package node import ( "context" + "time" "berty.tech/core/api/p2p" "berty.tech/core/crypto/keypair" @@ -9,9 +10,56 @@ import ( "go.uber.org/zap" ) +// EventRetry sends events which lack an AckedAt value emitted before the supplied time value +func (n *Node) EventRetry(before time.Time) ([]*p2p.Event, error) { + var retriedEvents []*p2p.Event + destinations, err := p2p.FindNonAcknowledgedEventDestinations(n.sql, before) + + if err != nil { + return nil, err + } + + for _, destination := range destinations { + events, err := p2p.FindNonAcknowledgedEventsForDestination(n.sql, destination) + + if err != nil { + logger().Error("error while retrieving events for dst", zap.Error(err)) + continue + } + + for _, event := range events { + now := time.Now() + event.SentAt = &now + if err := n.sql.Save(event).Error; err != nil { + logger().Error("error while updating SentAt on event", zap.Error(err)) + continue + } + n.outgoingEvents <- event + + retriedEvents = append(retriedEvents, event) + } + } + + return retriedEvents, nil +} + // Start is the node's mainloop func (n *Node) Start() error { ctx := context.Background() + + go func() { + for true { + before := time.Now().Add(-time.Second * 60 * 10) + _, err := n.EventRetry(before) + + if err != nil { + logger().Error("error while retrieving non acked destinations", zap.Error(err)) + } + + time.Sleep(time.Second * 60) + } + }() + for { select { case event := <-n.outgoingEvents: diff --git a/core/test/mainloop_test.go b/core/test/mainloop_test.go new file mode 100644 index 0000000000..0346a85f6e --- /dev/null +++ b/core/test/mainloop_test.go @@ -0,0 +1,82 @@ +package test + +import ( + "fmt" + "testing" + "time" + + "berty.tech/core/entity" + + networkmock "berty.tech/core/network/mock" + "github.com/pkg/errors" + + "berty.tech/core/api/p2p" +) + +func setupNonAcknowledgedEventDestinations() (*AppMock, time.Time, time.Time, time.Time) { + n, err := NewAppMock(&entity.Device{Name: "Alice's iPhone"}, networkmock.NewEnqueuer(), WithUnencryptedDb()) + + if err != nil { + panic(err) + } + + now := time.Now() + past := now.Add(-time.Second) + future := now.Add(time.Second) + + n.db.Save(&p2p.Event{ID: "Event1", Direction: p2p.Event_Outgoing, ReceiverID: "Receiver1", SentAt: &past}) + n.db.Save(&p2p.Event{ID: "Event2", Direction: p2p.Event_Outgoing, ReceiverID: "Receiver1", SentAt: &future}) + n.db.Save(&p2p.Event{ID: "Event3", Direction: p2p.Event_Outgoing, ReceiverID: "Receiver2", SentAt: &future}) + + n.db.Save(&p2p.Event{ID: "Event4", Direction: p2p.Event_Incoming, ReceiverID: "Receiver1", SentAt: &past}) + n.db.Save(&p2p.Event{ID: "Event5", Direction: p2p.Event_Incoming, ReceiverID: "Receiver1", SentAt: &future}) + n.db.Save(&p2p.Event{ID: "Event6", Direction: p2p.Event_Incoming, ReceiverID: "Receiver2", SentAt: &future}) + + n.db.Save(&p2p.Event{ID: "Event7", Direction: p2p.Event_Outgoing, ConversationID: "Conversation1", SentAt: &past}) + n.db.Save(&p2p.Event{ID: "Event8", Direction: p2p.Event_Outgoing, ConversationID: "Conversation1", SentAt: &future}) + n.db.Save(&p2p.Event{ID: "Event9", Direction: p2p.Event_Outgoing, ConversationID: "Conversation2", SentAt: &future}) + + n.db.Save(&p2p.Event{ID: "Event10", Direction: p2p.Event_Incoming, ConversationID: "Conversation1", SentAt: &past}) + n.db.Save(&p2p.Event{ID: "Event11", Direction: p2p.Event_Incoming, ConversationID: "Conversation1", SentAt: &future}) + n.db.Save(&p2p.Event{ID: "Event12", Direction: p2p.Event_Incoming, ConversationID: "Conversation2", SentAt: &future}) + + return n, past, now, future +} + +func TestEventRetry(t *testing.T) { + appMock, _, now, _ := setupNonAcknowledgedEventDestinations() + defer appMock.Close() + + expected := map[string]bool{ + "Event1": false, + "Event2": false, + "Event7": false, + "Event8": false, + } + + events, err := appMock.node.EventRetry(now) + + if err != nil { + t.Error(err) + } + + for _, event := range events { + value, ok := expected[event.ID] + if ok == false { + t.Error(errors.New(fmt.Sprintf("%s was not suppoesed to be found", event.ID))) + } + + if value == true { + t.Error(errors.New(fmt.Sprintf("%s was found twice", event.ID))) + } + + expected[event.ID] = true + } + + for identifier, value := range expected { + if value == false { + t.Error(errors.New(fmt.Sprintf("%s was supposed to be found but was not", identifier))) + } + } + +}