Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: build SSTs from KV_BATCH snapshot #38932

Merged
merged 4 commits into from
Aug 9, 2019

Conversation

jeffrey-xiao
Copy link
Contributor

@jeffrey-xiao jeffrey-xiao commented Jul 17, 2019

Implements the SST snapshot strategy discussed in #16954 and partially implemented in #25134 and #38873, but only have the logic on the receiver side for ease of testing and compatibility. This PR also handles the complications of subsumed replicas that are not fully contained by the current replica.

The maximum number of SSTs created using this strategy is 4 + SR + 2 where SR is the number of subsumed replicas.

  • Three SSTs get streamed from the sender (range local keys, replicated range-id local keys, and data keys)
  • One SST is constructed for the unreplicated range-id local keys.
  • One SST is constructed for every subsumed replica to clear the range-id local keys. These SSTs consists of one range deletion tombstone and one RaftTombstone key.
  • A maximum of two SSTs for all subsumed replicas to account for the case of not fully contained subsumed replicas. Note that currently, subsumed replicas can have keys right of the current replica, but not left of, so there will be a maximum of one SST created for the range-local keys and one for the data keys. These SSTs consist of one range deletion tombstone.

This number can be further reduced to 3 + SR if we pass the file handles and sst writers from the receiving step to the application step. We can combine the SSTs of the unreplicated range id and replicated id, and the range local of the subsumed replicas and data SSTs of the subsumed replicas. We probably don't want to do this optimization since we'll have to undo this optimization if we start constructing the SSTs from the sender or start chunking large SSTs into smaller SSTs.

Blocked by facebook/rocksdb#5649.

Test Plan

  • Testing knob to inspect SSTs before ingestion. Ensure that expected SSTs for subsumed replicas are ingested.
  • Unit tests for SSTSnapshotStorage.

Metrics and Evaluation

One way to evaluate this change is the following steps:

  1. Setup 3 node cluster
  2. Set default Raft log truncation threshold to some low constant:
defaultRaftLogTruncationThreshold = envutil.EnvOrDefaultInt64(
    "COCKROACH_RAFT_LOG_TRUNCATION_THRESHOLD", 128<<10 /* 128 KB */)
  1. Set range_min_bytes to 0 and range_max_bytes to some large number.
  2. Increase kv.snapshot_recovery.max_rate and kv.snapshot_rebalance.max_rate to some large number.
  3. Disable load-based splitting.
  4. Stop node 2.
  5. Run an insert heavy workload (kv0) on the cluster.
  6. Start node 2.
  7. Time how long it takes for node 2 to have all the ranges.

Roachtest: https://gist.github.com/jeffrey-xiao/e69fcad04968822d603f6807ca77ef3b

We can have two independent variables

  1. Fixed total data size (4000000 ops; ~3.81 GiB), variable number of splits
  • 1024 splits (~3.9 MiB ranges)
  • 512 splits (~7.9 MiB ranges)
  • 256 splits (~15.7 MiB ranges)
  • 128 splits (~31.2 MiB ranges)
  • 64 splits (~61.0 MiB ranges)
  • 32 splits (~121 MiB ranges)
name         old secs   new secs   delta
AvgBytes3    14.0 ±24%  12.7 ±21%     ~     (p=0.279 n=8+8)
AvgBytes7    11.3 ± 1%  13.4 ±25%     ~     (p=0.283 n=4+8)
AvgBytes15   11.8 ±17%  12.6 ±27%     ~     (p=0.755 n=6+8)
AvgBytes30   23.5 ±11%  14.9 ±45%  -36.74%  (p=0.001 n=8+8)
AvgBytes60   32.3 ±13%  23.4 ± 9%  -27.49%  (p=0.000 n=8+8)
AvgBytes121  53.1 ± 6%  38.8 ±19%  -26.86%  (p=0.002 n=5+8)
  1. Fixed number of splits (32), variable total data size
  • 125000 (~ 3.7 MiB ranges)
  • 250000 (~7.5 MiB ranges)
  • 500000 (~15 MiB ranges)
  • 1000000 (~30 MiB ranges)
  • 2000000 (60 MiB ranges)
  • 4000000 (121 MiB ranges)
name         old secs   new sec   delta
AvgBytes3     740 ±22%   883 ± 8%     ~     (p=0.143 n=5+3)
AvgBytes7     681 ±14%   728 ± 9%     ~     (p=0.310 n=5+5)
AvgBytes15    418 ±10%   441 ±11%     ~     (p=0.310 n=5+5)
AvgBytes31   54.3 ± 6%  43.6 ± 5%  -19.72%  (p=0.008 n=5+5)
AvgBytes61   51.8 ± 3%  42.4 ± 6%  -18.16%  (p=0.008 n=5+5)
AvgBytes121  53.1 ± 6%  38.8 ±19%  -26.86%  (p=0.002 n=5+8)

Fsync Chunk Size

The size of the SST chunk that we write before fsync-ing impacts how fast node 2 has all the ranges. I've experimented 32 splits and an median range size of 121 MB with no fsync-ing (~27s recovery), fsync-ing in 8 MB chunks (~30s recovery), fsync-ing in 2 MB chunks (~40s recovery), fsync-ing in 256 KB chunks (~42s recovery). The default bulk sst sync rate is 2MB and #20352 sets bytes_per_sync to 512 KB, so something between those options is probably good. The reason we would want to fsync is to prevent the OS from accumulating such a large buffer that it blocks unrelated small/fast writes for a long time when it flushes.

Impact on Foreground Traffic

For testing the impact on foreground traffic, I ran kv0 on a four node cluster with the merge queue and split queue disabled and starting with a constant number of splits. After 5 minutes, I decommissioned node 1 so its replicas would drain to other nodes using snapshots.

