Skip to content

Commit 7598e14

Browse files
committed
objstorage: check for missing file in remoteReadable
An external file can go missing after we "open" it (i.e. after we obtain a `remote.ObjectReader`). This should be marked as a corruption error; this commit fixes this and adds tests.
1 parent 4c72d8f commit 7598e14

File tree

7 files changed

+152
-22
lines changed

7 files changed

+152
-22
lines changed

objstorage/objstorageprovider/remote.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func (p *provider) remoteOpenForReading(
348348
}
349349
return nil, err
350350
}
351-
return p.newRemoteReadable(reader, size, meta.DiskFileNum), nil
351+
return p.newRemoteReadable(reader, size, meta.DiskFileNum, meta.Remote.Storage.IsNotExistError), nil
352352
}
353353

354354
func (p *provider) remoteSize(meta objstorage.ObjectMetadata) (int64, error) {

objstorage/objstorageprovider/remote_backing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func (p *provider) AttachRemoteObjects(
263263
_ = p.sharedUnref(d.meta)
264264
// TODO(radu): clean up references previously created in this loop.
265265
if d.meta.Remote.Storage.IsNotExistError(err) {
266-
return nil, errors.Errorf("origin marker object %q does not exist;"+
266+
return nil, base.CorruptionErrorf("origin marker object %q does not exist;"+
267267
" object probably removed from the provider which created the backing", refName)
268268
}
269269
return nil, errors.Wrapf(err, "checking origin's marker object %s", refName)

objstorage/objstorageprovider/remote_readable.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,27 @@ const remoteReadaheadSizeForCompaction = 8 * 1024 * 1024 /* 8MB */
3535
// remote.ObjectReader returned by remote.Storage.ReadObject. It is stateless
3636
// and can be called concurrently.
3737
type remoteReadable struct {
38-
objReader remote.ObjectReader
39-
size int64
40-
fileNum base.DiskFileNum
41-
cache *sharedcache.Cache
38+
objReader remote.ObjectReader
39+
size int64
40+
fileNum base.DiskFileNum
41+
cache *sharedcache.Cache
42+
errIsNotExist func(error) bool
4243
}
4344

4445
var _ objstorage.Readable = (*remoteReadable)(nil)
4546

4647
func (p *provider) newRemoteReadable(
47-
objReader remote.ObjectReader, size int64, fileNum base.DiskFileNum,
48+
objReader remote.ObjectReader,
49+
size int64,
50+
fileNum base.DiskFileNum,
51+
errIsNotExist func(error) bool,
4852
) *remoteReadable {
4953
return &remoteReadable{
50-
objReader: objReader,
51-
size: size,
52-
fileNum: fileNum,
53-
cache: p.remote.cache,
54+
objReader: objReader,
55+
size: size,
56+
fileNum: fileNum,
57+
cache: p.remote.cache,
58+
errIsNotExist: errIsNotExist,
5459
}
5560
}
5661

@@ -64,14 +69,21 @@ func (r *remoteReadable) ReadAt(ctx context.Context, p []byte, offset int64) err
6469
func (r *remoteReadable) readInternal(
6570
ctx context.Context, p []byte, offset int64, forCompaction bool,
6671
) error {
72+
var err error
6773
if r.cache != nil {
6874
flags := sharedcache.ReadFlags{
6975
// Don't add data to the cache if this read is for a compaction.
7076
ReadOnly: forCompaction,
7177
}
72-
return r.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, r.size, flags)
78+
err = r.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, r.size, flags)
79+
} else {
80+
err = r.objReader.ReadAt(ctx, p, offset)
81+
}
82+
if err != nil && r.errIsNotExist(err) {
83+
// If a file goes missing, we consider this a corruption error.
84+
err = base.MarkCorruptionError(err)
7385
}
74-
return r.objReader.ReadAt(ctx, p, offset)
86+
return err
7587
}
7688

7789
func (r *remoteReadable) Close() error {

objstorage/objstorageprovider/remote_readable_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import (
1313
"testing"
1414

1515
"github.com/cockroachdb/datadriven"
16+
"github.com/cockroachdb/pebble/internal/base"
1617
"github.com/cockroachdb/pebble/objstorage"
18+
"github.com/cockroachdb/pebble/objstorage/remote"
19+
"github.com/cockroachdb/pebble/vfs"
1720
"github.com/stretchr/testify/require"
1821
)
1922

@@ -106,3 +109,40 @@ func TestRemoteReadHandle(t *testing.T) {
106109
}
107110
})
108111
}
112+
113+
// TestErrorWhenObjectDisappears verifies that the provider returns a corruption
114+
// error when we read from an opened object that disappears from under us.
115+
func TestErrorWhenObjectDisappears(t *testing.T) {
116+
remoteStorage := remote.NewInMem()
117+
settings := DefaultSettings(vfs.NewMem(), "")
118+
settings.Remote.StorageFactory = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
119+
"locator": remoteStorage,
120+
})
121+
settings.Remote.CreateOnSharedLocator = "locator"
122+
settings.Remote.CreateOnShared = remote.CreateOnSharedAll
123+
provider, err := Open(settings)
124+
require.NoError(t, err)
125+
defer provider.Close()
126+
require.NoError(t, provider.SetCreatorID(1))
127+
128+
ctx := context.Background()
129+
writable, objMeta, err := provider.Create(ctx, base.FileTypeTable, 1, objstorage.CreateOptions{
130+
PreferSharedStorage: true,
131+
})
132+
require.NoError(t, err)
133+
require.NotNil(t, objMeta.Remote.Storage)
134+
require.NoError(t, writable.Write([]byte("hello")))
135+
require.NoError(t, writable.Finish())
136+
137+
readable, err := provider.OpenForReading(ctx, base.FileTypeTable, 1, objstorage.OpenOptions{})
138+
require.NoError(t, err)
139+
140+
// Delete all objects from the store and expect to get a corruption error.
141+
objects, err := remoteStorage.List("", "")
142+
require.NoError(t, err)
143+
for _, o := range objects {
144+
require.NoError(t, remoteStorage.Delete(o))
145+
}
146+
err = readable.ReadAt(ctx, make([]byte, 1), 0)
147+
require.True(t, base.IsCorruptionError(err))
148+
}

