Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
85138: importer/kv:  add ImportEpoch field to MVCCValueHeader and write to it during IMPORT INTO r=dt,erikgrinaker a=msbutler

storage: add ImportEpoch field to MVCCValueHeader
This patch adds the ImportEpoch field to an MVCCValue's MVCCValueHeader,
which allows kv clients (namely the sst_batcher in an IMPORT INTO) to write
the importing table's ImportEpoch to the metadata of each ingesting MVCCValue.

Unlike the MVCCValueHeader.LocalTimestamp field, the ImportEpoch field should
be exported to other clusters (e.g. via ExportRequests from BACKUP/RESTORE and
streaming). Consequently, this PR relaxes the invariant that the
MVCCValueHeader field must be stripped in an Export Request and must be empty
in an AddSSTable Request. Now, Export Request only strips the
MVCCValueHeader.LocalTimestamp field and AddSSTable will only require the
LocalTimestamp to be empty.

Release note: none

----

bulk/kv write the table's ImportEpoch to each MVCCValue during IMPORT
This patch makes IMPORT INTO on a non-empty table write the table's ImportEpoch
to each ingested MVCC Value, via the SSTBatcher. In a future PR, the
ImportEpoch will be used to track and rollback an IMPORT INTO. This additional
information will allow IMPORTing tables to be backed up and restored, as
described in this [RFC](https://docs.google.com/document/d/16TbkFznqbsu3mialSw6o1sxOEn1pKhhJ9HTxNdw0-WE/edit#heading=h.bpox0bmkz77i).

Informs #76722

Release note: None

120080: sql: remove default_target_cluster.check_service.enabled r=dt a=dt

Release note (enterprise change): default_target_cluster can now be set to any tenant name by default, including a tenant yet to be created or have service started.
Epic: none.

120342: roachtest: admission-control/elastic-io deflake r=sumeerbhola a=aadityasondhi

Similar to #114446, we now take the mean over the last two minutes for determining high L0 sublevel count.

Fixes #119838.

Release note: None

120350: backfill: finish the tracing span after closing the account r=yuzefovich a=yuzefovich

We recently fixed an issue where we forgot to stop the index backfill merger monitor, but we had a minor bug in that fix - we captured the context that contains the tracing span that is finished before the account is closed leading to "use after finish" assertions. This is now fixed.

Fixes: #120266.

Release note: None

120356: cmd: link on the `large` pool r=jlinder a=rickystewart

The `default` pool seems to be too small to perform linking efficiently. This should speed things up.

Epic: CRDB-8308

Release note: None

120359: logictest: skip some tests under `race` r=jlinder a=rickystewart

These tests specifically are prone to failing/timing out under `race`.

Epic: CRDB-8308
Release note: None

120361: roachtest: bump max ranges threshold in splits/load/ycsb/e/nodes=3/obj=cpu r=nvanbenschoten a=nvanbenschoten

Fixes #120163.

Avoids rare test flakes.

Release note: None

120363: go.mod: bump Pebble to 51faab0a3555 r=aadityasondhi a=jbowens

Changes:

 * [`51faab0a`](cockroachdb/pebble@51faab0a) tool: add DirectoryLock option
 * [`ec69e9a2`](cockroachdb/pebble@ec69e9a2) ingest test: fix merge skew
 * [`635c6003`](cockroachdb/pebble@635c6003) manifest: add VersionEdit tests with virtual tables
 * [`cb660884`](cockroachdb/pebble@cb660884) manifest: improve VersionEdit stringification
 * [`31b37248`](cockroachdb/pebble@31b37248) ingest_test: support reopening in ingest tests
 * [`64ebec94`](cockroachdb/pebble@64ebec94) ingest_test: set correct sizes for external ingests
 * [`4bf09d5e`](cockroachdb/pebble@4bf09d5e) manifest: improve VersionEdit tests
 * [`a034560d`](cockroachdb/pebble@a034560d) manifest: add a helper for DebugString parsing
 * [`623524f1`](cockroachdb/pebble@623524f1) manifest: use Lx for levels in version String/DebugString

Release note: none.
Epic: none.

Co-authored-by: Michael Butler <butler@cockroachlabs.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Ricky Stewart <ricky@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Jackson Owens <jackson@cockroachlabs.com>
  • Loading branch information
8 people committed Mar 12, 2024
9 parents 0da5d45 + 2d4b601 + 26acc25 + b3ead23 + 5ae5417 + d0bb7e4 + 5ca24e3 + de41ff6 + 1201b17 commit 9adee4e
Show file tree
Hide file tree
Showing 54 changed files with 644 additions and 175 deletions.
6 changes: 3 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1693,10 +1693,10 @@ def go_deps():
patches = [
"@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch",
],
sha256 = "f68528557224c2af2fd0b46199602f982fd44f813029040a1f3ceaf134633dc0",
strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20240308204553-8df4320c24e4",
sha256 = "bffe4ef26087a4e25f9deaad87ed1ea9849d3ea6032badce7cacb919d6614cc6",
strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20240312180812-51faab0a3555",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240308204553-8df4320c24e4.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240312180812-51faab0a3555.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.19.0.zip": "c4d516bcfe8c07b6fc09b8a9a07a95065b36c2855627cb3514e40c98f872b69e",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20230118201751-21c54148d20b.zip": "ca7776f47e5fecb4c495490a679036bfc29d95bd7625290cfdb9abb0baf97476",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/metamorphic/com_github_cockroachdb_metamorphic-v0.0.0-20231108215700-4ba948b56895.zip": "28c8cf42192951b69378cf537be5a9a43f2aeb35542908cc4fe5f689505853ea",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240308204553-8df4320c24e4.zip": "f68528557224c2af2fd0b46199602f982fd44f813029040a1f3ceaf134633dc0",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20240312180812-51faab0a3555.zip": "bffe4ef26087a4e25f9deaad87ed1ea9849d3ea6032badce7cacb919d6614cc6",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.5.zip": "11b30528eb0dafc8bc1a5ba39d81277c257cbe6946a7564402f588357c164560",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/stress/com_github_cockroachdb_stress-v0.0.0-20220803192808-1806698b1b7b.zip": "3fda531795c600daf25532a4f98be2a1335cd1e5e182c72789bca79f5f69fcc1",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ require (
github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55
github.com/cockroachdb/gostdlib v1.19.0
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b
github.com/cockroachdb/pebble v0.0.0-20240308204553-8df4320c24e4
github.com/cockroachdb/pebble v0.0.0-20240312180812-51faab0a3555
github.com/cockroachdb/redact v1.1.5
github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd
github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,8 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA=
github.com/cockroachdb/pebble v0.0.0-20240308204553-8df4320c24e4 h1:yuJAmwkJTQjB5YyoNkmXlK9/2YR+jWizfE7crErqGhU=
github.com/cockroachdb/pebble v0.0.0-20240308204553-8df4320c24e4/go.mod h1:g0agBmtwky6biPBw0MO+GkiYRv9krOTZgpPw2rfha8c=
github.com/cockroachdb/pebble v0.0.0-20240312180812-51faab0a3555 h1:NRPlms/+HfNgTPMrvSTU/bKDDps4/6vSvPnogZ4HzYw=
github.com/cockroachdb/pebble v0.0.0-20240312180812-51faab0a3555/go.mod h1:g0agBmtwky6biPBw0MO+GkiYRv9krOTZgpPw2rfha8c=
github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30=
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func backup(
backupManifest.StartTime,
backupManifest.EndTime,
backupManifest.ElidedPrefix,
backupManifest.ClusterVersion.AtLeast(clusterversion.V24_1.Version()),
)
if err != nil {
return roachpb.RowCount{}, 0, err
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,14 @@ func runBackupProcessor(
if !span.firstKeyTS.IsEmpty() {
splitMidKey = true
}

req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(span.span),
ResumeKeyTS: span.firstKeyTS,
StartTime: span.start,
MVCCFilter: spec.MVCCFilter,
TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
SplitMidKey: splitMidKey,
RequestHeader: kvpb.RequestHeaderFromSpan(span.span),
ResumeKeyTS: span.firstKeyTS,
StartTime: span.start,
MVCCFilter: spec.MVCCFilter,
TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
SplitMidKey: splitMidKey,
IncludeMVCCValueHeader: spec.IncludeMVCCValueHeader,
}

// If we're doing re-attempts but are not yet in the priority regime,
Expand Down
45 changes: 24 additions & 21 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func distBackupPlanSpecs(
mvccFilter kvpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
elide execinfrapb.ElidePrefix,
includeValueHeader bool,
) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs")
Expand Down Expand Up @@ -98,17 +99,18 @@ func distBackupPlanSpecs(
sqlInstanceIDToSpec := make(map[base.SQLInstanceID]*execinfrapb.BackupDataSpec)
for _, partition := range spanPartitions {
spec := &execinfrapb.BackupDataSpec{
JobID: jobID,
Spans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
ElidePrefix: elide,
JobID: jobID,
Spans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
ElidePrefix: elide,
IncludeMVCCValueHeader: includeValueHeader,
}
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}
Expand All @@ -121,16 +123,17 @@ func distBackupPlanSpecs(
// which is not the leaseholder for any of the spans, but is for an
// introduced span.
spec := &execinfrapb.BackupDataSpec{
JobID: jobID,
IntroducedSpans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
JobID: jobID,
IntroducedSpans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
IncludeMVCCValueHeader: includeValueHeader,
}
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -554,7 +553,10 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
return summary, err
}
valueScratch = append(valueScratch[:0], v...)
value := roachpb.Value{RawBytes: valueScratch}
value, err := storage.DecodeValueFromMVCCValue(valueScratch)
if err != nil {
return summary, err
}

key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime)

Expand All @@ -581,7 +583,14 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
if verbose {
log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint())
}
if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil {

// Using valueScratch here assumes that
// DecodeValueFromMVCCValue, ClearChecksum, and
// InitChecksum don't copy/reallocate the slice they
// were given. We expect that value.ClearChecksum and
// value.InitChecksum calls above have modified
// valueScratch.
if err := batcher.AddMVCCKey(ctx, key, valueScratch); err != nil {
return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint())
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/cockroach-oss/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
go_binary(
name = "cockroach-oss",
embed = [":cockroach-oss_lib"],
exec_properties = {"Pool": "large"},
visibility = ["//visibility:public"],
)

Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/cockroach-short/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ go_library(
go_binary(
name = "cockroach-short",
embed = [":cockroach-short_lib"],
exec_properties = {"Pool": "large"},
visibility = ["//visibility:public"],
)
1 change: 1 addition & 0 deletions pkg/cmd/cockroach-sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ go_library(
go_binary(
name = "cockroach-sql",
embed = [":cockroach-sql_lib"],
exec_properties = {"Pool": "large"},
visibility = ["//visibility:public"],
)
1 change: 1 addition & 0 deletions pkg/cmd/cockroach/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ disallowed_imports_test(
go_binary(
name = "cockroach",
embed = [":cockroach_lib"],
exec_properties = {"Pool": "large"},
visibility = ["//visibility:public"],
)
1 change: 1 addition & 0 deletions pkg/cmd/roachprod/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ go_library(
go_binary(
name = "roachprod",
embed = [":roachprod_lib"],
exec_properties = {"Pool": "large"},
visibility = ["//visibility:public"],
)
14 changes: 11 additions & 3 deletions pkg/cmd/roachtest/tests/admission_control_elastic_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,24 @@ func registerElasticIO(r registry.Registry) {
// not working, the threshold of 7 will be easily breached, since
// regular tokens allow sub-levels to exceed 10.
const subLevelThreshold = 7
const sampleCountForL0Sublevel = 12
var l0SublevelCount []float64
// Sleep initially for stability to be achieved, before measuring.
time.Sleep(5 * time.Minute)
for {
time.Sleep(30 * time.Second)
time.Sleep(10 * time.Second)
val, err := getMetricVal(subLevelMetric)
if err != nil {
continue
}
if val > subLevelThreshold {
t.Fatalf("sub-level count %f exceeded threshold", val)
l0SublevelCount = append(l0SublevelCount, val)
// We want to use the mean of the last 2m of data to avoid short-lived
// spikes causing failures.
if len(l0SublevelCount) >= sampleCountForL0Sublevel {
latestSampleMeanL0Sublevels := getMeanOverLastN(sampleCountForL0Sublevel, l0SublevelCount)
if latestSampleMeanL0Sublevels > subLevelThreshold {
t.Fatalf("sub-level mean %f over last %d iterations exceeded threshold", latestSampleMeanL0Sublevels, sampleCountForL0Sublevel)
}
}
if timeutil.Now().After(endTime) {
return nil
Expand Down
16 changes: 0 additions & 16 deletions pkg/cmd/roachtest/tests/admission_control_intent_resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,3 @@ func registerIntentResolutionOverload(r registry.Registry) {
},
})
}

// Returns the mean over the last n samples. If n > len(items), returns the mean
// over the entire items slice.
func getMeanOverLastN(n int, items []float64) float64 {
count := n
if len(items) < n {
count = len(items)
}
sum := float64(0)
i := 0
for i < count {
sum += items[len(items)-1-i]
i++
}
return sum / float64(count)
}
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func registerLoadSplits(r registry.Registry) {
// YCSB/E has a zipfian distribution with 95% scans (limit 1k) and 5%
// inserts.
minimumRanges: 5,
maximumRanges: 15,
maximumRanges: 18,
initialRangeCount: 2,
load: ycsbSplitLoad{
workload: "e",
Expand Down
16 changes: 16 additions & 0 deletions pkg/cmd/roachtest/tests/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,19 @@ func maybeUseMemoryBudget(t test.Test, budget int) option.StartOpts {
}
return startOpts
}

// Returns the mean over the last n samples. If n > len(items), returns the mean
// over the entire items slice.
func getMeanOverLastN(n int, items []float64) float64 {
count := n
if len(items) < n {
count = len(items)
}
sum := float64(0)
i := 0
for i < count {
sum += items[len(items)-1-i]
i++
}
return sum / float64(count)
}
20 changes: 17 additions & 3 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type BufferingAdder struct {
// name of the BufferingAdder for the purpose of logging only.
name string

// importEpoch specifies the ImportEpoch of the table the BufferingAdder
// is ingesting data as part of an IMPORT INTO job. If specified, the Bulk
// Adder's SSTBatcher will write the import epoch to each versioned value's
// metadata.
importEpoch uint32

bulkMon *mon.BytesMonitor
memAcc mon.BoundAccount

Expand Down Expand Up @@ -96,7 +102,8 @@ func MakeBulkAdder(
}

b := &BufferingAdder{
name: opts.Name,
name: opts.Name,
importEpoch: opts.ImportEpoch,
sink: SSTBatcher{
name: opts.Name,
db: db,
Expand Down Expand Up @@ -303,8 +310,15 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {

for i := range b.curBuf.entries {
mvccKey.Key = b.curBuf.Key(i)
if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil {
return err
if b.importEpoch != 0 {
if err := b.sink.AddMVCCKeyWithImportEpoch(ctx, mvccKey, b.curBuf.Value(i),
b.importEpoch); err != nil {
return err
}
} else {
if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil {
return err
}
}
}
if err := b.sink.Flush(ctx); err != nil {
Expand Down
18 changes: 16 additions & 2 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,21 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) {
b.mu.onFlush = onFlush
}

func (b *SSTBatcher) AddMVCCKeyWithImportEpoch(
ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32,
) error {
mvccVal, err := storage.DecodeMVCCValue(value)
if err != nil {
return err
}
mvccVal.MVCCValueHeader.ImportEpoch = importEpoch
encVal, err := storage.EncodeMVCCValue(mvccVal)
if err != nil {
return err
}
return b.AddMVCCKey(ctx, key, encVal)
}

// AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed).
// This is only for callers that want to control the timestamp on individual
// keys -- like RESTORE where we want the restored data to look like the backup.
Expand Down Expand Up @@ -389,8 +404,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value
if !b.disallowShadowingBelow.IsEmpty() {
b.updateMVCCStats(key, value)
}

return b.sstWriter.Put(key, value)
return b.sstWriter.PutRawMVCC(key, value)
}

// Reset clears all state in the batcher and prepares it for reuse.
Expand Down

0 comments on commit 9adee4e

Please sign in to comment.