/
workerjobrenewcontract.go
151 lines (128 loc) · 4.11 KB
/
workerjobrenewcontract.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package renter
import (
"context"
"go.sia.tech/siad/modules"
"go.sia.tech/siad/types"
"gitlab.com/NebulousLabs/errors"
)
type (
// jobRenew contains information about a Renew query.
jobRenew struct {
staticResponseChan chan *jobRenewResponse
staticTransactionBuilder modules.TransactionBuilder
staticParams modules.ContractParams
staticFCID types.FileContractID
*jobGeneric
}
// jobRenewQueue is a list of Renew queries that have been assigned to the
// worker.
jobRenewQueue struct {
*jobGenericQueue
}
// jobRenewResponse contains the result of a Renew query.
jobRenewResponse struct {
staticNewContract modules.RenterContract
staticTxnSet []types.Transaction
staticErr error
// The worker is included in the response so that the caller can listen
// on one channel for a bunch of workers and still know which worker
// successfully found the sector root.
staticWorker *worker
}
)
// renewJobExpectedBandwidth is a helper function that returns the expected
// bandwidth consumption of a renew job.
func renewJobExpectedBandwidth() (ul, dl uint64) {
ul = 8760
dl = 4380
return
}
// callDiscard will discard a job, sending the provided error.
func (j *jobRenew) callDiscard(err error) {
w := j.staticQueue.staticWorker()
w.renter.tg.Launch(func() {
response := &jobRenewResponse{
staticErr: errors.Extend(err, ErrJobDiscarded),
}
select {
case j.staticResponseChan <- response:
case <-j.staticCtx.Done():
case <-w.renter.tg.StopChan():
}
})
}
// callExecute will run the renew job.
func (j *jobRenew) callExecute() {
w := j.staticQueue.staticWorker()
// Proactively try to fix a revision mismatch.
w.externTryFixRevisionMismatch()
newContract, txnSet, err := w.managedRenew(j.staticFCID, j.staticParams, j.staticTransactionBuilder)
// If the error could be caused by a revision number mismatch,
// signal it by setting the flag.
if errCausedByRevisionMismatch(err) {
w.staticSetSuspectRevisionMismatch()
w.staticWake()
}
// Send the response.
response := &jobRenewResponse{
staticErr: err,
staticNewContract: newContract,
staticTxnSet: txnSet,
staticWorker: w,
}
w.renter.tg.Launch(func() {
select {
case j.staticResponseChan <- response:
case <-j.staticCtx.Done():
case <-w.renter.tg.StopChan():
}
})
// Report success or failure to the queue.
if err != nil {
j.staticQueue.callReportFailure(err)
return
}
// Update worker cache with the new fcid.
w.managedUpdateCache()
j.staticQueue.callReportSuccess()
}
// callExpectedBandwidth returns the amount of bandwidth this job is expected to
// consume.
func (j *jobRenew) callExpectedBandwidth() (ul, dl uint64) {
return renewJobExpectedBandwidth()
}
// initJobRenewQueue will initialize a queue for renewing contracts with a host
// for the worker. This is only meant to be run once at startup.
func (w *worker) initJobRenewQueue() {
// Sanity check that there is no existing job queue.
if w.staticJobRenewQueue != nil {
w.renter.log.Critical("incorret call on initJobRenewQueue")
return
}
w.staticJobRenewQueue = &jobRenewQueue{
jobGenericQueue: newJobGenericQueue(w),
}
}
// RenewContract renews the contract with the worker's host.
func (w *worker) RenewContract(ctx context.Context, fcid types.FileContractID, params modules.ContractParams, txnBuilder modules.TransactionBuilder) (modules.RenterContract, []types.Transaction, error) {
renewResponseChan := make(chan *jobRenewResponse)
jro := &jobRenew{
staticFCID: fcid,
staticParams: params,
staticResponseChan: renewResponseChan,
staticTransactionBuilder: txnBuilder,
jobGeneric: newJobGeneric(ctx, w.staticJobReadQueue, nil),
}
// Add the job to the queue.
if !w.staticJobRenewQueue.callAdd(jro) {
return modules.RenterContract{}, nil, errors.New("worker unavailable")
}
// Wait for the response.
var resp *jobRenewResponse
select {
case <-ctx.Done():
return modules.RenterContract{}, nil, errors.New("Renew interrupted")
case resp = <-renewResponseChan:
}
return resp.staticNewContract, resp.staticTxnSet, resp.staticErr
}