Skip to content

Commit

Permalink
feat(p2p-grpc): added ping grpc method
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Louvigny committed Sep 19, 2018
1 parent f9e3fd5 commit d71323d
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 28 deletions.
15 changes: 15 additions & 0 deletions core/api/client/jsonclient/berty.p2p.service.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions core/api/p2p/service.proto
Expand Up @@ -9,7 +9,13 @@ import "api/p2p/envelope.proto";
service Service {
// HandleEnvelope is the default p2p handler, it receives an envelope containing an encrypted event
rpc HandleEnvelope(Envelope) returns (Void) {};

rpc Ping(PingInput) returns (Void) {};
}

message PingInput {
string destination = 1;
};


message Void {};
7 changes: 6 additions & 1 deletion core/cmd/berty/daemon.go
Expand Up @@ -204,15 +204,20 @@ func daemon(opts *daemonOptions) error {
p2pOpts = append(p2pOpts, p2p.WithRelayClient())
}

driver, err = p2p.NewDriver(context.Background(), p2pOpts...)
p2pDriver, err := p2p.NewDriver(context.Background(), p2pOpts...)

if err != nil {
return err
}

defer func() {
if err := driver.Close(); err != nil {
logger().Warn("failed to close network driver", zap.Error(err))
}
}()

driver = p2pDriver
fmt.Printf("driver ID - %s\n", p2pDriver.ID())
}

