Skip to content

Commit

Permalink
feat: emit outgoing events to the node + small locking fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
moul committed Nov 16, 2018
1 parent 2660585 commit d04ffeb
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 41 deletions.
1 change: 1 addition & 0 deletions core/go.mod
Expand Up @@ -89,6 +89,7 @@ require (
github.com/libp2p/go-testutil v1.2.8 // indirect
github.com/libp2p/go-ws-transport v2.0.14+incompatible // indirect
github.com/maruel/circular v0.0.0-20161028021427-97eeabbe7b43
github.com/maruel/ut v1.0.0 // indirect
github.com/mattn/go-colorable v0.0.9 // indirect
github.com/mattn/go-isatty v0.0.4 // indirect
github.com/mattn/go-sqlite3 v1.9.0 // indirect
Expand Down
6 changes: 2 additions & 4 deletions core/go.sum
@@ -1,7 +1,3 @@
berty.tech v0.0.0-20181022155922-f32cfaad3875 h1:idZm5uDumAQzxQHAfTnmaxGqFOO6KC6/qHIEo0pyX1w=
berty.tech v0.0.0-20181023103937-6f6ce2eea59a h1:5XLFVgNTFR7/oD1a6amaLQ5oCOXteavfPzwm9vwljF4=
berty.tech v0.0.0-20181023133502-f897998c663b h1:H1iL7aMkf5y+JE8z2e63xM3ceJYDSwVsXET5kNXyGss=
berty.tech v0.0.0-20181031153117-b329d9e0528f h1:4MN99eOxuAzwg+Po0PNDVD7EFtV44h9Tc3YCY8vCGSw=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.30.0 h1:xKvyLgk56d0nksWq49J0UyGEeUIicTl4+UBiX1NPX9g=
cloud.google.com/go v0.30.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
Expand Down Expand Up @@ -205,6 +201,8 @@ github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDe
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/maruel/circular v0.0.0-20161028021427-97eeabbe7b43 h1:Fefy2DSaKkme73mCUM2YJxNmi4dC1vrGzs+sjcoxXE4=
github.com/maruel/circular v0.0.0-20161028021427-97eeabbe7b43/go.mod h1:2R57jb7EgkTrwIT4IRrrJ2I4m10M83Zf7e6ej1Cer9c=
github.com/maruel/ut v1.0.0 h1:Tg5f5waOijrohsOwnMlr1bZmv+wHEbuMEacNBE8kQ7k=
github.com/maruel/ut v1.0.0/go.mod h1:I68ffiAt5qre9obEVTy7S2/fj2dJku2NYLvzPuY0gqE=
github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs=
Expand Down
2 changes: 1 addition & 1 deletion core/manager/account/account.go
Expand Up @@ -414,7 +414,7 @@ func (a *Account) startNode() error {
// start node
go func() {
defer a.PanicHandler()
a.errChan <- a.node.Start()
a.errChan <- a.node.Start(true)

}()

Expand Down
47 changes: 34 additions & 13 deletions core/node/mainloop.go
Expand Up @@ -56,25 +56,28 @@ func (n *Node) EventsRetry(before time.Time) ([]*p2p.Event, error) {
}

// Start is the node's mainloop
func (n *Node) Start() error {
func (n *Node) Start(withCron bool) error {
ctx := context.Background()

go func() {
for true {
before := time.Now().Add(-time.Second * 60 * 10)
_, err := n.EventsRetry(before)
if withCron {
go func() {
for true {
before := time.Now().Add(-time.Second * 60 * 10)
_, err := n.EventsRetry(before)

if err != nil {
logger().Error("error while retrieving non acked destinations", zap.Error(err))
}
if err != nil {
logger().Error("error while retrieving non acked destinations", zap.Error(err))
}

time.Sleep(time.Second * 60)
}
}()
time.Sleep(time.Second * 60)
}
}()
}

for {
select {
case event := <-n.outgoingEvents:
logger().Debug("outgoing event", zap.Stringer("event", event))
envelope := p2p.Envelope{}
eventBytes, err := proto.Marshal(event)
if err != nil {
Expand Down Expand Up @@ -103,10 +106,28 @@ func (n *Node) Start() error {
continue
}

if err := n.networkDriver.Emit(ctx, &envelope); err != nil {
logger().Error("failed to emit envelope on network", zap.Error(err))
// Async subscribe to conversation
// wait for 1s to simulate a sync subscription,
// if too long, the task will be done in background
done := make(chan bool, 1)
go func() {
// FIXME: make something smarter, i.e., grouping events by contact or network driver
if err := n.networkDriver.Emit(ctx, &envelope); err != nil {
logger().Error("failed to emit envelope on network", zap.Error(err))
}
done <- true
}()
select {
case <-done:
case <-time.After(1 * time.Second):
}

// push the outgoing event on the client stream
n.clientEvents <- event

// emit the outgoing event on the node event stream
case event := <-n.clientEvents:
logger().Debug("client event", zap.Stringer("event", event))
n.clientEventsMutex.Lock()
for _, sub := range n.clientEventsSubscribers {
if sub.filter(event) {
Expand Down
23 changes: 20 additions & 3 deletions core/node/nodeapi.go
Expand Up @@ -2,12 +2,14 @@ package node

import (
"context"
"time"

"berty.tech/core/api/node"
"berty.tech/core/api/p2p"
"berty.tech/core/entity"
"berty.tech/core/sql"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -241,6 +243,10 @@ func (n *Node) ConversationCreate(ctx context.Context, input *node.ConversationC
n.handleMutex.Lock()
defer n.handleMutex.Unlock()

return n.conversationCreate(ctx, input)
}

func (n *Node) conversationCreate(ctx context.Context, input *node.ConversationCreateInput) (*entity.Conversation, error) {
members := []*entity.ConversationMember{
{
ID: n.NewID(),
Expand Down Expand Up @@ -273,10 +279,21 @@ func (n *Node) ConversationCreate(ctx context.Context, input *node.ConversationC
return nil, errors.Wrap(err, "failed to load freshly created conversation")
}

// Subscribe to conversation
if err := n.networkDriver.Join(ctx, conversation.ID); err != nil {
return nil, err
// Async subscribe to conversation
// wait for 1s to simulate a sync subscription,
// if too long, the task will be done in background
done := make(chan bool, 1)
go func() {
if err := n.networkDriver.Join(ctx, conversation.ID); err != nil {
logger().Error("failed to join conversation", zap.Error(err))
}
done <- true
}()
select {
case <-done:
case <-time.After(1 * time.Second):
}

// send invite to peers
filtered := conversation.Filtered()
for _, member := range conversation.Members {
Expand Down
14 changes: 13 additions & 1 deletion core/node/nodeapi_devtools.go
Expand Up @@ -93,7 +93,7 @@ func (n *Node) GenerateFakeData(_ context.Context, input *node.Void) (*node.Void
ID: contacts[rand.Intn(len(contacts))].ID,
})
}
if _, err := n.ConversationCreate(context.Background(), &node.ConversationCreateInput{
if _, err := n.conversationCreate(context.Background(), &node.ConversationCreateInput{
Contacts: contactsMembers,
Title: strings.Title(fmt.Sprintf("%s %s", gofakeit.HipsterWord(), gofakeit.HackerNoun())),
Topic: gofakeit.HackerPhrase(),
Expand All @@ -102,6 +102,18 @@ func (n *Node) GenerateFakeData(_ context.Context, input *node.Void) (*node.Void
}
}

// enqueue fake incoming event
in := n.NewContactEvent(&entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset)
if err := n.EnqueueClientEvent(in); err != nil {
return nil, err
}

// enqueue fake outgoing event
out := n.NewContactEvent(&entity.Contact{ID: "abcde"}, p2p.Kind_DevtoolsMapset)
if err := n.EnqueueOutgoingEvent(out); err != nil {
return nil, err
}

return &node.Void{}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/node/nodeapi_test.go
Expand Up @@ -55,7 +55,7 @@ func TestPagination(t *testing.T) {
gs.Serve(ic.Listener())
}()
go func() {
alice.Start()
alice.Start(false)
}()
})

Expand Down
3 changes: 3 additions & 0 deletions core/node/nodeclient.go
Expand Up @@ -32,6 +32,9 @@ func (n *Node) EventStream(input *node.EventStreamInput, stream node.Service_Eve
if input.Filter == nil {
return true
}
if input.Filter.Direction != p2p.Event_UnknownDirection && e.Direction != input.Filter.Direction {
return false
}
if input.Filter.Kind != p2p.Kind_Unknown && e.Kind != input.Filter.Kind {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion core/test/app_mock.go
Expand Up @@ -142,7 +142,7 @@ func (a *AppMock) Open() error {
}()

go func() {
if err := a.node.Start(); err != nil {
if err := a.node.Start(false); err != nil {
logger().Error("node routine error", zap.Error(err))
}
}()
Expand Down
64 changes: 47 additions & 17 deletions core/test/e2e_test.go
Expand Up @@ -85,9 +85,12 @@ func TestWithEnqueuer(t *testing.T) {
//jsonPrintIndent(event)
}
{
event := <-bob.eventStream
So(event.Kind, ShouldEqual, p2p.Kind_DevtoolsMapset)
//jsonPrintIndent(event)
in, out, err := asyncEventsWithTimeout(bob.eventStream, 2)
So(err, ShouldBeNil)
So(len(in), ShouldEqual, 1)
So(len(out), ShouldEqual, 1)
So(in[0].Kind, ShouldEqual, p2p.Kind_DevtoolsMapset)
So(out[0].Kind, ShouldEqual, p2p.Kind_Ack)
}
}
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 0, 0, 0, 0})
Expand Down Expand Up @@ -175,6 +178,13 @@ func TestWithEnqueuer(t *testing.T) {
So(res.DisplayName, ShouldBeEmpty)
So(res.DisplayStatus, ShouldBeEmpty)
So(len(res.Devices), ShouldEqual, 0)

in, out, err := asyncEventsWithTimeout(alice.eventStream, 1)
So(err, ShouldBeNil)
So(len(in), ShouldEqual, 0)
So(len(out), ShouldEqual, 1)
So(out[0].Kind, ShouldEqual, p2p.Kind_ContactRequest)

So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{1, 0, 0, 0, 0, 0})

everythingWentFine()
Expand Down Expand Up @@ -224,14 +234,18 @@ func TestWithEnqueuer(t *testing.T) {
So(err, ShouldBeNil)
So(res, ShouldResemble, &node.Void{})

So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 1, 1, 0, 0})
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 1, 2, 0, 0})

everythingWentFine()
})
Convey("Bob emits the ContactRequest event to its client", FailureHalts, func() {
shouldIContinue(t)

event := <-bob.eventStream
in, out, err := asyncEventsWithTimeout(bob.eventStream, 2)
So(err, ShouldBeNil)
So(len(in), ShouldEqual, 1)
So(len(out), ShouldEqual, 1)
event := in[0]

So(event.SenderID, ShouldEqual, alice.node.UserID())
So(event.Direction, ShouldEqual, p2p.Event_Incoming)
Expand Down Expand Up @@ -310,7 +324,7 @@ func TestWithEnqueuer(t *testing.T) {

time.Sleep(time.Second * 1)

So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 2, 0, 0, 0})
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 2, 2, 0, 0})

everythingWentFine()
})
Expand All @@ -332,22 +346,28 @@ func TestWithEnqueuer(t *testing.T) {
So(err, ShouldBeNil)
So(res, ShouldResemble, &node.Void{})

So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 1, 1, 0, 0, 0})
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 3, 1, 2, 0, 0})

everythingWentFine()
})
Convey("Alice emits the ContactRequestAccepted event to its clients", FailureHalts, func() {
shouldIContinue(t)

event := <-alice.eventStream
in, out, err := asyncEventsWithTimeout(alice.eventStream, 3)
So(err, ShouldBeNil)
So(len(in), ShouldEqual, 1)
So(len(out), ShouldEqual, 2)
event := in[0]

//jsonPrintIndent(event)

So(event.SenderID, ShouldEqual, bob.node.UserID())
So(event.Kind, ShouldEqual, p2p.Kind_ContactRequestAccepted)
So(event.ReceiverID, ShouldEqual, alice.node.UserID())
So(event.Direction, ShouldEqual, p2p.Event_Incoming)
_, err = event.GetContactRequestAcceptedAttrs()
So(err, ShouldBeNil)
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 0, 1, 0, 0, 0})
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 0, 1, 2, 0, 0})

