-
Notifications
You must be signed in to change notification settings - Fork 75
/
module.go
278 lines (254 loc) · 8.02 KB
/
module.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manifest
import (
"bytes"
"context"
"fmt"
"io"
"os"
)
import (
"go.uber.org/multierr"
)
// Blob is an anonymous file associated with a digest.
type Blob interface {
Digest() *Digest
Open(context.Context) (io.ReadCloser, error)
}
type memoryBlob struct {
digest Digest
content []byte
}
var _ Blob = (*memoryBlob)(nil)
type memoryBlobOptions struct {
validateDigest bool
}
// MemoryBlobOption are options passed when creating a new memory blob.
type MemoryBlobOption func(*memoryBlobOptions)
// MemoryBlobWithDigestValidation checks that the passed content and digest match.
func MemoryBlobWithDigestValidation() MemoryBlobOption {
return func(opts *memoryBlobOptions) {
opts.validateDigest = true
}
}
// NewMemoryBlob takes a digest and a content, and turns it into an in-memory
// representation of a blob, which returns the digest and an io.ReadCloser for
// its content.
func NewMemoryBlob(digest Digest, content []byte, opts ...MemoryBlobOption) (Blob, error) {
var config memoryBlobOptions
for _, option := range opts {
option(&config)
}
if config.validateDigest {
digester, err := NewDigester(digest.Type())
if err != nil {
return nil, err
}
contentDigest, err := digester.Digest(bytes.NewReader(content))
if err != nil {
return nil, err
}
if !digest.Equal(*contentDigest) {
return nil, fmt.Errorf("digest and content mismatch")
}
}
return &memoryBlob{
digest: digest,
content: content,
}, nil
}
func (b *memoryBlob) Digest() *Digest {
if b == nil {
return nil
}
return &b.digest
}
func (b *memoryBlob) Open(context.Context) (io.ReadCloser, error) {
if b == nil {
return nil, os.ErrNotExist
}
return io.NopCloser(bytes.NewReader(b.content)), nil
}
// BlobSet represents a set of deduplicated blobs by their digests.
type BlobSet struct {
digestToBlob map[string]Blob
}
type blobSetOptions struct {
validateContent bool
skipNilBlobs bool
}
// BlobSetOption are options passed when creating a new blob set.
type BlobSetOption func(*blobSetOptions)
// BlobSetWithContentValidation turns on content validation for all the blobs
// when creating a new BlobSet. If this option is on, blobs with the same digest
// must have the same content (in case blobs with the same digest are sent). If
// this option is not passed, then the latest duplicated blob digest content
// will prevail in the set.
func BlobSetWithContentValidation() BlobSetOption {
return func(opts *blobSetOptions) {
opts.validateContent = true
}
}
// BlobSetWithSkipNilBlobs allows passing nil blobs in the slice of blobs. The default behavior is
// that if you pass a nil blob in the slice, you'll get an error from the `NewBlobSet` constructor.
// If you pass this option, any nil blob will be skipped and the blob set will be built only from
// the non-nil ones.
func BlobSetWithSkipNilBlobs() BlobSetOption {
return func(opts *blobSetOptions) {
opts.skipNilBlobs = true
}
}
// NewBlobSet receives a slice of blobs, and de-duplicates them into a BlobSet.
func NewBlobSet(ctx context.Context, blobs []Blob, opts ...BlobSetOption) (*BlobSet, error) {
var config blobSetOptions
for _, option := range opts {
option(&config)
}
digestToBlobs := make(map[string]Blob, len(blobs))
for i, b := range blobs {
if b == nil {
if config.skipNilBlobs {
continue
}
return nil, fmt.Errorf("blobs[%d]: nil blob", i)
}
digestStr := b.Digest().String()
if config.validateContent {
existingBlob, alreadyPresent := digestToBlobs[digestStr]
if alreadyPresent {
equalContent, err := BlobEqual(ctx, b, existingBlob)
if err != nil {
return nil, fmt.Errorf("compare duplicated blobs with digest %q: %w", digestStr, err)
}
if !equalContent {
return nil, fmt.Errorf("duplicated blobs with digest %q have different contents", digestStr)
}
}
}
digestToBlobs[digestStr] = b
}
return &BlobSet{digestToBlob: digestToBlobs}, nil
}
// BlobFor returns the blob for the passed digest string, or nil, ok=false if
// the digest has no blob in the set.
func (s *BlobSet) BlobFor(digest string) (Blob, bool) {
blob, ok := s.digestToBlob[digest]
if !ok {
return nil, false
}
return blob, true
}
// Blobs returns a slice of the blobs in the set.
func (s *BlobSet) Blobs() []Blob {
blobs := make([]Blob, 0, len(s.digestToBlob))
for _, b := range s.digestToBlob {
blobs = append(blobs, b)
}
return blobs
}
// NewMemoryBlobFromReader creates a memory blob from content, which is read
// until completion. The returned blob contains all bytes read. If you are using
// this in a loop, you might better use NewMemoryBlobFromReaderWithDigester so
// you can reuse your digester.
func NewMemoryBlobFromReader(content io.Reader) (Blob, error) {
digester, err := NewDigester(DigestTypeShake256)
if err != nil {
return nil, err
}
return NewMemoryBlobFromReaderWithDigester(content, digester)
}
// NewMemoryBlobFromReaderWithDigester creates a memory blob from content with
// the passed digester. The content is read until completion. The returned blob
// contains all bytes read.
func NewMemoryBlobFromReaderWithDigester(content io.Reader, digester Digester) (Blob, error) {
var contentInMemory bytes.Buffer
tee := io.TeeReader(content, &contentInMemory)
digest, err := digester.Digest(tee)
if err != nil {
return nil, err
}
return &memoryBlob{
digest: *digest,
content: contentInMemory.Bytes(),
}, nil
}
// BlobEqual returns true if blob a is the same as blob b. The digest is
// checked for equality and the content bytes compared.
//
// An error is returned if an unexpected I/O error occurred when opening,
// reading, or closing either blob.
func BlobEqual(ctx context.Context, a, b Blob) (_ bool, retErr error) {
const blockSize = 4096
if !a.Digest().Equal(*b.Digest()) {
// digests don't match
return false, nil
}
aFile, err := a.Open(ctx)
if err != nil {
return false, err
}
defer func() { retErr = multierr.Append(retErr, aFile.Close()) }()
bFile, err := b.Open(ctx)
if err != nil {
return false, err
}
defer func() { retErr = multierr.Append(retErr, bFile.Close()) }()
// Read blockSize from a, then from b, and compare.
aBlock := make([]byte, blockSize)
bBlock := make([]byte, blockSize)
for {
aN, aErr := aFile.Read(aBlock)
bN, bErr := io.ReadAtLeast(bFile, bBlock[:aN], aN) // exactly aN bytes
// We're running unexpected error processing (not EOF) before comparing
// bytes because it doesn't matter if the returned bytes match if an
// error occurred before an expected EOF.
if bErr == io.ErrUnexpectedEOF {
// b is shorter; we can error early
return false, nil
}
if aErr != nil && aErr != io.EOF {
// unexpected read error
return false, aErr
}
if bErr != nil && bErr != io.EOF {
// unexpected read error
return false, bErr
}
if !bytes.Equal(aBlock[:aN], bBlock[:bN]) {
// Read content doesn't match.
return false, nil
}
if aErr == io.EOF || bErr == io.EOF {
// EOF
break
}
}
aN, aErr := aFile.Read(aBlock[:1])
bN, bErr := bFile.Read(bBlock[:1])
if aN == 0 && bN == 0 && aErr == io.EOF && bErr == io.EOF {
// a and b are at EOF with no more data for us
return true, nil
}
// either a or b are longer
return false, multierr.Append(nilEOF(aErr), nilEOF(bErr))
}
// nilEOF maps io.EOF to nil
func nilEOF(err error) error {
if err == io.EOF {
return nil
}
return err
}