Roachtest: https://gist.github.com/jeffrey-xiao/5d9443a37b0929884aca927f9c320b6c

Average Range Size of 3 MiB

Average Range Size of 32 MiB

Average Range Size 128 MiB

We see p99 latency wins for larger range sizes and comparable performance for smaller range sizes.

Release note (performance improvement): Snapshots sent between replicas are now applied more performantly and use less memory.

@jeffrey-xiao jeffrey-xiao requested review from tbg, nvanbenschoten and a team July 17, 2019 15:55
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Member

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a WIP, but I gave it a pass to get a for the general approach. I think this is turning out cleaner than the other attempt, even with the need to worry about partially applied snapshots and recovery from this state.

One of the bigger open questions right now is the testing plan for this change. Could you enumerate the tests that you're thinking of adding to exercise the new surface area of this approach?

Reviewed 14 of 14 files at r1, 4 of 4 files at r2, 7 of 11 files at r3, 2 of 3 files at r5.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jeffrey-xiao, @nvanbenschoten, and @tbg)


pkg/keys/constants.go, line 156 at r5 (raw file):

	// if any. If this key is set, the replica must finish processing the
	// snapshot by ingesting the SSTs of the snapshot before performing any other
	// action.

"on startup"


pkg/roachpb/internal_raft.proto, line 55 at r5 (raw file):

}

// SSTSnapshotInProgressData is the data needed to process an in-progress snapshot.

This comment is a little misleading. This isn't the data needed to process the snapshot, it's the persisted record that a snapshot is in progress, durably written to coordinate recovery from an untimely crash.


pkg/roachpb/internal_raft.proto, line 58 at r5 (raw file):

message SSTSnapshotInProgressData {
  // The uuid of the in-progress snapshot.
  optional bytes uuid = 1 [(gogoproto.customname) = "UUID"];

We can improve the generated code using:

(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", (gogoproto.nullable) = false

At which point, ID might be a better name.


pkg/storage/client_raft_test.go, line 1074 at r5 (raw file):

		State:      storagepb.ReplicaState{Desc: rep.Desc()},
	}
	header.RaftMessageRequest.Message.Snapshot.Data = uuid.UUID{}.GetBytes()

Is this really needed?


pkg/storage/replica_init.go, line 163 at r5 (raw file):

	sstSnapshotInProgressKey := keys.RangeSSTSnapshotInProgress(desc.RangeID)
	var sstSnapshotInProgressData roachpb.SSTSnapshotInProgressData
	if ok, err := engine.MVCCGetProto(

I think we'll want to build the keys.RangeSSTSnapshotInProgress manipulation code in as methods on the r.mu.stateLoader. It provides some degree of abstraction here.

Also, we might as well pull this into a method that lives near the rest of snapshot ingestion logic.


pkg/storage/replica_init.go, line 168 at r5 (raw file):

		return err
	} else if ok {
		sss, err := newSstSnapshotStorage(r.store.cfg.Settings, desc.RangeID, uuid.Must(uuid.FromBytes(sstSnapshotInProgressData.UUID)),

s/newSstSnapshotStorage/newSSTSnapshotStorage/


pkg/storage/replica_init.go, line 173 at r5 (raw file):

			return err
		}
		if err := r.store.Engine().IngestExternalFiles(ctx, sss.ssts, true /* skipWritingSeqNo */, true /* modify */); err != nil {

For now, do you mind leaving // TODO(jeffreyxiao): Test an untimely crash here. comments at each place that you'd like to test a crash?


pkg/storage/replica_raftstorage.go, line 484 at r5 (raw file):

	// The RocksDB BatchReprs that make up this snapshot.
	Batches [][]byte
	// SSTables that make put this snapshot.

s/put/up/


pkg/storage/replica_raftstorage.go, line 485 at r5 (raw file):

	Batches [][]byte
	// SSTables that make put this snapshot.
	SSTs []string

What are these? Filenames? Do we need to carry these around here if we can already look up all of the SSTs using an SstSnapshotStorage?


pkg/storage/replica_raftstorage.go, line 485 at r5 (raw file):

	Batches [][]byte
	// SSTables that make put this snapshot.
	SSTs []string

nit: put in same order as the variable declaration in kvBatchSnapshotStrategy.Receive


pkg/storage/replica_raftstorage.go, line 852 at r5 (raw file):

	// We need to delete any old Raft log entries here because any log entries
	// that predate the snapshot will be orphaned and never truncated or GC'd.
	if err := clearRangeData(ctx, s.Desc, r.store.Engine(), batch, true /* destroyData */); err != nil {

I'm realizing that this might have a downside of causing the SST we ingest to always get stuck high in the LSM instead of making its way lower down. We're going to want to play with both approaches to deleting this data (in the batch, in the SSTs) to determine which way performs better.


pkg/storage/replica_raftstorage.go, line 885 at r5 (raw file):

	}

	sstSnapshotInProgressKey := stateloader.Make(s.Desc.RangeID).RangeSSTSnapshotInProgress()

Here's another place where we'd use the stateloader instead of constructing this directly.


pkg/storage/replica_raftstorage.go, line 948 at r5 (raw file):

	// The on-disk state is now committed, but the corresponding in-memory state
	// has not yet been updated and the data SST has not been ingested. Any

nit: break the SST ingestion and the in-memory state into separate logical steps instead of putting them together in this block.

Once you do that, please add a good comment about the entire strategy here, what happens if things fail in different places, and how this is all correct.


pkg/storage/replica_sst_snapshot_storage.go, line 28 at r5 (raw file):

// SstSnapshotStorage keeps track of the SST files created when receiving a
// snapshot with the SST strategy.
type SstSnapshotStorage struct {

s/SstSnapshotStorage/SSTSnapshotStorage/

I think it will be worth pulling out an interface here like we do with SideloadStorage so that we can create a real and an in-memory implementation.


pkg/storage/store_snapshot.go, line 134 at r5 (raw file):

		return noSnap, sendSnapshotError(stream, err)
	}
	sstFile, err := kvSS.sss.CreateFile()

Let's do this lazily in case the SST has no non-local keys.


pkg/storage/store_snapshot.go, line 134 at r5 (raw file):

		return noSnap, sendSnapshotError(stream, err)
	}
	sstFile, err := kvSS.sss.CreateFile()

Please add a TODO somewhere in here to limit the size of a single SST (sstMaxFileSize). For now, we probably won't ever hit this limit because the size of ranges isn't large enough to need it, but we'll want to have the code structure in place to support it. Doing so will probably inspire us to pull the SST creation code in this method into something a little more structured.


pkg/storage/store_snapshot.go, line 157 at r5 (raw file):

				return noSnap, sendSnapshotError(stream, err)
			}
			// All operations in the batch are guaranteed to be puts.

Should we assert that using BatchType()?


pkg/storage/store_snapshot.go, line 166 at r5 (raw file):

				// Add the key to the sst table if it is not a part of the local key
				// range.
				if key.Key.Compare(keys.LocalMax) > 0 {

It looks like keys.LocalMax is considered a non-local key because this is exclusive. See isSysLocal.


pkg/storage/store_snapshot.go, line 188 at r5 (raw file):

					return noSnap, sendSnapshotError(stream, err)
				}
			}

