Skip to content

Commit

Permalink
Merge pull request #17079 from danhhz/addsstable_noclear
Browse files Browse the repository at this point in the history
storageccl: don't clear existing data in AddSSTable
  • Loading branch information
danhhz committed Jul 18, 2017
2 parents 2dcf8ed + db604c3 commit ec0963d
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 246 deletions.
134 changes: 42 additions & 92 deletions pkg/ccl/storageccl/add_sstable.go
Expand Up @@ -36,54 +36,43 @@ func evalAddSSTable(
args := cArgs.Args.(*roachpb.AddSSTableRequest)
h := cArgs.Header
ms := cArgs.Stats
mvccStartKey, mvccEndKey := engine.MVCCKey{Key: args.Key}, engine.MVCCKey{Key: args.EndKey}

// TODO(tschottdorf): restore the below in some form (gets in the way of testing).
// _, span := tracing.ChildSpan(ctx, fmt.Sprintf("AddSSTable [%s,%s)", args.Key, args.EndKey))
// defer tracing.FinishSpan(span)
log.Eventf(ctx, "evaluating AddSSTable")

// Compute the stats for any existing data in the affected span. The sstable
// being ingested can overwrite all, some, or none of the existing kvs.
// (Note: the expected case is that it's none or, in the case of a retry of
// the request, all.) So subtract out the existing mvcc stats, and add back
// what they'll be after the sstable is ingested.
existingIter := batch.NewIterator(false)
defer existingIter.Close()
existingIter.Seek(mvccStartKey)
if ok, err := existingIter.Valid(); err != nil {
return storage.EvalResult{}, errors.Wrap(err, "computing existing stats")
} else if ok && existingIter.UnsafeKey().Less(mvccEndKey) {
log.Eventf(ctx, "target key range not empty, will merge existing data with sstable")
}
// This ComputeStats is cheap if the span is empty.
existingStats, err := existingIter.ComputeStats(mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
if err != nil {
return storage.EvalResult{}, errors.Wrap(err, "computing existing stats")
}
ms.Subtract(existingStats)

// Verify that the keys in the sstable are within the range specified by the
// request header, verify the key-value checksums, and compute the new
// MVCCStats.
mvccStartKey, mvccEndKey := engine.MVCCKey{Key: args.Key}, engine.MVCCKey{Key: args.EndKey}
stats, err := verifySSTable(args.Data, mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
stats, err := verifySSTable(
existingIter, args.Data, mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
if err != nil {
return storage.EvalResult{}, errors.Wrap(err, "verifying sstable data")
}

// Check if there was data in the affected keyrange. If so, delete it.
existingStats, err := clearExistingData(ctx, batch, mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
if err != nil {
return storage.EvalResult{}, errors.Wrap(err, "clearing existing data")
}

if existingStats != (enginepb.MVCCStats{}) {
log.Eventf(ctx, "key range contains existing data %+v, falling back to regular WriteBatch", existingStats)
ms.Subtract(existingStats) // account for clearExistingData

// If we're overwriting data, linking the SSTable is tricky since we have
// to link before we delete, and so the deletions could clobber the data
// in the SSTable. Instead, we put everything into a WriteBatch. Not
// pretty, but this case should be the exception.
reader := engine.MakeRocksDBSstFileReader()
defer reader.Close()
if err := reader.IngestExternalFile(args.Data); err != nil {
return storage.EvalResult{}, errors.Wrap(err, "while preparing SSTable for copy into WriteBatch")
}
var v roachpb.Value
if err := reader.Iterate(mvccStartKey, mvccEndKey, func(kv engine.MVCCKeyValue) (bool, error) {
v.RawBytes = kv.Value
return false, engine.MVCCPut(ctx, batch, ms, kv.Key.Key, kv.Key.Timestamp, v, nil /* txn */)
}); err != nil {
return storage.EvalResult{}, errors.Wrap(err, "copying SSTable into batch")
}
// Return with only a WriteBatch and no sideloading.
return storage.EvalResult{}, nil
}

// More frequent case: the underlying key range is empty and we can ingest an SSTable.
log.Event(ctx, "key range is empty; commencing with SSTable proposal")
ms.Add(stats)

return storage.EvalResult{
Replicated: storagebase.ReplicatedEvalResult{
AddSSTable: &storagebase.ReplicatedEvalResult_AddSSTable{
Expand All @@ -95,18 +84,18 @@ func evalAddSSTable(
}

func verifySSTable(
data []byte, start, end engine.MVCCKey, nowNanos int64,
existingIter engine.SimpleIterator, data []byte, start, end engine.MVCCKey, nowNanos int64,
) (enginepb.MVCCStats, error) {
iter, err := engineccl.NewMemSSTIterator(data)
dataIter, err := engineccl.NewMemSSTIterator(data)
if err != nil {
return enginepb.MVCCStats{}, err
}
defer iter.Close()
defer dataIter.Close()

iter.Seek(engine.MVCCKey{Key: keys.MinKey})
ok, err := iter.Valid()
for ; ok; ok, err = iter.Valid() {
unsafeKey := iter.UnsafeKey()
dataIter.Seek(engine.MVCCKey{Key: keys.MinKey})
ok, err := dataIter.Valid()
for ; ok; ok, err = dataIter.Valid() {
unsafeKey := dataIter.UnsafeKey()
if unsafeKey.Less(start) || !unsafeKey.Less(end) {
// TODO(dan): Add a new field in roachpb.Error, so the client can
// catch this and retry. It can happen if the range splits between
Expand All @@ -115,65 +104,26 @@ func verifySSTable(
unsafeKey.Key, start.Key, end.Key)
}

v := roachpb.Value{RawBytes: iter.UnsafeValue()}
v := roachpb.Value{RawBytes: dataIter.UnsafeValue()}
if err := v.Verify(unsafeKey.Key); err != nil {
return enginepb.MVCCStats{}, err
}
iter.Next()
dataIter.Next()
}
if err != nil {
return enginepb.MVCCStats{}, err
}

// In the case that two iterators have an entry with the same key and
// timestamp, MultiIterator breaks ties by preferring later ones in the
// ordering. So it's important that the sstable iterator comes after the one
// for the existing data (because the sstable will overwrite it when
// ingested).
mergedIter := engineccl.MakeMultiIterator([]engine.SimpleIterator{existingIter, dataIter})
defer mergedIter.Close()

// TODO(dan): This unnecessarily iterates the sstable a second time, see if
// combining this computation with the above checksum verification speeds
// anything up.
return engine.ComputeStatsGo(iter, start, end, nowNanos)
}

func clearExistingData(
ctx context.Context, batch engine.ReadWriter, start, end engine.MVCCKey, nowNanos int64,
) (enginepb.MVCCStats, error) {
{
isEmpty := true
if err := batch.Iterate(start, end, func(_ engine.MVCCKeyValue) (bool, error) {
isEmpty = false
return true, nil // stop right away
}); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "while checking for empty key space")
}

if isEmpty {
return enginepb.MVCCStats{}, nil
}
}

iter := batch.NewIterator(false)
defer iter.Close()

iter.Seek(start)
if ok, err := iter.Valid(); err != nil {
return enginepb.MVCCStats{}, err
} else if ok && !iter.UnsafeKey().Less(end) {
return enginepb.MVCCStats{}, nil
}

existingStats, err := iter.ComputeStats(start, end, nowNanos)
if err != nil {
return enginepb.MVCCStats{}, err
}

log.Eventf(ctx, "target key range not empty, will clear existing data: %+v", existingStats)
// If this is a SpanSetIterator, we have to unwrap it because
// ClearIterRange needs a plain rocksdb iterator (and can't unwrap
// it itself because of import cycles).
if ssi, ok := iter.(*storage.SpanSetIterator); ok {
iter = ssi.Iterator()
}
// TODO(dan): Ideally, this would use `batch.ClearRange` but it doesn't
// yet work with read-write batches (or IngestExternalData).
if err := batch.ClearIterRange(iter, start, end); err != nil {
return enginepb.MVCCStats{}, err
}
return existingStats, nil
return engine.ComputeStatsGo(mergedIter, start, end, nowNanos)
}

0 comments on commit ec0963d

Please sign in to comment.