This repository has been archived by the owner on May 3, 2019. It is now read-only.
/
address_table.go
234 lines (202 loc) · 5.2 KB
/
address_table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package addresstable
import (
"fmt"
"sync"
"time"
"code.cloudfoundry.org/clock"
"code.cloudfoundry.org/lager"
)
type AddressTable struct {
addresses map[string][]entry
clock clock.Clock
stalenessThreshold time.Duration
mutex sync.RWMutex
ticker clock.Ticker
pausedPruning bool
logger lager.Logger
lastResume time.Time
resumePruningDelay time.Duration
warm bool
warmMutex sync.RWMutex
}
type entry struct {
ip string
updateTime time.Time
}
func NewAddressTable(stalenessThreshold, pruningInterval, resumePruningDelay time.Duration, clock clock.Clock, logger lager.Logger) *AddressTable {
table := &AddressTable{
addresses: map[string][]entry{},
clock: clock,
stalenessThreshold: stalenessThreshold,
ticker: clock.NewTicker(pruningInterval),
pausedPruning: false,
logger: logger,
resumePruningDelay: resumePruningDelay,
}
table.pruneStaleEntriesOnInterval(pruningInterval)
return table
}
func (at *AddressTable) Add(hostnames []string, ip string) {
at.mutex.Lock()
for _, hostname := range hostnames {
fqHostname := fqdn(hostname)
entries := at.entriesForHostname(fqHostname)
entryIndex := indexOf(entries, ip)
if entryIndex == -1 {
at.addresses[fqHostname] = append(entries, entry{ip: ip, updateTime: at.clock.Now()})
} else {
at.addresses[fqHostname][entryIndex].updateTime = at.clock.Now()
}
}
at.mutex.Unlock()
}
func (at *AddressTable) Remove(hostnames []string, ip string) {
at.mutex.Lock()
for _, hostname := range hostnames {
fqHostname := fqdn(hostname)
entries := at.entriesForHostname(fqHostname)
index := indexOf(entries, ip)
if index > -1 {
if len(entries) == 1 {
delete(at.addresses, fqHostname)
} else {
at.addresses[fqHostname] = append(entries[:index], entries[index+1:]...)
}
}
}
at.mutex.Unlock()
}
func (at *AddressTable) Lookup(hostname string) []string {
at.mutex.RLock()
found := at.entriesForHostname(fqdn(hostname))
ips := entriesToIPs(found)
at.mutex.RUnlock()
return ips
}
func (at *AddressTable) GetAllAddresses() map[string][]string {
at.mutex.RLock()
addresses := map[string][]string{}
for address, entries := range at.addresses {
addresses[address] = entriesToIPs(entries)
}
at.mutex.RUnlock()
return addresses
}
func (at *AddressTable) SetWarm() {
at.warmMutex.Lock()
at.warm = true
at.warmMutex.Unlock()
}
func (at *AddressTable) IsWarm() bool {
at.warmMutex.RLock()
warm := at.warm
at.warmMutex.RUnlock()
return warm
}
func (at *AddressTable) Shutdown() {
at.ticker.Stop()
}
func (at *AddressTable) PausePruning() {
at.logger.Info("pruning-pause")
at.mutex.Lock()
at.pausedPruning = true
at.mutex.Unlock()
}
func (at *AddressTable) ResumePruning() {
at.logger.Info("pruning-resume")
at.mutex.Lock()
at.lastResume = at.clock.Now()
at.pausedPruning = false
at.mutex.Unlock()
}
func (at *AddressTable) entriesForHostname(hostname string) []entry {
if existing, ok := at.addresses[hostname]; ok {
return existing
} else {
return []entry{}
}
}
func entriesToIPs(entries []entry) []string {
ips := make([]string, len(entries))
for idx, entry := range entries {
ips[idx] = entry.ip
}
return ips
}
func (at *AddressTable) pruneStaleEntriesOnInterval(pruningInterval time.Duration) {
go func() {
defer at.ticker.Stop()
for _ = range at.ticker.C() {
at.mutex.RLock()
if at.pausedPruning || (at.clock.Since(at.lastResume) < at.resumePruningDelay) {
at.mutex.RUnlock()
continue
}
at.mutex.RUnlock()
staleAddresses := at.addressesWithStaleEntriesWithReadLock()
at.pruneStaleEntriesWithWriteLock(staleAddresses)
}
}()
}
func (at *AddressTable) pruneStaleEntriesWithWriteLock(candidateAddresses []string) {
if len(candidateAddresses) == 0 {
return
}
var oldTotal, newTotal int
at.mutex.Lock()
for _, staleAddr := range candidateAddresses {
entries, ok := at.addresses[staleAddr]
if ok {
oldCount := len(entries)
freshEntries := []entry{}
for _, entry := range entries {
if at.clock.Since(entry.updateTime) <= at.stalenessThreshold {
freshEntries = append(freshEntries, entry)
} else {
at.logger.Debug(fmt.Sprintf("pruning address %s from %s", entry.ip, staleAddr))
}
}
at.addresses[staleAddr] = freshEntries
newCount := len(freshEntries)
oldTotal += oldCount
newTotal += newCount
}
}
at.mutex.Unlock()
at.logger.Info("pruned", lager.Data{"old-total": oldTotal, "new-total": newTotal})
}
func (at *AddressTable) addressesWithStaleEntriesWithReadLock() []string {
staleAddresses := []string{}
at.mutex.RLock()
for address, entries := range at.addresses {
for _, entry := range entries {
if at.clock.Since(entry.updateTime) > at.stalenessThreshold {
staleAddresses = append(staleAddresses, address)
break
}
}
}
at.mutex.RUnlock()
return staleAddresses
}
func indexOf(entries []entry, value string) int {
for idx, entry := range entries {
if entry.ip == value {
return idx
}
}
return -1
}
func isFqdn(s string) bool {
l := len(s)
if l == 0 {
return false
}
return s[l-1] == '.'
}
func fqdn(s string) string {
if isFqdn(s) {
return s
}
return s + "."
}