Should we flush a batch we have accumulated here?

@jeffrey-xiao jeffrey-xiao changed the title [WIP] storage: build SSTable from KV_BATCH snapshots [WIP] storage: build SSTable from KV_BATCH snapshot Jul 22, 2019
@jeffrey-xiao jeffrey-xiao force-pushed the kv-batch-stream branch 2 times, most recently from 9eca813 to be43855 Compare July 22, 2019 21:37
Copy link
Contributor Author

@jeffrey-xiao jeffrey-xiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @nvanbenschoten and @tbg)


pkg/storage/client_raft_test.go, line 1074 at r5 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Is this really needed?

The test fails without this line. The new code expects the UUID of the snapshot to be known from the Data field on the receiver side, so instead of failing with the expected error, it fails with uuid: UUID must be exactly 16 bytes long, got 0 bytes.


pkg/storage/replica_init.go, line 163 at r5 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I think we'll want to build the keys.RangeSSTSnapshotInProgress manipulation code in as methods on the r.mu.stateLoader. It provides some degree of abstraction here.

Also, we might as well pull this into a method that lives near the rest of snapshot ingestion logic.

I've added code to manipulate SSTSnapshotInProgress to stateloader. I think the new logic after this change is not that reusable and simple enough to warrant not having a separate method for it.


pkg/storage/replica_raftstorage.go, line 485 at r5 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

What are these? Filenames? Do we need to carry these around here if we can already look up all of the SSTs using an SstSnapshotStorage?

Constructing a SSTSnapshotStorage enables us to look up which SSTs we created, but it uses a glob to determine the SSTs which is probably not ideal.


pkg/storage/replica_raftstorage.go, line 852 at r5 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I'm realizing that this might have a downside of causing the SST we ingest to always get stuck high in the LSM instead of making its way lower down. We're going to want to play with both approaches to deleting this data (in the batch, in the SSTs) to determine which way performs better.

Left a TODO for this.


pkg/storage/replica_sst_snapshot_storage.go, line 28 at r5 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

s/SstSnapshotStorage/SSTSnapshotStorage/

I think it will be worth pulling out an interface here like we do with SideloadStorage so that we can create a real and an in-memory implementation.

Made the substitution change. I refactored SSTSnapshotStorage so that it's more easily made into an interface. I was just thinking what an appropriate in-memory implementation would look like? SSTSnapshotStorage would have to output SSTs to disk for ingestion, so I'm finding it hard to imagine how an in-memory implementation would be useful.


pkg/storage/store_snapshot.go, line 188 at r5 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Should we flush a batch we have accumulated here?

Discussed offline. Previously the batches were chunked to be streamed. Since we are constructing a batch on the receiver side, there isn't a need to chunk it. It's probably good to change kv strategy to only have a single batch instead of an array of batches.

Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This came out well! Thanks for splitting out the first two commits, that made reviewing a lot easier (third comment needs fat comment, but it's a WIP so no complaints). I left plenty of comments, but none of them are substantial, if I had to summarize them I'd go with "Make it a pleasure to understand precisely how it all works from the code". I'm pretty excited about getting this in, cranking the range size to 10gb and seeing all sorts of fires start to burn. I know of a few places where they will already (looking at you, consistency checker), but pretty sure not all.

Reviewed 10 of 14 files at r1, 15 of 15 files at r6, 4 of 4 files at r7, 13 of 13 files at r8.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jeffrey-xiao and @nvanbenschoten)


c-deps/libroach/include/libroach.h, line 471 at r7 (raw file):

// Truncates the writer and stores the constructed file's contents in *data.
// May be called multiple times. The function may not truncate and return all

This is saying that the returned data won't necessarily reflect the latest writes (only those in completed blocks, though that's an impl detail), right? Maybe you can use that wording, I was thrown off by "may not truncate and return all keys" trying to figure out what exactly was meant.


pkg/keys/keys.go, line 979 at r8 (raw file):

// RangeSSTSnapshotInProgress returns a range-local key for the snapshot in
// progress, in any.

"in any"


pkg/storage/client_raft_test.go, line 1074 at r5 (raw file):

Previously, jeffrey-xiao (Jeffrey Xiao) wrote…

