Skip to content

Commit 8ee98ea

Browse files
committed
xs/xact: initial rechunk xaction implementation
Signed-off-by: Tony Chen <a122774007@gmail.com>
1 parent c04d838 commit 8ee98ea

File tree

13 files changed

+1028
-14
lines changed

13 files changed

+1028
-14
lines changed

ais/test/rechunk_test.go

Lines changed: 771 additions & 0 deletions
Large diffs are not rendered by default.

ais/tgtimpl.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (t *target) Health(si *meta.Snode, timeout time.Duration, query url.Values)
3737
return t.reqHealth(si, timeout, query, t.owner.smap.get(), false /*retry*/)
3838
}
3939

40-
func (t *target) PutObject(lom *core.LOM, params *core.PutParams) error {
40+
func (t *target) PutObject(lom *core.LOM, params *core.PutParams) (err error) {
4141
debug.Assert(params.WorkTag != "" && !params.Atime.IsZero())
4242
workFQN := lom.GenFQN(fs.WorkCT, params.WorkTag)
4343

@@ -54,11 +54,19 @@ func (t *target) PutObject(lom *core.LOM, params *core.PutParams) error {
5454
poi.owt = params.OWT
5555
poi.skipEC = params.SkipEC
5656
poi.coldGET = params.ColdGET
57+
poi.locked = params.Locked
5758
}
5859
if poi.owt != cmn.OwtPut {
5960
poi.cksumToUse = params.Cksum
6061
}
61-
_, err := poi.putObject()
62+
63+
switch {
64+
case params.ChunkSize > 0:
65+
_, err = poi.chunk(params.ChunkSize)
66+
default:
67+
_, err = poi.putObject()
68+
}
69+
6270
freePOI(poi)
6371
debug.Func(func() {
6472
if err == nil {

ais/tgtmpt.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type (
6060
parts apc.MptCompletedParts
6161
isS3 bool
6262
coldGET bool
63+
locked bool // true if the LOM is already locked by the caller
6364
}
6465
)
6566

@@ -529,8 +530,8 @@ func (ups *ups) complete(args *completeArgs) (string, int, error) {
529530

530531
// atomically flip: persist manifest, mark chunked, persist main
531532
// NOTE: coldGET implies the LOM's lock has been promoted to wlock
532-
debug.Assertf(!args.coldGET || lom.IsLocked() == apc.LockWrite, "expecting wlock, have %d", lom.IsLocked())
533-
if err = lom.CompleteUfest(manifest, args.coldGET); err != nil {
533+
debug.Assertf(!args.locked || lom.IsLocked() == apc.LockWrite, "expecting wlock, have %d", lom.IsLocked())
534+
if err = lom.CompleteUfest(manifest, args.locked); err != nil {
534535
nlog.Errorf("upload %q: failed to complete %s locally: %v", uploadID, lom.Cname(), err)
535536
return "", http.StatusInternalServerError, err
536537
}

ais/tgtobj.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type (
6868
skipEC bool // do not erasure-encode when finalizing
6969
skipVC bool // skip loading existing Version and skip comparing Checksums (skip VC)
7070
coldGET bool // (one implication: proceed to write)
71+
locked bool // true if the LOM is already locked by the caller
7172
remoteErr bool // to exclude `putRemote` errors when counting soft IO errors
7273
}
7374

@@ -180,6 +181,7 @@ func (poi *putOI) chunk(chunkSize int64) (ecode int, err error) {
180181
lom = poi.lom
181182
uploadID string
182183
)
184+
debug.Assertf(!poi.coldGET || poi.locked, "expecting locked LOM for cold-GET")
183185

184186
debug.Assertf(poi.size > 0, "poi.size is required in chunk, object name: %s", poi.lom.Cname())
185187
if uploadID, err = poi.t.ups.start(poi.oreq, lom, poi.coldGET); err != nil {
@@ -239,12 +241,14 @@ func (poi *putOI) chunk(chunkSize int64) (ecode int, err error) {
239241
parts: completedParts,
240242
isS3: false,
241243
coldGET: poi.coldGET,
244+
locked: poi.locked,
242245
})
243246
return ecode, err
244247
}
245248

246249
func (poi *putOI) putObject() (ecode int, err error) {
247250
maxMonoSize := int64(poi.lom.Bprops().Chunks.MaxMonolithicSize)
251+
// protect the bucket: if the object size exceeds the max monolithic size, MUST chunk
248252
// NOTE: if `poi.size` is not set, don't trigger chunking
249253
if maxMonoSize > 0 && poi.size > maxMonoSize {
250254
if cmn.Rom.V(5, cos.ModAIS) {
@@ -453,8 +457,9 @@ func (poi *putOI) fini() (ecode int, err error) {
453457
// locking strategies: optimistic and otherwise
454458
// (see GetCold() implementation and cmn.OWT enum)
455459
switch poi.owt {
456-
case cmn.OwtGetTryLock, cmn.OwtGetLock, cmn.OwtGet:
460+
case cmn.OwtGetTryLock, cmn.OwtGetLock, cmn.OwtGet, cmn.OwtChunks:
457461
// do nothing: lom is already wlocked
462+
debug.Assertf(lom.IsLocked() == apc.LockWrite, "lom %s is not write-locked", lom.Cname())
458463
case cmn.OwtGetPrefetchLock:
459464
if !lom.TryLock(true) {
460465
nlog.Warningln(poi.loghdr(), "is busy")
@@ -879,6 +884,7 @@ func (goi *getOI) coldPut(res *core.GetReaderResult) (int, error) {
879884
poi.owt = cmn.OwtGet
880885
poi.cksumToUse = res.ExpCksum // expected checksum (to validate if the bucket's `validate_cold_get == true`)
881886
poi.coldGET = true
887+
poi.locked = true
882888
}
883889
code, err := poi.putObject()
884890
freePOI(poi)

ais/tgtxact.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,20 @@ func (t *target) xstart(args *xact.ArgsMsg, bck *meta.Bck, msg *apc.ActMsg) (xid
282282
}
283283
go t.runResilver(resargs, wg)
284284
wg.Wait()
285+
case apc.ActRechunk:
286+
if err := xreg.LimitedCoexistence(t.si, bck, args.Kind); err != nil {
287+
return "", err
288+
}
289+
rns := xreg.RenewBckRechunks(bck, args.ID, &xreg.RechunkArgs{
290+
ObjSizeLimit: int64(bck.Props.Chunks.ObjSizeLimit),
291+
ChunkSize: int64(bck.Props.Chunks.ChunkSize),
292+
})
293+
if rns.Err != nil {
294+
return "", rns.Err
295+
}
296+
xctn := rns.Entry.Get()
297+
xact.GoRunW(xctn)
298+
return xctn.ID(), nil
285299
case apc.ActLoadLomCache:
286300
rns := xreg.RenewBckLoadLomCache(args.ID, bck)
287301
return xid, rns.Err

api/apc/actmsg.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ const (
3939

4040
ActMakeNCopies = "make-n-copies"
4141
ActPutCopies = "put-copies"
42+
ActRechunk = "rechunk"
4243

4344
ActRebalance = "rebalance"
4445
ActMoveBck = "move-bck"

cmn/owt.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const (
2525
OwtArchive // multi-obj arch
2626
OwtTransform // ETL
2727
OwtCopy // copy and move objects within cluster
28+
OwtChunks // chunks
2829
OwtRebalance // NOTE: must be the last in PUT* group
2930
//
3031
// GET and friends

core/target.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,17 @@ import (
2626
// intra-cluster data path: control structures and types
2727
type (
2828
PutParams struct {
29-
Reader io.ReadCloser
30-
Cksum *cos.Cksum // checksum to check
31-
Atime time.Time
32-
Xact Xact
33-
WorkTag string // (=> work fqn)
34-
Size int64
35-
OWT cmn.OWT
36-
SkipEC bool // don't erasure-code when finalizing
37-
ColdGET bool // this PUT is in fact a cold-GET
29+
Reader io.ReadCloser
30+
Cksum *cos.Cksum // checksum to check
31+
Atime time.Time
32+
Xact Xact
33+
WorkTag string // (=> work fqn)
34+
Size int64
35+
ChunkSize int64 // if set, the object will be chunked with this size regardless of the bucket's chunk properties
36+
OWT cmn.OWT
37+
SkipEC bool // don't erasure-code when finalizing
38+
ColdGET bool // this PUT is in fact a cold-GET
39+
Locked bool // true if the LOM is already locked by the caller
3840
}
3941
PromoteParams struct {
4042
Bck *meta.Bck // destination bucket

xact/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ var Table = map[string]Descriptor{
145145

146146
// single target (node)
147147
apc.ActResilver: {Scope: ScopeT, Startable: true, Resilver: true},
148+
apc.ActRechunk: {Scope: ScopeB, Startable: true, RefreshCap: true, ConflictRebRes: true},
148149

149150
// on-demand EC and n-way replication
150151
// (non-startable, triggered by PUT => erasure-coded or mirrored bucket)

xact/xreg/bucket.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ func RenewBckLoadLomCache(uuid string, bck *meta.Bck) RenewRes {
8181
return RenewBucketXact(apc.ActLoadLomCache, bck, Args{UUID: uuid})
8282
}
8383

84+
func RenewBckRechunks(bck *meta.Bck, uuid string, args *RechunkArgs) RenewRes {
85+
return RenewBucketXact(apc.ActRechunk, bck, Args{Custom: args, UUID: uuid})
86+
}
87+
8488
func RenewPutMirror(lom *core.LOM) RenewRes {
8589
return RenewBucketXact(apc.ActPutCopies, lom.Bck(), Args{Custom: lom})
8690
}

0 commit comments

Comments
 (0)