diff --git a/db/version_edit.cc b/db/version_edit.cc index ddaadc58dac..284b65f7183 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -226,13 +226,17 @@ bool VersionEdit::EncodeTo(std::string* dst) const { } for (const auto& wal_addition : wal_additions_) { - PutVarint32(dst, kWalAddition); - wal_addition.EncodeTo(dst); + PutVarint32(dst, kWalAddition2); + std::string encoded; + wal_addition.EncodeTo(&encoded); + PutLengthPrefixedSlice(dst, encoded); } if (!wal_deletion_.IsEmpty()) { - PutVarint32(dst, kWalDeletion); - wal_deletion_.EncodeTo(dst); + PutVarint32(dst, kWalDeletion2); + std::string encoded; + wal_deletion_.EncodeTo(&encoded); + PutLengthPrefixedSlice(dst, encoded); } // 0 is default and does not need to be explicitly written @@ -375,6 +379,11 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { Status VersionEdit::DecodeFrom(const Slice& src) { Clear(); +#ifndef NDEBUG + bool ignore_ignorable_tags = false; + TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:IgnoreIgnorableTags", + &ignore_ignorable_tags); +#endif Slice input = src; const char* msg = nullptr; uint32_t tag = 0; @@ -385,6 +394,11 @@ Status VersionEdit::DecodeFrom(const Slice& src) { Slice str; InternalKey key; while (msg == nullptr && GetVarint32(&input, &tag)) { +#ifndef NDEBUG + if (ignore_ignorable_tags && tag > kTagSafeIgnoreMask) { + tag = kTagSafeIgnoreMask; + } +#endif switch (tag) { case kDbId: if (GetLengthPrefixedSlice(&input, &str)) { @@ -575,6 +589,23 @@ Status VersionEdit::DecodeFrom(const Slice& src) { break; } + case kWalAddition2: { + Slice encoded; + if (!GetLengthPrefixedSlice(&input, &encoded)) { + msg = "WalAddition not prefixed by length"; + break; + } + + WalAddition wal_addition; + const Status s = wal_addition.DecodeFrom(&encoded); + if (!s.ok()) { + return s; + } + + wal_additions_.emplace_back(std::move(wal_addition)); + break; + } + case kWalDeletion: { WalDeletion wal_deletion; const Status s = wal_deletion.DecodeFrom(&input); @@ -586,6 +617,23 @@ Status VersionEdit::DecodeFrom(const Slice& src) { break; } + case kWalDeletion2: { + Slice encoded; + if (!GetLengthPrefixedSlice(&input, &encoded)) { + msg = "WalDeletion not prefixed by length"; + break; + } + + WalDeletion wal_deletion; + const Status s = wal_deletion.DecodeFrom(&encoded); + if (!s.ok()) { + return s; + } + + wal_deletion_ = std::move(wal_deletion); + break; + } + case kColumnFamily: if (!GetVarint32(&input, &column_family_)) { if (!msg) { diff --git a/db/version_edit.h b/db/version_edit.h index 6b045878bc0..a80543a0da9 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -62,6 +62,8 @@ enum Tag : uint32_t { kWalAddition, kWalDeletion, kFullHistoryTsLow, + kWalAddition2, + kWalDeletion2, }; enum NewFileCustomTag : uint32_t { diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index a0869b3c76f..43ae6840fb1 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -324,14 +324,22 @@ TEST_F(VersionEditTest, AddWalEncodeDecode) { TestEncodeDecode(edit); } +static std::string PrefixEncodedWalAdditionWithLength( + const std::string& encoded) { + std::string ret; + PutVarint32(&ret, Tag::kWalAddition2); + PutLengthPrefixedSlice(&ret, encoded); + return ret; +} + TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) { std::string encoded; - PutVarint32(&encoded, Tag::kWalAddition); { // No log number. + std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded); VersionEdit edit; - Status s = edit.DecodeFrom(encoded); + Status s = edit.DecodeFrom(encoded_edit); ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") != std::string::npos) @@ -345,8 +353,10 @@ TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) { unsigned char* ptr = reinterpret_cast(&c); *ptr = 128; encoded.append(1, c); + + std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded); VersionEdit edit; - Status s = edit.DecodeFrom(encoded); + Status s = edit.DecodeFrom(encoded_edit); ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") != std::string::npos) @@ -358,14 +368,14 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) { constexpr WalNumber kLogNumber = 100; constexpr uint64_t kSizeInBytes = 100; - std::string encoded_without_tag; - PutVarint32(&encoded_without_tag, Tag::kWalAddition); - PutVarint64(&encoded_without_tag, kLogNumber); + std::string encoded; + PutVarint64(&encoded, kLogNumber); { // No tag. + std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded); VersionEdit edit; - Status s = edit.DecodeFrom(encoded_without_tag); + Status s = edit.DecodeFrom(encoded_edit); ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos) << s.ToString(); @@ -373,12 +383,15 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) { { // Only has size tag, no terminate tag. - std::string encoded_with_size = encoded_without_tag; + std::string encoded_with_size = encoded; PutVarint32(&encoded_with_size, static_cast(WalAdditionTag::kSyncedSize)); PutVarint64(&encoded_with_size, kSizeInBytes); + + std::string encoded_edit = + PrefixEncodedWalAdditionWithLength(encoded_with_size); VersionEdit edit; - Status s = edit.DecodeFrom(encoded_with_size); + Status s = edit.DecodeFrom(encoded_edit); ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos) << s.ToString(); @@ -386,11 +399,14 @@ TEST_F(VersionEditTest, AddWalDecodeBadTag) { { // Only has terminate tag. - std::string encoded_with_terminate = encoded_without_tag; + std::string encoded_with_terminate = encoded; PutVarint32(&encoded_with_terminate, static_cast(WalAdditionTag::kTerminate)); + + std::string encoded_edit = + PrefixEncodedWalAdditionWithLength(encoded_with_terminate); VersionEdit edit; - ASSERT_OK(edit.DecodeFrom(encoded_with_terminate)); + ASSERT_OK(edit.DecodeFrom(encoded_edit)); auto& wal_addition = edit.GetWalAdditions()[0]; ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber); ASSERT_FALSE(wal_addition.GetMetadata().HasSyncedSize()); @@ -401,15 +417,15 @@ TEST_F(VersionEditTest, AddWalDecodeNoSize) { constexpr WalNumber kLogNumber = 100; std::string encoded; - PutVarint32(&encoded, Tag::kWalAddition); PutVarint64(&encoded, kLogNumber); PutVarint32(&encoded, static_cast(WalAdditionTag::kSyncedSize)); // No real size after the size tag. { // Without terminate tag. + std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded); VersionEdit edit; - Status s = edit.DecodeFrom(encoded); + Status s = edit.DecodeFrom(encoded_edit); ASSERT_TRUE(s.IsCorruption()); ASSERT_TRUE(s.ToString().find("Error decoding WAL file size") != std::string::npos) @@ -419,8 +435,10 @@ TEST_F(VersionEditTest, AddWalDecodeNoSize) { { // With terminate tag. PutVarint32(&encoded, static_cast(WalAdditionTag::kTerminate)); + + std::string encoded_edit = PrefixEncodedWalAdditionWithLength(encoded); VersionEdit edit; - Status s = edit.DecodeFrom(encoded); + Status s = edit.DecodeFrom(encoded_edit); ASSERT_TRUE(s.IsCorruption()); // The terminate tag is misunderstood as the size. ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos) @@ -515,6 +533,62 @@ TEST_F(VersionEditTest, FullHistoryTsLow) { TestEncodeDecode(edit); } +// Tests that if RocksDB is downgraded, the new types of VersionEdits +// that have a tag larger than kTagSafeIgnoreMask can be safely ignored. +TEST_F(VersionEditTest, IgnorableTags) { + SyncPoint::GetInstance()->SetCallBack( + "VersionEdit::EncodeTo:IgnoreIgnorableTags", [&](void* arg) { + bool* ignore = static_cast(arg); + *ignore = true; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr uint64_t kPrevLogNumber = 100; + constexpr uint64_t kLogNumber = 200; + constexpr uint64_t kNextFileNumber = 300; + constexpr uint64_t kColumnFamilyId = 400; + + VersionEdit edit; + // Add some ignorable entries. + for (int i = 0; i < 2; i++) { + edit.AddWal(i + 1, WalMetadata(i + 2)); + } + edit.SetDBId("db_id"); + // Add unignorable entries. + edit.SetPrevLogNumber(kPrevLogNumber); + edit.SetLogNumber(kLogNumber); + // Add more ignorable entries. + edit.DeleteWalsBefore(100); + // Add unignorable entry. + edit.SetNextFile(kNextFileNumber); + // Add more ignorable entries. + edit.SetFullHistoryTsLow("ts"); + // Add unignorable entry. + edit.SetColumnFamily(kColumnFamilyId); + + std::string encoded; + ASSERT_TRUE(edit.EncodeTo(&encoded)); + + VersionEdit decoded; + ASSERT_OK(decoded.DecodeFrom(encoded)); + + // Check that all ignorable entries are ignored. + ASSERT_FALSE(decoded.HasDbId()); + ASSERT_FALSE(decoded.HasFullHistoryTsLow()); + ASSERT_FALSE(decoded.IsWalAddition()); + ASSERT_FALSE(decoded.IsWalDeletion()); + ASSERT_TRUE(decoded.GetWalAdditions().empty()); + ASSERT_TRUE(decoded.GetWalDeletion().IsEmpty()); + + // Check that unignorable entries are still present. + ASSERT_EQ(edit.GetPrevLogNumber(), kPrevLogNumber); + ASSERT_EQ(edit.GetLogNumber(), kLogNumber); + ASSERT_EQ(edit.GetNextFile(), kNextFileNumber); + ASSERT_EQ(edit.GetColumnFamily(), kColumnFamilyId); + + SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {