diff --git a/core/go.mod b/core/go.mod index 17206cf72c..4e1fadb992 100644 --- a/core/go.mod +++ b/core/go.mod @@ -89,6 +89,7 @@ require ( github.com/libp2p/go-testutil v1.2.8 // indirect github.com/libp2p/go-ws-transport v2.0.14+incompatible // indirect github.com/maruel/circular v0.0.0-20161028021427-97eeabbe7b43 + github.com/maruel/ut v1.0.0 // indirect github.com/mattn/go-colorable v0.0.9 // indirect github.com/mattn/go-isatty v0.0.4 // indirect github.com/mattn/go-sqlite3 v1.9.0 // indirect diff --git a/core/go.sum b/core/go.sum index ac62ceff5c..72192f3ae5 100644 --- a/core/go.sum +++ b/core/go.sum @@ -1,7 +1,3 @@ -berty.tech v0.0.0-20181022155922-f32cfaad3875 h1:idZm5uDumAQzxQHAfTnmaxGqFOO6KC6/qHIEo0pyX1w= -berty.tech v0.0.0-20181023103937-6f6ce2eea59a h1:5XLFVgNTFR7/oD1a6amaLQ5oCOXteavfPzwm9vwljF4= -berty.tech v0.0.0-20181023133502-f897998c663b h1:H1iL7aMkf5y+JE8z2e63xM3ceJYDSwVsXET5kNXyGss= -berty.tech v0.0.0-20181031153117-b329d9e0528f h1:4MN99eOxuAzwg+Po0PNDVD7EFtV44h9Tc3YCY8vCGSw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.30.0 h1:xKvyLgk56d0nksWq49J0UyGEeUIicTl4+UBiX1NPX9g= cloud.google.com/go v0.30.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= @@ -205,6 +201,8 @@ github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDe github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/maruel/circular v0.0.0-20161028021427-97eeabbe7b43 h1:Fefy2DSaKkme73mCUM2YJxNmi4dC1vrGzs+sjcoxXE4= github.com/maruel/circular v0.0.0-20161028021427-97eeabbe7b43/go.mod h1:2R57jb7EgkTrwIT4IRrrJ2I4m10M83Zf7e6ej1Cer9c= +github.com/maruel/ut v1.0.0 h1:Tg5f5waOijrohsOwnMlr1bZmv+wHEbuMEacNBE8kQ7k= +github.com/maruel/ut v1.0.0/go.mod h1:I68ffiAt5qre9obEVTy7S2/fj2dJku2NYLvzPuY0gqE= github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs= diff --git a/core/manager/account/account.go b/core/manager/account/account.go index ffdd5835b8..7a9349296b 100644 --- a/core/manager/account/account.go +++ b/core/manager/account/account.go @@ -414,7 +414,7 @@ func (a *Account) startNode() error { // start node go func() { defer a.PanicHandler() - a.errChan <- a.node.Start() + a.errChan <- a.node.Start(true) }() diff --git a/core/node/mainloop.go b/core/node/mainloop.go index 5010bf736c..1386821f10 100644 --- a/core/node/mainloop.go +++ b/core/node/mainloop.go @@ -56,25 +56,28 @@ func (n *Node) EventsRetry(before time.Time) ([]*p2p.Event, error) { } // Start is the node's mainloop -func (n *Node) Start() error { +func (n *Node) Start(withCron bool) error { ctx := context.Background() - go func() { - for true { - before := time.Now().Add(-time.Second * 60 * 10) - _, err := n.EventsRetry(before) + if withCron { + go func() { + for true { + before := time.Now().Add(-time.Second * 60 * 10) + _, err := n.EventsRetry(before) - if err != nil { - logger().Error("error while retrieving non acked destinations", zap.Error(err)) - } + if err != nil { + logger().Error("error while retrieving non acked destinations", zap.Error(err)) + } - time.Sleep(time.Second * 60) - } - }() + time.Sleep(time.Second * 60) + } + }() + } for { select { case event := <-n.outgoingEvents: + logger().Debug("outgoing event", zap.Stringer("event", event)) envelope := p2p.Envelope{} eventBytes, err := proto.Marshal(event) if err != nil { @@ -103,10 +106,28 @@ func (n *Node) Start() error { continue } - if err := n.networkDriver.Emit(ctx, &envelope); err != nil { - logger().Error("failed to emit envelope on network", zap.Error(err)) + // Async subscribe to conversation + // wait for 1s to simulate a sync subscription, + // if too long, the task will be done in background + done := make(chan bool, 1) + go func() { + // FIXME: make something smarter, i.e., grouping events by contact or network driver + if err := n.networkDriver.Emit(ctx, &envelope); err != nil { + logger().Error("failed to emit envelope on network", zap.Error(err)) + } + done <- true + }() + select { + case <-done: + case <-time.After(1 * time.Second): } + + // push the outgoing event on the client stream + n.clientEvents <- event + + // emit the outgoing event on the node event stream case event := <-n.clientEvents: + logger().Debug("client event", zap.Stringer("event", event)) n.clientEventsMutex.Lock() for _, sub := range n.clientEventsSubscribers { if sub.filter(event) { diff --git a/core/node/nodeapi.go b/core/node/nodeapi.go index 046169ed6c..9e54f45283 100644 --- a/core/node/nodeapi.go +++ b/core/node/nodeapi.go @@ -2,12 +2,14 @@ package node import ( "context" + "time" "berty.tech/core/api/node" "berty.tech/core/api/p2p" "berty.tech/core/entity" "berty.tech/core/sql" "github.com/pkg/errors" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -241,6 +243,10 @@ func (n *Node) ConversationCreate(ctx context.Context, input *node.ConversationC n.handleMutex.Lock() defer n.handleMutex.Unlock() + return n.conversationCreate(ctx, input) +} + +func (n *Node) conversationCreate(ctx context.Context, input *node.ConversationCreateInput) (*entity.Conversation, error) { members := []*entity.ConversationMember{ { ID: n.NewID(), @@ -273,10 +279,21 @@ func (n *Node) ConversationCreate(ctx context.Context, input *node.ConversationC return nil, errors.Wrap(err, "failed to load freshly created conversation") } - // Subscribe to conversation - if err := n.networkDriver.Join(ctx, conversation.ID); err != nil { - return nil, err + // Async subscribe to conversation + // wait for 1s to simulate a sync subscription, + // if too long, the task will be done in background + done := make(chan bool, 1) + go func() { + if err := n.networkDriver.Join(ctx, conversation.ID); err != nil { + logger().Error("failed to join conversation", zap.Error(err)) + } + done <- true + }() + select { + case <-done: + case <-time.After(1 * time.Second): } + // send invite to peers filtered := conversation.Filtered() for _, member := range conversation.Members { diff --git a/core/node/nodeapi_devtools.go b/core/node/nodeapi_devtools.go index cec72b8529..554f4a1c4f 100644 --- a/core/node/nodeapi_devtools.go +++ b/core/node/nodeapi_devtools.go @@ -93,7 +93,7 @@ func (n *Node) GenerateFakeData(_ context.Context, input *node.Void) (*node.Void ID: contacts[rand.Intn(len(contacts))].ID, }) } - if _, err := n.ConversationCreate(context.Background(), &node.ConversationCreateInput{ + if _, err := n.conversationCreate(context.Background(), &node.ConversationCreateInput{ Contacts: contactsMembers, Title: strings.Title(fmt.Sprintf("%s %s", gofakeit.HipsterWord(), gofakeit.HackerNoun())), Topic: gofakeit.HackerPhrase(), @@ -102,6 +102,18 @@ func (n *Node) GenerateFakeData(_ context.Context, input *node.Void) (*node.Void } } + // enqueue fake incoming event + in := n.NewContactEvent(&entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset) + if err := n.EnqueueClientEvent(in); err != nil { + return nil, err + } + + // enqueue fake outgoing event + out := n.NewContactEvent(&entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset) + if err := n.EnqueueOutgoingEvent(out); err != nil { + return nil, err + } + return &node.Void{}, nil } diff --git a/core/node/nodeapi_test.go b/core/node/nodeapi_test.go index 064c7fa622..5f3b574515 100644 --- a/core/node/nodeapi_test.go +++ b/core/node/nodeapi_test.go @@ -55,7 +55,7 @@ func TestPagination(t *testing.T) { gs.Serve(ic.Listener()) }() go func() { - alice.Start() + alice.Start(false) }() }) diff --git a/core/node/nodeclient.go b/core/node/nodeclient.go index 797f5bc87b..174e8616d7 100644 --- a/core/node/nodeclient.go +++ b/core/node/nodeclient.go @@ -32,6 +32,9 @@ func (n *Node) EventStream(input *node.EventStreamInput, stream node.Service_Eve if input.Filter == nil { return true } + if input.Filter.Direction != p2p.Event_UnknownDirection && e.Direction != input.Filter.Direction { + return false + } if input.Filter.Kind != p2p.Kind_Unknown && e.Kind != input.Filter.Kind { return false } diff --git a/core/test/app_mock.go b/core/test/app_mock.go index caa1252162..7926c5d243 100644 --- a/core/test/app_mock.go +++ b/core/test/app_mock.go @@ -142,7 +142,7 @@ func (a *AppMock) Open() error { }() go func() { - if err := a.node.Start(); err != nil { + if err := a.node.Start(false); err != nil { logger().Error("node routine error", zap.Error(err)) } }() diff --git a/core/test/e2e_test.go b/core/test/e2e_test.go index bdc5e9466b..f65cf86dc8 100644 --- a/core/test/e2e_test.go +++ b/core/test/e2e_test.go @@ -85,9 +85,12 @@ func TestWithEnqueuer(t *testing.T) { //jsonPrintIndent(event) } { - event := <-bob.eventStream - So(event.Kind, ShouldEqual, p2p.Kind_DevtoolsMapset) - //jsonPrintIndent(event) + in, out, err := asyncEventsWithTimeout(bob.eventStream, 2) + So(err, ShouldBeNil) + So(len(in), ShouldEqual, 1) + So(len(out), ShouldEqual, 1) + So(in[0].Kind, ShouldEqual, p2p.Kind_DevtoolsMapset) + So(out[0].Kind, ShouldEqual, p2p.Kind_Ack) } } So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 0, 0, 0, 0}) @@ -175,6 +178,13 @@ func TestWithEnqueuer(t *testing.T) { So(res.DisplayName, ShouldBeEmpty) So(res.DisplayStatus, ShouldBeEmpty) So(len(res.Devices), ShouldEqual, 0) + + in, out, err := asyncEventsWithTimeout(alice.eventStream, 1) + So(err, ShouldBeNil) + So(len(in), ShouldEqual, 0) + So(len(out), ShouldEqual, 1) + So(out[0].Kind, ShouldEqual, p2p.Kind_ContactRequest) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{1, 0, 0, 0, 0, 0}) everythingWentFine() @@ -224,14 +234,18 @@ func TestWithEnqueuer(t *testing.T) { So(err, ShouldBeNil) So(res, ShouldResemble, &node.Void{}) - So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 1, 1, 0, 0}) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 1, 2, 0, 0}) everythingWentFine() }) Convey("Bob emits the ContactRequest event to its client", FailureHalts, func() { shouldIContinue(t) - event := <-bob.eventStream + in, out, err := asyncEventsWithTimeout(bob.eventStream, 2) + So(err, ShouldBeNil) + So(len(in), ShouldEqual, 1) + So(len(out), ShouldEqual, 1) + event := in[0] So(event.SenderID, ShouldEqual, alice.node.UserID()) So(event.Direction, ShouldEqual, p2p.Event_Incoming) @@ -310,7 +324,7 @@ func TestWithEnqueuer(t *testing.T) { time.Sleep(time.Second * 1) - So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 2, 0, 0, 0}) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 2, 2, 0, 0}) everythingWentFine() }) @@ -332,14 +346,20 @@ func TestWithEnqueuer(t *testing.T) { So(err, ShouldBeNil) So(res, ShouldResemble, &node.Void{}) - So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 1, 1, 0, 0, 0}) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 3, 1, 2, 0, 0}) everythingWentFine() }) Convey("Alice emits the ContactRequestAccepted event to its clients", FailureHalts, func() { shouldIContinue(t) - event := <-alice.eventStream + in, out, err := asyncEventsWithTimeout(alice.eventStream, 3) + So(err, ShouldBeNil) + So(len(in), ShouldEqual, 1) + So(len(out), ShouldEqual, 2) + event := in[0] + + //jsonPrintIndent(event) So(event.SenderID, ShouldEqual, bob.node.UserID()) So(event.Kind, ShouldEqual, p2p.Kind_ContactRequestAccepted) @@ -347,7 +367,7 @@ func TestWithEnqueuer(t *testing.T) { So(event.Direction, ShouldEqual, p2p.Event_Incoming) _, err = event.GetContactRequestAcceptedAttrs() So(err, ShouldBeNil) - So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 0, 1, 0, 0, 0}) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 0, 1, 2, 0, 0}) everythingWentFine() }) @@ -370,14 +390,18 @@ func TestWithEnqueuer(t *testing.T) { So(err, ShouldBeNil) So(res, ShouldResemble, &node.Void{}) - So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{3, 1, 0, 0, 0, 0}) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{3, 2, 0, 2, 0, 0}) everythingWentFine() }) Convey("Alice emits the ContactShareMe event to its client", FailureHalts, func() { shouldIContinue(t) - event := <-alice.eventStream + in, out, err := asyncEventsWithTimeout(alice.eventStream, 2) + So(err, ShouldBeNil) + So(len(in), ShouldEqual, 1) + So(len(out), ShouldEqual, 1) + event := in[0] So(event.SenderID, ShouldEqual, bob.node.UserID()) So(event.Kind, ShouldEqual, p2p.Kind_ContactShareMe) @@ -387,7 +411,7 @@ func TestWithEnqueuer(t *testing.T) { So(err, ShouldBeNil) So(attrs.Me.DisplayName, ShouldEqual, "Bob") So(attrs.Me.DisplayStatus, ShouldBeEmpty) - So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{3, 0, 0, 0, 0, 0}) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{3, 0, 0, 2, 0, 0}) everythingWentFine() }) @@ -412,7 +436,7 @@ func TestWithEnqueuer(t *testing.T) { So(err, ShouldBeNil) So(res, ShouldResemble, &node.Void{}) - So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 0, 1, 1, 0, 0}) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 0, 1, 4, 0, 0}) everythingWentFine() }) @@ -435,7 +459,7 @@ func TestWithEnqueuer(t *testing.T) { So(err, ShouldBeNil) So(res, ShouldResemble, &node.Void{}) - So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{1, 0, 1, 1, 0, 0}) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{1, 0, 1, 4, 0, 0}) everythingWentFine() }) @@ -458,7 +482,7 @@ func TestWithEnqueuer(t *testing.T) { So(err, ShouldBeNil) So(res, ShouldResemble, &node.Void{}) - So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 1, 1, 0, 0}) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 1, 4, 0, 0}) everythingWentFine() }) @@ -480,14 +504,18 @@ func TestWithEnqueuer(t *testing.T) { So(err, ShouldBeNil) So(res, ShouldResemble, &node.Void{}) - So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 0, 1, 0, 0}) + So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 0, 4, 0, 0}) everythingWentFine() }) Convey("Bob emits the ContactShareMe event to its client", FailureHalts, func() { shouldIContinue(t) - event := <-bob.eventStream + in, out, err := asyncEventsWithTimeout(bob.eventStream, 4) + So(err, ShouldBeNil) + So(len(in), ShouldEqual, 1) + So(len(out), ShouldEqual, 3) + event := in[0] So(event.SenderID, ShouldEqual, alice.node.UserID()) So(event.Direction, ShouldEqual, p2p.Event_Incoming) @@ -672,6 +700,8 @@ func TestAliasesFlow(t *testing.T) { So(err, ShouldBeNil) + time.Sleep(200 * time.Millisecond) + conversations, err := bob.client.ConversationList(internalCtx, &node.ConversationListInput{}) So(err, ShouldBeNil) So(len(conversations), ShouldEqual, 1) diff --git a/core/test/scenario_test.go b/core/test/scenario_test.go index 9b71937b29..feb486491a 100644 --- a/core/test/scenario_test.go +++ b/core/test/scenario_test.go @@ -75,6 +75,7 @@ func scenario(t *testing.T, alice, bob, eve *AppMock) { }) Convey("Alice has Bob as friend", FailureHalts, func() { shouldIContinue(t) + time.Sleep(200 * time.Millisecond) contacts, err := alice.client.ContactList(internalCtx, &node.ContactListInput{ Filter: &entity.Contact{Status: entity.Contact_IsFriend}, diff --git a/core/test/test_test.go b/core/test/test_test.go index 47383a2d1a..32600022ce 100644 --- a/core/test/test_test.go +++ b/core/test/test_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "berty.tech/core/api/p2p" "berty.tech/core/network/mock" ) @@ -96,3 +97,26 @@ func shouldIContinue(t *testing.T) { func everythingWentFine() { lastTestSucceed = true } + +// +// get async events with timeout +// + +func asyncEventsWithTimeout(eventStream chan *p2p.Event, n int) ([]*p2p.Event, []*p2p.Event, error) { + var incomings, outgoings []*p2p.Event + + for i := 0; i < n; i++ { + select { + case event := <-eventStream: + if event.Direction == p2p.Event_Incoming { + incomings = append(incomings, event) + } else { + outgoings = append(outgoings, event) + } + case <-time.After(1 * time.Second): // max 1 sec timeout + return nil, nil, fmt.Errorf("timeout") + } + } + + return incomings, outgoings, nil +}