From 0eeb0f57ef8482f6be11637e81879935948058fb Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 10 Apr 2026 11:31:52 +0800 Subject: [PATCH] [fix](cloud) Delete local rowsets before add_rowsets in cloud schema change (#62256) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Problem Summary: During cloud schema change, the MS (Meta Service) side correctly recycles rowsets in `[2, alter_version]` on the new tablet when committing the SC job. However, the BE side did not mirror this behavior — it directly called `add_rowsets` for the SC output without first removing existing local rowsets. This could leave stale rowsets (e.g., compaction outputs on the new tablet) visible in `_rs_version_map`, and since their delete bitmap does not cover the SC output rows, duplicate keys may appear in MOW tables. PR #61089 increased the likelihood of triggering this issue by enabling compaction on new tablets during SC, which makes it more common for the new tablet to have compaction rowsets with wider version ranges (e.g., `[818-822]`) that overlap with individual SC output rowsets (e.g., `[818],[819],...,[822]`). The `add_rowsets` overlap check (`to_add_v.contains(v)`) is one-directional: `[818].contains([818-822])` evaluates to false, so the stale compaction rowset was not removed. Fix: Before calling `add_rowsets` for SC output, delete all local rowsets in `[2, alter_version]` from the new tablet, mirroring the MS-side recycle behavior. A new `CloudTablet::delete_rowsets_for_schema_change` method is added that also removes edges from the version graph, preventing the greedy capture algorithm from preferring the wider stale compaction path over the individual SC output rowsets. --- be/src/cloud/cloud_schema_change_job.cpp | 23 +++ be/src/cloud/cloud_tablet.cpp | 28 +++ be/src/cloud/cloud_tablet.h | 7 + be/test/cloud/cloud_tablet_test.cpp | 248 +++++++++++++++++++++++ 4 files changed, 306 insertions(+) diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 5903ba2ba313c8..0719f08237bc99 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include "cloud/cloud_meta_mgr.h" @@ -515,6 +516,28 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread std::unique_lock lock {_new_tablet->get_sync_meta_lock()}; std::unique_lock wlock(_new_tablet->get_header_lock()); + // Mirror MS behavior: delete rowsets in [2, alter_version] before adding + // SC output rowsets to avoid stale compaction rowsets remaining visible. + { + int64_t alter_ver = sc_job->alter_version(); + std::vector to_delete; + for (auto& [v, rs] : _new_tablet->rowset_map()) { + if (v.first >= 2 && v.second <= alter_ver) { + to_delete.push_back(rs); + } + } + if (!to_delete.empty()) { + LOG_INFO( + "schema change: delete {} local rowsets in [2, {}] before adding SC " + "output, tablet_id={}, versions=[{}]", + to_delete.size(), alter_ver, _new_tablet->tablet_id(), + fmt::join(to_delete | std::views::transform([](const auto& rs) { + return rs->version().to_string(); + }), + ", ")); + _new_tablet->delete_rowsets_for_schema_change(to_delete, wlock); + } + } _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false); _new_tablet->set_cumulative_layer_point(_output_cumulative_point); _new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 75f3218c5f3cda..0c4d8547afd612 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -677,6 +677,34 @@ void CloudTablet::delete_rowsets(const std::vector& to_delete, _tablet_meta->modify_rs_metas({}, rs_metas, false); } +void CloudTablet::delete_rowsets_for_schema_change(const std::vector& to_delete, + std::unique_lock&) { + if (to_delete.empty()) { + return; + } + std::vector rs_metas; + rs_metas.reserve(to_delete.size()); + for (auto&& rs : to_delete) { + rs_metas.push_back(rs->rowset_meta()); + _rs_version_map.erase(rs->version()); + // Remove edge from version graph so that the greedy capture algorithm + // won't prefer the wider stale compaction rowset over individual SC + // output rowsets (e.g. [818-822] vs [818],[819],...,[822]). + _timestamped_version_tracker.delete_version(rs->version()); + } + + // Use same_version=true to skip adding to _stale_rs_metas. Do NOT use the + // stale tracking mechanism (_stale_rs_version_map / _stale_version_path_map) + // because SC output will create new rowsets with identical version ranges; + // a later compaction could put those into stale as well, causing two stale + // paths to reference the same version key -- when one path is cleaned first, + // the other hits a DCHECK(false) in delete_expired_stale_rowsets(). + _tablet_meta->modify_rs_metas({}, rs_metas, true); + + // Schedule for direct cache cleanup. MS has already recycled these rowsets. + add_unused_rowsets(to_delete); +} + uint64_t CloudTablet::delete_expired_stale_rowsets() { if (config::enable_mow_verbose_log) { LOG_INFO("begin delete_expired_stale_rowset for tablet={}", tablet_id()); diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index b27bd7c5b55d89..5def9eabea266e 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -159,6 +159,13 @@ class CloudTablet final : public BaseTablet { void delete_rowsets(const std::vector& to_delete, std::unique_lock& meta_lock); + // Like delete_rowsets, but also removes edges from the version graph. + // Used by schema change to prevent the greedy capture algorithm from + // preferring stale compaction rowsets over individual SC output rowsets. + // MUST hold EXCLUSIVE `_meta_lock`. + void delete_rowsets_for_schema_change(const std::vector& to_delete, + std::unique_lock& meta_lock); + // When the tablet is dropped, we need to recycle cached data: // 1. The data in file cache // 2. The memory in tablet cache diff --git a/be/test/cloud/cloud_tablet_test.cpp b/be/test/cloud/cloud_tablet_test.cpp index 904dc2e3fdf1e0..356bbc2e040d42 100644 --- a/be/test/cloud/cloud_tablet_test.cpp +++ b/be/test/cloud/cloud_tablet_test.cpp @@ -997,4 +997,252 @@ TEST_F(CloudTabletWarmUpStateTest, TestWarmedUpOverridesNotWarmedUp) { EXPECT_TRUE(_tablet->is_rowset_warmed_up(rowset->rowset_id())); } +class CloudTabletDeleteRowsetsForSchemaChangeTest : public testing::Test { +public: + CloudTabletDeleteRowsetsForSchemaChangeTest() : _engine(CloudStorageEngine(EngineOptions {})) {} + + void SetUp() override { + _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, + TCompressionType::LZ4F)); + _tablet = + std::make_shared(_engine, std::make_shared(*_tablet_meta)); + } + void TearDown() override {} + + RowsetSharedPtr create_rowset(Version version) { + auto rs_meta = std::make_shared(); + rs_meta->set_rowset_type(BETA_ROWSET); + rs_meta->set_version(version); + rs_meta->set_rowset_id(_engine.next_rowset_id()); + RowsetSharedPtr rowset; + Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset); + if (!st.ok()) { + return nullptr; + } + return rowset; + } + +protected: + TabletMetaSharedPtr _tablet_meta; + std::shared_ptr _tablet; + CloudStorageEngine _engine; +}; + +// Simulate the DORIS-25014 scenario: +// - New tablet has compacted rowset [2-6] from compaction during SC +// - SC produces individual output rowsets [2],[3],[4],[5],[6] +// - Without the fix, add_rowsets fails to remove [2-6] because +// [2].contains([2-6]) = false +// - With delete_rowsets_for_schema_change, the stale compaction rowset is +// removed from both _rs_version_map and version graph before add_rowsets +TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestSchemaChangeDeletesCompactionRowset) { + // Setup: add placeholder [0-1] and compacted rowset [2-6] + auto rs_placeholder = create_rowset(Version(0, 1)); + auto rs_compacted = create_rowset(Version(2, 6)); + ASSERT_NE(rs_placeholder, nullptr); + ASSERT_NE(rs_compacted, nullptr); + + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock, false); + } + // Verify initial state + ASSERT_EQ(_tablet->rowset_map().size(), 2); + ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6))); + + // SC produces individual rowsets + std::vector sc_output; + for (int v = 2; v <= 6; v++) { + auto rs = create_rowset(Version(v, v)); + ASSERT_NE(rs, nullptr); + sc_output.push_back(rs); + } + + // Simulate delete_rowsets_for_schema_change + add_rowsets + int64_t alter_version = 6; + { + std::unique_lock wlock(_tablet->get_header_lock()); + // Collect rowsets in [2, alter_version] + std::vector to_delete; + for (auto& [v, rs] : _tablet->rowset_map()) { + if (v.first >= 2 && v.second <= alter_version) { + to_delete.push_back(rs); + } + } + ASSERT_EQ(to_delete.size(), 1); // only [2-6] + ASSERT_EQ(to_delete[0]->version(), Version(2, 6)); + + _tablet->delete_rowsets_for_schema_change(to_delete, wlock); + + // [2-6] should be removed from rs_version_map + ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6))); + // Should NOT go to stale (to avoid stale path conflicts), but to unused + ASSERT_FALSE(_tablet->has_stale_rowsets()); + ASSERT_TRUE(_tablet->need_remove_unused_rowsets()); + + _tablet->add_rowsets(std::move(sc_output), false, wlock, false); + } + + // Verify: individual SC rowsets are now in rs_version_map + ASSERT_EQ(_tablet->rowset_map().size(), 6); // [0-1] + 5 individual + for (int v = 2; v <= 6; v++) { + ASSERT_TRUE(_tablet->rowset_map().count(Version(v, v))) + << "Missing version " << v << "-" << v; + } + ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6))); + + // Verify: capture_consistent_versions works correctly (no stale edges) + auto versions_result = _tablet->capture_consistent_versions_unlocked(Version(0, 6), {}); + ASSERT_TRUE(versions_result.has_value()) << versions_result.error(); + auto& versions = versions_result.value(); + ASSERT_EQ(versions.size(), 6); // [0-1] + [2],[3],[4],[5],[6] + ASSERT_EQ(versions[0], Version(0, 1)); + for (int i = 0; i < 5; i++) { + ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i)); + } +} + +// Test that delete_rowsets_for_schema_change with empty input is a no-op +TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestEmptyDeleteIsNoop) { + auto rs = create_rowset(Version(0, 1)); + ASSERT_NE(rs, nullptr); + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->add_rowsets({rs}, false, wlock, false); + } + ASSERT_EQ(_tablet->rowset_map().size(), 1); + + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->delete_rowsets_for_schema_change({}, wlock); + } + ASSERT_EQ(_tablet->rowset_map().size(), 1); + ASSERT_FALSE(_tablet->has_stale_rowsets()); +} + +// Test with multiple compaction rowsets spanning different version ranges +TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestMultipleCompactionRowsets) { + auto rs_placeholder = create_rowset(Version(0, 1)); + auto rs_comp1 = create_rowset(Version(2, 5)); + auto rs_comp2 = create_rowset(Version(6, 10)); + auto rs_post = create_rowset(Version(11, 11)); // after alter_version, should NOT be deleted + ASSERT_NE(rs_placeholder, nullptr); + ASSERT_NE(rs_comp1, nullptr); + ASSERT_NE(rs_comp2, nullptr); + ASSERT_NE(rs_post, nullptr); + + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->add_rowsets({rs_placeholder, rs_comp1, rs_comp2, rs_post}, false, wlock, false); + } + ASSERT_EQ(_tablet->rowset_map().size(), 4); + + // SC output: individual rowsets for versions 2-10 + std::vector sc_output; + for (int v = 2; v <= 10; v++) { + auto rs = create_rowset(Version(v, v)); + ASSERT_NE(rs, nullptr); + sc_output.push_back(rs); + } + + int64_t alter_version = 10; + { + std::unique_lock wlock(_tablet->get_header_lock()); + std::vector to_delete; + for (auto& [v, rs] : _tablet->rowset_map()) { + if (v.first >= 2 && v.second <= alter_version) { + to_delete.push_back(rs); + } + } + ASSERT_EQ(to_delete.size(), 2); // [2-5] and [6-10] + + _tablet->delete_rowsets_for_schema_change(to_delete, wlock); + + // Post-alter rowset should survive + ASSERT_TRUE(_tablet->rowset_map().count(Version(11, 11))); + ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 5))); + ASSERT_FALSE(_tablet->rowset_map().count(Version(6, 10))); + + _tablet->add_rowsets(std::move(sc_output), false, wlock, false); + } + + // Verify: [0-1], [2],[3],...,[10], [11-11] + ASSERT_EQ(_tablet->rowset_map().size(), 11); + + // Verify capture + auto versions_result = _tablet->capture_consistent_versions_unlocked(Version(0, 11), {}); + ASSERT_TRUE(versions_result.has_value()) << versions_result.error(); + auto& versions = versions_result.value(); + ASSERT_EQ(versions.size(), 11); + ASSERT_EQ(versions[0], Version(0, 1)); + for (int i = 0; i < 9; i++) { + ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i)); + } + ASSERT_EQ(versions[10], Version(11, 11)); +} + +// Reproduce the CI crash scenario: SC delete puts rowsets to stale, then +// compaction creates a new stale path with overlapping version keys. When +// one stale path is cleaned, the other hits DCHECK(false) because the +// version is already removed from _stale_rs_version_map. +// With the fix (bypassing stale tracking), this should not happen. +TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestNoStalePathConflictWithCompaction) { + // Setup: [0-1] placeholder, [2-6] compaction product during SC + auto rs_placeholder = create_rowset(Version(0, 1)); + auto rs_compacted = create_rowset(Version(2, 6)); + ASSERT_NE(rs_placeholder, nullptr); + ASSERT_NE(rs_compacted, nullptr); + + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock, false); + } + + // SC output: individual rowsets [2],[3],[4],[5],[6] + std::vector sc_output; + for (int v = 2; v <= 6; v++) { + sc_output.push_back(create_rowset(Version(v, v))); + } + + // Step 1: delete_rowsets_for_schema_change + add SC output + { + std::unique_lock wlock(_tablet->get_header_lock()); + _tablet->delete_rowsets_for_schema_change({rs_compacted}, wlock); + _tablet->add_rowsets(std::move(sc_output), false, wlock, false); + } + // Stale should be empty — SC delete bypasses stale tracking + ASSERT_FALSE(_tablet->has_stale_rowsets()); + + // Step 2: compaction merges SC output [2],[3],[4],[5],[6] -> [2-6] + auto rs_new_compacted = create_rowset(Version(2, 6)); + std::vector compaction_input; + { + std::unique_lock wlock(_tablet->get_header_lock()); + for (auto& [v, rs] : _tablet->rowset_map()) { + if (v.first >= 2 && v.second <= 6) { + compaction_input.push_back(rs); + } + } + ASSERT_EQ(compaction_input.size(), 5); + // Normal compaction delete_rowsets — this WILL use stale tracking + _tablet->delete_rowsets(compaction_input, wlock); + _tablet->add_rowsets({rs_new_compacted}, false, wlock, false); + } + // Now stale has the compaction inputs + ASSERT_TRUE(_tablet->has_stale_rowsets()); + + // Step 3: delete_expired_stale_rowsets — this is where CI crashed + // With old code: stale path from SC and compaction both reference [2-6] key, + // causing DCHECK(false). With fix: only compaction stale path exists, no conflict. + config::tablet_rowset_stale_sweep_time_sec = 0; // expire immediately + ASSERT_NO_FATAL_FAILURE(_tablet->delete_expired_stale_rowsets()); + + // Verify final state: [0-1] and [2-6] active, no stale left + ASSERT_EQ(_tablet->rowset_map().size(), 2); + ASSERT_TRUE(_tablet->rowset_map().count(Version(0, 1))); + ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6))); + ASSERT_FALSE(_tablet->has_stale_rowsets()); +} + } // namespace doris