Skip to content

Commit

Permalink
feat(core): add network.Driver.SetReceiveEventHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
moul committed Jul 31, 2018
1 parent baa2adf commit 05c00cc
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 18 deletions.
3 changes: 2 additions & 1 deletion core/network/driver.go
Expand Up @@ -7,5 +7,6 @@ import (
)

type Driver interface {
SendEvent(ctx context.Context, event *p2p.Event) error
SendEvent(context.Context, *p2p.Event) error
SetReceiveEventHandler(func(context.Context, *p2p.Event) (*p2p.Void, error))
}
4 changes: 4 additions & 0 deletions core/network/drivermock/euqueuer.go
Expand Up @@ -27,3 +27,7 @@ func (e *Enqueuer) SendEvent(_ context.Context, event *p2p.Event) error {
e.queue <- event
return nil
}

func (e *Enqueuer) SetReceiveEventHandler(_ func(context.Context, *p2p.Event) (*p2p.Void, error)) {
// doing nothing, enqueuer does not support receiving events
}
53 changes: 36 additions & 17 deletions core/network/drivermock/simple.go
Expand Up @@ -5,45 +5,64 @@ import (
"fmt"

"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/berty/berty/core/api/p2p"
"github.com/berty/berty/core/client"
"github.com/berty/berty/core/network"
)

type peer struct {
id string
conn *grpc.ClientConn
}

type Simple struct {
network.Driver
//
// Manager
//

type SimpleManager struct {
peers []peer
}

func NewSimple() *Simple {
return &Simple{
type peer struct {
id string
driver *SimpleDriver
}

func NewSimple() *SimpleManager {
return &SimpleManager{
peers: make([]peer, 0),
}
}

func (s *Simple) SendEvent(ctx context.Context, event *p2p.Event) error {
for _, peer := range s.peers {
func (m *SimpleManager) Driver() *SimpleDriver {
return &SimpleDriver{
manager: m,
}
}

func (m *SimpleManager) AddPeer(id string, driver *SimpleDriver) {
m.peers = append(m.peers, peer{id: id, driver: driver})
}

//
// Driver
//

type SimpleDriver struct {
network.Driver
manager *SimpleManager
handler func(context.Context, *p2p.Event) (*p2p.Void, error)
}

func (d *SimpleDriver) SendEvent(ctx context.Context, event *p2p.Event) error {
for _, peer := range d.manager.peers {
if peer.id == event.ReceiverID {
zap.L().Debug("Simple.SendEvent",
zap.String("sender", event.SenderID),
zap.String("receiver", event.ReceiverID),
)
_, err := client.New(peer.conn).P2p().Handle(p2p.SetSender(ctx, event.SenderID), event)
_, err := peer.driver.handler(p2p.SetSender(ctx, event.SenderID), event)
return err
}
}
zap.L().Error("AAAAAAAAAAAAAAAAA")
return fmt.Errorf("peer not found")
}

func (s *Simple) AddPeer(id string, conn *grpc.ClientConn) {
s.peers = append(s.peers, peer{id: id, conn: conn})
func (d *SimpleDriver) SetReceiveEventHandler(handler func(context.Context, *p2p.Event) (*p2p.Void, error)) {
d.handler = handler
}
2 changes: 2 additions & 0 deletions core/node/node.go
Expand Up @@ -57,6 +57,8 @@ func New(opts ...NewNodeOption) (*Node, error) {
}
n.config = config

n.networkDriver.SetReceiveEventHandler(n.Handle)

return n, nil
}

Expand Down

0 comments on commit 05c00cc

Please sign in to comment.