/
copy.go
78 lines (64 loc) · 1.77 KB
/
copy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package vparquet
import (
"context"
"fmt"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/pkg/errors"
)
func CopyBlock(ctx context.Context, meta *backend.BlockMeta, from backend.Reader, to backend.Writer) error {
blockID := meta.BlockID
tenantID := meta.TenantID
// Copy streams, efficient but can't cache.
copyStream := func(name string) error {
reader, size, err := from.StreamReader(ctx, name, blockID, tenantID)
if err != nil {
return errors.Wrapf(err, "error reading %s", name)
}
defer reader.Close()
return to.StreamWriter(ctx, name, blockID, tenantID, reader, size)
}
// Read entire object and attempt to cache
copy := func(name string) error {
b, err := from.Read(ctx, name, blockID, tenantID, true)
if err != nil {
return errors.Wrapf(err, "error reading %s", name)
}
return to.Write(ctx, name, blockID, tenantID, b, true)
}
// Data
err := copyStream(DataFileName)
if err != nil {
return err
}
// Bloom
for i := 0; i < common.ValidateShardCount(int(meta.BloomShardCount)); i++ {
err = copy(common.BloomName(i))
if err != nil {
return err
}
}
// Meta
err = to.WriteBlockMeta(ctx, meta)
return err
}
func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, bloom *common.ShardedBloomFilter) error {
// bloom
blooms, err := bloom.Marshal()
if err != nil {
return err
}
for i, bloom := range blooms {
nameBloom := common.BloomName(i)
err := w.Write(ctx, nameBloom, meta.BlockID, meta.TenantID, bloom, true)
if err != nil {
return fmt.Errorf("unexpected error writing bloom-%d %w", i, err)
}
}
// meta
err = w.WriteBlockMeta(ctx, meta)
if err != nil {
return fmt.Errorf("unexpected error writing meta %w", err)
}
return nil
}