forked from NebulousLabs/Sia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
278 lines (246 loc) · 7.16 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package renter
// TODO: Need to make sure that we do not end up with two workers for the same
// host, which could potentially happen over renewals because the contract ids
// will be different.
import (
"time"
"github.com/NebulousLabs/Sia/crypto"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/types"
)
type (
// downloadWork contains instructions to download a piece from a host, and
// a channel for returning the results.
downloadWork struct {
// dataRoot is the MerkleRoot of the data being requested, which serves
// as an ID when requesting data from the host.
dataRoot crypto.Hash
pieceIndex uint64
chunkDownload *chunkDownload
// resultChan is a channel that the worker will use to return the
// results of the download.
resultChan chan finishedDownload
}
// finishedDownload contains the data and error from performing a download.
finishedDownload struct {
chunkDownload *chunkDownload
data []byte
err error
pieceIndex uint64
workerID types.FileContractID
}
// finishedUpload contains the Merkle root and error from performing an
// upload.
finishedUpload struct {
chunkID chunkID
dataRoot crypto.Hash
err error
pieceIndex uint64
workerID types.FileContractID
}
// uploadWork contains instructions to upload a piece to a host, and a
// channel for returning the results.
uploadWork struct {
// data is the payload of the upload.
chunkID chunkID
data []byte
file *file
pieceIndex uint64
// resultChan is a channel that the worker will use to return the
// results of the upload.
resultChan chan finishedUpload
}
// A worker listens for work on a certain host.
worker struct {
// contractID specifies which contract the worker specifically works
// with.
contract modules.RenterContract
contractID types.FileContractID
// If there is work on all three channels, the worker will first do all
// of the work in the priority download chan, then all of the work in the
// download chan, and finally all of the work in the upload chan.
//
// A busy higher priority channel is able to entirely starve all of the
// channels with lower priority.
downloadChan chan downloadWork // higher priority than all uploads
killChan chan struct{} // highest priority
priorityDownloadChan chan downloadWork // higher priority than downloads (used for user-initiated downloads)
uploadChan chan uploadWork // lowest priority
// recentUploadFailure documents the most recent time that an upload
// has failed.
consecutiveUploadFailures time.Duration
recentUploadFailure time.Time // Only modified by primary repair loop.
// recentDownloadFailure documents the most recent time that a download
// has failed.
recentDownloadFailure time.Time // Only modified by the primary download loop.
// Utilities.
renter *Renter
}
)
// download will perform some download work.
func (w *worker) download(dw downloadWork) {
d, err := w.renter.hostContractor.Downloader(w.contractID, w.renter.tg.StopChan())
if err != nil {
go func() {
select {
case dw.resultChan <- finishedDownload{dw.chunkDownload, nil, err, dw.pieceIndex, w.contractID}:
case <-w.renter.tg.StopChan():
}
}()
return
}
defer d.Close()
data, err := d.Sector(dw.dataRoot)
go func() {
select {
case dw.resultChan <- finishedDownload{dw.chunkDownload, data, err, dw.pieceIndex, w.contractID}:
case <-w.renter.tg.StopChan():
}
}()
}
// upload will perform some upload work.
func (w *worker) upload(uw uploadWork) {
e, err := w.renter.hostContractor.Editor(w.contractID, w.renter.tg.StopChan())
if err != nil {
w.recentUploadFailure = time.Now()
w.consecutiveUploadFailures++
go func() {
select {
case uw.resultChan <- finishedUpload{uw.chunkID, crypto.Hash{}, err, uw.pieceIndex, w.contractID}:
case <-w.renter.tg.StopChan():
}
}()
return
}
defer e.Close()
root, err := e.Upload(uw.data)
if err != nil {
w.recentUploadFailure = time.Now()
w.consecutiveUploadFailures++
go func() {
select {
case uw.resultChan <- finishedUpload{uw.chunkID, root, err, uw.pieceIndex, w.contractID}:
case <-w.renter.tg.StopChan():
}
}()
return
}
// Success - reset the consecutive upload failures count.
w.consecutiveUploadFailures = 0
// Update the renter metadata.
addr := e.Address()
endHeight := e.EndHeight()
id := w.renter.mu.Lock()
uw.file.mu.Lock()
contract, exists := uw.file.contracts[w.contractID]
if !exists {
contract = fileContract{
ID: w.contractID,
IP: addr,
WindowStart: endHeight,
}
}
contract.Pieces = append(contract.Pieces, pieceData{
Chunk: uw.chunkID.index,
Piece: uw.pieceIndex,
MerkleRoot: root,
})
uw.file.contracts[w.contractID] = contract
w.renter.saveFile(uw.file)
uw.file.mu.Unlock()
w.renter.mu.Unlock(id)
go func() {
select {
case uw.resultChan <- finishedUpload{uw.chunkID, root, err, uw.pieceIndex, w.contractID}:
case <-w.renter.tg.StopChan():
}
}()
}
// work will perform one unit of work, exiting early if there is a kill signal
// given before work is completed.
func (w *worker) work() {
// Check for priority downloads.
select {
case d := <-w.priorityDownloadChan:
w.download(d)
return
default:
// do nothing
}
// Check for standard downloads.
select {
case d := <-w.downloadChan:
w.download(d)
return
default:
// do nothing
}
// None of the priority channels have work, listen on all channels.
select {
case d := <-w.downloadChan:
w.download(d)
return
case <-w.killChan:
return
case d := <-w.priorityDownloadChan:
w.download(d)
return
case u := <-w.uploadChan:
w.upload(u)
return
case <-w.renter.tg.StopChan():
return
}
}
// threadedWorkLoop repeatedly issues work to a worker, stopping when the
// thread group is closed.
func (w *worker) threadedWorkLoop() {
for {
// Check if the worker has been killed individually.
select {
case <-w.killChan:
return
default:
// do nothing
}
if w.renter.tg.Add() != nil {
return
}
w.work()
w.renter.tg.Done()
}
}
// updateWorkerPool will grab the set of contracts from the contractor and
// update the worker pool to match.
func (r *Renter) updateWorkerPool(contracts []modules.RenterContract) {
// Get a map of all the contracts in the contractor.
newContracts := make(map[types.FileContractID]modules.RenterContract)
for _, nc := range contracts {
newContracts[nc.ID] = nc
}
// Add a worker for any contract that does not already have a worker.
for id, contract := range newContracts {
_, exists := r.workerPool[id]
if !exists {
worker := &worker{
contract: contract,
contractID: id,
downloadChan: make(chan downloadWork, 1),
killChan: make(chan struct{}),
priorityDownloadChan: make(chan downloadWork, 1),
uploadChan: make(chan uploadWork, 1),
renter: r,
}
r.workerPool[id] = worker
go worker.threadedWorkLoop()
}
}
// Remove a worker for any worker that is not in the set of new contracts.
for id, worker := range r.workerPool {
_, exists := newContracts[id]
if !exists {
delete(r.workerPool, id)
close(worker.killChan)
}
}
}