Skip to content

Commit

Permalink
fix(test): e2e_test and app mock context
Browse files Browse the repository at this point in the history
Signed-off-by: Godefroy Ponsinet <godefroy.ponsinet@outlook.com>
  • Loading branch information
90dy committed Mar 8, 2019
1 parent 39fd678 commit 40b99ed
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 125 deletions.
1 change: 0 additions & 1 deletion core/api/client/jsonclient/logger.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/api/client/logger.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/network/config/config.go
Expand Up @@ -83,6 +83,9 @@ func (cfg *Config) NewNode(ctx context.Context) (*host.BertyHost, error) {
// override conn manager
cfg.Config.ConnManager = host.NewBertyConnMgr(ctx, 10, 20, time.Duration(60*1000))

// override ping service
cfg.Config.DisablePing = true

h, err := cfg.Config.NewNode(ctx)
if err != nil {
return nil, err
Expand Down
14 changes: 5 additions & 9 deletions core/network/driver.go
Expand Up @@ -74,9 +74,9 @@ func (net *Network) ID(ctx context.Context) *metric.Peer {
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()
// ctx = tracer.Context()
addrs := make([]string, len(net.host.Addrs()))
for i, addr := range net.host.Addrs() {
addrs[i] = addr.String()
addrs := []string{}
for _, addr := range net.host.Addrs() {
addrs = append(addrs, addr.String())
}

return &metric.Peer{
Expand Down Expand Up @@ -336,9 +336,9 @@ func (net *Network) FindProvidersAndWait(ctx context.Context, id string, cache b
return nil, err
}

ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()

t := time.Now()
piChan := net.host.Routing.FindProvidersAsync(ctx, c, 10)

piSlice := []pstore.PeerInfo{}
Expand All @@ -348,11 +348,7 @@ func (net *Network) FindProvidersAndWait(ctx context.Context, id string, cache b
if pi.ID != "" {
piSlice = append(piSlice, pi)
}
if time.Now().Sub(t) >= time.Second*3 && len(piSlice) >= 1 {
cancel()
}
case <-ctx.Done():
cancel()
if len(piSlice) == 0 {
return nil, errors.New("no providers found")
}
Expand Down
2 changes: 1 addition & 1 deletion core/network/host/host.go
Expand Up @@ -173,7 +173,7 @@ func (bh *BertyHost) newStream(ctx context.Context, p peer.ID, pid protocol.ID)
s.SetProtocol(pid)

lzcon := msmux.NewMSSelect(s, string(pid))
return &streamWrapper{
return &StreamWrapper{
Stream: s,
rw: lzcon,
}, nil
Expand Down
17 changes: 0 additions & 17 deletions core/network/host/notifee.go

This file was deleted.

10 changes: 5 additions & 5 deletions core/network/host/stream_wrapper.go
Expand Up @@ -8,25 +8,25 @@ import (
msmux "github.com/multiformats/go-multistream"
)

type streamWrapper struct {
type StreamWrapper struct {
inet.Stream
rw io.ReadWriter
}

func NewStreamWrapper(s inet.Stream, pid protocol.ID) inet.Stream {
func NewStreamWrapper(s inet.Stream, pid protocol.ID) *StreamWrapper {
s.SetProtocol(pid)

lzcon := msmux.NewMSSelect(s, string(pid))
return &streamWrapper{
return &StreamWrapper{
Stream: s,
rw: lzcon,
}
}

func (s *streamWrapper) Read(b []byte) (int, error) {
func (s *StreamWrapper) Read(b []byte) (int, error) {
return s.rw.Read(b)
}

func (s *streamWrapper) Write(b []byte) (int, error) {
func (s *StreamWrapper) Write(b []byte) (int, error) {
return s.rw.Write(b)
}
1 change: 1 addition & 0 deletions core/network/options.go
Expand Up @@ -38,6 +38,7 @@ func WithDefaultOptions() config.Option {
libp2p.EnableAutoRelay(),
),
EnableDefaultBootstrap(),
EnablePing(),
EnableMDNS(),
PrivateNetwork(config.DefaultSwarmKey),
EnableDHT(),
Expand Down
1 change: 1 addition & 0 deletions core/network/protocol/ble/transport.go
Expand Up @@ -64,6 +64,7 @@ func AddToPeerStore(peerID string, rAddr string) {
// created. It represents an entire tcp stack (though it might not necessarily be)
func NewTransport(h host.Host) (*Transport, error) {
// use deterministic id based on host peerID
logger().Debug("BLE: " + h.ID().String())
id := uuid.NewV5(uuid.UUID{}, h.ID().String())
srcMA, err := ma.NewMultiaddr(fmt.Sprintf("/ble/%s", id.String()))
ret := &Transport{
Expand Down
37 changes: 25 additions & 12 deletions core/network/protocol/mdns/discovery.go
Expand Up @@ -2,6 +2,7 @@ package mdns

import (
"context"
"sync"
"time"

"berty.tech/core/pkg/tracing"
Expand All @@ -17,6 +18,7 @@ type Discovery struct {
host host.Host
services map[string]service.Service
notifees map[string]*notifee
mutex sync.Mutex
}

func NewDiscovery(ctx context.Context, host host.Host) (discovery.Discovery, error) {
Expand All @@ -31,7 +33,7 @@ func (d *Discovery) Advertise(ctx context.Context, ns string, opts ...discovery.
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()

if err := d.wakeService(ctx, ns); err != nil {
if err := d.wakeService(ctx, ns, false); err != nil {
return 0, err
}
time.Sleep(10 * time.Second)
Expand All @@ -42,22 +44,17 @@ func (d *Discovery) FindPeers(ctx context.Context, ns string, opts ...discovery.
tracer := tracing.EnterFunc(ctx)
defer tracer.Finish()

if err := d.wakeService(ctx, ns); err != nil {
if err := d.wakeService(ctx, ns, true); err != nil {
return nil, err
}
_, ok := d.notifees[ns]
if !ok {
d.notifees[ns] = &notifee{
piChan: make(chan pstore.PeerInfo, 1),
}
d.services[ns].RegisterNotifee(d.notifees[ns])
}

return d.notifees[ns].piChan, nil
}

func (d *Discovery) wakeService(ctx context.Context, ns string) error {
func (d *Discovery) wakeService(ctx context.Context, ns string, regiterNotifee bool) error {
var err error

d.mutex.Lock()
_, ok := d.services[ns]
if ok {
return nil
Expand All @@ -68,15 +65,31 @@ func (d *Discovery) wakeService(ctx context.Context, ns string) error {
return err
}

_, ok = d.notifees[ns]
if !ok {
d.notifees[ns] = &notifee{
piChan: make(chan pstore.PeerInfo, 1),
}
}

if regiterNotifee && d.notifees[ns].registered == false {
d.services[ns].RegisterNotifee(d.notifees[ns])
d.notifees[ns].registered = true
}
d.mutex.Unlock()

return nil
}

var _ service.Notifee = (*notifee)(nil)

type notifee struct {
piChan chan pstore.PeerInfo
piChan chan pstore.PeerInfo
registered bool
}

func (n *notifee) HandlePeerFound(pi pstore.PeerInfo) {
n.piChan <- pi
if pi.ID != "" {
n.piChan <- pi
}
}
3 changes: 2 additions & 1 deletion core/node/network.go
Expand Up @@ -40,10 +40,11 @@ func (n *Node) UseNetworkDriver(ctx context.Context, driver network.Driver) erro
// configure network
n.networkDriver.OnEnvelopeHandler(n.HandleEnvelope)
if err := n.networkDriver.Join(ctx, n.UserID()); err != nil {
logger().Warn("failed to join user channel",
logger().Error("failed to join user channel",
zap.String("id", n.UserID()),
zap.Error(err),
)
return err
}

// FIXME: subscribe to every owned device IDs
Expand Down
18 changes: 12 additions & 6 deletions core/test/app_mock.go
Expand Up @@ -63,7 +63,7 @@ func WithUnencryptedDb() AppMockOption {
}
}

func NewAppMock(device *entity.Device, networkDriver network.Driver, options ...AppMockOption) (*AppMock, error) {
func NewAppMock(ctx context.Context, device *entity.Device, networkDriver network.Driver, options ...AppMockOption) (*AppMock, error) {
tmpFile, err := ioutil.TempFile("", "sqlite")
if err != nil {
return nil, err
Expand All @@ -76,8 +76,10 @@ func NewAppMock(device *entity.Device, networkDriver network.Driver, options ...
crypto: &keypair.InsecureCrypto{},
options: options,
}
a.ctx, a.cancel = context.WithCancel(ctx)

if err := a.Open(); err != nil {
a.cancel()
return nil, err
}

Expand Down Expand Up @@ -121,8 +123,6 @@ func (a *AppMock) Open() error {
}
}

a.ctx, a.cancel = context.WithCancel(context.Background())

if a.node, err = node.New(
a.ctx,
node.WithSQL(a.db),
Expand Down Expand Up @@ -158,7 +158,7 @@ func (a *AppMock) Open() error {
return nil
}

func (a *AppMock) InitEventStream() error {
func (a *AppMock) InitEventStream(ctx context.Context) error {
a.eventStream = make(chan *entity.Event, 100)
stream, err := a.client.Node().EventStream(a.ctx, &nodeapi.EventStreamInput{})
if err != nil {
Expand All @@ -172,10 +172,16 @@ func (a *AppMock) InitEventStream() error {
return
}
if err != nil {
logger().Warn("failed to receive stream data", zap.Error(err))
logger().Warn("failed to receive stream data", zap.String("app", fmt.Sprintf("%+v", a)), zap.Error(err))
return
}
select {
default:
a.eventStream <- data
case <-ctx.Done():
logger().Debug("event stream context done")
return
}
a.eventStream <- data
}
}()
return nil
Expand Down

0 comments on commit 40b99ed

Please sign in to comment.