The test fails without this line. The new code expects the UUID of the snapshot to be known from the Data field on the receiver side, so instead of failing with the expected error, it fails with uuid: UUID must be exactly 16 bytes long, got 0 bytes.

This isn't a migration concern because outside of tests, the Data field is always properly populated, right?


pkg/storage/replica_init.go, line 168 at r8 (raw file):

	// be idempotent.
	if found {
		sss, err := newSSTSnapshotStorage(r.store.cfg.Settings, desc.RangeID, sstSnapshotInProgressData.ID,

Surprised that you need to make a whole new thing here where the "other place" that does the ingestion gets the SSTs spoonfed. It'd be nice to have more symmetry.


pkg/storage/replica_init.go, line 173 at r8 (raw file):

			return err
		}
		if err := r.store.Engine().IngestExternalFiles(ctx, sss.ssts, true /* skipWritingSeqNo */, true /* modify */); err != nil {

This also deserves targeted crash testing, even though it looks very similar to the other code.


pkg/storage/replica_raftstorage.go, line 485 at r5 (raw file):

Previously, jeffrey-xiao (Jeffrey Xiao) wrote…

Constructing a SSTSnapshotStorage enables us to look up which SSTs we created, but it uses a glob to determine the SSTs which is probably not ideal.

Improve the comment, including whether the filenames are relative or absolute, whether this field is always populated. Also clarify what's in the Batch field.


pkg/storage/replica_raftstorage.go, line 904 at r8 (raw file):
Logic in this sentence is off. Suggestion

If there are no SSTs to ingest, don't write the snapshot progress key.


pkg/storage/replica_raftstorage.go, line 963 at r8 (raw file):
There are many more reasons we need to sync the WAL (some known, some probably unknown). I don't think it's worth listing them now, but just write

Sync to make the snapshot durably applied.

to make this less misleading.


pkg/storage/replica_raftstorage.go, line 970 at r8 (raw file):

SSTs have


pkg/storage/replica_raftstorage.go, line 972 at r8 (raw file):

	// has not yet been updated and the data SST has not been ingested. Any
	// errors past this point must therefore be treated as fatal. If the node
	// crashes before the data SST is ingested, the unreplicated range-ID local

ditto


pkg/storage/replica_raftstorage.go, line 977 at r8 (raw file):

// If there are no SSTs, there's nothing to ingest and we didn't write the snapshot in progress key earlier.


pkg/storage/replica_raftstorage.go, line 983 at r8 (raw file):

		// crash here is safe because the SSTs should be ingested on replica
		// startup.
		if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSTs, true /* skipWritingSeqNo */, true /* modify */); err != nil {

Is modify the option that hard-links into Rocks' dir and removes the original file? That seems fine then. I see you're skipWritingSeqNo, not sure how this works in the case in which it needs to be assigned a global seqno but you've probably looked into this and it just works.


pkg/storage/replica_sst_snapshot_storage.go, line 130 at r8 (raw file):

The current file must have been written to before being closed.


pkg/storage/store_snapshot.go, line 103 at r8 (raw file):

	// Fields used when sending snapshots.
	batchSize    int64
	sstChunkSize int64

comment


pkg/storage/store_snapshot.go, line 119 at r8 (raw file):

	var err error

	emptySST := true

comment (there isn't an obvious notion of empty since Truncate() can return empty even if something has been written)


pkg/storage/store_snapshot.go, line 120 at r8 (raw file):

	emptySST := true
	b := kvSS.newBatch()

Is it still necessary to provide this as a method? Just curious, it doesn't seem like it, but I'm also a fan of only creating a batch when it's being used immediately


pkg/storage/store_snapshot.go, line 123 at r8 (raw file):

// At the moment, we'll write at most one SST.
//
// TODO(jeffreyxiao): re-evaluate as the default range size grows.


pkg/storage/store_snapshot.go, line 127 at r8 (raw file):

	}

	lastSizeCheck := int64(0)

comment


pkg/storage/store_snapshot.go, line 164 at r8 (raw file):

					}
					emptySST = false
					if sst.DataSize-lastSizeCheck > kvSS.sstChunkSize {

Does sst.DataSize > 0 imply that Truncate returns a nonempty chunk? Call that out, and make sure it's actually true in the code below (i.e. assert).

Add a comment on kvSS.sstChunkSize that indicates that this is roughly how much data will be buffered in memory at any given time.


pkg/storage/store_snapshot.go, line 711 at r8 (raw file):

	switch header.Strategy {
	case SnapshotRequest_KV_BATCH:
		snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data)

Just confirmed to myself that this UUID has been sent forever, so this just moved earlier but will always be set in relevant old versions of CRDB.


pkg/storage/store_test.go, line 3374 at r8 (raw file):

	defer leaktest.AfterTest(t)()

	testKey := testutils.MakeKey(keys.MakeTablePrefix(50), roachpb.RKey("a"))

This test setup seems unnecessarily pedestrian. Can't you start a TestServer with on-disk storage, split the range, stop the server, then create an engine from the dir, set up the in-progress-key, close the engine, start the TestServer again and then run your store.MVCCGet via the TestServer (plus check that the snapshot dir you wrote in is gone, along with the marker key)?


pkg/storage/store_test.go, line 3430 at r8 (raw file):

		t.Fatal(err)
	}
	if err := sss.NewFile(); err != nil {

Your call, but require.NoError would've really helped you in this test.


pkg/storage/engine/rocksdb.go, line 2954 at r6 (raw file):

// ClearIterRange implements the Writer interface
func (fw *RocksDBSstFileWriter) ClearIterRange(iter Iterator, start, end MVCCKey) error {
	panic("unimplemented")

I don't feel great about making something an interface when it doesn't actually support lots of the operations (granted, the interfaces here have lots of room for improvement, too). You probably have a reason, but I don't (yet) know what it is. Add that to the commit message. Or, if it isn't needed, just don't implement the interface and remove the panics. If you'd like to check statically that the receiver implements the "relevant" part of the interface, you can do

type rocksDBSstFileWriterAdapter struct{*RocksDBSstFileWriter}
var _ Writer = (*rocksDBSstFileWriterAdapter)(nil)
func(fw *rocksDBSstFileWriterAdapter) Merge(...) error { panic("unimplemented") }
...

pkg/storage/engine/rocksdb.go, line 2980 at r6 (raw file):

// LogLogicalOp implements the Writer interface.
func (fw *RocksDBSstFileWriter) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails) {}

panic?


pkg/storage/engine/rocksdb_test.go, line 740 at r7 (raw file):

	const keyLen = 10
	const valLen = 1000
	ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}

Use a fixed timestamp here, unless you're going for randomness, in which case t.Log the timestamp so it's available when the test fails.


pkg/storage/engine/rocksdb_test.go, line 801 at r7 (raw file):

			"was not (len=%d)", len(sst1FinishBuf), len(resBuf2))
	}
}

