/
lcopy.go
445 lines (414 loc) · 11.6 KB
/
lcopy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
// Package core provides core metadata and in-cluster API
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
*/
package core
import (
"fmt"
"os"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/fs"
)
//
// LOM copy management
//
func (lom *LOM) whingeCopy() (yes bool) {
if !lom.IsCopy() {
return
}
msg := fmt.Sprintf("unexpected: %s([fqn=%s] [hrw=%s] %+v)", lom, lom.FQN, lom.HrwFQN, lom.md.copies)
debug.Assert(false, msg)
nlog.Errorln(msg)
return true
}
func (lom *LOM) HasCopies() bool { return len(lom.md.copies) > 1 }
func (lom *LOM) NumCopies() int { return max(len(lom.md.copies), 1) } // metadata-wise
// GetCopies returns all copies
// NOTE: a) copies include lom.FQN aka "main repl.", and b) caller must take a lock
func (lom *LOM) GetCopies() fs.MPI {
debug.AssertFunc(func() bool {
rc, exclusive := lom.IsLocked()
return exclusive || rc > 0
})
return lom.md.copies
}
// given an existing (on-disk) object, determines whether it is a _copy_
// (compare with isMirror below)
func (lom *LOM) IsCopy() bool {
if lom.IsHRW() {
return false
}
// misplaced or a copy
_, ok := lom.md.copies[lom.FQN]
return ok
}
// determines whether the two LOM _structures_ represent objects that must be _copies_ of each other
// (compare with IsCopy above)
func (lom *LOM) isMirror(dst *LOM) bool {
return lom.MirrorConf().Enabled &&
lom.ObjName == dst.ObjName &&
lom.Bck().Equal(dst.Bck(), true /* must have same BID*/, true /* same backend */)
}
func (lom *LOM) delCopyMd(copyFQN string) {
delete(lom.md.copies, copyFQN)
if len(lom.md.copies) <= 1 {
lom.md.copies = nil
}
}
// NOTE: used only in tests
func (lom *LOM) AddCopy(copyFQN string, mpi *fs.Mountpath) error {
if lom.md.copies == nil {
lom.md.copies = make(fs.MPI, 2)
}
lom.md.copies[copyFQN] = mpi
lom.md.copies[lom.FQN] = lom.mi
return lom.syncMetaWithCopies()
}
func (lom *LOM) DelCopies(copiesFQN ...string) (err error) {
numCopies := lom.NumCopies()
// 1. Delete all copies from the metadata
for _, copyFQN := range copiesFQN {
if _, ok := lom.md.copies[copyFQN]; !ok {
return fmt.Errorf("lom %s(num: %d): copy %s does not exist", lom, numCopies, copyFQN)
}
lom.delCopyMd(copyFQN)
}
// 2. Update metadata on remaining copies, if any
if err := lom.syncMetaWithCopies(); err != nil {
debug.AssertNoErr(err)
return err
}
// 3. Remove the copies
for _, copyFQN := range copiesFQN {
if err1 := cos.RemoveFile(copyFQN); err1 != nil {
nlog.Errorln(err1) // TODO: LRU should take care of that later.
continue
}
}
return
}
func (lom *LOM) DelAllCopies() (err error) {
copiesFQN := make([]string, 0, len(lom.md.copies))
for copyFQN := range lom.md.copies {
if copyFQN == lom.FQN {
continue
}
copiesFQN = append(copiesFQN, copyFQN)
}
return lom.DelCopies(copiesFQN...)
}
// DelExtraCopies deletes obj replicas that are not part of the lom.md.copies metadata
// (cleanup)
func (lom *LOM) DelExtraCopies(fqn ...string) (removed bool, err error) {
if lom.whingeCopy() {
return
}
availablePaths := fs.GetAvail()
for _, mi := range availablePaths {
copyFQN := mi.MakePathFQN(lom.Bucket(), fs.ObjectType, lom.ObjName)
if _, ok := lom.md.copies[copyFQN]; ok {
continue
}
if err1 := cos.RemoveFile(copyFQN); err1 != nil {
err = err1
continue
}
if len(fqn) > 0 && fqn[0] == copyFQN {
removed = true
}
}
return
}
// syncMetaWithCopies tries to make sure that all copies have identical metadata.
// NOTE: uname for LOM must be already locked.
// NOTE: changes _may_ be made - the caller must call lom.Persist() upon return
func (lom *LOM) syncMetaWithCopies() (err error) {
var copyFQN string
if !lom.HasCopies() {
return nil
}
// NOTE: caller is responsible for write-locking
debug.AssertFunc(func() bool {
_, exclusive := lom.IsLocked()
return exclusive
})
if !lom.WritePolicy().IsImmediate() {
lom.md.makeDirty()
return nil
}
for {
if copyFQN, err = lom.persistMdOnCopies(); err == nil {
break
}
lom.delCopyMd(copyFQN)
if err1 := cos.Stat(copyFQN); err1 != nil && !os.IsNotExist(err1) {
T.FSHC(err, copyFQN) // TODO: notify scrubber
}
}
return
}
// RestoreObjectFromAny tries to restore the object at its default location.
// Returns true if object exists, false otherwise
// TODO: locking vs concurrent restore: consider (read-lock object + write-lock meta) split
func (lom *LOM) RestoreToLocation() (exists bool) {
lom.Lock(true)
if err := lom.Load(true /*cache it*/, true /*locked*/); err == nil {
lom.Unlock(true)
return true // nothing to do
}
var (
saved = lom.md.pushrt()
availablePaths = fs.GetAvail()
buf, slab = g.pmm.Alloc()
)
for path, mi := range availablePaths {
if path == lom.mi.Path {
continue
}
fqn := mi.MakePathFQN(lom.Bucket(), fs.ObjectType, lom.ObjName)
if err := cos.Stat(fqn); err != nil {
continue
}
dst, err := lom._restore(fqn, buf)
if err == nil {
lom.md = dst.md
lom.md.poprt(saved)
exists = true
FreeLOM(dst)
break
}
if dst != nil {
FreeLOM(dst)
}
}
lom.Unlock(true)
slab.Free(buf)
return
}
func (lom *LOM) _restore(fqn string, buf []byte) (dst *LOM, err error) {
src := lom.CloneMD(fqn)
defer FreeLOM(src)
if err = src.InitFQN(fqn, lom.Bucket()); err != nil {
return
}
if err = src.Load(false /*cache it*/, true /*locked*/); err != nil {
return
}
// restore at default location
dst, err = src.Copy2FQN(lom.FQN, buf)
return
}
// increment the object's num copies by (well) copying the former
// (compare with lom.Copy2FQN below)
func (lom *LOM) Copy(mi *fs.Mountpath, buf []byte) (err error) {
var (
copyFQN = mi.MakePathFQN(lom.Bucket(), fs.ObjectType, lom.ObjName)
workFQN = mi.MakePathFQN(lom.Bucket(), fs.WorkfileType, fs.WorkfileCopy+"."+lom.ObjName)
)
// check if the copy destination exists and then skip copying if it's also identical
if errExists := cos.Stat(copyFQN); errExists == nil {
cplom := AllocLOM(lom.ObjName)
defer FreeLOM(cplom)
if errExists = cplom.InitFQN(copyFQN, lom.Bucket()); errExists == nil {
if errExists = cplom.Load(false /*cache it*/, true /*locked*/); errExists == nil && cplom.Equal(lom) {
goto add
}
}
}
// copy
_, _, err = cos.CopyFile(lom.FQN, workFQN, buf, cos.ChecksumNone) // TODO: checksumming
if err != nil {
return
}
if err = cos.Rename(workFQN, copyFQN); err != nil {
if errRemove := cos.RemoveFile(workFQN); errRemove != nil && !os.IsNotExist(errRemove) {
nlog.Errorln("nested err:", errRemove)
}
return
}
add:
// add md and persist
lom.AddCopy(copyFQN, mi)
err = lom.Persist()
if err != nil {
lom.delCopyMd(copyFQN)
nlog.Errorln(err)
return err
}
err = lom.syncMetaWithCopies()
return
}
// copy object => any local destination
// recommended for copying between different buckets (compare with lom.Copy() above)
// NOTE: `lom` source must be w-locked
func (lom *LOM) Copy2FQN(dstFQN string, buf []byte) (dst *LOM, err error) {
dst = lom.CloneMD(dstFQN)
if err = dst.InitFQN(dstFQN, nil); err == nil {
err = lom.copy2fqn(dst, buf)
}
if err != nil {
FreeLOM(dst)
dst = nil
}
return
}
func (lom *LOM) copy2fqn(dst *LOM, buf []byte) (err error) {
var (
dstCksum *cos.CksumHash
dstFQN = dst.FQN
srcCksum = lom.Checksum()
cksumType = cos.ChecksumNone
)
if !srcCksum.IsEmpty() {
cksumType = srcCksum.Ty()
}
if dst.isMirror(lom) && lom.md.copies != nil {
dst.md.copies = make(fs.MPI, len(lom.md.copies)+1)
for fqn, mpi := range lom.md.copies {
dst.md.copies[fqn] = mpi
}
}
if !dst.Bck().Equal(lom.Bck(), true /*same ID*/, true /*same backend*/) {
// The copy will be in a new bucket - completely separate object. Hence, we have to set initial version.
dst.SetVersion(lomInitialVersion)
}
workFQN := fs.CSM.Gen(dst, fs.WorkfileType, fs.WorkfileCopy)
_, dstCksum, err = cos.CopyFile(lom.FQN, workFQN, buf, cksumType)
if err != nil {
return
}
if err = cos.Rename(workFQN, dstFQN); err != nil {
if errRemove := cos.RemoveFile(workFQN); errRemove != nil && !os.IsNotExist(errRemove) {
nlog.Errorln("nested err:", errRemove)
}
return
}
if cksumType != cos.ChecksumNone {
if !dstCksum.Equal(lom.Checksum()) {
return cos.NewErrDataCksum(&dstCksum.Cksum, lom.Checksum())
}
dst.SetCksum(dstCksum.Clone())
}
// persist
if lom.isMirror(dst) {
if lom.md.copies == nil {
lom.md.copies = make(fs.MPI, 2)
dst.md.copies = make(fs.MPI, 2)
}
lom.md.copies[dstFQN], dst.md.copies[dstFQN] = dst.mi, dst.mi
lom.md.copies[lom.FQN], dst.md.copies[lom.FQN] = lom.mi, lom.mi
if err = lom.syncMetaWithCopies(); err != nil {
if _, ok := lom.md.copies[dst.FQN]; !ok {
if errRemove := os.Remove(dst.FQN); errRemove != nil && !os.IsNotExist(errRemove) {
nlog.Errorln("nested err:", errRemove)
}
}
// `lom.syncMetaWithCopies()` may have made changes notwithstanding
if errPersist := lom.Persist(); errPersist != nil {
nlog.Errorln("nested err:", errPersist)
}
return
}
err = lom.Persist()
} else if err = dst.Persist(); err != nil {
if errRemove := os.Remove(dst.FQN); errRemove != nil && !os.IsNotExist(errRemove) {
nlog.Errorln("nested err:", errRemove)
}
}
return
}
// load-balanced GET
func (lom *LOM) LBGet() (fqn string) {
if !lom.HasCopies() {
return lom.FQN
}
return lom.leastUtilCopy()
}
// NOTE: reconsider counting GETs (and the associated overhead)
// vs ios.refreshIostatCache (and the associated delay)
func (lom *LOM) leastUtilCopy() (fqn string) {
var (
mpathUtils = fs.GetAllMpathUtils()
minUtil = mpathUtils.Get(lom.mi.Path)
copies = lom.GetCopies()
)
fqn = lom.FQN
for copyFQN, copyMPI := range copies {
if copyFQN != lom.FQN {
if util := mpathUtils.Get(copyMPI.Path); util < minUtil {
fqn, minUtil = copyFQN, util
}
}
}
return
}
// returns the least utilized mountpath that does _not_ have a copy of this `lom` yet
// (compare with leastUtilCopy())
func (lom *LOM) LeastUtilNoCopy() (mi *fs.Mountpath) {
var (
availablePaths = fs.GetAvail()
mpathUtils = fs.GetAllMpathUtils()
minUtil = int64(101) // to motivate the first assignment
)
for mpath, mpathInfo := range availablePaths {
if lom.haveMpath(mpath) || mpathInfo.IsAnySet(fs.FlagWaitingDD) {
continue
}
if util := mpathUtils.Get(mpath); util < minUtil {
minUtil, mi = util, mpathInfo
}
}
return
}
func (lom *LOM) haveMpath(mpath string) bool {
if len(lom.md.copies) == 0 {
return lom.mi.Path == mpath
}
for _, mi := range lom.md.copies {
if mi.Path == mpath {
return true
}
}
return false
}
// must be called under w-lock
// returns mountpath destination to copy this object, or nil if no copying is required
// - checks hrw location first, and
// - checks copies (if any) against the current configuation and available mountpaths;
// - does not check `fstat` in either case (TODO: configurable or scrub);
func (lom *LOM) ToMpath() (mi *fs.Mountpath, isHrw bool) {
var (
availablePaths = fs.GetAvail()
hrwMi, _, err = fs.Hrw(lom.md.uname)
)
if err != nil {
nlog.Errorln(err)
return
}
debug.Assert(!hrwMi.IsAnySet(fs.FlagWaitingDD))
if lom.mi.Path != hrwMi.Path {
return hrwMi, true
}
mirror := lom.MirrorConf()
if !mirror.Enabled || mirror.Copies < 2 {
return
}
// count copies vs. configuration
// take into account mountpath flags but stop short of `fstat`-ing
expCopies, gotCopies := int(mirror.Copies), 0
for fqn, mpi := range lom.md.copies {
mpathInfo, ok := availablePaths[mpi.Path]
if !ok || mpathInfo.IsAnySet(fs.FlagWaitingDD) {
lom.delCopyMd(fqn)
} else {
gotCopies++
}
}
if expCopies <= gotCopies {
return
}
mi = lom.LeastUtilNoCopy() // NOTE: nil when not enough mountpaths
return
}