/
queued_blob_replicator.go
88 lines (78 loc) · 2.85 KB
/
queued_blob_replicator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package replication
import (
"context"
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/blobstore/slicing"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/util"
)
type queuedBlobReplicator struct {
source blobstore.BlobAccess
base BlobReplicator
existenceCache *digest.ExistenceCache
wait chan struct{}
}
// NewQueuedBlobReplicator creates a decorator for BlobReplicator that
// serializes and deduplicates requests. It can be used to place a limit
// on the amount of replication traffic.
//
// TODO: The current implementation is a bit simplistic, in that it does
// not guarantee fairness. Should all requests be processed in FIFO
// order? Alternatively, should we replicate objects with most waiters
// first?
func NewQueuedBlobReplicator(source blobstore.BlobAccess, base BlobReplicator, existenceCache *digest.ExistenceCache) BlobReplicator {
q := &queuedBlobReplicator{
source: source,
base: base,
existenceCache: existenceCache,
wait: make(chan struct{}, 1),
}
q.wait <- struct{}{}
return q
}
func (br *queuedBlobReplicator) ReplicateSingle(ctx context.Context, blobDigest digest.Digest) buffer.Buffer {
// Serve the read request from the source, while letting the
// replication go through the regular queueing process.
//
// This causes a duplicate read on the source, but this cannot
// be prevented reasonably. The client and the replication
// process may each run at a different pace.
return br.source.Get(ctx, blobDigest).WithTask(func() error {
if err := br.ReplicateMultiple(ctx, blobDigest.ToSingletonSet()); err != nil {
return util.StatusWrap(err, "Replication failed")
}
return nil
})
}
func (br *queuedBlobReplicator) ReplicateComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer {
return br.source.GetFromComposite(ctx, parentDigest, childDigest, slicer).WithTask(func() error {
if err := br.ReplicateMultiple(ctx, parentDigest.ToSingletonSet()); err != nil {
return util.StatusWrap(err, "Replication failed")
}
return nil
})
}
func (br *queuedBlobReplicator) ReplicateMultiple(ctx context.Context, digests digest.Set) error {
// Don't queue requests for objects that have already been
// replicated.
if br.existenceCache.RemoveExisting(digests).Empty() {
return nil
}
// Queue the request.
select {
case <-br.wait:
case <-ctx.Done():
return util.StatusFromContext(ctx)
}
// Forward the call, filtering out objects that have already
// been replicated.
digests = br.existenceCache.RemoveExisting(digests)
err := br.base.ReplicateMultiple(ctx, digests)
if err == nil {
br.existenceCache.Add(digests)
}
// Unblock the next request.
br.wait <- struct{}{}
return err
}