-
Notifications
You must be signed in to change notification settings - Fork 235
/
compressor.go
220 lines (195 loc) · 5.53 KB
/
compressor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package compressor
// NOTE: This is used from github.com/containers/image by callers that
// don't otherwise use containers/storage, so don't make this depend on any
// larger software like the graph drivers.
import (
"encoding/base64"
"io"
"io/ioutil"
"github.com/containers/storage/pkg/chunked/internal"
"github.com/containers/storage/pkg/ioutils"
"github.com/opencontainers/go-digest"
"github.com/vbatts/tar-split/archive/tar"
)
func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error {
// total written so far. Used to retrieve partial offsets in the file
dest := ioutils.NewWriteCounter(destFile)
tr := tar.NewReader(reader)
tr.RawAccounting = true
buf := make([]byte, 4096)
zstdWriter, err := internal.ZstdWriterWithLevel(dest, level)
if err != nil {
return err
}
defer func() {
if zstdWriter != nil {
zstdWriter.Close()
zstdWriter.Flush()
}
}()
restartCompression := func() (int64, error) {
var offset int64
if zstdWriter != nil {
if err := zstdWriter.Close(); err != nil {
return 0, err
}
if err := zstdWriter.Flush(); err != nil {
return 0, err
}
offset = dest.Count
zstdWriter.Reset(dest)
}
return offset, nil
}
var metadata []internal.FileMetadata
for {
hdr, err := tr.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}
rawBytes := tr.RawBytes()
if _, err := zstdWriter.Write(rawBytes); err != nil {
return err
}
payloadDigester := digest.Canonical.Digester()
payloadChecksum := payloadDigester.Hash()
payloadDest := io.MultiWriter(payloadChecksum, zstdWriter)
// Now handle the payload, if any
var startOffset, endOffset int64
checksum := ""
for {
read, errRead := tr.Read(buf)
if errRead != nil && errRead != io.EOF {
return err
}
// restart the compression only if there is
// a payload.
if read > 0 {
if startOffset == 0 {
startOffset, err = restartCompression()
if err != nil {
return err
}
}
_, err := payloadDest.Write(buf[:read])
if err != nil {
return err
}
}
if errRead == io.EOF {
if startOffset > 0 {
endOffset, err = restartCompression()
if err != nil {
return err
}
checksum = payloadDigester.Digest().String()
}
break
}
}
typ, err := internal.GetType(hdr.Typeflag)
if err != nil {
return err
}
xattrs := make(map[string]string)
for k, v := range hdr.Xattrs {
xattrs[k] = base64.StdEncoding.EncodeToString([]byte(v))
}
m := internal.FileMetadata{
Type: typ,
Name: hdr.Name,
Linkname: hdr.Linkname,
Mode: hdr.Mode,
Size: hdr.Size,
UID: hdr.Uid,
GID: hdr.Gid,
ModTime: hdr.ModTime,
AccessTime: hdr.AccessTime,
ChangeTime: hdr.ChangeTime,
Devmajor: hdr.Devmajor,
Devminor: hdr.Devminor,
Xattrs: xattrs,
Digest: checksum,
Offset: startOffset,
EndOffset: endOffset,
// ChunkSize is 0 for the last chunk
ChunkSize: 0,
ChunkOffset: 0,
ChunkDigest: checksum,
}
metadata = append(metadata, m)
}
rawBytes := tr.RawBytes()
if _, err := zstdWriter.Write(rawBytes); err != nil {
return err
}
if err := zstdWriter.Flush(); err != nil {
return err
}
if err := zstdWriter.Close(); err != nil {
return err
}
zstdWriter = nil
return internal.WriteZstdChunkedManifest(dest, outMetadata, uint64(dest.Count), metadata, level)
}
type zstdChunkedWriter struct {
tarSplitOut *io.PipeWriter
tarSplitErr chan error
}
func (w zstdChunkedWriter) Close() error {
err := <-w.tarSplitErr
if err != nil {
w.tarSplitOut.Close()
return err
}
return w.tarSplitOut.Close()
}
func (w zstdChunkedWriter) Write(p []byte) (int, error) {
select {
case err := <-w.tarSplitErr:
w.tarSplitOut.Close()
return 0, err
default:
return w.tarSplitOut.Write(p)
}
}
// zstdChunkedWriterWithLevel writes a zstd compressed tarball where each file is
// compressed separately so it can be addressed separately. Idea based on CRFS:
// https://github.com/google/crfs
// The difference with CRFS is that the zstd compression is used instead of gzip.
// The reason for it is that zstd supports embedding metadata ignored by the decoder
// as part of the compressed stream.
// A manifest json file with all the metadata is appended at the end of the tarball
// stream, using zstd skippable frames.
// The final file will look like:
// [FILE_1][FILE_2]..[FILE_N][SKIPPABLE FRAME 1][SKIPPABLE FRAME 2]
// Where:
// [FILE_N]: [ZSTD HEADER][TAR HEADER][PAYLOAD FILE_N][ZSTD FOOTER]
// [SKIPPABLE FRAME 1]: [ZSTD SKIPPABLE FRAME, SIZE=MANIFEST LENGTH][MANIFEST]
// [SKIPPABLE FRAME 2]: [ZSTD SKIPPABLE FRAME, SIZE=16][MANIFEST_OFFSET][MANIFEST_LENGTH][MANIFEST_LENGTH_UNCOMPRESSED][MANIFEST_TYPE][CHUNKED_ZSTD_MAGIC_NUMBER]
// MANIFEST_OFFSET, MANIFEST_LENGTH, MANIFEST_LENGTH_UNCOMPRESSED and CHUNKED_ZSTD_MAGIC_NUMBER are 64 bits unsigned in little endian format.
func zstdChunkedWriterWithLevel(out io.Writer, metadata map[string]string, level int) (io.WriteCloser, error) {
ch := make(chan error, 1)
r, w := io.Pipe()
go func() {
ch <- writeZstdChunkedStream(out, metadata, r, level)
io.Copy(ioutil.Discard, r)
r.Close()
close(ch)
}()
return zstdChunkedWriter{
tarSplitOut: w,
tarSplitErr: ch,
}, nil
}
// ZstdCompressor is a CompressorFunc for the zstd compression algorithm.
func ZstdCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
if level == nil {
l := 3
level = &l
}
return zstdChunkedWriterWithLevel(r, metadata, *level)
}