/
worker.go
381 lines (319 loc) · 11.1 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
// Copyright 2019 DxChain, All rights reserved.
// Use of this source code is governed by an Apache
// License 2.0 that can be found in the LICENSE file
package storageclient
import (
"errors"
"sync"
"time"
"github.com/DxChainNetwork/godx/log"
"github.com/DxChainNetwork/godx/p2p/enode"
"github.com/DxChainNetwork/godx/storage"
)
var (
// ErrNoContractsWithHost will be used when the client has no contract with host
// the worker will be terminated
ErrNoContractsWithHost = errors.New("no contract with host which is need to terminate")
// ErrUnableRetrieveHostInfo is used when host information cannot be retrieved
// worker will be terminated
ErrUnableRetrieveHostInfo = errors.New("can't retrieve host info")
// ErrContractRenewing is used when client and host is renewing contract
// the worker will return directly
ErrContractRenewing = errors.New("client and host is renewing contract")
)
// Listen for a work on a certain host.
type worker struct {
// The contract and host used by this worker.
contract storage.ContractMetaData
hostID enode.ID
client *StorageClient
// How many failures in a row?
ownedDownloadConsecutiveFailures int
// the time that last failure
ownedDownloadRecentFailure time.Time
// Notifications of new download work. Takes priority over uploads.
downloadChan chan struct{}
downloadSegments []*unfinishedDownloadSegment
downloadMu sync.Mutex
// Has downloading been terminated for this worker?
downloadTerminated bool
// pending upload segment in heap
pendingSegments []*unfinishedUploadSegment
uploadChan chan struct{} // Notifications of new segment
uploadConsecutiveFailures int // How many times in a row uploading has failed.
uploadRecentFailure time.Time // How recent was the last failure?
uploadTerminated bool // Have we stopped uploading?
// Worker will shut down if a signal is sent down this channel.
killChan chan struct{}
mu sync.Mutex
}
// ActivateWorkerPool will grab the set of contracts from the contract manager and
// update the worker pool to match.
func (client *StorageClient) activateWorkerPool() {
// get all contracts in client
contractMap := client.contractManager.GetStorageContractSet().Contracts()
// new a worker for a contract that haven't a worker
for id, contract := range contractMap {
client.lock.Lock()
_, exists := client.workerPool[id]
if !exists {
worker := &worker{
contract: contract.Metadata(),
hostID: contract.Header().EnodeID,
downloadChan: make(chan struct{}, 1),
uploadChan: make(chan struct{}, 1),
killChan: make(chan struct{}),
client: client,
}
client.workerPool[id] = worker
// start worker goroutine
if err := client.tm.Add(); err != nil {
log.Error("storage client failed to add in worker progress", "error", err)
client.lock.Unlock()
break
}
go func() {
defer client.tm.Done()
worker.workLoop()
}()
}
client.lock.Unlock()
}
// Remove a worker for any worker that is not in the set of new contracts.
client.lock.Lock()
for id, worker := range client.workerPool {
_, exists := contractMap[storage.ContractID(id)]
if !exists {
delete(client.workerPool, id)
close(worker.killChan)
}
}
client.lock.Unlock()
}
// WorkLoop repeatedly issues task to a worker, will stop when receive stop or kill signal
func (w *worker) workLoop() {
defer w.killUploading()
defer w.killDownloading()
for {
downloadSegment := w.nextDownloadSegment()
if downloadSegment != nil {
err := w.download(downloadSegment)
if err == ErrNoContractsWithHost || err == ErrUnableRetrieveHostInfo {
break
}
if err == ErrContractRenewing {
<-time.After(50 * time.Millisecond)
}
if err != nil {
return
}
continue
}
segment, sectorIndex := w.nextUploadSegment()
if segment != nil {
err := w.upload(segment, sectorIndex)
if err == ErrNoContractsWithHost || err == ErrUnableRetrieveHostInfo {
break
}
// the client is renewing, we wait for some millisecond
if err == ErrContractRenewing {
<-time.After(50 * time.Millisecond)
}
if err != nil {
return
}
continue
}
// keep listening for a new upload/download task, or a stop signal
select {
case <-w.downloadChan:
continue
case <-w.uploadChan:
continue
case <-w.killChan:
return
case <-w.client.tm.StopChan():
return
}
}
}
// Drop all of the download task given to the worker.
func (w *worker) killDownloading() {
w.downloadMu.Lock()
var removedSegments []*unfinishedDownloadSegment
for i := 0; i < len(w.downloadSegments); i++ {
removedSegments = append(removedSegments, w.downloadSegments[i])
}
w.downloadSegments = w.downloadSegments[:0]
w.downloadTerminated = true
w.downloadMu.Unlock()
// close connection after downloading
for i := 0; i < len(removedSegments); i++ {
removedSegments[i].removeWorker()
}
}
// Add a segment to the worker's queue.
func (w *worker) queueDownloadSegment(uds *unfinishedDownloadSegment) {
w.downloadMu.Lock()
terminated := w.downloadTerminated
if !terminated {
// accept the segment and notify client that there is a new download.
w.downloadSegments = append(w.downloadSegments, uds)
select {
case w.downloadChan <- struct{}{}:
default:
}
}
w.downloadMu.Unlock()
// if the worker has terminated, remove it from the uds
if terminated {
uds.removeWorker()
}
}
// Pull the next potential segment out of the work queue for downloading.
func (w *worker) nextDownloadSegment() *unfinishedDownloadSegment {
w.downloadMu.Lock()
defer w.downloadMu.Unlock()
if len(w.downloadSegments) == 0 {
return nil
}
nextSegment := w.downloadSegments[0]
w.downloadSegments = w.downloadSegments[1:]
return nextSegment
}
func (w *worker) checkConnection() (storage.Peer, *storage.HostInfo, error) {
// check this contract whether is renewing
contractID := w.contract.ID
// get the storage host information
hostInfo, err := w.updateWorkerContractID(contractID)
if err != nil {
return nil, nil, err
}
// set up the connection
sp, err := w.client.SetupConnection(hostInfo.EnodeURL)
// start contract revision, if failed, meaning the
// renewing is started
if ok := sp.TryToRenewOrRevise(); !ok {
return nil, nil, errors.New("the contract is currently renewing or revising")
}
return sp, hostInfo, err
}
// Actually perform a download task
func (w *worker) download(uds *unfinishedDownloadSegment) error {
sp, hostInfo, err := w.checkConnection()
defer sp.RevisionOrRenewingDone()
if err != nil {
w.client.log.Error("failed to check the connection", "err", err)
return err
}
// check the uds whether can be the worker performed
uds = w.processDownloadSegment(uds)
if uds == nil {
return err
}
// whether download success or fail, we should remove the worker at last
defer uds.removeWorker()
// for not supporting partial encoding, we need to download the whole sector every time.
fetchOffset, fetchLength := 0, storage.SectorSize
root := uds.segmentMap[w.hostID.String()].root
// call rpc request the data from host, if get error, unregister the worker.
sectorData, err := w.client.Download(sp, root, uint32(fetchOffset), uint32(fetchLength), hostInfo)
if err != nil {
w.client.log.Error("worker failed to download sector", "error", err)
uds.unregisterWorker(w)
return err
}
// decrypt the sector
key := uds.clientFile.CipherKey()
decryptedSector, err := key.DecryptInPlace(sectorData)
if err != nil {
w.client.log.Error("worker failed to decrypt sector", "error", err)
uds.unregisterWorker(w)
return err
}
// mark the sector as completed
sectorIndex := uds.segmentMap[w.hostID.String()].index
uds.mu.Lock()
uds.markSectorCompleted(sectorIndex)
uds.sectorsRegistered--
// if the num of sectorsCompleted has not reached the required min sector num,
// go on keeping the decrypted sector.
if uds.sectorsCompleted <= uds.erasureCode.MinSectors() {
uds.physicalSegmentData[sectorIndex] = decryptedSector
w.client.log.Debug("received a sector,but not enough to recover", "sectors_completed", uds.sectorsCompleted)
}
// recover the logical data
if uds.sectorsCompleted == uds.erasureCode.MinSectors() {
go uds.recoverLogicalData()
w.client.log.Debug("received enough sectors to recover", "sectors_completed", uds.sectorsCompleted)
}
uds.mu.Unlock()
return nil
}
// Check the given download segment whether there is work to do, and update its info
func (w *worker) processDownloadSegment(uds *unfinishedDownloadSegment) *unfinishedDownloadSegment {
uds.mu.Lock()
segmentComplete := uds.sectorsCompleted >= uds.erasureCode.MinSectors() || uds.download.isComplete()
segmentFailed := uds.sectorsCompleted+uds.workersRemaining < uds.erasureCode.MinSectors()
sectorData, workerHasSector := uds.segmentMap[w.hostID.String()]
sectorCompleted := uds.completedSectors[sectorData.index]
// if the given segment downloading complete/fail, or no sector associated with host for downloading,
// or the sector has completed, the worker should be removed.
if segmentComplete || segmentFailed || w.onDownloadCooldown() || !workerHasSector || sectorCompleted {
uds.mu.Unlock()
uds.removeWorker()
return nil
}
defer uds.mu.Unlock()
// if need more sector, and the sector has not been fetched yet,
// should register the worker and return the segment for downloading.
sectorTaken := uds.sectorUsage[sectorData.index]
sectorsInProgress := uds.sectorsRegistered + uds.sectorsCompleted
desiredSectorsInProgress := uds.erasureCode.MinSectors() + uds.overdrive
workersDesired := sectorsInProgress < desiredSectorsInProgress && !sectorTaken
if workersDesired {
uds.sectorsRegistered++
uds.sectorUsage[sectorData.index] = true
return uds
}
// put this worker on standby for this segment, we can use it to download later.
uds.workersStandby = append(uds.workersStandby, w)
return nil
}
// Return true if the worker is on cooldown for download failure.
func (w *worker) onDownloadCooldown() bool {
requiredCooldown := DownloadFailureCooldown
for i := 0; i < w.ownedDownloadConsecutiveFailures && i < MaxConsecutivePenalty; i++ {
requiredCooldown *= 2
}
return time.Now().Before(w.ownedDownloadRecentFailure.Add(requiredCooldown))
}
// Remove the worker from an unfinished download segment,
// and then un-register the sectors that it grabbed.
//
// NOTE: This function should only be called when a worker download fails.
func (uds *unfinishedDownloadSegment) unregisterWorker(w *worker) {
uds.mu.Lock()
uds.sectorsRegistered--
sectorIndex := uds.segmentMap[w.hostID.String()].index
uds.sectorUsage[sectorIndex] = false
uds.mu.Unlock()
}
func (w *worker) updateWorkerContractID(contractID storage.ContractID) (*storage.HostInfo, error) {
hostInfo, ok := w.client.storageHostManager.RetrieveHostInfo(w.hostID)
if !ok {
return nil, ErrUnableRetrieveHostInfo
}
cm := w.client.contractManager
if _, exist := cm.RetrieveActiveContract(contractID); exist {
return &hostInfo, nil
}
scs := cm.GetStorageContractSet()
renewContractID := scs.GetContractIDByHostID(w.hostID)
if contract, exist := cm.RetrieveActiveContract(renewContractID); exist {
w.contract = contract
w.hostID = contract.EnodeID
return &hostInfo, nil
}
return nil, ErrNoContractsWithHost
}