Skip to content

Commit

Permalink
fix: fix messenger/protocol test
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com>
  • Loading branch information
gfanton committed Oct 20, 2022
1 parent ec651ce commit a90e34c
Show file tree
Hide file tree
Showing 23 changed files with 179 additions and 113 deletions.
2 changes: 2 additions & 0 deletions config/config.yml
Expand Up @@ -53,6 +53,8 @@ p2p:
# svc ams
- maddr: '/ip4/51.15.25.224/tcp/4040/p2p/12D3KooWHhDBv6DJJ4XDWjzEXq6sVNEs6VuxsV1WyBBEhPENHzcZ'
# - maddr: '/ip4/51.15.25.224/udp/quic/4040/p2p/12D3KooWHhDBv6DJJ4XDWjzEXq6sVNEs6VuxsV1WyBBEhPENHzcZ'
# svc test
# - maddr: '/ip4/51.159.34.32/tcp/4040/p2p/12D3KooWLRUtgsmEwg7zC2d2JhQfYiXsS1BJBRXvrfBJA1Eo6aiY'
static-relays:
# berty relays fr
# - '/ip4/51.159.21.214/tcp/4040/p2p/QmdT7AmhhnbuwvCpa5PH1ySK9HJVB82jr3fo1bxMxBPW6p'
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -132,7 +132,8 @@ replace (

github.com/agl/ed25519 => github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // latest commit before the author shutdown the repo; see https://github.com/golang/go/issues/20504

github.com/libp2p/go-libp2p-rendezvous => github.com/berty/go-libp2p-rendezvous v0.0.0-20220927143406-f7347cb814d4 // use berty fork of go-libp2p-rendezvous
github.com/libp2p/go-libp2p-rendezvous => github.com/berty/go-libp2p-rendezvous v0.0.0-20221020165144-5fc9e26830f5 // use berty fork of go-libp2p-rendezvous

github.com/lucas-clemente/quic-go => github.com/lucas-clemente/quic-go v0.25.0

github.com/multiformats/go-multiaddr => github.com/berty/go-multiaddr v0.4.2-0.20220126184027-53e56f02fb68 // tmp, required for Android SDK30
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions go/internal/initutil/ipfs.go
Expand Up @@ -715,8 +715,9 @@ func (m *Manager) configIPFSRouting(h host.Host, r p2p_routing.Routing) error {
// }

// disc = tinder.NewRotationDiscovery(logger.Named("rotation"), disc, rp)
disc := tinder.NewDiscoveryAdaptater(logger.Named("disc"), m.Node.Protocol.tinder)
popts = append(popts, pubsub.WithDiscovery(disc, pubsub.WithDiscoverConnector(backoffconnector)))
m.Node.Protocol.discAdaptater = tinder.NewDiscoveryAdaptater(logger.Named("disc"), m.Node.Protocol.tinder)
popts = append(popts, pubsub.WithDiscovery(
m.Node.Protocol.discAdaptater, pubsub.WithDiscoverConnector(backoffconnector)))

// pubsub.DiscoveryPollInterval = m.Node.Protocol.PollInterval
m.Node.Protocol.pubsub, err = pubsub.NewGossipSub(m.getContext(), h, popts...)
Expand Down
5 changes: 5 additions & 0 deletions go/internal/initutil/manager.go
Expand Up @@ -149,6 +149,7 @@ type Manager struct {
ipfsAPI ipfsutil.ExtendedCoreAPI
mdnsService p2p_mdns.Service
localdisc *tinder.LocalDiscovery
discAdaptater *tinder.DiscoveryAdaptater
pubsub *pubsub.PubSub
tinder *tinder.Service
server bertyprotocol.Service
Expand Down Expand Up @@ -397,6 +398,10 @@ func (m *Manager) Close(prog *progress.Progress) error {
if m.Node.Protocol.localdisc != nil {
m.Node.Protocol.localdisc.Close()
}

if m.Node.Protocol.discAdaptater != nil {
m.Node.Protocol.discAdaptater.Close()
}
}

prog.Get("close-mdns-service").SetAsCurrent()
Expand Down
25 changes: 20 additions & 5 deletions go/internal/ipfsutil/testing.go
Expand Up @@ -5,6 +5,7 @@ import (
crand "crypto/rand"
"encoding/base64"
"fmt"
"math/rand"
"testing"

ipfs_mobile "github.com/ipfs-shipyard/gomobile-ipfs/go/pkg/ipfsmobile"
Expand Down Expand Up @@ -128,13 +129,20 @@ func TestingCoreAPIUsingMockNet(ctx context.Context, t testing.TB, opts *Testing

var ps *pubsub.PubSub
var stinder *tinder.Service
var discAdaptater *tinder.DiscoveryAdaptater

configureRouting := func(h host.Host, r routing.Routing) error {
var err error
drivers := []tinder.IDriver{}
if opts.RDVPeer.ID != "" {
h.Peerstore().AddAddrs(opts.RDVPeer.ID, opts.RDVPeer.Addrs, peerstore.PermanentAddrTTL)
ms := tinder.NewMockDriverServer()
drivers = append(drivers, ms.Client(h))
cl := rendezvous.NewSyncInMemClient(ctx, h)
ms := tinder.NewRendezvousDiscovery(opts.Logger.Named("rdvp"), h, opts.RDVPeer.ID, rand.New(rand.NewSource(rand.Int63())), cl)
if _, err = opts.Mocknet.LinkPeers(h.ID(), opts.RDVPeer.ID); err != nil {
return err
}

drivers = append(drivers, ms)
}

if r != nil {
Expand All @@ -148,8 +156,8 @@ func TestingCoreAPIUsingMockNet(ctx context.Context, t testing.TB, opts *Testing
return fmt.Errorf("unable to monitor discovery driver: %w", err)
}

disc := tinder.NewDiscoveryAdaptater(opts.Logger, stinder)
ps, err = pubsub.NewGossipSub(ctx, h, pubsub.WithDiscovery(disc))
discAdaptater = tinder.NewDiscoveryAdaptater(opts.Logger, stinder)
ps, err = pubsub.NewGossipSub(ctx, h, pubsub.WithDiscovery(discAdaptater))

return err
}
Expand Down Expand Up @@ -183,6 +191,10 @@ func TestingCoreAPIUsingMockNet(ctx context.Context, t testing.TB, opts *Testing
}

return api, func() {
if discAdaptater != nil {
discAdaptater.Close()
}

_ = mnode.Close()
_ = mnode.PeerHost().Close()
_ = repo.Close()
Expand Down Expand Up @@ -217,7 +229,10 @@ func TestingRDVP(ctx context.Context, t testing.TB, h host.Host) (*rendezvous.Re
db, err := p2p_rpdb.OpenDB(ctx, ":memory:")
require.NoError(t, err)

svc := rendezvous.NewRendezvousService(h, db)
provider, err := rendezvous.NewSyncInMemProvider(h)
require.NoError(t, err)

svc := rendezvous.NewRendezvousService(h, db, provider)
cleanup := func() {
_ = db.Close() // dont use me for now as db is open in_memory
}
Expand Down
12 changes: 9 additions & 3 deletions go/internal/rendezvous/emitterio_sync_test.go
Expand Up @@ -2,6 +2,7 @@ package rendezvous_test

import (
"context"
"os"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -38,11 +39,16 @@ func getEmitterRendezvousClients(ctx context.Context, t *testing.T, hosts []host
}

func TestEmitterIOFlow(t *testing.T) {
const serverAddr = "tcp://127.0.0.1:8080"
const adminKey = "bPcKwpuIP3gjjAKqb9EaAJYJnilISQhJ"

// const adminKey = "bPcKwpuIP3gjjAKqb9EaAJYJnilISQhJ"
// const serverAddr = "tcp://127.0.0.1:8080"
const topic = "foo1"

serverAddr := os.Getenv("TEST_EMITTER_SERVER_ADDR")
adminKey := os.Getenv("TEST_EMITTER_ADMINKEY")
if adminKey == "" || serverAddr == "" {
t.Skip("cannot test emitter, no adminKey/serverAddr provided")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down
2 changes: 1 addition & 1 deletion go/internal/tinder/driver_localdiscovery_test.go
Expand Up @@ -39,7 +39,7 @@ func TestLocalDiscorvery(t *testing.T) {

const topic = "test_topic"

s1.Advertises(ctx, topic)
s1.StartAdvertises(ctx, topic)

// try a first lookup, should find nothing
{
Expand Down
6 changes: 3 additions & 3 deletions go/internal/tinder/driver_mock.go
Expand Up @@ -57,7 +57,7 @@ func (s *MockDriverServer) FindPeers(topic string, limit int) <-chan peer.AddrIn
peer := peers[i]
if expire, ok := expires[peer.ID]; ok {
if time.Now().Before(expire) {
out <- peers[i]
out <- peer
} else {
delete(expires, peer.ID)
}
Expand All @@ -84,11 +84,11 @@ func (s *MockDriverServer) Exist(topic string, p peer.ID) (ok bool) {

func (s *MockDriverServer) Subscribe(ctx context.Context, topic string, buffsize int) <-chan peer.AddrInfo {
// subtract 500ms to make sure to avoid data race and miss some event
start := time.Now().Add(-time.Millisecond * 500)

s.mx.Lock()
defer s.mx.Unlock()

start := time.Now()

peers := s.pc.GetPeersForTopics(topic)
knownPeers := make(PeersUpdate)
for _, p := range peers {
Expand Down
2 changes: 1 addition & 1 deletion go/internal/tinder/driver_service_test.go
Expand Up @@ -109,7 +109,7 @@ func testMultipleDriversSubscribe(t *testing.T, ctx context.Context, mn mocknet.
require.NoError(t, err)

// ...then advertise
err = s1.Advertises(ctx, topic)
err = s1.StartAdvertises(ctx, topic)
require.NoError(t, err)

{
Expand Down
5 changes: 4 additions & 1 deletion go/internal/tinder/notify_network.go
Expand Up @@ -100,7 +100,10 @@ func (n *NetworkUpdate) subscribeToNetworkUpdate() {

func (n *NetworkUpdate) Close() (err error) {
// use once to avoid panic if called twice
n.once.Do(func() { err = n.sub.Close() })
n.once.Do(func() {
err = n.sub.Close()
})

return err
}

Expand Down
3 changes: 2 additions & 1 deletion go/internal/tinder/peer_cache.go
Expand Up @@ -68,7 +68,8 @@ func (c *peersCache) UpdatePeer(topic string, p peer.AddrInfo) (isNew bool) {
c.peers[p.ID] = p
}

tu.peerUpdate[p.ID] = time.Now()
t := time.Now()
tu.peerUpdate[p.ID] = t

// notify topic that peers has been updated
tu.notify.Broadcast()
Expand Down
2 changes: 1 addition & 1 deletion go/internal/tinder/service.go
Expand Up @@ -135,7 +135,7 @@ func (s *Service) fadeOut(ctx context.Context, topic string, bufsize int) <-chan
}

func (s *Service) Close() error {
return nil
return s.networkNotify.Close()
}

func (s *Service) GetProcess() uint32 { return atomic.LoadUint32(&s.process) }
Expand Down
49 changes: 31 additions & 18 deletions go/internal/tinder/service_adaptater.go
Expand Up @@ -34,6 +34,8 @@ type DiscoveryAdaptater struct {
muAdvertiser sync.Mutex
resetInterval time.Duration
ttl time.Duration

closeOnce sync.Once
}

func NewDiscoveryAdaptater(logger *zap.Logger, service *Service) *DiscoveryAdaptater {
Expand Down Expand Up @@ -112,6 +114,7 @@ func (a *DiscoveryAdaptater) Advertise(_ context.Context, topic string, opts ...
ctx := a.ctx

a.muAdvertiser.Lock()
defer a.muAdvertiser.Unlock()

start := time.Now()
if t, ok := a.watchdogAdvertise[topic]; ok {
Expand All @@ -123,16 +126,26 @@ func (a *DiscoveryAdaptater) Advertise(_ context.Context, topic string, opts ...
} else {
wctx, cancel := context.WithCancel(ctx)

// start advertising on this topic
if err := a.service.StartAdvertises(wctx, topic, StartAdvertisesFilterDrivers(LocalDiscoveryName)); err != nil {
a.logger.Error("advertise failed", logutil.PrivateString("topic", topic), zap.Error(err))
cancel()
return time.Minute, err
}

a.logger.Debug("advertise started", logutil.PrivateString("topic", topic))

// create a new watchdog
a.watchdogAdvertise[topic] = time.AfterFunc(a.resetInterval, func() {
// watchdog has expired, cancel advertise

a.muAdvertiser.Lock()
cancel()
a.logger.Debug("advertise expired",
logutil.PrivateString("topic", topic),
zap.Duration("duration", time.Since(start)),
)

a.muAdvertiser.Lock()
delete(a.watchdogAdvertise, topic)
a.muAdvertiser.Unlock()

Expand All @@ -144,31 +157,31 @@ func (a *DiscoveryAdaptater) Advertise(_ context.Context, topic string, opts ...
)
}
})

// start advertising on this topic
if err := a.service.Advertises(wctx, topic, AdvertisesFilterDrivers(LocalDiscoveryName)); err != nil {
a.logger.Error("advertise failed", logutil.PrivateString("topic", topic), zap.Error(err))
} else {
a.logger.Debug("advertise started", logutil.PrivateString("topic", topic))
}
}

a.muAdvertiser.Unlock()

return a.ttl, nil
}

func (a *DiscoveryAdaptater) Close() error {
a.cancel()
a.closeOnce.Do(func() {
a.muDiscover.Lock()
a.cancel()
for _, st := range a.watchdogDiscover {
if !st.timer.Stop() {
<-st.timer.C
}
_ = st.sub.Close()
}
a.muDiscover.Unlock()

a.muDiscover.Lock()
for _, st := range a.watchdogDiscover {
if !st.timer.Stop() {
<-st.timer.C
a.muAdvertiser.Lock()
for _, t := range a.watchdogAdvertise {
if !t.Stop() {
<-t.C
}
}
_ = st.sub.Close()
}
a.muDiscover.Unlock()
a.muAdvertiser.Unlock()
})

return nil
}
7 changes: 4 additions & 3 deletions go/internal/tinder/service_advertises.go
Expand Up @@ -27,7 +27,7 @@ func (o *AdvertiseOptions) apply(opts ...AdvertiseOption) error {
return nil
}

func AdvertisesFilterDrivers(drivers ...string) AdvertiseOption {
func StartAdvertisesFilterDrivers(drivers ...string) AdvertiseOption {
return func(opts *AdvertiseOptions) error {
opts.Filters = map[string]struct{}{}
for _, driver := range drivers {
Expand All @@ -39,7 +39,7 @@ func AdvertisesFilterDrivers(drivers ...string) AdvertiseOption {
}

// Register advertise topic on each of his driver
func (s *Service) Advertises(ctx context.Context, topic string, opts ...AdvertiseOption) error {
func (s *Service) StartAdvertises(ctx context.Context, topic string, opts ...AdvertiseOption) error {
if len(s.drivers) == 0 {
return fmt.Errorf("no driver available to advertise")
}
Expand All @@ -48,7 +48,7 @@ func (s *Service) Advertises(ctx context.Context, topic string, opts ...Advertis
if err := aopts.apply(opts...); err != nil {
return fmt.Errorf("failed to advertise: %w", err)
}
// @TODO(gfanton): add filter

for _, driver := range s.drivers {
if aopts.Filters != nil {
// skip filter driver
Expand All @@ -57,6 +57,7 @@ func (s *Service) Advertises(ctx context.Context, topic string, opts ...Advertis
}
}

// start background job
go func(driver IDriver) {
if err := s.advertise(ctx, driver, topic); err != nil {
s.logger.Debug("advertise ended", zap.Error(err))
Expand Down
10 changes: 5 additions & 5 deletions go/internal/tinder/service_mocked_test.go
Expand Up @@ -99,12 +99,12 @@ func TestMockedServiceSubscribePull(t *testing.T) {
mn := mocknet.New(ctx)
srv := NewMockDriverServer()

t.Run("pull enable", func(t *testing.T) {
t.Run("with pull", func(t *testing.T) {
const topic = "test_topic_1"
p1, s1 := newTestMockedService(t, logger, mn, srv)
_, s2 := newTestMockedService(t, logger, mn, srv)

err := s1.Advertises(ctx, topic)
err := s1.StartAdvertises(ctx, topic)
require.NoError(t, err)

err = srv.WaitForPeer(topic, p1.ID(), time.Second)
Expand All @@ -125,13 +125,13 @@ func TestMockedServiceSubscribePull(t *testing.T) {
}
})

t.Run("pull disable", func(t *testing.T) {
t.Run("no pull", func(t *testing.T) {
const topic = "test_topic_2"

p1, s1 := newTestMockedService(t, logger, mn, srv)
_, s2 := newTestMockedService(t, logger, mn, srv)

err := s1.Advertises(ctx, topic)
err := s1.StartAdvertises(ctx, topic)
require.NoError(t, err)

err = srv.WaitForPeer(topic, p1.ID(), time.Second)
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestMockedServiceSubscribeDuplicatePeer(t *testing.T) {
p1, s1 := newTestMockedService(t, logger, mn, servers...)
_, s2 := newTestMockedService(t, logger, mn, servers...)

err := s1.Advertises(ctx, topic)
err := s1.StartAdvertises(ctx, topic)
require.NoError(t, err)

for _, s := range servers {
Expand Down

0 comments on commit a90e34c

Please sign in to comment.