everythingWentFine()
})
Expand All @@ -370,14 +390,18 @@ func TestWithEnqueuer(t *testing.T) {
So(err, ShouldBeNil)
So(res, ShouldResemble, &node.Void{})

So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{3, 1, 0, 0, 0, 0})
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{3, 2, 0, 2, 0, 0})

everythingWentFine()
})
Convey("Alice emits the ContactShareMe event to its client", FailureHalts, func() {
shouldIContinue(t)

event := <-alice.eventStream
in, out, err := asyncEventsWithTimeout(alice.eventStream, 2)
So(err, ShouldBeNil)
So(len(in), ShouldEqual, 1)
So(len(out), ShouldEqual, 1)
event := in[0]

So(event.SenderID, ShouldEqual, bob.node.UserID())
So(event.Kind, ShouldEqual, p2p.Kind_ContactShareMe)
Expand All @@ -387,7 +411,7 @@ func TestWithEnqueuer(t *testing.T) {
So(err, ShouldBeNil)
So(attrs.Me.DisplayName, ShouldEqual, "Bob")
So(attrs.Me.DisplayStatus, ShouldBeEmpty)
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{3, 0, 0, 0, 0, 0})
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{3, 0, 0, 2, 0, 0})

everythingWentFine()
})
Expand All @@ -412,7 +436,7 @@ func TestWithEnqueuer(t *testing.T) {
So(err, ShouldBeNil)
So(res, ShouldResemble, &node.Void{})

So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 0, 1, 1, 0, 0})
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{2, 0, 1, 4, 0, 0})