Nice test.


pkg/storage/stateloader/stateloader.go, line 634 at r8 (raw file):
remove , if any, add:

If no record is found, returns true.

@jeffrey-xiao jeffrey-xiao force-pushed the kv-batch-stream branch 4 times, most recently from 5da9171 to 3f8b876 Compare July 24, 2019 19:39
Copy link
Contributor Author

@jeffrey-xiao jeffrey-xiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jeffrey-xiao, @nvanbenschoten, and @tbg)


pkg/storage/client_raft_test.go, line 1074 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

This isn't a migration concern because outside of tests, the Data field is always properly populated, right?

Yes, I don't think we need to do any migrations for this new expectation that the UUID is populated on the receiver side.


pkg/storage/replica_init.go, line 168 at r8 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Surprised that you need to make a whole new thing here where the "other place" that does the ingestion gets the SSTs spoonfed. It'd be nice to have more symmetry.

in replica_raftstorage.go the SSTs are known, whereas here the SSTs must be "rediscovered" based on the UUID that we stored. I wanted to delegate newSSTSnapshotStorage the responsibility of recovering using a UUID which is why I constructed a new one. Let me know if there's a better alternative though.


pkg/storage/replica_raftstorage.go, line 983 at r8 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Is modify the option that hard-links into Rocks' dir and removes the original file? That seems fine then. I see you're skipWritingSeqNo, not sure how this works in the case in which it needs to be assigned a global seqno but you've probably looked into this and it just works.

I wasn't exactly sure about these options, but it works for the tests I wrote and workloads I ran. From https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files#what-happens-when-you-ingest-a-file, it seems like setting it to true is a backwards compatibility concern. All the other uses of IngestExternalFiles in our codebase has modify = true and skipWritingSeqNo = true.


pkg/storage/store_snapshot.go, line 120 at r8 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Is it still necessary to provide this as a method? Just curious, it doesn't seem like it, but I'm also a fan of only creating a batch when it's being used immediately

I think newBatch is being overridden in tests.


pkg/storage/store_snapshot.go, line 164 at r8 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Does sst.DataSize > 0 imply that Truncate returns a nonempty chunk? Call that out, and make sure it's actually true in the code below (i.e. assert).

Add a comment on kvSS.sstChunkSize that indicates that this is roughly how much data will be buffered in memory at any given time.

I think you mean sst.DataSize-lastSizeCheck > 0 right? I don't think that sst.DataSize-lastSizeCheck > 0 implies that Truncate returns a non-empty chunk. It really depends on when the writer decides to flush. DataSize is something we maintain and will not necessarily reflect what we would get from truncate.


pkg/storage/store_snapshot.go, line 711 at r8 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Just confirmed to myself that this UUID has been sent forever, so this just moved earlier but will always be set in relevant old versions of CRDB.

Thanks for confirming!


pkg/storage/store_test.go, line 3374 at r8 (raw file):

Previously, tbg (Tobias Grieger) wrote…

This test setup seems unnecessarily pedestrian. Can't you start a TestServer with on-disk storage, split the range, stop the server, then create an engine from the dir, set up the in-progress-key, close the engine, start the TestServer again and then run your store.MVCCGet via the TestServer (plus check that the snapshot dir you wrote in is gone, along with the marker key)?

Thanks for the suggestion. This turned out to be a much better way to test it than what I did.


pkg/storage/engine/rocksdb.go, line 2954 at r6 (raw file):

Previously, tbg (Tobias Grieger) wrote…

I don't feel great about making something an interface when it doesn't actually support lots of the operations (granted, the interfaces here have lots of room for improvement, too). You probably have a reason, but I don't (yet) know what it is. Add that to the commit message. Or, if it isn't needed, just don't implement the interface and remove the panics. If you'd like to check statically that the receiver implements the "relevant" part of the interface, you can do

type rocksDBSstFileWriterAdapter struct{*RocksDBSstFileWriter}
var _ Writer = (*rocksDBSstFileWriterAdapter)(nil)
func(fw *rocksDBSstFileWriterAdapter) Merge(...) error { panic("unimplemented") }
...

I forgot why this should be a Writer. I've removed the panics.

@jeffrey-xiao
Copy link
Contributor Author

After an offline discussion with @nvanbenschoten and @bdarnell, it seems like it would be ideal for the entire operation to be atomic (and avoid writing SSTSnapshotInProgress for recovery). The problem with the previous approach was that the SSTs were created on the sender side, making it difficult to create SSTs to clear the subsumed replicas that are non-overlapping. Since we are building the SSTs on the receiver side, this becomes a non-issue. We would have three SSTs (range local keys, replicated range-id local keys, and data keys) that include range deletion tombstones for the subsumed replicas. The range deletion tombstones in the SSTs might overlap, but the SSTs won't overlap. How does this sound @tbg?

