-
Notifications
You must be signed in to change notification settings - Fork 0
/
mdns.go
101 lines (90 loc) · 2.34 KB
/
mdns.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package discovery
import (
"context"
"fmt"
"strings"
"github.com/charlesderek/actor-model/actor"
"github.com/charlesderek/actor-model/log"
"github.com/grandcat/zeroconf"
)
type mdns struct {
id string
announcer *announcer
resolver *zeroconf.Resolver
eventStream *actor.EventStream
ctx context.Context
cancelFn context.CancelFunc
}
func NewMdnsDiscovery(eventStream *actor.EventStream, opts ...DiscoveryOption) actor.Producer {
ctx, cancel := context.WithCancel(context.Background())
cfg := applyDiscoveryOptions(opts...)
announcer := newAnnouncer(cfg)
return func() actor.Receiver {
ret := &mdns{
id: cfg.id,
announcer: announcer,
ctx: ctx,
cancelFn: cancel,
eventStream: eventStream,
}
return ret
}
}
func (d *mdns) Receive(ctx *actor.Context) {
switch msg := ctx.Message().(type) {
case actor.Initialized:
d.createResolver()
case actor.Started:
go d.startDiscovery(ctx)
d.announcer.start()
case actor.Stopped:
d.shutdown()
_ = msg
}
}
func (d *mdns) shutdown() {
if d.announcer != nil {
d.announcer.shutdown()
}
d.cancelFn()
}
func (d *mdns) createResolver() {
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
panic(err)
}
d.resolver = resolver
}
// Starts multicast dns discovery process.
// Searches matching entries with `serviceName` and `domain`.
func (d *mdns) startDiscovery(c *actor.Context) {
ctx, cancel := context.WithCancel(d.ctx)
defer cancel()
entries := make(chan *zeroconf.ServiceEntry)
go func(results <-chan *zeroconf.ServiceEntry) {
for entry := range results {
d.sendDiscoveryEvent(entry)
}
}(entries)
err := d.resolver.Browse(ctx, serviceName, domain, entries)
if err != nil {
log.Infow("[DISCOVERY] starting discovery failed", log.M{"err": err.Error()})
panic(err)
}
<-ctx.Done()
}
// Sends discovered peer as `DiscoveryEvent` to event stream.
func (d *mdns) sendDiscoveryEvent(entry *zeroconf.ServiceEntry) {
// avoid to discover myself
if entry.Instance != d.id {
event := &DiscoveryEvent{
ID: entry.Instance,
Addr: []string{},
}
for _, addr := range entry.AddrIPv4 {
event.Addr = append(event.Addr, fmt.Sprintf("%s:%d", addr.String(), entry.Port))
}
log.Infow("[DISCOVERY] remote discovered", log.M{"addrs": strings.Join(event.Addr, ","), "ID": entry.Instance})
d.eventStream.Publish(event)
}
}