everythingWentFine()
})
Expand All @@ -435,7 +459,7 @@ func TestWithEnqueuer(t *testing.T) {
So(err, ShouldBeNil)
So(res, ShouldResemble, &node.Void{})

So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{1, 0, 1, 1, 0, 0})
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{1, 0, 1, 4, 0, 0})

everythingWentFine()
})
Expand All @@ -458,7 +482,7 @@ func TestWithEnqueuer(t *testing.T) {
So(err, ShouldBeNil)
So(res, ShouldResemble, &node.Void{})

So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 1, 1, 0, 0})
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 1, 4, 0, 0})

everythingWentFine()
})
Expand All @@ -480,14 +504,18 @@ func TestWithEnqueuer(t *testing.T) {
So(err, ShouldBeNil)
So(res, ShouldResemble, &node.Void{})

So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 0, 1, 0, 0})
So(nodeChansLens(alice, bob, eve), ShouldResemble, []int{0, 0, 0, 4, 0, 0})

everythingWentFine()
})
Convey("Bob emits the ContactShareMe event to its client", FailureHalts, func() {
shouldIContinue(t)

event := <-bob.eventStream
in, out, err := asyncEventsWithTimeout(bob.eventStream, 4)
So(err, ShouldBeNil)
So(len(in), ShouldEqual, 1)
So(len(out), ShouldEqual, 3)
event := in[0]

So(event.SenderID, ShouldEqual, alice.node.UserID())
So(event.Direction, ShouldEqual, p2p.Event_Incoming)
Expand Down Expand Up @@ -672,6 +700,8 @@ func TestAliasesFlow(t *testing.T) {

So(err, ShouldBeNil)

time.Sleep(200 * time.Millisecond)

conversations, err := bob.client.ConversationList(internalCtx, &node.ConversationListInput{})
So(err, ShouldBeNil)
So(len(conversations), ShouldEqual, 1)
Expand Down
1 change: 1 addition & 0 deletions core/test/scenario_test.go
Expand Up @@ -75,6 +75,7 @@ func scenario(t *testing.T, alice, bob, eve *AppMock) {
})
Convey("Alice has Bob as friend", FailureHalts, func() {
shouldIContinue(t)
time.Sleep(200 * time.Millisecond)

contacts, err := alice.client.ContactList(internalCtx, &node.ContactListInput{
Filter: &entity.Contact{Status: entity.Contact_IsFriend},
Expand Down

0 comments on commit d04ffeb

Please sign in to comment.