/
hostdb.go
314 lines (284 loc) · 9.47 KB
/
hostdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
// Package hostdb provides a HostDB object that implements the renter.hostDB
// interface. The blockchain is scanned for host announcements and hosts that
// are found get added to the host database. The database continually scans the
// set of hosts it has found and updates who is online.
package hostdb
import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/HyperspaceApp/Hyperspace/build"
"github.com/HyperspaceApp/Hyperspace/modules"
"github.com/HyperspaceApp/Hyperspace/modules/renter/hostdb/hosttree"
"github.com/HyperspaceApp/Hyperspace/persist"
"github.com/HyperspaceApp/Hyperspace/types"
"github.com/HyperspaceApp/fastrand"
"github.com/HyperspaceApp/threadgroup"
)
var (
// ErrInitialScanIncomplete is returned whenever an operation is not
// allowed to be executed before the initial host scan has finished.
ErrInitialScanIncomplete = errors.New("initial hostdb scan is not yet completed")
errNilCS = errors.New("cannot create hostdb with nil consensus set")
errNilGateway = errors.New("cannot create hostdb with nil gateway")
)
// The HostDB is a database of potential hosts. It assigns a weight to each
// host based on their hosting parameters, and then can select hosts at random
// for uploading files.
type HostDB struct {
// dependencies
cs modules.ConsensusSet
deps modules.Dependencies
gateway modules.Gateway
log *persist.Logger
mu sync.RWMutex
persistDir string
tg threadgroup.ThreadGroup
// The hostTree is the root node of the tree that organizes hosts by
// weight. The tree is necessary for selecting weighted hosts at
// random.
hostTree *hosttree.HostTree
// the scanPool is a set of hosts that need to be scanned. There are a
// handful of goroutines constantly waiting on the channel for hosts to
// scan. The scan map is used to prevent duplicates from entering the scan
// pool.
initialScanComplete bool
initialScanLatencies []time.Duration
scanList []modules.HostDBEntry
scanMap map[string]struct{}
scanWait bool
scanningThreads int
blockHeight types.BlockHeight
lastChange modules.ConsensusChangeID
}
// New returns a new HostDB.
func New(g modules.Gateway, cs modules.ConsensusSet, persistDir string) (*HostDB, error) {
// Check for nil inputs.
if g == nil {
return nil, errNilGateway
}
if cs == nil {
return nil, errNilCS
}
// Create HostDB using production dependencies.
return NewCustomHostDB(g, cs, persistDir, modules.ProdDependencies)
}
// NewCustomHostDB creates a HostDB using the provided dependencies. It loads the old
// persistence data, spawns the HostDB's scanning threads, and subscribes it to
// the consensusSet.
func NewCustomHostDB(g modules.Gateway, cs modules.ConsensusSet, persistDir string, deps modules.Dependencies) (*HostDB, error) {
// Create the HostDB object.
hdb := &HostDB{
cs: cs,
deps: deps,
gateway: g,
persistDir: persistDir,
scanMap: make(map[string]struct{}),
}
// Create the persist directory if it does not yet exist.
err := os.MkdirAll(persistDir, 0700)
if err != nil {
return nil, err
}
// Create the logger.
logger, err := persist.NewFileLogger(filepath.Join(persistDir, "hostdb.log"))
if err != nil {
return nil, err
}
hdb.log = logger
err = hdb.tg.AfterStop(func() error {
if err := hdb.log.Close(); err != nil {
// Resort to println as the logger is in an uncertain state.
fmt.Println("Failed to close the hostdb logger:", err)
return err
}
return nil
})
if err != nil {
return nil, err
}
// The host tree is used to manage hosts and query them at random.
hdb.hostTree = hosttree.New(hdb.calculateHostWeight, deps.Resolver())
// Load the prior persistence structures.
hdb.mu.Lock()
err = hdb.load()
hdb.mu.Unlock()
if err != nil && !os.IsNotExist(err) {
return nil, err
}
err = hdb.tg.AfterStop(func() error {
hdb.mu.Lock()
err := hdb.saveSync()
hdb.mu.Unlock()
if err != nil {
hdb.log.Println("Unable to save the hostdb:", err)
return err
}
return nil
})
if err != nil {
return nil, err
}
// Loading is complete, establish the save loop.
go hdb.threadedSaveLoop()
// Don't perform the remaining startup in the presence of a quitAfterLoad
// disruption.
if hdb.deps.Disrupt("quitAfterLoad") {
return hdb, nil
}
if hdb.cs.SpvMode() {
err = cs.HeaderConsensusSetSubscribe(hdb, hdb.lastChange, hdb.tg.StopChan())
if err == modules.ErrInvalidConsensusChangeID {
// Subscribe again using the new ID. This will cause a triggered scan
// on all of the hosts, but that should be acceptable.
hdb.mu.Lock()
hdb.blockHeight = 0
hdb.lastChange = modules.ConsensusChangeBeginning
hdb.mu.Unlock()
err = cs.HeaderConsensusSetSubscribe(hdb, hdb.lastChange, hdb.tg.StopChan())
}
if err != nil {
return nil, errors.New("hostdb header consensus subscription failed: " + err.Error())
}
} else {
err = cs.ConsensusSetSubscribe(hdb, hdb.lastChange, hdb.tg.StopChan())
if err == modules.ErrInvalidConsensusChangeID {
// Subscribe again using the new ID. This will cause a triggered scan
// on all of the hosts, but that should be acceptable.
hdb.mu.Lock()
hdb.blockHeight = 0
hdb.lastChange = modules.ConsensusChangeBeginning
hdb.mu.Unlock()
err = cs.ConsensusSetSubscribe(hdb, hdb.lastChange, hdb.tg.StopChan())
}
if err != nil {
return nil, errors.New("hostdb subscription failed: " + err.Error())
}
}
err = hdb.tg.OnStop(func() error {
cs.Unsubscribe(hdb)
return nil
})
if err != nil {
return nil, err
}
// Spawn the scan loop during production, but allow it to be disrupted
// during testing. Primary reason is so that we can fill the hostdb with
// fake hosts and not have them marked as offline as the scanloop operates.
if !hdb.deps.Disrupt("disableScanLoop") {
go hdb.threadedScan()
} else {
hdb.initialScanComplete = true
}
return hdb, nil
}
// ActiveHosts returns a list of hosts that are currently online, sorted by
// weight.
func (hdb *HostDB) ActiveHosts() (activeHosts []modules.HostDBEntry) {
allHosts := hdb.hostTree.All()
for _, entry := range allHosts {
if len(entry.ScanHistory) == 0 {
continue
}
if !entry.ScanHistory[len(entry.ScanHistory)-1].Success {
continue
}
if !entry.AcceptingContracts {
continue
}
activeHosts = append(activeHosts, entry)
}
return activeHosts
}
// AllHosts returns all of the hosts known to the hostdb, including the
// inactive ones.
func (hdb *HostDB) AllHosts() (allHosts []modules.HostDBEntry) {
return hdb.hostTree.All()
}
// AverageContractPrice returns the average price of a host.
func (hdb *HostDB) AverageContractPrice() (totalPrice types.Currency) {
sampleSize := 32
hosts := hdb.hostTree.SelectRandom(sampleSize, nil, nil)
if len(hosts) == 0 {
return totalPrice
}
for _, host := range hosts {
totalPrice = totalPrice.Add(host.ContractPrice)
}
return totalPrice.Div64(uint64(len(hosts)))
}
// CheckForIPViolations accepts a number of host public keys and returns the
// ones that violate the rules of the addressFilter.
func (hdb *HostDB) CheckForIPViolations(hosts []types.SiaPublicKey) []types.SiaPublicKey {
// Shuffle the hosts to non-deterministically decide which host is bad. The
// reason being that the address which is passed to the filter first, has
// priority over addresses which are passed in later. So if address A and B
// together violate the rules, passing B first will result in A being
// considered a bad host and vice versa.
if build.Release != "testing" {
fastrand.Shuffle(len(hosts), func(i, j int) { hosts[i], hosts[j] = hosts[j], hosts[i] })
}
// Create a filter.
filter := hosttree.NewFilter(hdb.deps.Resolver())
var badHosts []types.SiaPublicKey
for _, host := range hosts {
// Get the host from the db.
node, exists := hdb.hostTree.Select(host)
if !exists {
// A host that's not in the hostdb is bad.
badHosts = append(badHosts, host)
continue
}
// Check if the host violates the rules.
if filter.Filtered(node.NetAddress) {
badHosts = append(badHosts, host)
continue
}
// If it didn't then we add it to the filter.
filter.Add(node.NetAddress)
}
return badHosts
}
// Close closes the hostdb, terminating its scanning threads
func (hdb *HostDB) Close() error {
return hdb.tg.Stop()
}
// Host returns the HostSettings associated with the specified NetAddress. If
// no matching host is found, Host returns false.
func (hdb *HostDB) Host(spk types.SiaPublicKey) (modules.HostDBEntry, bool) {
host, exists := hdb.hostTree.Select(spk)
if !exists {
return host, exists
}
hdb.mu.RLock()
updateHostHistoricInteractions(&host, hdb.blockHeight)
hdb.mu.RUnlock()
return host, exists
}
// InitialScanComplete returns a boolean indicating if the initial scan of the
// hostdb is completed.
func (hdb *HostDB) InitialScanComplete() (complete bool, err error) {
if err = hdb.tg.Add(); err != nil {
return
}
defer hdb.tg.Done()
hdb.mu.Lock()
defer hdb.mu.Unlock()
complete = hdb.initialScanComplete
return
}
// RandomHosts implements the HostDB interface's RandomHosts() method. It takes
// a number of hosts to return, and a slice of netaddresses to ignore, and
// returns a slice of entries.
func (hdb *HostDB) RandomHosts(n int, blacklist, addressBlacklist []types.SiaPublicKey) ([]modules.HostDBEntry, error) {
hdb.mu.RLock()
initialScanComplete := hdb.initialScanComplete
hdb.mu.RUnlock()
if !initialScanComplete {
return []modules.HostDBEntry{}, ErrInitialScanIncomplete
}
return hdb.hostTree.SelectRandom(n, blacklist, addressBlacklist), nil
}