if driver == nil {
Expand Down
6 changes: 6 additions & 0 deletions core/network/driver.go
Expand Up @@ -2,14 +2,20 @@ package network

import (
"context"
"net"
"time"

"berty.tech/core/api/p2p"
"github.com/libp2p/go-libp2p-protocol"
)

type Driver interface {
// Emit sends an envelope to a channel
Emit(context.Context, *p2p.Envelope) error

// Dial get a raw connection
Dial(context.Context, string, time.Duration, protocol.ID) (net.Conn, error)

// Join subscribe for new envelope in a channel
Join(context.Context, string) error

Expand Down
16 changes: 14 additions & 2 deletions core/network/p2p/p2p.go
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"context"
"fmt"
"net"
"sync"
"time"

Expand All @@ -15,7 +16,7 @@ import (
dht "github.com/libp2p/go-libp2p-kad-dht"
dhtopt "github.com/libp2p/go-libp2p-kad-dht/opts"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
mdns "github.com/libp2p/go-libp2p/p2p/discovery"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -27,9 +28,12 @@ import (
"berty.tech/core/network"
"berty.tech/core/network/p2p/p2putil"
"berty.tech/core/network/p2p/protocol/service/p2pgrpc"
"github.com/libp2p/go-libp2p-protocol"
)

const ID = "api/p2p/envelope"
const ID = "api/p2p/methods"

var ProtocolID = protocol.ID(p2pgrpc.GetGrpcID(ID))

// driverConfig configure the driver
type driverConfig struct {
Expand Down Expand Up @@ -260,6 +264,10 @@ func (d *Driver) Connect(ctx context.Context, pi pstore.PeerInfo) error {
return d.host.Connect(ctx, pi)
}

func (d *Driver) Dial(ctx context.Context, peerId string, timeout time.Duration, pid protocol.ID) (net.Conn, error) {
return p2putil.NewDialer(d.host, pid)(peerId, timeout)
}

func (d *Driver) createCid(id string) (*cid.Cid, error) {
h, err := mh.Sum([]byte(id), mh.SHA2_256, -1)
if err != nil {
Expand Down Expand Up @@ -371,6 +379,10 @@ func (ds *DriverService) HandleEnvelope(ctx context.Context, e *p2p.Envelope) (*
return nil, fmt.Errorf("no handler set")
}

func (ds *DriverService) Ping(ctx context.Context, e *p2p.PingInput) (*p2p.Void, error) {
return &p2p.Void{}, nil
}

type DriverDiscoveryNotifee Driver

func (ddn *DriverDiscoveryNotifee) HandlePeerFound(pi pstore.PeerInfo) {
Expand Down
39 changes: 39 additions & 0 deletions core/network/p2p/p2putil/manager.go
Expand Up @@ -2,8 +2,16 @@ package p2putil

import (
"context"
"net"
"sync"

"fmt"
"time"

"github.com/libp2p/go-libp2p-host"
"github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-protocol"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -98,3 +106,34 @@ func (m *Manager) GetConn(ctx context.Context, target string) (*grpc.ClientConn,

return cl, nil
}

func NewDialer(host host.Host, pid protocol.ID) func(string, time.Duration) (net.Conn, error) {
return func(target string, timeout time.Duration) (net.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

peerID, err := peer.IDB58Decode(target)
if err != nil {
return nil, fmt.Errorf("failed to parse `%s`: %s", target, err.Error())
}

// No stream exist, creating a new one
logger().Debug("Dialing", zap.String("addr", target))

p, err := host.Peerstore().GetProtocols(peerID)

if err != nil {
return nil, errors.Wrap(err, "shit appends while resolving protocols")
}

fmt.Printf("%+v\n\n\n\n\n", p)

s, err := host.NewStream(ctx, peerID, pid)
if err != nil {
logger().Error("new stream failed", zap.Error(err))
return nil, errors.Wrap(err, "new stream failed")
}

return NewConnFromStream(s)
}
}
29 changes: 4 additions & 25 deletions core/network/p2p/protocol/service/p2pgrpc/grpc.go
@@ -1,13 +1,10 @@
package p2pgrpc

import (
"context"
"fmt"
"net"
"time"

host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"
"go.uber.org/zap"

Expand All @@ -16,7 +13,7 @@ import (

const ID = "/berty/grpc"

func getGrpcID(proto string) string {
func GetGrpcID(proto string) string {
return ID + "/" + proto
}

Expand All @@ -39,7 +36,7 @@ func (pg *P2Pgrpc) hasProtocol(proto string) bool {
}

func (pg *P2Pgrpc) NewListener(proto string) net.Listener {
id := getGrpcID(proto)
id := GetGrpcID(proto)

if pg.hasProtocol(id) {
logger().Warn("protocol already registered", zap.String("pid", id))
Expand All @@ -59,25 +56,7 @@ func (pg *P2Pgrpc) NewListener(proto string) net.Listener {
}

func (pg *P2Pgrpc) NewDialer(proto string) func(string, time.Duration) (net.Conn, error) {
pid := protocol.ID(getGrpcID(proto))
return func(target string, timeout time.Duration) (net.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

peerID, err := peer.IDB58Decode(target)
if err != nil {
return nil, fmt.Errorf("failed to parse `%s`: %s", target, err.Error())
}

// No stream exist, creating a new one
logger().Debug("dialing", zap.String("addr", target))
pid := protocol.ID(GetGrpcID(proto))

s, err := pg.host.NewStream(ctx, peerID, pid)
if err != nil {
logger().Error("new stream failed ", zap.Error(err))
return nil, err
}

return p2putil.NewConnFromStream(s)
}
return p2putil.NewDialer(pg.host, pid)
}
14 changes: 14 additions & 0 deletions core/node/p2papi.go
Expand Up @@ -7,7 +7,11 @@ import (
"github.com/gogo/protobuf/proto"
"google.golang.org/grpc"

"time"

"berty.tech/core/api/p2p"
netp2p "berty.tech/core/network/p2p"
"github.com/pkg/errors"
)

// WithP2PGrpcServer registers the Node as a 'berty.p2p' protobuf server implementation
Expand All @@ -21,6 +25,16 @@ func (n *Node) HandleEnvelope(ctx context.Context, input *p2p.Envelope) (*p2p.Vo
return &p2p.Void{}, n.handleEnvelope(ctx, input)
}

func (n *Node) Ping(ctx context.Context, input *p2p.PingInput) (*p2p.Void, error) {
_, err := n.networkDriver.Dial(ctx, input.Destination, time.Second*5, netp2p.ProtocolID)

if err != nil {
return nil, errors.Wrap(err, "unable to ping")
}

return &p2p.Void{}, nil
}

func (n *Node) handleEnvelope(ctx context.Context, input *p2p.Envelope) error {
event, err := n.OpenEnvelope(input)
if err != nil {
Expand Down

0 comments on commit d71323d

Please sign in to comment.