forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 1
/
dns_watcher.go
83 lines (69 loc) · 2.23 KB
/
dns_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package util
import (
"context"
"fmt"
"time"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
util_log "github.com/grafana/loki/pkg/util/log"
)
// Notifications about address resolution. All notifications are sent on the same goroutine.
type DNSNotifications interface {
// New address has been discovered by DNS watcher for supplied hostname.
AddressAdded(address string)
// Previously-discovered address is no longer resolved for the hostname.
AddressRemoved(address string)
}
type dnsWatcher struct {
watcher grpcutil.Watcher
notifications DNSNotifications
}
// NewDNSWatcher creates a new DNS watcher and returns a service that is wrapping it.
func NewDNSWatcher(address string, dnsLookupPeriod time.Duration, notifications DNSNotifications) (services.Service, error) {
resolver, err := grpcutil.NewDNSResolverWithFreq(dnsLookupPeriod, util_log.Logger)
if err != nil {
return nil, err
}
watcher, err := resolver.Resolve(address, "")
if err != nil {
return nil, err
}
w := &dnsWatcher{
watcher: watcher,
notifications: notifications,
}
return services.NewBasicService(nil, w.watchDNSLoop, nil), nil
}
// watchDNSLoop watches for changes in DNS and sends notifications.
func (w *dnsWatcher) watchDNSLoop(servCtx context.Context) error {
go func() {
// Close the watcher, when this service is asked to stop.
// Closing the watcher makes watchDNSLoop exit, since it only iterates on watcher updates, and has no other
// way to stop. We cannot close the watcher in `stopping` method, because it is only called *after*
// watchDNSLoop exits.
<-servCtx.Done()
w.watcher.Close()
}()
for {
updates, err := w.watcher.Next()
if err != nil {
// watcher.Next returns error when Close is called, but we call Close when our context is done.
// we don't want to report error in that case.
if servCtx.Err() != nil {
return nil
}
return errors.Wrapf(err, "error from DNS watcher")
}
for _, update := range updates {
switch update.Op {
case grpcutil.Add:
w.notifications.AddressAdded(update.Addr)
case grpcutil.Delete:
w.notifications.AddressRemoved(update.Addr)
default:
return fmt.Errorf("unknown op: %v", update.Op)
}
}
}
}