diff --git a/config/config.yml b/config/config.yml index d96fa9adb2..95986d77ba 100644 --- a/config/config.yml +++ b/config/config.yml @@ -53,6 +53,8 @@ p2p: # svc ams - maddr: '/ip4/51.15.25.224/tcp/4040/p2p/12D3KooWHhDBv6DJJ4XDWjzEXq6sVNEs6VuxsV1WyBBEhPENHzcZ' # - maddr: '/ip4/51.15.25.224/udp/quic/4040/p2p/12D3KooWHhDBv6DJJ4XDWjzEXq6sVNEs6VuxsV1WyBBEhPENHzcZ' + # svc test + # - maddr: '/ip4/51.159.34.32/tcp/4040/p2p/12D3KooWLRUtgsmEwg7zC2d2JhQfYiXsS1BJBRXvrfBJA1Eo6aiY' static-relays: # berty relays fr # - '/ip4/51.159.21.214/tcp/4040/p2p/QmdT7AmhhnbuwvCpa5PH1ySK9HJVB82jr3fo1bxMxBPW6p' diff --git a/go.mod b/go.mod index 7f8bdb8f0e..fbc0ccc8a7 100644 --- a/go.mod +++ b/go.mod @@ -132,7 +132,8 @@ replace ( github.com/agl/ed25519 => github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // latest commit before the author shutdown the repo; see https://github.com/golang/go/issues/20504 - github.com/libp2p/go-libp2p-rendezvous => github.com/berty/go-libp2p-rendezvous v0.0.0-20220927143406-f7347cb814d4 // use berty fork of go-libp2p-rendezvous + github.com/libp2p/go-libp2p-rendezvous => github.com/berty/go-libp2p-rendezvous v0.0.0-20221020165144-5fc9e26830f5 // use berty fork of go-libp2p-rendezvous + github.com/lucas-clemente/quic-go => github.com/lucas-clemente/quic-go v0.25.0 github.com/multiformats/go-multiaddr => github.com/berty/go-multiaddr v0.4.2-0.20220126184027-53e56f02fb68 // tmp, required for Android SDK30 diff --git a/go.sum b/go.sum index 8646304fc7..aae3e83a00 100644 --- a/go.sum +++ b/go.sum @@ -151,8 +151,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/berty/go-libp2p-rendezvous v0.0.0-20220927143406-f7347cb814d4 h1:RPYxEk0CawP6CmLqpQ3AhoFHpIBoT2ZGCCbIesuKqZU= -github.com/berty/go-libp2p-rendezvous v0.0.0-20220927143406-f7347cb814d4/go.mod h1:T5TW6d2Xo9C4k9AhpPfxXcU8U6UJsZ9pQFEEb5I0Yj4= +github.com/berty/go-libp2p-rendezvous v0.0.0-20221020165144-5fc9e26830f5 h1:q8kaMh2a0/R2jrnPGY9tKWpPBJjS0e5bka2ssNkR8WE= +github.com/berty/go-libp2p-rendezvous v0.0.0-20221020165144-5fc9e26830f5/go.mod h1:T5TW6d2Xo9C4k9AhpPfxXcU8U6UJsZ9pQFEEb5I0Yj4= github.com/berty/go-multiaddr v0.4.2-0.20220126184027-53e56f02fb68 h1:Vzah9UzXtlo6aU3I+fGhRPQYBq3+El3DCga3u2TP+ig= github.com/berty/go-multiaddr v0.4.2-0.20220126184027-53e56f02fb68/go.mod h1:3KAxNkUqLTJ20AAwN4XVX4kZar+bR+gh4zgbfr3SNug= github.com/berty/go-sqlcipher/v4 v4.4.3-0.20220810151512-74ea78235b48 h1:tHmYUj9It7qV3PPzg0Z5VRNmFaJaaYiu49fkqyw5NYI= diff --git a/go/internal/initutil/ipfs.go b/go/internal/initutil/ipfs.go index 7722697a16..d5b1411a9d 100644 --- a/go/internal/initutil/ipfs.go +++ b/go/internal/initutil/ipfs.go @@ -715,8 +715,9 @@ func (m *Manager) configIPFSRouting(h host.Host, r p2p_routing.Routing) error { // } // disc = tinder.NewRotationDiscovery(logger.Named("rotation"), disc, rp) - disc := tinder.NewDiscoveryAdaptater(logger.Named("disc"), m.Node.Protocol.tinder) - popts = append(popts, pubsub.WithDiscovery(disc, pubsub.WithDiscoverConnector(backoffconnector))) + m.Node.Protocol.discAdaptater = tinder.NewDiscoveryAdaptater(logger.Named("disc"), m.Node.Protocol.tinder) + popts = append(popts, pubsub.WithDiscovery( + m.Node.Protocol.discAdaptater, pubsub.WithDiscoverConnector(backoffconnector))) // pubsub.DiscoveryPollInterval = m.Node.Protocol.PollInterval m.Node.Protocol.pubsub, err = pubsub.NewGossipSub(m.getContext(), h, popts...) diff --git a/go/internal/initutil/manager.go b/go/internal/initutil/manager.go index d27bb4aa88..495c45f42f 100644 --- a/go/internal/initutil/manager.go +++ b/go/internal/initutil/manager.go @@ -149,6 +149,7 @@ type Manager struct { ipfsAPI ipfsutil.ExtendedCoreAPI mdnsService p2p_mdns.Service localdisc *tinder.LocalDiscovery + discAdaptater *tinder.DiscoveryAdaptater pubsub *pubsub.PubSub tinder *tinder.Service server bertyprotocol.Service @@ -397,6 +398,10 @@ func (m *Manager) Close(prog *progress.Progress) error { if m.Node.Protocol.localdisc != nil { m.Node.Protocol.localdisc.Close() } + + if m.Node.Protocol.discAdaptater != nil { + m.Node.Protocol.discAdaptater.Close() + } } prog.Get("close-mdns-service").SetAsCurrent() diff --git a/go/internal/ipfsutil/testing.go b/go/internal/ipfsutil/testing.go index 373f9ca207..31c74a31b4 100644 --- a/go/internal/ipfsutil/testing.go +++ b/go/internal/ipfsutil/testing.go @@ -5,6 +5,7 @@ import ( crand "crypto/rand" "encoding/base64" "fmt" + "math/rand" "testing" ipfs_mobile "github.com/ipfs-shipyard/gomobile-ipfs/go/pkg/ipfsmobile" @@ -128,13 +129,20 @@ func TestingCoreAPIUsingMockNet(ctx context.Context, t testing.TB, opts *Testing var ps *pubsub.PubSub var stinder *tinder.Service + var discAdaptater *tinder.DiscoveryAdaptater + configureRouting := func(h host.Host, r routing.Routing) error { var err error drivers := []tinder.IDriver{} if opts.RDVPeer.ID != "" { h.Peerstore().AddAddrs(opts.RDVPeer.ID, opts.RDVPeer.Addrs, peerstore.PermanentAddrTTL) - ms := tinder.NewMockDriverServer() - drivers = append(drivers, ms.Client(h)) + cl := rendezvous.NewSyncInMemClient(ctx, h) + ms := tinder.NewRendezvousDiscovery(opts.Logger.Named("rdvp"), h, opts.RDVPeer.ID, rand.New(rand.NewSource(rand.Int63())), cl) + if _, err = opts.Mocknet.LinkPeers(h.ID(), opts.RDVPeer.ID); err != nil { + return err + } + + drivers = append(drivers, ms) } if r != nil { @@ -148,8 +156,8 @@ func TestingCoreAPIUsingMockNet(ctx context.Context, t testing.TB, opts *Testing return fmt.Errorf("unable to monitor discovery driver: %w", err) } - disc := tinder.NewDiscoveryAdaptater(opts.Logger, stinder) - ps, err = pubsub.NewGossipSub(ctx, h, pubsub.WithDiscovery(disc)) + discAdaptater = tinder.NewDiscoveryAdaptater(opts.Logger, stinder) + ps, err = pubsub.NewGossipSub(ctx, h, pubsub.WithDiscovery(discAdaptater)) return err } @@ -183,6 +191,10 @@ func TestingCoreAPIUsingMockNet(ctx context.Context, t testing.TB, opts *Testing } return api, func() { + if discAdaptater != nil { + discAdaptater.Close() + } + _ = mnode.Close() _ = mnode.PeerHost().Close() _ = repo.Close() @@ -217,7 +229,10 @@ func TestingRDVP(ctx context.Context, t testing.TB, h host.Host) (*rendezvous.Re db, err := p2p_rpdb.OpenDB(ctx, ":memory:") require.NoError(t, err) - svc := rendezvous.NewRendezvousService(h, db) + provider, err := rendezvous.NewSyncInMemProvider(h) + require.NoError(t, err) + + svc := rendezvous.NewRendezvousService(h, db, provider) cleanup := func() { _ = db.Close() // dont use me for now as db is open in_memory } diff --git a/go/internal/rendezvous/emitterio_sync_test.go b/go/internal/rendezvous/emitterio_sync_test.go index f96fcfcc05..ea31900d77 100644 --- a/go/internal/rendezvous/emitterio_sync_test.go +++ b/go/internal/rendezvous/emitterio_sync_test.go @@ -2,6 +2,7 @@ package rendezvous_test import ( "context" + "os" "sync" "sync/atomic" "testing" @@ -38,11 +39,16 @@ func getEmitterRendezvousClients(ctx context.Context, t *testing.T, hosts []host } func TestEmitterIOFlow(t *testing.T) { - const serverAddr = "tcp://127.0.0.1:8080" - const adminKey = "bPcKwpuIP3gjjAKqb9EaAJYJnilISQhJ" - + // const adminKey = "bPcKwpuIP3gjjAKqb9EaAJYJnilISQhJ" + // const serverAddr = "tcp://127.0.0.1:8080" const topic = "foo1" + serverAddr := os.Getenv("TEST_EMITTER_SERVER_ADDR") + adminKey := os.Getenv("TEST_EMITTER_ADMINKEY") + if adminKey == "" || serverAddr == "" { + t.Skip("cannot test emitter, no adminKey/serverAddr provided") + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/go/internal/tinder/driver_localdiscovery_test.go b/go/internal/tinder/driver_localdiscovery_test.go index d0cdde9bf7..3dc1e70800 100644 --- a/go/internal/tinder/driver_localdiscovery_test.go +++ b/go/internal/tinder/driver_localdiscovery_test.go @@ -39,7 +39,7 @@ func TestLocalDiscorvery(t *testing.T) { const topic = "test_topic" - s1.Advertises(ctx, topic) + s1.StartAdvertises(ctx, topic) // try a first lookup, should find nothing { diff --git a/go/internal/tinder/driver_mock.go b/go/internal/tinder/driver_mock.go index 47df75e0bc..03c06b6f42 100644 --- a/go/internal/tinder/driver_mock.go +++ b/go/internal/tinder/driver_mock.go @@ -57,7 +57,7 @@ func (s *MockDriverServer) FindPeers(topic string, limit int) <-chan peer.AddrIn peer := peers[i] if expire, ok := expires[peer.ID]; ok { if time.Now().Before(expire) { - out <- peers[i] + out <- peer } else { delete(expires, peer.ID) } @@ -84,11 +84,11 @@ func (s *MockDriverServer) Exist(topic string, p peer.ID) (ok bool) { func (s *MockDriverServer) Subscribe(ctx context.Context, topic string, buffsize int) <-chan peer.AddrInfo { // subtract 500ms to make sure to avoid data race and miss some event - start := time.Now().Add(-time.Millisecond * 500) - s.mx.Lock() defer s.mx.Unlock() + start := time.Now() + peers := s.pc.GetPeersForTopics(topic) knownPeers := make(PeersUpdate) for _, p := range peers { diff --git a/go/internal/tinder/driver_service_test.go b/go/internal/tinder/driver_service_test.go index 6bf86996ac..fb05dec1d6 100644 --- a/go/internal/tinder/driver_service_test.go +++ b/go/internal/tinder/driver_service_test.go @@ -109,7 +109,7 @@ func testMultipleDriversSubscribe(t *testing.T, ctx context.Context, mn mocknet. require.NoError(t, err) // ...then advertise - err = s1.Advertises(ctx, topic) + err = s1.StartAdvertises(ctx, topic) require.NoError(t, err) { diff --git a/go/internal/tinder/notify_network.go b/go/internal/tinder/notify_network.go index 10f9afea52..86989271ee 100644 --- a/go/internal/tinder/notify_network.go +++ b/go/internal/tinder/notify_network.go @@ -100,7 +100,10 @@ func (n *NetworkUpdate) subscribeToNetworkUpdate() { func (n *NetworkUpdate) Close() (err error) { // use once to avoid panic if called twice - n.once.Do(func() { err = n.sub.Close() }) + n.once.Do(func() { + err = n.sub.Close() + }) + return err } diff --git a/go/internal/tinder/peer_cache.go b/go/internal/tinder/peer_cache.go index 7363d4408c..cc55db24bd 100644 --- a/go/internal/tinder/peer_cache.go +++ b/go/internal/tinder/peer_cache.go @@ -68,7 +68,8 @@ func (c *peersCache) UpdatePeer(topic string, p peer.AddrInfo) (isNew bool) { c.peers[p.ID] = p } - tu.peerUpdate[p.ID] = time.Now() + t := time.Now() + tu.peerUpdate[p.ID] = t // notify topic that peers has been updated tu.notify.Broadcast() diff --git a/go/internal/tinder/service.go b/go/internal/tinder/service.go index b22ca01900..2e449dd4d2 100644 --- a/go/internal/tinder/service.go +++ b/go/internal/tinder/service.go @@ -135,7 +135,7 @@ func (s *Service) fadeOut(ctx context.Context, topic string, bufsize int) <-chan } func (s *Service) Close() error { - return nil + return s.networkNotify.Close() } func (s *Service) GetProcess() uint32 { return atomic.LoadUint32(&s.process) } diff --git a/go/internal/tinder/service_adaptater.go b/go/internal/tinder/service_adaptater.go index 33750b88e2..7657f92d9e 100644 --- a/go/internal/tinder/service_adaptater.go +++ b/go/internal/tinder/service_adaptater.go @@ -34,6 +34,8 @@ type DiscoveryAdaptater struct { muAdvertiser sync.Mutex resetInterval time.Duration ttl time.Duration + + closeOnce sync.Once } func NewDiscoveryAdaptater(logger *zap.Logger, service *Service) *DiscoveryAdaptater { @@ -112,6 +114,7 @@ func (a *DiscoveryAdaptater) Advertise(_ context.Context, topic string, opts ... ctx := a.ctx a.muAdvertiser.Lock() + defer a.muAdvertiser.Unlock() start := time.Now() if t, ok := a.watchdogAdvertise[topic]; ok { @@ -123,16 +126,26 @@ func (a *DiscoveryAdaptater) Advertise(_ context.Context, topic string, opts ... } else { wctx, cancel := context.WithCancel(ctx) + // start advertising on this topic + if err := a.service.StartAdvertises(wctx, topic, StartAdvertisesFilterDrivers(LocalDiscoveryName)); err != nil { + a.logger.Error("advertise failed", logutil.PrivateString("topic", topic), zap.Error(err)) + cancel() + return time.Minute, err + } + + a.logger.Debug("advertise started", logutil.PrivateString("topic", topic)) + // create a new watchdog a.watchdogAdvertise[topic] = time.AfterFunc(a.resetInterval, func() { // watchdog has expired, cancel advertise + + a.muAdvertiser.Lock() cancel() a.logger.Debug("advertise expired", logutil.PrivateString("topic", topic), zap.Duration("duration", time.Since(start)), ) - a.muAdvertiser.Lock() delete(a.watchdogAdvertise, topic) a.muAdvertiser.Unlock() @@ -144,31 +157,31 @@ func (a *DiscoveryAdaptater) Advertise(_ context.Context, topic string, opts ... ) } }) - - // start advertising on this topic - if err := a.service.Advertises(wctx, topic, AdvertisesFilterDrivers(LocalDiscoveryName)); err != nil { - a.logger.Error("advertise failed", logutil.PrivateString("topic", topic), zap.Error(err)) - } else { - a.logger.Debug("advertise started", logutil.PrivateString("topic", topic)) - } } - a.muAdvertiser.Unlock() - return a.ttl, nil } func (a *DiscoveryAdaptater) Close() error { - a.cancel() + a.closeOnce.Do(func() { + a.muDiscover.Lock() + a.cancel() + for _, st := range a.watchdogDiscover { + if !st.timer.Stop() { + <-st.timer.C + } + _ = st.sub.Close() + } + a.muDiscover.Unlock() - a.muDiscover.Lock() - for _, st := range a.watchdogDiscover { - if !st.timer.Stop() { - <-st.timer.C + a.muAdvertiser.Lock() + for _, t := range a.watchdogAdvertise { + if !t.Stop() { + <-t.C + } } - _ = st.sub.Close() - } - a.muDiscover.Unlock() + a.muAdvertiser.Unlock() + }) return nil } diff --git a/go/internal/tinder/service_advertises.go b/go/internal/tinder/service_advertises.go index bc75f332c9..6153038454 100644 --- a/go/internal/tinder/service_advertises.go +++ b/go/internal/tinder/service_advertises.go @@ -27,7 +27,7 @@ func (o *AdvertiseOptions) apply(opts ...AdvertiseOption) error { return nil } -func AdvertisesFilterDrivers(drivers ...string) AdvertiseOption { +func StartAdvertisesFilterDrivers(drivers ...string) AdvertiseOption { return func(opts *AdvertiseOptions) error { opts.Filters = map[string]struct{}{} for _, driver := range drivers { @@ -39,7 +39,7 @@ func AdvertisesFilterDrivers(drivers ...string) AdvertiseOption { } // Register advertise topic on each of his driver -func (s *Service) Advertises(ctx context.Context, topic string, opts ...AdvertiseOption) error { +func (s *Service) StartAdvertises(ctx context.Context, topic string, opts ...AdvertiseOption) error { if len(s.drivers) == 0 { return fmt.Errorf("no driver available to advertise") } @@ -48,7 +48,7 @@ func (s *Service) Advertises(ctx context.Context, topic string, opts ...Advertis if err := aopts.apply(opts...); err != nil { return fmt.Errorf("failed to advertise: %w", err) } - // @TODO(gfanton): add filter + for _, driver := range s.drivers { if aopts.Filters != nil { // skip filter driver @@ -57,6 +57,7 @@ func (s *Service) Advertises(ctx context.Context, topic string, opts ...Advertis } } + // start background job go func(driver IDriver) { if err := s.advertise(ctx, driver, topic); err != nil { s.logger.Debug("advertise ended", zap.Error(err)) diff --git a/go/internal/tinder/service_mocked_test.go b/go/internal/tinder/service_mocked_test.go index 564b92cbe4..26bdc6afa9 100644 --- a/go/internal/tinder/service_mocked_test.go +++ b/go/internal/tinder/service_mocked_test.go @@ -99,12 +99,12 @@ func TestMockedServiceSubscribePull(t *testing.T) { mn := mocknet.New(ctx) srv := NewMockDriverServer() - t.Run("pull enable", func(t *testing.T) { + t.Run("with pull", func(t *testing.T) { const topic = "test_topic_1" p1, s1 := newTestMockedService(t, logger, mn, srv) _, s2 := newTestMockedService(t, logger, mn, srv) - err := s1.Advertises(ctx, topic) + err := s1.StartAdvertises(ctx, topic) require.NoError(t, err) err = srv.WaitForPeer(topic, p1.ID(), time.Second) @@ -125,13 +125,13 @@ func TestMockedServiceSubscribePull(t *testing.T) { } }) - t.Run("pull disable", func(t *testing.T) { + t.Run("no pull", func(t *testing.T) { const topic = "test_topic_2" p1, s1 := newTestMockedService(t, logger, mn, srv) _, s2 := newTestMockedService(t, logger, mn, srv) - err := s1.Advertises(ctx, topic) + err := s1.StartAdvertises(ctx, topic) require.NoError(t, err) err = srv.WaitForPeer(topic, p1.ID(), time.Second) @@ -167,7 +167,7 @@ func TestMockedServiceSubscribeDuplicatePeer(t *testing.T) { p1, s1 := newTestMockedService(t, logger, mn, servers...) _, s2 := newTestMockedService(t, logger, mn, servers...) - err := s1.Advertises(ctx, topic) + err := s1.StartAdvertises(ctx, topic) require.NoError(t, err) for _, s := range servers { diff --git a/go/internal/tinder/service_subscription.go b/go/internal/tinder/service_subscription.go index 3806f15432..19163fd460 100644 --- a/go/internal/tinder/service_subscription.go +++ b/go/internal/tinder/service_subscription.go @@ -3,8 +3,6 @@ package tinder import ( "context" "fmt" - "sync" - "sync/atomic" "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/zap" @@ -34,11 +32,11 @@ func (s *Subscription) Close() error { func (s *Service) Subscribe(topic string) *Subscription { ctx, cancel := context.WithCancel(context.Background()) out := s.fadeOut(ctx, topic, 16) - go func() { - if err := s.WatchTopic(ctx, topic); err != nil { - s.logger.Error("unable to watch topic", zap.String("topic", topic), zap.Error(err)) - } - }() + + err := s.WatchTopic(ctx, topic) + if err != nil { + s.logger.Warn("unable to watch topic", zap.String("topic", topic), zap.Error(err)) + } return &Subscription{ service: s, @@ -50,30 +48,22 @@ func (s *Service) Subscribe(topic string) *Subscription { } func (s *Service) LookupPeers(ctx context.Context, topic string) error { - var wg sync.WaitGroup - var success int32 + var success int for _, d := range s.drivers { - wg.Add(1) - go func(d IDriver) { - in, err := d.FindPeers(ctx, topic) // find peers should not hang there - switch err { - case nil: // ok - atomic.AddInt32(&success, 1) - s.logger.Debug("lookup for topic started", zap.String("driver", d.Name()), zap.String("topic", topic)) - s.fadeIn(ctx, topic, in) - case ErrNotSupported: // do nothing - default: - s.logger.Error("lookup failed", - zap.String("driver", d.Name()), zap.String("topic", topic), zap.Error(err)) - } - - wg.Done() - }(d) + in, err := d.FindPeers(ctx, topic) // find peers should not hang there + switch err { + case nil: // ok + success++ + s.logger.Debug("lookup for topic started", zap.String("driver", d.Name()), zap.String("topic", topic)) + go s.fadeIn(ctx, topic, in) + case ErrNotSupported: // do nothing + default: + s.logger.Error("lookup failed", + zap.String("driver", d.Name()), zap.String("topic", topic), zap.Error(err)) + } } - wg.Wait() // wait for process to finish - if success == 0 { return fmt.Errorf("no driver(s) were available for lookup") } @@ -81,36 +71,29 @@ func (s *Service) LookupPeers(ctx context.Context, topic string) error { return nil } -func (s *Service) WatchTopic(ctx context.Context, topic string) error { - var wg sync.WaitGroup - var success int32 +func (s *Service) WatchTopic(ctx context.Context, topic string) (err error) { + var success int for _, d := range s.drivers { s.logger.Debug("start subscribe", zap.String("driver", d.Name()), zap.String("topic", topic)) - wg.Add(1) - go func(d IDriver) { - in, err := d.Subscribe(ctx, topic) - switch err { - case nil: // ok, start fadin - atomic.AddInt32(&success, 1) - s.logger.Debug("watching for topic update", zap.String("driver", d.Name()), zap.String("topic", topic)) - s.fadeIn(ctx, topic, in) - case ErrNotSupported: // not, supported skipping - default: - s.logger.Error("unable to subscribe on topic", - zap.String("driver", d.Name()), zap.String("topic", topic), zap.Error(err)) - } - - wg.Done() - }(d) + in, err := d.Subscribe(ctx, topic) + switch err { + case nil: // ok, start fadin + success++ + s.logger.Debug("watching for topic update", zap.String("driver", d.Name()), zap.String("topic", topic)) + + go s.fadeIn(ctx, topic, in) + case ErrNotSupported: // not, supported skipping + default: + s.logger.Error("unable to subscribe on topic", + zap.String("driver", d.Name()), zap.String("topic", topic), zap.Error(err)) + } } - wg.Wait() // wait for process to finish - if success == 0 { - return fmt.Errorf("no driver(s) were available for subscribe") + err = fmt.Errorf("no driver(s) were available for subscribe") } - return nil + return } diff --git a/go/pkg/bertymessenger/testing.go b/go/pkg/bertymessenger/testing.go index 9debacf012..8b55266d36 100644 --- a/go/pkg/bertymessenger/testing.go +++ b/go/pkg/bertymessenger/testing.go @@ -11,6 +11,7 @@ import ( "time" sqlite "github.com/flyingtime/gorm-sqlcipher" + "github.com/golang/protobuf/proto" libp2p_mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -125,13 +126,11 @@ func TestingInfra(ctx context.Context, t *testing.T, amount int, logger *zap.Log } func Testing1To1ProcessWholeStream(t *testing.T) (context.Context, []*TestingAccount, *zap.Logger, func()) { - t.Helper() - // PREPARE logger, cleanup := testutil.Logger(t) clean := cleanup - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) clean = u.CombineFuncs(cancel, clean) const l = 2 @@ -147,22 +146,58 @@ func Testing1To1ProcessWholeStream(t *testing.T) (context.Context, []*TestingAcc clean = u.CombineFuncs(close, clean) } - logger.Info("Started nodes, waiting for settlement") - time.Sleep(4 * time.Second) + // @FIXME: link should already be ready at this point - user := nodes[0] friend := nodes[1] + require.Eventually(t, func() bool { + return friend.GetAccount().GetLink() != "" + }, time.Second*5, time.Millisecond*100) + + user := nodes[0] + require.Eventually(t, func() bool { + fmt.Println("account", user.GetAccount().GetLink()) + return user.GetAccount().GetLink() != "" + }, time.Second*5, time.Millisecond*100) + userPK := user.GetAccount().GetPublicKey() - _, err := user.client.ContactRequest(ctx, &messengertypes.ContactRequest_Request{Link: friend.GetAccount().GetLink()}) + subctx, cancel := context.WithCancel(ctx) + defer cancel() + + cl, err := friend.client.EventStream(subctx, &messengertypes.EventStream_Request{}) + require.NoError(t, err) + + _, err = user.client.ContactRequest(ctx, &messengertypes.ContactRequest_Request{Link: friend.GetAccount().GetLink()}) require.NoError(t, err) logger.Info("waiting for request propagation") - time.Sleep(1 * time.Second) + + // wait to receive contact request + for { + evt, err := cl.Recv() + require.NoError(t, err) + + if evt.GetEvent().GetType() == messengertypes.StreamEvent_TypeNotified { + var notif messengertypes.StreamEvent_Notified + err := proto.Unmarshal(evt.GetEvent().Payload, ¬if) + require.NoError(t, err) + + if notif.GetType() == messengertypes.StreamEvent_Notified_TypeContactRequestReceived { + break + } + } + } + _, err = friend.client.ContactAccept(ctx, &messengertypes.ContactAccept_Request{PublicKey: userPK}) require.NoError(t, err) logger.Info("waiting for contact settlement") - time.Sleep(4 * time.Second) + for { + evt, err := cl.Recv() + require.NoError(t, err) + if evt.GetEvent().GetType() == messengertypes.StreamEvent_TypeDeviceUpdated { + break + } + } return ctx, nodes, logger, clean } diff --git a/go/pkg/bertyprotocol/account_export_test.go b/go/pkg/bertyprotocol/account_export_test.go index 090a679db0..1b5a80c0d9 100644 --- a/go/pkg/bertyprotocol/account_export_test.go +++ b/go/pkg/bertyprotocol/account_export_test.go @@ -33,6 +33,8 @@ func Test_service_exportAccountKey(t *testing.T) { }, dsA) defer closeNodeA() + // time.Sleep(time.Second * 5) + s, ok := nodeA.Service.(*service) require.True(t, ok) @@ -42,6 +44,7 @@ func Test_service_exportAccountKey(t *testing.T) { defer os.Remove(tmpFile.Name()) tw := tar.NewWriter(tmpFile) + err = s.exportAccountKey(tw) require.NoError(t, err) @@ -59,7 +62,6 @@ func Test_service_exportAccountKey(t *testing.T) { keyContents := make([]byte, header.Size) size, err := tr.Read(keyContents) - require.NoError(t, err) require.Equal(t, int(header.Size), size) sk, err := crypto.UnmarshalPrivateKey(keyContents) @@ -108,7 +110,6 @@ func Test_service_exportAccountProofKey(t *testing.T) { keyContents := make([]byte, header.Size) size, err := tr.Read(keyContents) - require.NoError(t, err) require.Equal(t, int(header.Size), size) sk, err := crypto.UnmarshalPrivateKey(keyContents) diff --git a/go/pkg/bertyprotocol/contact_request_manager.go b/go/pkg/bertyprotocol/contact_request_manager.go index 25d7514ae4..a075f2d7ea 100644 --- a/go/pkg/bertyprotocol/contact_request_manager.go +++ b/go/pkg/bertyprotocol/contact_request_manager.go @@ -315,6 +315,7 @@ func (c *contactRequestsManager) enableAnnounce(ctx context.Context, seed, accPK c.enabled = true tyber.LogStep(ctx, c.logger, "announcing on swipper") + // start announcing on swiper, this method should take care ton announce as // many time as needed c.swiper.Announce(ctx, accPK, seed) diff --git a/go/pkg/bertyprotocol/tinder_swiper.go b/go/pkg/bertyprotocol/tinder_swiper.go index c3fa55fee8..5d5249fc55 100644 --- a/go/pkg/bertyprotocol/tinder_swiper.go +++ b/go/pkg/bertyprotocol/tinder_swiper.go @@ -193,6 +193,7 @@ func (s *Swiper) watchPeers(ctx context.Context, _ discovery.BackoffStrategy, ou } } }() + for { // wait until the context is done select { @@ -223,7 +224,7 @@ func (s *Swiper) Announce(ctx context.Context, topic, seed []byte) { s.logger.Debug("self announce topic for time", logutil.PrivateString("topic", point.RotationTopic())) actx, cancel := context.WithDeadline(ctx, point.Deadline()) - if err := s.tinder.Advertises(actx, point.RotationTopic()); err != nil && err != ctx.Err() { + if err := s.tinder.StartAdvertises(actx, point.RotationTopic()); err != nil && err != ctx.Err() { cancel() <-time.After(time.Second * 10) // retry after 10sc continue @@ -232,8 +233,6 @@ func (s *Swiper) Announce(ctx context.Context, topic, seed []byte) { select { case <-actx.Done(): s.logger.Debug("rotation ended", logutil.PrivateString("topic", point.RotationTopic())) - // take a little breath and wait 1 second to avoid looping on advertise on network issue - time.Sleep(time.Second) case <-ctx.Done(): s.logger.Debug("announce advertise ended", logutil.PrivateString("topic", point.RotationTopic()), zap.Error(ctx.Err())) } diff --git a/js/ios/Podfile.lock b/js/ios/Podfile.lock index 262a3eded2..73e32ef52c 100644 --- a/js/ios/Podfile.lock +++ b/js/ios/Podfile.lock @@ -677,7 +677,7 @@ SPEC CHECKSUMS: Flipper-PeerTalk: 116d8f857dc6ef55c7a5a75ea3ceaafe878aadc9 Flipper-RSocket: 127954abe8b162fcaf68d2134d34dc2bd7076154 FlipperKit: 651f50a42eb95c01b3e89a60996dd6aded529eeb - glog: 73c2498ac6884b13ede40eda8228cb1eee9d9d62 + glog: 15e2ba430ddc61583e57e21be14de010560ba459 libevent: 4049cae6c81cdb3654a443be001fb9bdceff7913 lottie-ios: c058aeafa76daa4cf64d773554bccc8385d0150e lottie-react-native: b561920bba7ec727ec94a473b683cffba00faece diff --git a/js/packages/utils/accounts/defaultCLIArgs.ts b/js/packages/utils/accounts/defaultCLIArgs.ts index e2b3d0f828..0f005b6672 100644 --- a/js/packages/utils/accounts/defaultCLIArgs.ts +++ b/js/packages/utils/accounts/defaultCLIArgs.ts @@ -4,7 +4,6 @@ export const defaultCLIArgs: string[] = [ '--p2p.high-water=60', '--p2p.low-water=40', '--p2p.webui-listener=:3000', - '--p2p.rdvp=/ip4/192.168.50.29/tcp/4040/p2p/12D3KooWDb4GQkWAghyxaWfbYW2AgwMpPPeoEz6EGbw5fkcUArQa', // @FIXME(gfanton,aeddi): Disable randomwalk for now because it uses too many // resources on the mobile. We should re-enable it when everything is // stabilized.