/
targz.go
119 lines (106 loc) · 3.12 KB
/
targz.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Package shard provides Extract(shard), Create(shard), and associated methods
// across all suppported archival formats (see cmn/archive/mime.go)
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
*/
package shard
import (
"archive/tar"
"compress/gzip"
"io"
"github.com/NVIDIA/aistore/cmn/archive"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/ext/dsort/ct"
"github.com/NVIDIA/aistore/fs"
)
type tgzRW struct {
ext string
}
// interface guard
var _ RW = (*tgzRW)(nil)
func NewTargzRW(ext string) RW { return &tgzRW{ext: ext} }
func (*tgzRW) IsCompressed() bool { return true }
func (*tgzRW) SupportsOffset() bool { return true }
func (*tgzRW) MetadataSize() int64 { return archive.TarBlockSize } // size of tar header with padding
// Extract reads the tarball f and extracts its metadata.
// Writes work tar
func (trw *tgzRW) Extract(lom *core.LOM, r cos.ReadReaderAt, extractor RecordExtractor, toDisk bool) (int64, int, error) {
ar, err := archive.NewReader(trw.ext, r)
if err != nil {
return 0, 0, err
}
workFQN := fs.CSM.Gen(lom, ct.DsortFileType, "") // tarFQN
wfh, err := cos.CreateFile(workFQN)
if err != nil {
return 0, 0, err
}
c := &rcbCtx{parent: trw, extractor: extractor, shardName: lom.ObjName, toDisk: toDisk}
c.tw = tar.NewWriter(wfh)
buf, slab := core.T.PageMM().AllocSize(lom.SizeBytes())
c.buf = buf
_, err = ar.Range("", c.xtar)
slab.Free(buf)
if err == nil {
cos.Close(c.tw)
} else {
_ = c.tw.Close()
}
cos.Close(wfh)
return c.extractedSize, c.extractedCount, err
}
// Create creates a new shard locally based on the Shard.
// Note that the order of closing must be trw, gzw, then finally tarball.
func (*tgzRW) Create(s *Shard, tarball io.Writer, loader ContentLoader) (written int64, err error) {
var (
n int64
needFlush bool
gzw, _ = gzip.NewWriterLevel(tarball, gzip.BestSpeed)
tw = tar.NewWriter(gzw)
rdReader = newTarRecordDataReader()
)
defer func() {
rdReader.free()
cos.Close(tw)
cos.Close(gzw)
}()
for _, rec := range s.Records.All() {
for _, obj := range rec.Objects {
switch obj.StoreType {
case OffsetStoreType:
if needFlush {
// We now will write directly to the tarball file so we need
// to flush everything what we have written so far.
if err := tw.Flush(); err != nil {
return written, err
}
needFlush = false
}
if n, err = loader.Load(gzw, rec, obj); err != nil {
return written + n, err
}
// pad to 512 bytes
diff := cos.CeilAlignInt64(n, archive.TarBlockSize) - n
if diff > 0 {
if _, err = gzw.Write(padBuf[:diff]); err != nil {
return written + n, err
}
n += diff
}
debug.Assert(diff >= 0 && diff < archive.TarBlockSize)
case SGLStoreType, DiskStoreType:
rdReader.reinit(tw, obj.Size, obj.MetadataSize)
if n, err = loader.Load(rdReader, rec, obj); err != nil {
return written + n, err
}
written += n
needFlush = true
default:
debug.Assert(false, obj.StoreType)
}
written += n
}
}
return written, nil
}