objstorage/objstorageprovider/testdata/provider/shared_attach_after_unref

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ p5b2 102
8383
----
8484
<remote> create object "1ab5-5-000002.sst.ref.6.000102"
8585
<remote> close writer for "1ab5-5-000002.sst.ref.6.000102" after 0 bytes
86-
<remote> size of object "1ab5-5-000002.sst.ref.5.000002": error: file does not exist
86+
<remote> size of object "1ab5-5-000002.sst.ref.5.000002": error: in-mem remote storage object does not exist
8787
<remote> delete object "1ab5-5-000002.sst.ref.6.000102"
8888
<remote> list (prefix="1ab5-5-000002.sst.ref.", delimiter="")
8989
<remote> delete object "1ab5-5-000002.sst"

objstorage/remote/mem.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import (
88
"bytes"
99
"context"
1010
"io"
11-
"os"
1211
"strings"
1312
"sync"
13+
14+
"github.com/cockroachdb/errors"
1415
)
1516

1617
// NewInMem returns an in-memory implementation of the remote.Storage
@@ -49,25 +50,32 @@ func (s *inMemStore) ReadObject(
4950
if err != nil {
5051
return nil, 0, err
5152
}
52-
return &inMemReader{data: obj.data}, int64(len(obj.data)), nil
53+
return &inMemReader{objName: objName, store: s}, int64(len(obj.data)), nil
5354
}
5455

5556
type inMemReader struct {
56-
data []byte
57+
objName string
58+
store *inMemStore
5759
}
5860

5961
var _ ObjectReader = (*inMemReader)(nil)
6062

