/
mdns.go
97 lines (82 loc) · 2.3 KB
/
mdns.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package discovery
import (
"context"
"fmt"
"log"
"sync"
"github.com/bingoohuang/braft/util"
"github.com/bingoohuang/gg/pkg/ss"
"github.com/grandcat/zeroconf"
)
type mdnsDiscovery struct {
ctx context.Context
mdnsServer *zeroconf.Server
discoveryChan chan string
tempQueue *util.UniqueQueue
cancel context.CancelFunc
wg *sync.WaitGroup
nodeID string
serviceName string
nodePort int
}
func NewMdnsDiscovery(serviceName string) Discovery {
return &mdnsDiscovery{
// https://github.com/grandcat/zeroconf
// Multiple subtypes may be added to service name, separated by commas.
// e.g _workstation._tcp,_windows has subtype _windows.
serviceName: serviceName,
discoveryChan: make(chan string),
tempQueue: util.NewUniqueQueue(100),
}
}
// Name gives the name of the discovery.
func (k *mdnsDiscovery) Name() string { return "mdns://" + k.serviceName }
func (k *mdnsDiscovery) Start(nodeID string, nodePort int) (chan string, error) {
k.nodeID, k.nodePort = ss.Left(nodeID, 27), nodePort
k.ctx, k.cancel = context.WithCancel(context.Background())
go k.discovery()
return k.discoveryChan, nil
}
func (k *mdnsDiscovery) Search() (dest []string, err error) { return k.tempQueue.Get(), nil }
func (k *mdnsDiscovery) discovery() {
// expose mdns server
mdnsServer, err := zeroconf.Register(k.nodeID, k.serviceName,
"local.", k.nodePort, []string{"txtv=0", "lo=1", "la=2"}, nil)
if err != nil {
log.Fatal(err)
}
k.mdnsServer = mdnsServer
// fetch mDNS enabled raft nodes
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
log.Fatalln("Failed to initialize mDNS resolver:", err.Error())
}
entries := make(chan *zeroconf.ServiceEntry)
k.wg = &sync.WaitGroup{}
k.wg.Add(1)
go k.receive(entries)
if err = resolver.Browse(k.ctx, k.serviceName, "local.", entries); err != nil {
log.Printf("Error during mDNS lookup: %v", err)
}
}
func (k *mdnsDiscovery) receive(entries chan *zeroconf.ServiceEntry) {
defer k.wg.Done()
for {
select {
case <-k.ctx.Done():
return
case entry, ok := <-entries:
if !ok {
break
}
value := fmt.Sprintf("%s:%d", entry.AddrIPv4[0], entry.Port)
k.discoveryChan <- value
k.tempQueue.Put(value)
}
}
}
func (k *mdnsDiscovery) Stop() {
k.cancel()
k.wg.Wait()
k.mdnsServer.Shutdown()
}