This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
/
copy_impl.go
60 lines (50 loc) · 1.87 KB
/
copy_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package storage
import (
"context"
"io"
"time"
"github.com/lyft/flytestdlib/ioutils"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
)
type copyImpl struct {
rawStore RawStore
metrics copyMetrics
}
type copyMetrics struct {
CopyLatency labeled.StopWatch
ComputeLengthLatency labeled.StopWatch
}
// A naiive implementation for copy that reads all data locally then writes them to destination.
// TODO: We should upstream an API change to stow to implement copy more natively. E.g. Use s3 copy:
// https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjectUsingREST.html
func (c copyImpl) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error {
rc, err := c.rawStore.ReadRaw(ctx, source)
if err != nil {
return err
}
length := int64(0)
if _, isSeeker := rc.(io.Seeker); !isSeeker {
// If the returned ReadCloser doesn't implement Seeker interface, then the underlying writer won't be able to
// calculate content length on its own. Some implementations (e.g. S3 Stow Store) will error if it can't.
var raw []byte
raw, err = ioutils.ReadAll(rc, c.metrics.ComputeLengthLatency.Start(ctx))
if err != nil {
return err
}
length = int64(len(raw))
}
return c.rawStore.WriteRaw(ctx, destination, length, Options{}, rc)
}
func newCopyMetrics(scope promutils.Scope) copyMetrics {
return copyMetrics{
CopyLatency: labeled.NewStopWatch("overall", "Overall copy latency", time.Millisecond, scope, labeled.EmitUnlabeledMetric),
ComputeLengthLatency: labeled.NewStopWatch("length", "Latency involved in computing length of content before writing.", time.Millisecond, scope, labeled.EmitUnlabeledMetric),
}
}
func newCopyImpl(store RawStore, metricsScope promutils.Scope) copyImpl {
return copyImpl{
rawStore: store,
metrics: newCopyMetrics(metricsScope.NewSubScope("copy")),
}
}