6163
func (r *inMemReader) ReadAt(ctx context.Context, p []byte, offset int64) error {
62-
if offset+int64(len(p)) > int64(len(r.data)) {
64+
// We don't just store obj.data in the inMemReader because we want to emit an
65+
// error if the object is deleted from under us.
66+
obj, err := r.store.getObj(r.objName)
67+
if err != nil {
68+
return err
69+
}
70+
if offset+int64(len(p)) > int64(len(obj.data)) {
6371
return io.EOF
6472
}
65-
copy(p, r.data[offset:])
73+
copy(p, obj.data[offset:])
6674
return nil
6775
}
6876

6977
func (r *inMemReader) Close() error {
70-
r.data = nil
78+
r.store = nil
7179
return nil
7280
}
7381

@@ -135,15 +143,19 @@ func (s *inMemStore) Size(objName string) (int64, error) {
135143
}
136144

137145
func (s *inMemStore) IsNotExistError(err error) bool {
138-
return err == os.ErrNotExist
146+
return errors.Is(err, inMemStoreNotExistErr)
139147
}
140148

149+
// We use a custom "not exists" error to make sure that callers correctly use
150+
// IsNotExistError.
151+
var inMemStoreNotExistErr = errors.Newf("in-mem remote storage object does not exist")
152+
141153
func (s *inMemStore) getObj(name string) (*inMemObj, error) {
142154
s.mu.Lock()
143155
defer s.mu.Unlock()
144156
obj, ok := s.mu.objects[name]
145157
if !ok {
146-
return nil, os.ErrNotExist
158+
return nil, inMemStoreNotExistErr
147159
}
148160
return obj, nil
149161
}

sstable/reader_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/cockroachdb/pebble/internal/testkeys"
3434
"github.com/cockroachdb/pebble/objstorage"
3535
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
36+
"github.com/cockroachdb/pebble/objstorage/remote"
3637
"github.com/cockroachdb/pebble/sstable/block"
3738
"github.com/cockroachdb/pebble/sstable/valblk"
3839
"github.com/cockroachdb/pebble/vfs"
@@ -2535,3 +2536,68 @@ func newReader(r ReadableFile, o ReaderOptions) (*Reader, error) {
25352536
}
25362537
return NewReader(context.Background(), readable, o)
25372538
}
2539+
2540+
// TestReaderReportsCorruption tests that the reader reports corruption when
2541+
// an external file goes missing after obtaining a remote.ObjectReader for it.
2542+
func TestReaderReportsCorruption(t *testing.T) {
2543+
defer leaktest.AfterTest(t)()
2544+
2545+
ctx := context.Background()
2546+
remoteStorage := remote.NewInMem()
2547+
settings := objstorageprovider.DefaultSettings(vfs.NewMem(), "")
2548+
settings.Remote.StorageFactory = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
2549+
"locator": remoteStorage,
2550+
})
2551+
settings.Remote.CreateOnSharedLocator = "locator"
2552+
settings.Remote.CreateOnShared = remote.CreateOnSharedAll
2553+
provider, err := objstorageprovider.Open(settings)
2554+
require.NoError(t, err)
2555+
defer provider.Close()
2556+
require.NoError(t, provider.SetCreatorID(1))
2557+
2558+
writable, objMeta, err := provider.Create(ctx, base.FileTypeTable, 1, objstorage.CreateOptions{
2559+
PreferSharedStorage: true,
2560+
})
2561+
require.NoError(t, err)
2562+
require.NotNil(t, objMeta.Remote.Storage)
2563+
2564+
// Create an sst file with multiple data blocks.
2565+
w := NewWriter(writable, WriterOptions{BlockSize: 1})
2566+
for i := range 100 {
2567+
require.NoError(t, w.Set([]byte(fmt.Sprintf("%04d", i)), []byte(fmt.Sprintf("value%04d", i))))
2568+
}
2569+
require.NoError(t, w.Close())
2570+
2571+
readable, err := provider.OpenForReading(ctx, base.FileTypeTable, 1, objstorage.OpenOptions{})
2572+
require.NoError(t, err)
2573+
r, err := NewReader(context.Background(), readable, ReaderOptions{})
2574+
require.NoError(t, err)
2575+
defer r.Close()
2576+
2577+
var lastReportedCorruption error
2578+
env := block.ReadEnv{
2579+
ReportCorruptionFn: func(opaque any, err error) error {
2580+
lastReportedCorruption = err
2581+
return errors.Wrap(err, "error passed through ReportCorruptionFn")
2582+
},
2583+
}
2584+
iter, err := r.NewPointIter(context.Background(), NoTransforms, nil, nil, nil, 0, env, nil)
2585+
require.NoError(t, err)
2586+
defer iter.Close()
2587+
kv := iter.First()
2588+
require.NotNil(t, kv)
2589+
2590+
// Delete all objects from the store and expect to get a corruption error.
2591+
objects, err := remoteStorage.List("", "")
2592+
require.NoError(t, err)
2593+
for _, o := range objects {
2594+
require.NoError(t, remoteStorage.Delete(o))
2595+
}
2596+
for ; kv != nil; kv = iter.Next() {
2597+
}
2598+
iterErr := iter.Error()
2599+
require.ErrorContains(t, iterErr, "error passed through ReportCorruptionFn: in-mem remote storage object does not exist")
2600+
require.True(t, base.IsCorruptionError(iterErr))
2601+
require.ErrorContains(t, lastReportedCorruption, "in-mem remote storage object does not exist")
2602+
require.True(t, base.IsCorruptionError(lastReportedCorruption))
2603+
}

0 commit comments

Comments
 (0)