@tbg
Copy link
Member

tbg commented Jul 25, 2019

Oh, interesting. I thought that was the approach originally taken but that there were obstacles with it, I had no idea this was because we originally thought to construct SSTs at the sender. Apologies for not checking this assumption.
Yes, if we can make this work that would be preferable. Let's forget about subsumed replicas for a second and just consider the case in which the existing replica gets "shortened", i.e. it split and we're catching up the recipient replica across the split - I think that already covers 100% of the complexity in the sense that the remaining key range will act just the same as a replica you need to subsume.

You are essentially suggesting that we get the SSTs that we need by just taking the SSTs that assume no subsumption and extending them via suitable range deletion tombstones so that everything we need to subsume is covered, correct?

I think you need four snapshots to cover a range in general:

  1. unreplicated rangeID-based keys
  2. replicated rangeID-based keys
  3. replicated key-based parallel records (range descriptor, txn records, etc)
  4. user data

You are receiving a large snapshot, which potentially takes minutes. At this point, you don't want to lock the replicas you subsume, so while you can "peek" and get an idea of what they will be, you won't know for sure yet. Now you've written a bunch of SSTs (or you've received all the data, maybe you're not finishing the writers just yet but you've synced all to disk that you can); now you go into the apply phase where you actually learn what you need to subsume. Can you still throw appropriate range deletion tombstones into the SSTs, then finish them, and immediately move to ingestion? From what I understand, you can.
Since subsumption is not a common case (or so we hope) it might be easier if you could actually finish the SSTs whenever it's convenient (i.e. well before applying) and add new SSTs to your set if you need to cover additional keyspace (the SSTs contain only a range deletion or, if we need an actual datum to avoid it being "empty", it could contain a bogus deletion also implied by the range deletion tombstone).

Now back to subsuming replicas, does this add anything? As far as I can tell you'd just have to add more tombstones to the SSTs for some additional key ranges. I don't even think the tombstones would need to be overlapping since with each step you know what you already have (the original range plus anything subsumed so far).

@tbg
Copy link
Member

tbg commented Jul 25, 2019

Regarding your split-merge example above, I think your precise example can't happen as written, but a similar one can. Here's how the keyspace is partitioned in the steps in your example:

a      b      c      d
|--1---|---2---------|    S1
|--1---|---2--|---3--|    S2
|--1----------|---3--|    S3

(Pipe denotes range boundary, numbers are rangeIDs, S1 is step 1, etc). The thing that can't happen is that r2 merges into r1 while n2 is down. This is because the merge will not commit unless all replicas catch up to the "beginning" of the merge transaction. You're saying that a snapshot of r1 from S3 can meet S1 on n2, but this means that r2 has never caught up to the beginning of the merge - if it had, it would've split off r3. So this particular example is bust, but there is another one that we should talk about next.

This next example is interesting because it shows us that there are very likely bugs. The example is actually in a comment in today's code (there's a typo at the beginning, should be [c,e) not [b,e):

// TODO(benesch): we may be unnecessarily forcing another Raft snapshot here
// by subsuming too much. Consider the case where [a, b) and [c, e) first
// merged into [a, e), then split into [a, d) and [d, e), and we're applying a
// snapshot that spans this merge and split. The bounds of this snapshot will
// be [a, d), so we'll subsume [c, e). But we're still a member of [d, e)!
// We'll currently be forced to get a Raft snapshot to catch up. Ideally, we'd
// subsume only half of [c, e) and synthesize a new RHS [d, e), effectively
// applying both the split and merge during snapshot application. This isn't a
// huge deal, though: we're probably behind enough that the RHS would need to
// get caught up with a Raft snapshot anyway, even if we synthesized it
// properly.

and it goes like this:

a      b      c      d     e
|--1----------|--2---------|  S1
|--1-----------------------|  S2
|--1-----------------|--3--|  S3

So the ranges merge, then split again at key that was formerly in the RHS. Since the merge is the first operation that happens, a follower could be down when it completes, which means reasonably a r1-snapshot from S3 can hit r1 from S1. It will overlap r2 (from S1) which will become a subsuming replica.

