5
5
package metamorphic
6
6
7
7
import (
8
- "context"
9
8
"fmt"
10
9
"io"
11
10
"os"
12
11
"path"
13
- "path/filepath"
14
12
"sort"
15
13
"strings"
16
14
"time"
@@ -108,12 +106,6 @@ func (t *Test) init(
108
106
if numInstances < 1 {
109
107
numInstances = 1
110
108
}
111
- if t .testOpts .externalStorageEnabled {
112
- t .externalStorage = t .testOpts .externalStorageFS
113
- } else {
114
- t .externalStorage = remote .NewInMem ()
115
- }
116
-
117
109
t .opsWaitOn , t .opsDone = computeSynchronizationPoints (t .ops )
118
110
119
111
if t .opts .Cache != nil {
@@ -192,10 +184,7 @@ func (t *Test) init(
192
184
dir = path .Join (t .dir , fmt .Sprintf ("db%d" , i + 1 ))
193
185
}
194
186
err = t .withRetries (func () error {
195
- // Give each DB its own CompactionScheduler.
196
- o := * t .opts
197
- o .Experimental .CompactionScheduler =
198
- pebble .NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest ()
187
+ o := t .finalizeOptions (dir )
199
188
db , err = pebble .Open (dir , & o )
200
189
return err
201
190
})
@@ -252,6 +241,45 @@ func (t *Test) init(
252
241
return nil
253
242
}
254
243
244
+ // finalizeOptions returns the options that need to be passed to pebble.Open().
245
+ //
246
+ // It initializes t.externalStorage and creates the compaction scheduler and
247
+ // remote storage factory.
248
+ func (t * Test ) finalizeOptions (dataDir string ) pebble.Options {
249
+ o := * t .opts
250
+ // Give each DB its own CompactionScheduler.
251
+ o .Experimental .CompactionScheduler =
252
+ pebble .NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest ()
253
+
254
+ // Set up external/shared storage.
255
+ externalDir := o .FS .PathJoin (dataDir , "external" )
256
+ if err := o .FS .MkdirAll (externalDir , 0755 ); err != nil {
257
+ panic (fmt .Sprintf ("failed to create directory %q: %s" , externalDir , err ))
258
+ }
259
+ // Even if externalStorageEnabled is false, the test uses externalStorage to
260
+ // emulate external ingestion.
261
+ t .externalStorage = remote .NewLocalFS (externalDir , o .FS )
262
+
263
+ m := make (map [remote.Locator ]remote.Storage )
264
+ // If we are starting from an initial state (initialStatePath != ""), the
265
+ // existing store might use shared or external storage, so we set them up
266
+ // unconditionally.
267
+ if t .testOpts .sharedStorageEnabled || t .testOpts .initialStatePath != "" {
268
+ sharedDir := o .FS .PathJoin (dataDir , "shared" )
269
+ if err := o .FS .MkdirAll (sharedDir , 0755 ); err != nil {
270
+ panic (fmt .Sprintf ("failed to create directory %q: %s" , sharedDir , err ))
271
+ }
272
+ m ["" ] = remote .NewLocalFS (sharedDir , o .FS )
273
+ }
274
+ if t .testOpts .externalStorageEnabled || t .testOpts .initialStatePath != "" {
275
+ m ["external" ] = t .externalStorage
276
+ }
277
+ if len (m ) > 0 {
278
+ o .Experimental .RemoteStorage = remote .MakeSimpleFactory (m )
279
+ }
280
+ return o
281
+ }
282
+
255
283
func (t * Test ) withRetries (fn func () error ) error {
256
284
return withRetries (fn , t .testOpts .RetryPolicy )
257
285
}
@@ -279,13 +307,6 @@ func (t *Test) restartDB(dbID objID) error {
279
307
if ! t .testOpts .strictFS {
280
308
return nil
281
309
}
282
- if t .testOpts .sharedStorageEnabled {
283
- // We simulate a crash by essentially ignoring writes to disk after a
284
- // certain point. However, we cannot prevent the process (which didn't
285
- // actually crash) from deleting an external object before we call Close().
286
- // TODO(radu): perhaps we want all syncs to fail after the "crash" point?
287
- return nil
288
- }
289
310
// We can't do this if we have more than one database since they share the
290
311
// same FS (and we only close/reopen one of them).
291
312
// TODO(radu): have each database use its own MemFS.
@@ -326,10 +347,7 @@ func (t *Test) restartDB(dbID objID) error {
326
347
if len (t .dbs ) > 1 {
327
348
dir = path .Join (dir , fmt .Sprintf ("db%d" , dbID .slot ()))
328
349
}
329
- // Give each DB its own CompactionScheduler.
330
- o := * t .opts
331
- o .Experimental .CompactionScheduler =
332
- pebble .NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest ()
350
+ o := t .finalizeOptions (dir )
333
351
t .dbs [dbID .slot ()- 1 ], err = pebble .Open (dir , & o )
334
352
if err != nil {
335
353
return err
@@ -349,49 +367,6 @@ func (t *Test) saveInMemoryDataInternal() error {
349
367
return err
350
368
}
351
369
}
352
- if t .testOpts .sharedStorageEnabled {
353
- if err := copyRemoteStorage (t .testOpts .sharedStorageFS , filepath .Join (t .dir , "shared" )); err != nil {
354
- return err
355
- }
356
- }
357
- if t .testOpts .externalStorageEnabled {
358
- if err := copyRemoteStorage (t .testOpts .externalStorageFS , filepath .Join (t .dir , "external" )); err != nil {
359
- return err
360
- }
361
- }
362
- return nil
363
- }
364
-
365
- func copyRemoteStorage (fs remote.Storage , outputDir string ) error {
366
- if err := vfs .Default .MkdirAll (outputDir , 0755 ); err != nil {
367
- return err
368
- }
369
- objs , err := fs .List ("" , "" )
370
- if err != nil {
371
- return err
372
- }
373
- for i := range objs {
374
- reader , readSize , err := fs .ReadObject (context .TODO (), objs [i ])
375
- if err != nil {
376
- return err
377
- }
378
- buf := make ([]byte , readSize )
379
- if err := reader .ReadAt (context .TODO (), buf , 0 ); err != nil {
380
- return err
381
- }
382
- outputPath := vfs .Default .PathJoin (outputDir , objs [i ])
383
- outputFile , err := vfs .Default .Create (outputPath , vfs .WriteCategoryUnspecified )
384
- if err != nil {
385
- return err
386
- }
387
- if _ , err := outputFile .Write (buf ); err != nil {
388
- outputFile .Close ()
389
- return err
390
- }
391
- if err := outputFile .Close (); err != nil {
392
- return err
393
- }
394
- }
395
370
return nil
396
371
}
397
372
0 commit comments