Skip to content

Commit

Permalink
feat(core): implement Node.EventStream()
Browse files Browse the repository at this point in the history
  • Loading branch information
moul committed Aug 3, 2018
1 parent a958847 commit 9e884b8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 13 deletions.
9 changes: 5 additions & 4 deletions core/node/errors.go
Expand Up @@ -3,8 +3,9 @@ package node
import "errors"

var (
ErrEntityAlreadyExists = errors.New("entity already exists")
ErrInvalidEventSender = errors.New("invalid event sender")
ErrInvalidInput = errors.New("invalid input")
ErrNotImplemented = errors.New("not implemented")
ErrEntityAlreadyExists = errors.New("entity already exists")
ErrInvalidEventSender = errors.New("invalid event sender")
ErrInvalidInput = errors.New("invalid input")
ErrNotImplemented = errors.New("not implemented")
ErrAnotherClientIsAlreadyConnected = errors.New("another client is already connected")
)
15 changes: 8 additions & 7 deletions core/node/node.go
Expand Up @@ -18,13 +18,14 @@ import (

// Node is the top-level object of a Berty peer
type Node struct {
clientEvents chan *p2p.Event
outgoingEvents chan *p2p.Event
sql *gorm.DB
config *entity.Config
initDevice *entity.Device
handleMutex sync.Mutex
networkDriver network.Driver
clientEvents chan *p2p.Event
outgoingEvents chan *p2p.Event
clientEventsConnected bool
sql *gorm.DB
config *entity.Config
initDevice *entity.Device
handleMutex sync.Mutex
networkDriver network.Driver
}

// New initializes a new Node object
Expand Down
25 changes: 23 additions & 2 deletions core/node/nodeapi.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/berty/berty/core/api/node"
Expand Down Expand Up @@ -40,8 +41,28 @@ func (n *Node) EventList(input *node.EventListInput, stream node.Service_EventLi
}

// EventStream implements berty.node.EventStream
func (n *Node) EventStream(*node.Void, node.Service_EventStreamServer) error {
return ErrNotImplemented
func (n *Node) EventStream(_ *node.Void, stream node.Service_EventStreamServer) error {
if n.clientEventsConnected {
return ErrAnotherClientIsAlreadyConnected
}

zap.L().Debug("EventStream connected")
n.clientEventsConnected = true
defer func() {
zap.L().Debug("EventStream disconnected")
n.clientEventsConnected = false
}()

for {
select {
case <-stream.Context().Done():
return stream.Context().Err()
case event := <-n.clientEvents:
if err := stream.Send(event); err != nil {
return err
}
}
}
}

//
Expand Down

0 comments on commit 9e884b8

Please sign in to comment.