-
Notifications
You must be signed in to change notification settings - Fork 2
/
task_sdr.go
277 lines (225 loc) · 7.91 KB
/
task_sdr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
package seal
import (
"bytes"
"context"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/nonffi"
"github.com/filecoin-project/go-commp-utils/zerocomm"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
ffi2 "github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/filler"
"github.com/filecoin-project/curio/lib/paths"
lbuild "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var isDevnet = lbuild.BlockDelaySecs < 30
func SetDevnet(value bool) {
isDevnet = value
}
func GetDevnet() bool {
return isDevnet
}
type SDRAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
StateGetRandomnessFromTickets(context.Context, crypto.DomainSeparationTag, abi.ChainEpoch, []byte, types.TipSetKey) (abi.Randomness, error)
}
type SDRTask struct {
api SDRAPI
db *harmonydb.DB
sp *SealPoller
sc *ffi2.SealCalls
max int
}
func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *ffi2.SealCalls, maxSDR int) *SDRTask {
return &SDRTask{
api: api,
db: db,
sp: sp,
sc: sc,
max: maxSDR,
}
}
func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()
var sectorParamsArr []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"`
}
err = s.db.Select(ctx, §orParamsArr, `
SELECT sp_id, sector_number, reg_seal_proof
FROM sectors_sdr_pipeline
WHERE task_id_sdr = $1`, taskID)
if err != nil {
return false, xerrors.Errorf("getting sector params: %w", err)
}
if len(sectorParamsArr) != 1 {
return false, xerrors.Errorf("expected 1 sector params, got %d", len(sectorParamsArr))
}
sectorParams := sectorParamsArr[0]
var pieces []struct {
PieceIndex int64 `db:"piece_index"`
PieceCID string `db:"piece_cid"`
PieceSize int64 `db:"piece_size"`
DataRawSize *int64 `db:"data_raw_size"`
}
err = s.db.Select(ctx, &pieces, `
SELECT piece_index, piece_cid, piece_size, data_raw_size
FROM sectors_sdr_initial_pieces
WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, sectorParams.SpID, sectorParams.SectorNumber)
if err != nil {
return false, xerrors.Errorf("getting pieces: %w", err)
}
ssize, err := sectorParams.RegSealProof.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
}
var commd cid.Cid
var offset abi.UnpaddedPieceSize
var pieceInfos []abi.PieceInfo
if len(pieces) > 0 {
for _, p := range pieces {
c, err := cid.Parse(p.PieceCID)
if err != nil {
return false, xerrors.Errorf("parsing piece cid: %w", err)
}
pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), abi.PaddedPieceSize(p.PieceSize))
offset += padLength.Unpadded()
for _, pad := range pads {
pieceInfos = append(pieceInfos, abi.PieceInfo{
Size: pad,
PieceCID: zerocomm.ZeroPieceCommitment(pad.Unpadded()),
})
}
pieceInfos = append(pieceInfos, abi.PieceInfo{
Size: abi.PaddedPieceSize(p.PieceSize),
PieceCID: c,
})
offset += abi.UnpaddedPieceSize(*p.DataRawSize)
}
fillerSize, err := filler.FillersFromRem(abi.PaddedPieceSize(ssize).Unpadded() - offset)
if err != nil {
return false, xerrors.Errorf("failed to calculate the final padding: %w", err)
}
for _, fil := range fillerSize {
pieceInfos = append(pieceInfos, abi.PieceInfo{
Size: fil.Padded(),
PieceCID: zerocomm.ZeroPieceCommitment(fil),
})
}
commd, err = nonffi.GenerateUnsealedCID(sectorParams.RegSealProof, pieceInfos)
if err != nil {
return false, xerrors.Errorf("computing CommD: %w", err)
}
} else {
commd = zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded())
}
sref := storiface.SectorRef{
ID: abi.SectorID{
Miner: abi.ActorID(sectorParams.SpID),
Number: abi.SectorNumber(sectorParams.SectorNumber),
},
ProofType: sectorParams.RegSealProof,
}
// get ticket
maddr, err := address.NewIDAddress(uint64(sectorParams.SpID))
if err != nil {
return false, xerrors.Errorf("getting miner address: %w", err)
}
// FAIL: api may be down
// FAIL-RESP: rely on harmony retry
ticket, ticketEpoch, err := s.getTicket(ctx, maddr)
if err != nil {
return false, xerrors.Errorf("getting ticket: %w", err)
}
// do the SDR!!
// FAIL: storage may not have enough space
// FAIL-RESP: rely on harmony retry
// LATEFAIL: compute error in sdr
// LATEFAIL-RESP: Check in Trees task should catch this; Will retry computing
// Trees; After one retry, it should return the sector to the
// SDR stage; max number of retries should be configurable
err = s.sc.GenerateSDR(ctx, taskID, sref, ticket, commd)
if err != nil {
return false, xerrors.Errorf("generating sdr: %w", err)
}
// store success!
n, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
SET after_sdr = true, ticket_epoch = $3, ticket_value = $4, task_id_sdr = NULL
WHERE sp_id = $1 AND sector_number = $2`,
sectorParams.SpID, sectorParams.SectorNumber, ticketEpoch, []byte(ticket))
if err != nil {
return false, xerrors.Errorf("store sdr success: updating pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("store sdr success: updated %d rows", n)
}
return true, nil
}
func (s *SDRTask) getTicket(ctx context.Context, maddr address.Address) (abi.SealRandomness, abi.ChainEpoch, error) {
ts, err := s.api.ChainHead(ctx)
if err != nil {
return nil, 0, xerrors.Errorf("getting chain head: %w", err)
}
ticketEpoch := ts.Height() - policy.SealRandomnessLookback
buf := new(bytes.Buffer)
if err := maddr.MarshalCBOR(buf); err != nil {
return nil, 0, xerrors.Errorf("marshaling miner address: %w", err)
}
rand, err := s.api.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key())
if err != nil {
return nil, 0, xerrors.Errorf("getting randomness from tickets: %w", err)
}
return abi.SealRandomness(rand), ticketEpoch, nil
}
func (s *SDRTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := ids[0]
return &id, nil
}
func (s *SDRTask) TypeDetails() harmonytask.TaskTypeDetails {
ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size
if isDevnet {
ssize = abi.SectorSize(2 << 20)
}
res := harmonytask.TaskTypeDetails{
Max: s.max,
Name: "SDR",
Cost: resources.Resources{ // todo offset for prefetch?
Cpu: 4, // todo multicore sdr
Gpu: 0,
Ram: (64 << 30) + (256 << 20),
Storage: s.sc.Storage(s.taskToSector, storiface.FTCache, storiface.FTNone, ssize, storiface.PathSealing, paths.MinFreeStoragePercentage),
},
MaxFailures: 2,
Follows: nil,
}
if isDevnet {
res.Cost.Ram = 1 << 30
res.Cost.Cpu = 1
}
return res
}
func (s *SDRTask) Adder(taskFunc harmonytask.AddTaskFunc) {
s.sp.pollers[pollerSDR].Set(taskFunc)
}
func (s *SDRTask) taskToSector(id harmonytask.TaskID) (ffi2.SectorRef, error) {
var refs []ffi2.SectorRef
err := s.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_sdr = $1`, id)
if err != nil {
return ffi2.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err)
}
if len(refs) != 1 {
return ffi2.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs))
}
return refs[0], nil
}
var _ harmonytask.TaskInterface = &SDRTask{}