Skip to content

Commit 430ae82

Browse files
committed
tool: analyze-data: support remote storage
We add support for analyzing ssts from cloud storage (backups). A tool option is used to register a function that can take a URI and return a `remote.Storage` implementation. With remote storage, retrieving the size of each file one by one takes too long when there are many files, so we don't do that upfront. Most of the refactoring is to support tracking progress by file count only in case of remote storage.
1 parent bc68d6c commit 430ae82

File tree

5 files changed

+116
-55
lines changed

5 files changed

+116
-55
lines changed

sstable/compressionanalyzer/file_analyzer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@ func (fa *FileAnalyzer) Close() {
4848
*fa = FileAnalyzer{}
4949
}
5050

51-
// SSTable analyzes the blocks in an sstable file.
51+
// SSTable analyzes the blocks in an sstable file and closes the readable (even
52+
// in error cases).
5253
func (fa *FileAnalyzer) SSTable(ctx context.Context, readable objstorage.Readable) error {
5354
r, err := sstable.NewReader(ctx, readable, fa.sstReadOpts)
5455
if err != nil {
56+
_ = readable.Close()
5557
return err
5658
}
5759
defer func() { _ = r.Close() }()

tool/db.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func newDB(
8888
openErrEnhancer func(error) error,
8989
openOptions []OpenOption,
9090
exciseSpanFn DBExciseSpanFn,
91+
remoteStorageFn DBRemoteStorageFn,
9192
) *dbT {
9293
d := &dbT{
9394
opts: opts,
@@ -96,6 +97,7 @@ func newDB(
9697
openErrEnhancer: openErrEnhancer,
9798
openOptions: openOptions,
9899
exciseSpanFn: exciseSpanFn,
100+
remoteStorageFn: remoteStorageFn,
99101
}
100102
d.fmtKey.mustSet("quoted")
101103
d.fmtValue.mustSet("[%x]")

tool/db_analyze_data.go

Lines changed: 109 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@ import (
1313
"math/rand/v2"
1414
"os"
1515
"slices"
16+
"strings"
1617
"time"
1718

1819
"github.com/cockroachdb/errors"
1920
"github.com/cockroachdb/pebble/internal/base"
2021
"github.com/cockroachdb/pebble/internal/humanize"
2122
"github.com/cockroachdb/pebble/objstorage"
2223
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
24+
"github.com/cockroachdb/pebble/objstorage/remote"
25+
"github.com/cockroachdb/pebble/sstable"
2326
"github.com/cockroachdb/pebble/sstable/compressionanalyzer"
2427
"github.com/cockroachdb/pebble/vfs"
2528
"github.com/cockroachdb/tokenbucket"
@@ -34,28 +37,38 @@ func (d *dbT) runAnalyzeData(cmd *cobra.Command, args []string) {
3437
isTTY := isTTY(stdout)
3538

3639
dir := args[0]
37-
if err := d.initOptions(dir); err != nil {
38-
fmt.Fprintf(stderr, "error initializing options: %s\n", err)
39-
return
40+
41+
var dbStorage dbStorage
42+
var isRemote bool
43+
if strings.Contains(dir, "://") {
44+
if d.remoteStorageFn == nil {
45+
fmt.Fprintf(stderr, "path looks like remote storage, but remote storage not configuered.\n")
46+
return
47+
}
48+
remoteStorageImpl, err := d.remoteStorageFn(dir)
49+
if err != nil {
50+
fmt.Fprintf(stderr, "error initializing remote storage: %s\n", err)
51+
return
52+
}
53+
dbStorage = newRemoteStorage(remoteStorageImpl)
54+
isRemote = true
55+
} else {
56+
dbStorage = newVFSStorage(d.opts.FS, dir)
4057
}
4158
rng := rand.New(rand.NewPCG(rand.Uint64(), rand.Uint64()))
42-
dbStorage := newVFSStorage(d.opts.FS, dir)
59+
if isTTY {
60+
fmt.Fprintf(stdout, "Listing files and sizes...\n")
61+
}
4362
files, err := makeFileSet(dbStorage, rng)
4463
if err != nil {
4564
fmt.Fprintf(stderr, "error loading file list: %s\n", err)
4665
return
4766
}
48-
if files.Done() {
67+
numFiles, totalSize := files.Remaining()
68+
if numFiles == 0 {
4969
fmt.Fprintf(stderr, "no sstables found\n")
5070
return
5171
}
52-
totalSize := files.TotalSize()
53-
// We do not recalculate the target size every time we refresh the file list.
54-
// If the database is growing rapidly, we might not be able to keep up.
55-
targetSize := totalSize
56-
if d.analyzeData.samplePercent > 0 && d.analyzeData.samplePercent < 100 {
57-
targetSize = (totalSize*int64(d.analyzeData.samplePercent) + 99) / 100
58-
}
5972
var readLimiter *tokenbucket.TokenBucket
6073
if d.analyzeData.readMBPerSec > 0 {
6174
readLimiter = &tokenbucket.TokenBucket{}
@@ -64,7 +77,12 @@ func (d *dbT) runAnalyzeData(cmd *cobra.Command, args []string) {
6477
readLimiter.Init(rate, burst)
6578
}
6679
if isTTY {
67-
fmt.Fprintf(stdout, "Found %d files, total size %s.\n", len(files.files), humanize.Bytes.Int64(totalSize))
80+
if isRemote {
81+
// We don't obtain the sizes of the remote objects.
82+
fmt.Fprintf(stdout, "Found %d objects.\n", numFiles)
83+
} else {
84+
fmt.Fprintf(stdout, "Found %d files, total size %s.\n", numFiles, humanize.Bytes.Int64(totalSize))
85+
}
6886
if d.analyzeData.readMBPerSec > 0 {
6987
fmt.Fprintf(stdout, "Limiting read bandwidth to %s/s.\n", humanize.Bytes.Int64(int64(d.analyzeData.readMBPerSec)<<20))
7088
} else {
@@ -84,11 +102,28 @@ func (d *dbT) runAnalyzeData(cmd *cobra.Command, args []string) {
84102
startTime := time.Now()
85103
lastReportTime := startTime
86104

87-
analyzer := compressionanalyzer.NewFileAnalyzer(readLimiter, d.opts.MakeReaderOptions())
88-
var sampled int64
105+
readerOptions := sstable.ReaderOptions{
106+
Comparers: d.comparers,
107+
Mergers: d.mergers,
108+
KeySchemas: d.opts.KeySchemas,
109+
}
110+
analyzer := compressionanalyzer.NewFileAnalyzer(readLimiter, readerOptions)
111+
var sampledFiles int
112+
var sampledBytes int64
89113
const reportPeriod = 10 * time.Second
90114
for {
91-
shouldStop := files.Done() || sampled >= targetSize ||
115+
remainingFiles, remainingBytes := files.Remaining()
116+
var percentage float64
117+
if remainingFiles == 0 {
118+
percentage = 100
119+
} else if isRemote {
120+
// We don't obtain the sizes of all remote objects, so we use the number
121+
// of files.
122+
percentage = float64(sampledFiles) * 100 / float64(sampledFiles+remainingFiles)
123+
} else {
124+
percentage = float64(sampledBytes) * 100 / float64(sampledBytes+remainingBytes)
125+
}
126+
shouldStop := percentage >= float64(d.analyzeData.samplePercent) ||
92127
(d.analyzeData.timeout > 0 && time.Since(startTime) > d.analyzeData.timeout)
93128
// Every 10 seconds, we:
94129
// - print the current results and progress (if on a tty);
@@ -103,11 +138,8 @@ func (d *dbT) runAnalyzeData(cmd *cobra.Command, args []string) {
103138
if isTTY || shouldStop {
104139
partialResults := analyzer.Buckets().String(minSamples)
105140
fmt.Fprintf(stdout, "\n%s\n", partialResults)
106-
percentage := min(float64(sampled*100)/float64(totalSize), 100)
107-
if files.Done() {
108-
percentage = 100
109-
}
110-
fmt.Fprintf(stdout, "Sampled %.2f%% (%s)\n", percentage, humanize.Bytes.Int64(sampled))
141+
fmt.Fprintf(stdout, "Sampled %s files, %s (%.2f%%)\n",
142+
humanize.Count.Int64(int64(sampledFiles)), humanize.Bytes.Int64(sampledBytes), percentage)
111143
}
112144
if err := analyzeSaveCSVFile(analyzer, d.analyzeData.outputCSVFile); err != nil {
113145
fmt.Fprintf(stderr, "error writing CSV file: %s\n", err)
@@ -116,39 +148,40 @@ func (d *dbT) runAnalyzeData(cmd *cobra.Command, args []string) {
116148
if shouldStop {
117149
return
118150
}
119-
if err := files.Refresh(); err != nil {
120-
fmt.Fprintf(stderr, "error loading file list: %s\n", err)
121-
return
151+
if !isRemote {
152+
if err := files.Refresh(); err != nil {
153+
fmt.Fprintf(stderr, "error loading file list: %s\n", err)
154+
return
155+
}
122156
}
123157
lastReportTime = time.Now()
124158
}
125159
// Sample a file and analyze it.
126-
filename, size := files.Sample()
127-
path := d.opts.FS.PathJoin(dir, filename)
128-
if err := d.analyzeSSTable(analyzer, path); err != nil {
160+
filename := files.Sample()
161+
size, err := d.analyzeSSTable(analyzer, dbStorage, filename)
162+
if err != nil {
129163
// We silently ignore errors from files that are deleted from under us.
130164
if !errors.Is(err, os.ErrNotExist) {
131165
// Note that errors can happen if the sstable file wasn't completed;
132166
// they should not stop the process.
133-
fmt.Fprintf(stderr, "error reading file %s: %s\n", path, err)
167+
fmt.Fprintf(stderr, "error reading file %s: %s\n", filename, err)
134168
}
135169
continue
136170
}
137-
sampled += size
171+
sampledBytes += size
172+
sampledFiles++
138173
}
139174
}
140175

141-
func (d *dbT) analyzeSSTable(analyzer *compressionanalyzer.FileAnalyzer, path string) error {
142-
file, err := d.opts.FS.Open(path)
143-
if err != nil {
144-
return err
145-
}
146-
readable, err := objstorageprovider.NewFileReadable(file, d.opts.FS, objstorageprovider.NewReadaheadConfig(), path)
176+
func (d *dbT) analyzeSSTable(
177+
analyzer *compressionanalyzer.FileAnalyzer, dbStorage dbStorage, name string,
178+
) (size int64, _ error) {
179+
readable, err := dbStorage.Open(name)
147180
if err != nil {
148-
return errors.CombineErrors(err, file.Close())
181+
return 0, err
149182
}
150-
defer func() { _ = readable.Close() }()
151-
return analyzer.SSTable(context.Background(), readable)
183+
size = readable.Size()
184+
return size, analyzer.SSTable(context.Background(), readable)
152185
}
153186

154187
func analyzeSaveCSVFile(a *compressionanalyzer.FileAnalyzer, path string) error {
@@ -201,6 +234,34 @@ func (l *vfsStorage) Open(name string) (objstorage.Readable, error) {
201234
return readable, nil
202235
}
203236

237+
type remoteStorage struct {
238+
storage remote.Storage
239+
}
240+
241+
func newRemoteStorage(storage remote.Storage) *remoteStorage {
242+
return &remoteStorage{storage: storage}
243+
}
244+
245+
var _ dbStorage = (*remoteStorage)(nil)
246+
247+
func (r *remoteStorage) List() ([]string, error) {
248+
return r.storage.List("", "")
249+
}
250+
251+
func (r *remoteStorage) Size(name string) int64 {
252+
// Retrieving the size for each file from cloud storage would take too long,
253+
// just make up a fixed value.
254+
return 1024 * 1024
255+
}
256+
257+
func (r *remoteStorage) Open(name string) (objstorage.Readable, error) {
258+
objReader, size, err := r.storage.ReadObject(context.Background(), name)
259+
if err != nil {
260+
return nil, err
261+
}
262+
return objstorageprovider.NewRemoteReadable(objReader, size), nil
263+
}
264+
204265
// We avoid files that are very large to prevent excessive memory usage. Note
205266
// that we have seen cases where large files contain a giant top index block, so
206267
// even getting the block layout of the file would use a lot of memory.
@@ -210,8 +271,9 @@ type fileSet struct {
210271
dbStorage dbStorage
211272
rng *rand.Rand
212273

213-
files []fileInSet
214-
sampleIdx []int
274+
files []fileInSet
275+
sampleIdx []int
276+
bytesToSample int64
215277
}
216278

217279
type fileInSet struct {
@@ -299,9 +361,11 @@ func (s *fileSet) Refresh() error {
299361
}
300362
// Generate the samples.
301363
s.sampleIdx = s.sampleIdx[:0]
364+
s.bytesToSample = 0
302365
for i := range s.files {
303366
if !s.files[i].sampled {
304367
s.sampleIdx = append(s.sampleIdx, i)
368+
s.bytesToSample += s.files[i].size
305369
}
306370
}
307371
slices.SortFunc(s.sampleIdx, func(i, j int) int {
@@ -310,25 +374,18 @@ func (s *fileSet) Refresh() error {
310374
return nil
311375
}
312376

313-
func (s *fileSet) TotalSize() int64 {
314-
var sum int64
315-
for i := range s.files {
316-
sum += s.files[i].size
317-
}
318-
return sum
319-
}
320-
321-
func (s *fileSet) Done() bool {
322-
return len(s.sampleIdx) == 0
377+
func (s *fileSet) Remaining() (files int, bytes int64) {
378+
return len(s.sampleIdx), s.bytesToSample
323379
}
324380

325381
// Sample returns a random file from the set (which was not previously sampled),
326382
// weighted by size.
327-
func (s *fileSet) Sample() (filename string, size int64) {
383+
func (s *fileSet) Sample() (filename string) {
328384
idx := s.sampleIdx[0]
329385
s.sampleIdx = s.sampleIdx[1:]
330386
s.files[idx].sampled = true
331-
return s.files[idx].filename, s.files[idx].size
387+
s.bytesToSample -= s.files[idx].size
388+
return s.files[idx].filename
332389
}
333390

334391
func isTTY(out io.Writer) bool {

tool/db_analyze_data_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestFileSetSampling(t *testing.T) {
5151
dbStorage := newVFSStorage(fsWrapper{memFS}, "")
5252
fs, err := makeFileSet(dbStorage, rng)
5353
require.NoError(t, err)
54-
file, _ := fs.Sample()
54+
file := fs.Sample()
5555
if file != largeFileName {
5656
smallFileChosen++
5757
}

tool/tool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func New(opts ...Option) *T {
183183
opt(t)
184184
}
185185

186-
t.db = newDB(&t.opts, t.comparers, t.mergers, t.openErrEnhancer, t.openOptions, t.exciseSpanFn)
186+
t.db = newDB(&t.opts, t.comparers, t.mergers, t.openErrEnhancer, t.openOptions, t.exciseSpanFn, t.remoteStorageFn)
187187
t.find = newFind(&t.opts, t.comparers, t.defaultComparer, t.mergers)
188188
t.lsm = newLSM(&t.opts, t.comparers)
189189
t.manifest = newManifest(&t.opts, t.comparers)

0 commit comments

Comments
 (0)