From 9e884b87967ee929829e5448b9c1576e728c5c04 Mon Sep 17 00:00:00 2001 From: Manfred Touron Date: Fri, 3 Aug 2018 08:47:11 +0200 Subject: [PATCH] feat(core): implement Node.EventStream() --- core/node/errors.go | 9 +++++---- core/node/node.go | 15 ++++++++------- core/node/nodeapi.go | 25 +++++++++++++++++++++++-- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/core/node/errors.go b/core/node/errors.go index f3364763a2..56a6e8ccca 100644 --- a/core/node/errors.go +++ b/core/node/errors.go @@ -3,8 +3,9 @@ package node import "errors" var ( - ErrEntityAlreadyExists = errors.New("entity already exists") - ErrInvalidEventSender = errors.New("invalid event sender") - ErrInvalidInput = errors.New("invalid input") - ErrNotImplemented = errors.New("not implemented") + ErrEntityAlreadyExists = errors.New("entity already exists") + ErrInvalidEventSender = errors.New("invalid event sender") + ErrInvalidInput = errors.New("invalid input") + ErrNotImplemented = errors.New("not implemented") + ErrAnotherClientIsAlreadyConnected = errors.New("another client is already connected") ) diff --git a/core/node/node.go b/core/node/node.go index 4b7f5416c6..925260547e 100644 --- a/core/node/node.go +++ b/core/node/node.go @@ -18,13 +18,14 @@ import ( // Node is the top-level object of a Berty peer type Node struct { - clientEvents chan *p2p.Event - outgoingEvents chan *p2p.Event - sql *gorm.DB - config *entity.Config - initDevice *entity.Device - handleMutex sync.Mutex - networkDriver network.Driver + clientEvents chan *p2p.Event + outgoingEvents chan *p2p.Event + clientEventsConnected bool + sql *gorm.DB + config *entity.Config + initDevice *entity.Device + handleMutex sync.Mutex + networkDriver network.Driver } // New initializes a new Node object diff --git a/core/node/nodeapi.go b/core/node/nodeapi.go index 773d737895..aac8c4528c 100644 --- a/core/node/nodeapi.go +++ b/core/node/nodeapi.go @@ -4,6 +4,7 @@ import ( "context" "github.com/pkg/errors" + "go.uber.org/zap" "google.golang.org/grpc" "github.com/berty/berty/core/api/node" @@ -40,8 +41,28 @@ func (n *Node) EventList(input *node.EventListInput, stream node.Service_EventLi } // EventStream implements berty.node.EventStream -func (n *Node) EventStream(*node.Void, node.Service_EventStreamServer) error { - return ErrNotImplemented +func (n *Node) EventStream(_ *node.Void, stream node.Service_EventStreamServer) error { + if n.clientEventsConnected { + return ErrAnotherClientIsAlreadyConnected + } + + zap.L().Debug("EventStream connected") + n.clientEventsConnected = true + defer func() { + zap.L().Debug("EventStream disconnected") + n.clientEventsConnected = false + }() + + for { + select { + case <-stream.Context().Done(): + return stream.Context().Err() + case event := <-n.clientEvents: + if err := stream.Send(event); err != nil { + return err + } + } + } } //