Now the comment in the code goes on to claim that this is a problem because [c,e) from S1 is basically just an older version of [d,e) from S3 that would catch up (i.e. we're gc'ing a live replica, which would actually constitute a fat correctness problem, which the comment doesn't recognize), but I think that's wrong actually - they're different ranges. In S1, [c,e) is r2; in S3 [d,e) is r3. Subsuming is the right thing to do. In fact, r2@S1 also blocks incoming snapshots from r3@S3, and these snapshots cannot just remove r2@S1 because they have to prove that r1@S1 cannot catch up to the merge trigger any more (if the merge trigger fires and the subsumed replica is missing, things go boom). In practice this is enforced by the replica GC queue, which will know to check whether the LHS of the merge is still on that store and prevents GC in that case.

But remember why we were looking at this example originally: yes, the subsuming replicas can definitely extend past the snapshot - even though a comment a bit further up claims the exact opposite and even gives an argument:

// Any replicas that overlap with the bounds of the incoming snapshot are ours
// to subsume; further, the end of the last overlapping replica will exactly
// align with the end of the snapshot. How are we guaranteed this? Each merge

The comment fails to recognize that the snapshot can reflect a split (i.e. S2->S3), in the absence of splits it would check out. However, it seems that the code that actually collects the subsumed replicas

for endKey.Less(inSnap.State.Desc.EndKey) {
sRepl := r.store.LookupReplica(endKey)
if sRepl == nil || !endKey.Equal(sRepl.Desc().StartKey) {
log.Fatalf(ctx, "snapshot widens existing replica, but no replica exists for subsumed key %s", endKey)
}
sRepl.raftMu.Lock()
subsumedRepls = append(subsumedRepls, sRepl)
endKey = sRepl.Desc().EndKey
}

does pick up an "extending" last replica without problems. We'll have to check whether this "lines up exactly" comment is actually relied upon anywhere.

@tbg
Copy link
Member

tbg commented Jul 25, 2019

PS #36611 (comment) has some relevant example about a bug that exists today where we allow a snapshot in that shouldn't. This doesn't imply anything on this PR (we just have to fix the checks) but might be interesting to look at anyway.

Rename Add to Put and Delete to Clear. Additionally implement ClearRange
using DBSstFileWriterDeleteRange and ClearRangeIter using a Clear on all
iterated keys on the Go side.

Release note: None
This method truncates the SSTfile being written and returns the data
that was truncated. It can be called multiple times when writing an SST
file and can be used to chunk an SST file into pieces. Since SSTs are
built in an append-only manner, the concatenated chunks is equivalent to
an SST built without using Truncate and using Finish.

Release note: None
SSTSnapshotStorage is associated with a store and can be used to create
SSTSnapshotStorageScratches. Each SSTSnapshotStorageScratch is
associated with a snapshot and keeps track of the SSTs incrementally
created when receiving a snapshot.

Release note: None
Incrementally build SSTs from the batches sent in a KV_BATCH snapshot.
This logic is only on the receiver side for ease of testing and
compatibility.

The complications of subsumed replicas that are not fully contained by
the current replica are also handled. The following is an example of
this case happening.

a       b       c       d
|---1---|-------2-------|  S1
|---1-------------------|  S2
|---1-----------|---3---|  S3

Since the merge is the first operation to happen, a follower could be
down before it completes. It is reasonable for r1-snapshot from S3 to
subsume both r1 and r2 in S1. Note that it's impossible for a replica to
subsume anything to its left.

The maximum number of SSTs created using the strategy is 4 + SR + 2
where SR is the number of subsumed replicas.

- Three SSTs get created when the snapshot is being received (range
  local keys, replicated range-id local keys, and user keys).
- One SST is constructed for the unreplicated range-id local keys when
  the snapshot is being applied.
- One SST is constructed for every subsumed replica to clear the
  range-id local keys. These SSTs consist of one range deletion
  tombstone and one RaftTombstoneKey.
- A maximum of two SSTs for all subsumed replicas are constructed to
  account the case of not fully contained subsumed replicas. We need to
  delete the key space of the subsumed replicas that we did not delete
  in the previous SSTs. We need one for the range-local keys and one for
  the user keys. These SSTs consist of normal tombstones, one range
  deletion tombstone, or they could be empty.

This commit also introduced a cluster setting
"kv.snapshot_sst.sync_size" which defines the maximum SST chunk size
before fsync-ing. Fsync-ing is necessary to prevent the OS from
accumulating such a large buffer that it blocks unrelated small/fast
writes for a long time when it flushes.

Release note (performance improvement): Snapshots sent between replicas
are now applied more performantly and use less memory.
@jeffrey-xiao
Copy link
Contributor Author

bors try

craig bot pushed a commit that referenced this pull request Aug 9, 2019
@craig
Copy link
Contributor

craig bot commented Aug 9, 2019

try

Build succeeded

@jeffrey-xiao
Copy link
Contributor Author

Thanks for all your help reviewing this PR!

bors r+

craig bot pushed a commit that referenced this pull request Aug 9, 2019
38932: storage: build SSTs from KV_BATCH snapshot r=jeffrey-xiao a=jeffrey-xiao

Implements the SST snapshot strategy discussed in #16954 and partially implemented in #25134 and #38873, but only have the logic on the receiver side for ease of testing and compatibility. This PR also handles the complications of subsumed replicas that are not fully contained by the current replica.

The maximum number of SSTs created using this strategy is 4 + SR + 2 where SR is the number of subsumed replicas.

- Three SSTs get streamed from the sender (range local keys, replicated range-id local keys, and data keys)
- One SST is constructed for the unreplicated range-id local keys.
- One SST is constructed for every subsumed replica to clear the range-id local keys. These SSTs consists of one range deletion tombstone and one `RaftTombstone` key.
- A maximum of two SSTs for all subsumed replicas to account for the case of not fully contained subsumed replicas. Note that currently, subsumed replicas can have keys right of the current replica, but not left of, so there will be a maximum of one SST created for the range-local keys and one for the data keys. These SSTs consist of one range deletion tombstone.

This number can be further reduced to 3 + SR if we pass the file handles and sst writers from the receiving step to the application step. We can combine the SSTs of the unreplicated range id and replicated id, and the range local of the subsumed replicas and data SSTs of the subsumed replicas. We probably don't want to do this optimization since we'll have to undo this optimization if we start constructing the SSTs from the sender or start chunking large SSTs into smaller SSTs.

Blocked by facebook/rocksdb#5649.

# Test Plan

- [x] Testing knob to inspect SSTs before ingestion. Ensure that expected SSTs for subsumed replicas are ingested.
- [x] Unit tests for `SSTSnapshotStorage`.
 
# Metrics and Evaluation

One way to evaluate this change is the following steps:

1. Setup 3 node cluster
2. Set default Raft log truncation threshold to some low constant:
```go
defaultRaftLogTruncationThreshold = envutil.EnvOrDefaultInt64(
    "COCKROACH_RAFT_LOG_TRUNCATION_THRESHOLD", 128<<10 /* 128 KB */)
```
3. Set `range_min_bytes` to 0 and `range_max_bytes` to some large number.
4. Increase `kv.snapshot_recovery.max_rate` and `kv.snapshot_rebalance.max_rate` to some large number.
5. Disable load-based splitting.
6. Stop node 2.
7. Run an insert heavy workload (kv0) on the cluster.
8. Start node 2.
9. Time how long it takes for node 2 to have all the ranges.

Roachtest: https://gist.github.com/jeffrey-xiao/e69fcad04968822d603f6807ca77ef3b

We can have two independent variables

1. Fixed total data size (4000000 ops; ~3.81 GiB), variable number of splits
- 32 splits (~121 MiB ranges)
- 64 splits (~61.0 MiB ranges)
- 128 splits (~31.2 MiB ranges)
- 256 splits (~15.7 MiB ranges)
- 512 splits (~7.9 MiB ranges)
- 1024 splits (~3.9 MiB ranges)
2. Fixed number of splits (32), variable total data size
- 125000 (~ 3.7 MiB ranges)
- 250000 (~7.5 MiB ranges)
- 500000 (~15 MiB ranges)
- 1000000 (~30 MiB ranges)
- 2000000 (60 MiB ranges)
- 4000000 (121 MiB ranges)

# Fsync Chunk Size

The size of the SST chunk that we write before fsync-ing impacts how fast node 2 has all the ranges. I've experimented 32 splits and an median range size of 121 MB with no fsync-ing (~27s recovery), fsync-ing in 8 MB chunks (~30s recovery), fsync-ing in 2 MB chunks (~40s recovery), fsync-ing in 256 KB chunks (~42s recovery). The default bulk sst sync rate is 2MB and #20352 sets `bytes_per_sync` to 512 KB, so something between those options is probably good. The reason we would want to fsync is to prevent the OS from accumulating such a large buffer that it blocks unrelated small/fast writes for a long time when it flushes.

# Impact on Foreground Traffic

For testing the impact on foreground traffic, I ran kv0 on a four node cluster with the merge queue and split queue disabled and starting with a constant number of splits. After 5 minutes, I decommissioned node 1 so its replicas would drain to other nodes using snapshots.

Roachtest: https://gist.github.com/jeffrey-xiao/5d9443a37b0929884aca927f9c320b6c

**Average Range Size of 3 MiB**
- [Before](https://user-images.githubusercontent.com/8853434/62398633-41a2bb00-b547-11e9-9e3d-747ee724943b.png)
- [After](https://user-images.githubusercontent.com/8853434/62398634-41a2bb00-b547-11e9-85e7-445b7989d173.png)

**Average Range Size of 32 MiB**
- [Before](https://user-images.githubusercontent.com/8853434/62398631-410a2480-b547-11e9-9019-86d3bd2e6f73.png)
- [After](https://user-images.githubusercontent.com/8853434/62398632-410a2480-b547-11e9-9513-8763e132e76b.png)

**Average Range Size 128 MiB**
- [Before](https://user-images.githubusercontent.com/8853434/62398558-15873a00-b547-11e9-8ab6-2e8e9bae658c.png)
- [After](https://user-images.githubusercontent.com/8853434/62398559-15873a00-b547-11e9-9c72-b3e90fce1acc.png)

We see p99 latency wins for larger range sizes and comparable performance for smaller range sizes.

Release note (performance improvement): Snapshots sent between replicas are now applied more performantly and use less memory.

Co-authored-by: Jeffrey Xiao <jeffrey.xiao1998@gmail.com>
@craig
Copy link
Contributor

craig bot commented Aug 9, 2019

Build succeeded

@craig craig bot merged commit b320ff5 into cockroachdb:master Aug 9, 2019
@tbg
Copy link
Member

tbg commented Aug 9, 2019 via email

)
unreplicatedStateLog := fmt.Sprintf(
"unreplicatedState=%0.0fms ",
stats.unreplicatedState.Sub(start).Seconds()*1000,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we didn't want to print this unless the legacy state is present (i.e. never). I'm also OK nuking this unconditionally since it's such a small write and won't ever matter

tbg added a commit to tbg/cockroach that referenced this pull request Aug 12, 2019
This reverts commit 717c185.

Apparently we violate the assertions. This needs to be fixed, but until
then, let's keep the ball rolling.

One likely culprit is cockroachdb#38932, see:

cockroachdb#39034 (comment)

Release note: None
craig bot pushed a commit that referenced this pull request Aug 12, 2019
39562: Revert "c-deps: fix assertion-enabled builds" r=knz a=tbg

This reverts commit 717c185.

Apparently we violate the assertions. This needs to be fixed, but until
then, let's keep the ball rolling.

The assertion failures typically take the form

> L0 file with seqno 90 90 vs. file with global_seqno 90
SIGABRT: abort

See for example #39559

One likely culprit is #38932, see:

#39034 (comment)

Release note: None

Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
@nvanbenschoten
Copy link
Member

When we revive this PR, we can update this comment:

// of keeping it all in memory at once. (Well, at least on the sender side. On
// the recipient side, we do still buffer it, but we'll fix that at some point).

craig bot pushed a commit that referenced this pull request Aug 15, 2019
39689: storage: reintroduce building SSTs from KV_BATCH snapshot r=jeffrey-xiao a=jeffrey-xiao

The final commit of #38932 was previously reverted to due an underlying bug in RocksDB with ingesting range deletion tombstones with a global seqno. See #39604 for discussion on the bug and cockroachdb/rocksdb#43 for the temporary short-term resolution of the bug.

Release note: None


Co-authored-by: Jeffrey Xiao <jeffrey.xiao1998@gmail.com>
@jeffrey-xiao jeffrey-xiao deleted the kv-batch-stream branch August 15, 2019 17:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants