Skip to content

Commit

Permalink
feat(tinder): Add multi driver
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton committed Mar 6, 2020
1 parent b423019 commit 4b73fac
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 38 deletions.
1 change: 1 addition & 0 deletions go/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 34 additions & 1 deletion go/internal/tinder/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,48 @@ package tinder

import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/discovery"
libp2p_discovery "github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/peer"
)

// Driver is a libp2p_discovery.Discovery
var _ libp2p_discovery.Discovery = (Driver)(nil)

type Unregister interface {
Unregister(ctx context.Context, ns string) error
}

// Driver is Discovery with a unregister method
type Driver interface {
libp2p_discovery.Discovery
Unregister
}

Unregister(ctx context.Context, ns string) error
type composeDriver struct {
advertiser libp2p_discovery.Advertiser
discoverer libp2p_discovery.Discoverer
unregister Unregister
}

func (d *composeDriver) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
return d.advertiser.Advertise(ctx, ns, opts...)
}

func (d *composeDriver) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
return d.discoverer.FindPeers(ctx, ns, opts...)
}

func (d *composeDriver) Unregister(ctx context.Context, ns string) error {
return d.unregister.Unregister(ctx, ns)
}

func ComposeDriver(advertiser libp2p_discovery.Advertiser, discover libp2p_discovery.Discovery, unregister Unregister) Driver {
return &composeDriver{
advertiser: advertiser,
discoverer: discover,
unregister: unregister,
}
}
135 changes: 135 additions & 0 deletions go/internal/tinder/driver_multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package tinder

import (
"context"
"fmt"
"reflect"
"sync"
"time"

libp2p_discovery "github.com/libp2p/go-libp2p-core/discovery"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
disc "github.com/libp2p/go-libp2p-discovery"
)

// MultiDriver is a simple driver manager, that forward request across multiple driver
type MultiDriver struct {
drivers []Driver

mapc map[string]context.CancelFunc
muc sync.Mutex
}

func NewMultiDriver(drivers ...Driver) Driver {
return &MultiDriver{
drivers: drivers,
mapc: make(map[string]context.CancelFunc),
}
}

// Advertise simply dispatch Advertise request accross all the drivers
func (md *MultiDriver) Advertise(ctx context.Context, ns string, opts ...libp2p_discovery.Option) (time.Duration, error) {
// Get options
var options libp2p_discovery.Options
err := options.Apply(opts...)
if err != nil {
return 0, err
}

md.muc.Lock()
if _, ok := md.mapc[ns]; ok {
md.muc.Unlock()
// @NOTE(gfanton): should we return an error here?
return 0, fmt.Errorf("already advertising")
}

ctx, cf := context.WithCancel(ctx)
md.mapc[ns] = cf
md.muc.Unlock()

for _, driver := range md.drivers {
disc.Advertise(ctx, driver, ns, opts...)
}

return options.Ttl, nil
}

// FindPeers for MultiDriver doesn't care about duplicate peers, his only
// job here is to dispatch FindPeers request across all the drivers.
func (md *MultiDriver) FindPeers(ctx context.Context, ns string, opts ...libp2p_discovery.Option) (<-chan libp2p_peer.AddrInfo, error) {
ctx, cancel := context.WithCancel(ctx)

// @NOTE(gfanton): I prefer the use of select to limit the number of goroutines
const selDone = 0
selCases := []reflect.SelectCase{
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done),
},
}

ndrivers := 0
for _, driver := range md.drivers {
ch, err := driver.FindPeers(ctx, ns, opts...)
if err != nil { // @TODO(gfanton): log this
continue
}

selCases = append(selCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
ndrivers++
}

cpeers := make(chan libp2p_peer.AddrInfo, ndrivers)
go func() {
defer cancel()
defer close(cpeers)

for ndrivers > 0 {
idx, value, ok := reflect.Select(selCases)
if idx == selDone { // context has been cancel stop and close chan
return
}

if !ok {
// The chosen channel has been closed, so zero out the channel to disable the case
selCases[idx].Chan = reflect.ValueOf(nil)
ndrivers -= 1
continue
}

// we can safly get our peer
peer := value.Interface().(libp2p_peer.AddrInfo)

// forward the peer
select {
case cpeers <- peer:
case <-ctx.Done():
return
}
}

return
}()

return cpeers, nil
}

func (md *MultiDriver) Unregister(ctx context.Context, ns string) error {
// first cancel advertiser
md.muc.Lock()
if cf, ok := md.mapc[ns]; ok {
cf()
delete(md.mapc, ns)
}
md.muc.Unlock()

// unregister drivers
for _, driver := range md.drivers {
_ = driver.Unregister(ctx, ns) // @TODO(gfanton): log this
}

return nil
}
4 changes: 3 additions & 1 deletion go/internal/tinder/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
libp2p_discovery "github.com/libp2p/go-libp2p-core/discovery"
)

const ScheduleKey = "Schedule"

// Schedule advertise at the given time
func Schedule(t time.Time) libp2p_discovery.Option {
return func(opts *libp2p_discovery.Options) error {
opts.Other["Schedule"] = t
opts.Other[ScheduleKey] = t
return nil
}
}
46 changes: 10 additions & 36 deletions go/internal/tinder/service.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,17 @@
// Tinder service is a multi discoverer service backed by a cache

package tinder

import (
"context"
"fmt"
"time"

libp2p_discovery "github.com/libp2p/go-libp2p-core/discovery"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
libp2p_discovery "github.com/libp2p/go-libp2p-discovery"
)

// Service is a Driver
var _ Driver = (Service)(nil)

// Service is a libp2p_discovery.Discovery
var _ libp2p_discovery.Discovery = (Service)(nil)

// Tinder Service
type Service interface {
Driver

RegisterDriver(driver Driver)
}

type service struct {
drivers []Driver
}

func (s *service) RegisterDriver(driver Driver) {
s.drivers = append(s.drivers, driver)
}

func (s *service) Advertise(ctx context.Context, ns string, opts ...libp2p_discovery.Option) (time.Duration, error) {
return 0, fmt.Errorf("not implemented")
}

func (s *service) FindPeers(ctx context.Context, ns string, opts ...libp2p_discovery.Option) (<-chan libp2p_peer.AddrInfo, error) {
return nil, fmt.Errorf("not implemented")
}
func New(drivers []Driver, stratFactory libp2p_discovery.BackoffFactory, opts ...libp2p_discovery.BackoffDiscoveryOption) (Driver, error) {
mdriver := NewMultiDriver(drivers...)
disc, err := libp2p_discovery.NewBackoffDiscovery(mdriver, nil)
if err != nil {
return nil, err
}

func (s *service) Unregister(ctx context.Context, ns string) error {
return fmt.Errorf("not implemented")
return ComposeDriver(disc, disc, mdriver), nil
}

0 comments on commit 4b73fac

Please sign in to comment.