-
Notifications
You must be signed in to change notification settings - Fork 378
/
watchdogs_advertiser.go
152 lines (126 loc) · 3.39 KB
/
watchdogs_advertiser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package tinder
import (
"context"
"fmt"
"sync"
"time"
p2p_discovery "github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"go.uber.org/zap"
)
type watchdogsAdvertiser struct {
drivers []*Driver
logger *zap.Logger
host host.Host
networkNotify *NetworkUpdate
resetInterval time.Duration
ttl time.Duration
watchdogs map[string]*time.Timer
muAdvertiser sync.Mutex
rootCtx context.Context
}
func newWatchdogsAdvertiser(ctx context.Context, l *zap.Logger, h host.Host, n *NetworkUpdate,
resetInterval time.Duration, gracePeriod time.Duration, drivers []*Driver) *watchdogsAdvertiser {
return &watchdogsAdvertiser{
logger: l,
drivers: drivers,
networkNotify: n,
host: h,
ttl: resetInterval,
resetInterval: resetInterval + gracePeriod,
watchdogs: make(map[string]*time.Timer),
rootCtx: ctx,
}
}
func (wa *watchdogsAdvertiser) Advertise(_ context.Context, ns string, opts ...p2p_discovery.Option) (time.Duration, error) {
if len(wa.drivers) == 0 {
return 0, fmt.Errorf("no drivers to advertise")
}
// override given ctx with root ctx
ctx := wa.rootCtx
wa.muAdvertiser.Lock()
timer := time.Now()
if t, ok := wa.watchdogs[ns]; ok {
if !t.Stop() {
<-t.C
}
t.Reset(wa.resetInterval)
} else {
wctx, cancel := context.WithCancel(ctx)
wa.watchdogs[ns] = time.AfterFunc(wa.resetInterval, func() {
cancel()
wa.logger.Debug("advertise expired",
zap.String("ns", ns),
zap.Duration("duration", time.Since(timer)),
)
wa.muAdvertiser.Lock()
delete(wa.watchdogs, ns)
wa.muAdvertiser.Unlock()
// unregister drivers (?)
// wa.unregister(ctx, ns)
})
wa.advertises(wctx, ns, opts...)
wa.logger.Debug("advertise started", zap.String("ns", ns))
}
wa.muAdvertiser.Unlock()
return wa.ttl, nil
}
func (wa *watchdogsAdvertiser) unregister(ctx context.Context, ns string) {
for _, driver := range wa.drivers {
if err := driver.Unregister(ctx, ns); err != nil {
wa.logger.Warn("unable to unsubscribe", zap.Error(err))
}
}
}
func (wa *watchdogsAdvertiser) advertises(ctx context.Context, ns string, opts ...p2p_discovery.Option) {
for _, d := range wa.drivers {
go wa.advertise(ctx, d, ns, opts...)
}
}
func (wa *watchdogsAdvertiser) advertise(ctx context.Context, d *Driver, ns string, opts ...p2p_discovery.Option) {
for {
currentAddrs := wa.networkNotify.GetLastUpdatedAddrs(ctx)
now := time.Now()
ttl, err := d.Advertise(ctx, ns, opts...)
took := time.Since(now)
var deadline time.Duration
if err != nil {
wa.logger.Warn("unable to advertise",
zap.String("driver", d.Name),
zap.String("ns", ns), zap.Error(err))
select {
case <-ctx.Done():
return
default:
}
deadline = wa.resetInterval
} else {
if ttl == 0 {
ttl = wa.ttl
}
deadline = 4 * ttl / 5
}
wa.logger.Debug("advertise",
zap.String("driver", d.Name),
zap.String("ns", ns),
zap.Duration("ttl", ttl),
zap.Duration("took", took),
zap.Duration("next", deadline),
)
waitctx, cancel := context.WithTimeout(ctx, deadline)
ok := wa.networkNotify.WaitForUpdate(waitctx, currentAddrs, d.AddrsFactory)
cancel()
if !ok {
select {
// check for parent ctx
case <-ctx.Done():
return
default:
}
}
}
}
func (wa *watchdogsAdvertiser) Unregister(ctx context.Context, ns string) error {
wa.unregister(ctx, ns)
return nil
}