Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <memory>
#include <mutex>
#include <random>
#include <ranges>
#include <thread>

#include "cloud/cloud_meta_mgr.h"
Expand Down Expand Up @@ -515,6 +516,28 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
// during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread
std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
std::unique_lock wlock(_new_tablet->get_header_lock());
// Mirror MS behavior: delete rowsets in [2, alter_version] before adding
// SC output rowsets to avoid stale compaction rowsets remaining visible.
{
int64_t alter_ver = sc_job->alter_version();
std::vector<RowsetSharedPtr> to_delete;
for (auto& [v, rs] : _new_tablet->rowset_map()) {
if (v.first >= 2 && v.second <= alter_ver) {
to_delete.push_back(rs);
}
}
if (!to_delete.empty()) {
LOG_INFO(
"schema change: delete {} local rowsets in [2, {}] before adding SC "
"output, tablet_id={}, versions=[{}]",
to_delete.size(), alter_ver, _new_tablet->tablet_id(),
fmt::join(to_delete | std::views::transform([](const auto& rs) {
return rs->version().to_string();
}),
", "));
_new_tablet->delete_rowsets_for_schema_change(to_delete, wlock);
}
}
_new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false);
_new_tablet->set_cumulative_layer_point(_output_cumulative_point);
_new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
Expand Down
28 changes: 28 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,34 @@ void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
_tablet_meta->modify_rs_metas({}, rs_metas, false);
}

void CloudTablet::delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>& to_delete,
std::unique_lock<std::shared_mutex>&) {
if (to_delete.empty()) {
return;
}
std::vector<RowsetMetaSharedPtr> rs_metas;
rs_metas.reserve(to_delete.size());
for (auto&& rs : to_delete) {
rs_metas.push_back(rs->rowset_meta());
_rs_version_map.erase(rs->version());
// Remove edge from version graph so that the greedy capture algorithm
// won't prefer the wider stale compaction rowset over individual SC
// output rowsets (e.g. [818-822] vs [818],[819],...,[822]).
_timestamped_version_tracker.delete_version(rs->version());
}

// Use same_version=true to skip adding to _stale_rs_metas. Do NOT use the
// stale tracking mechanism (_stale_rs_version_map / _stale_version_path_map)
// because SC output will create new rowsets with identical version ranges;
// a later compaction could put those into stale as well, causing two stale
// paths to reference the same version key -- when one path is cleaned first,
// the other hits a DCHECK(false) in delete_expired_stale_rowsets().
_tablet_meta->modify_rs_metas({}, rs_metas, true);

// Schedule for direct cache cleanup. MS has already recycled these rowsets.
add_unused_rowsets(to_delete);
}

uint64_t CloudTablet::delete_expired_stale_rowsets() {
if (config::enable_mow_verbose_log) {
LOG_INFO("begin delete_expired_stale_rowset for tablet={}", tablet_id());
Expand Down
7 changes: 7 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ class CloudTablet final : public BaseTablet {
void delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
std::unique_lock<std::shared_mutex>& meta_lock);

// Like delete_rowsets, but also removes edges from the version graph.
// Used by schema change to prevent the greedy capture algorithm from
// preferring stale compaction rowsets over individual SC output rowsets.
// MUST hold EXCLUSIVE `_meta_lock`.
void delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>& to_delete,
std::unique_lock<std::shared_mutex>& meta_lock);

// When the tablet is dropped, we need to recycle cached data:
// 1. The data in file cache
// 2. The memory in tablet cache
Expand Down
248 changes: 248 additions & 0 deletions be/test/cloud/cloud_tablet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,4 +997,252 @@ TEST_F(CloudTabletWarmUpStateTest, TestWarmedUpOverridesNotWarmedUp) {
EXPECT_TRUE(_tablet->is_rowset_warmed_up(rowset->rowset_id()));
}

class CloudTabletDeleteRowsetsForSchemaChangeTest : public testing::Test {
public:
CloudTabletDeleteRowsetsForSchemaChangeTest() : _engine(CloudStorageEngine(EngineOptions {})) {}

void SetUp() override {
_tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
_tablet =
std::make_shared<CloudTablet>(_engine, std::make_shared<TabletMeta>(*_tablet_meta));
}
void TearDown() override {}

RowsetSharedPtr create_rowset(Version version) {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_rowset_type(BETA_ROWSET);
rs_meta->set_version(version);
rs_meta->set_rowset_id(_engine.next_rowset_id());
RowsetSharedPtr rowset;
Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
if (!st.ok()) {
return nullptr;
}
return rowset;
}

protected:
TabletMetaSharedPtr _tablet_meta;
std::shared_ptr<CloudTablet> _tablet;
CloudStorageEngine _engine;
};

// Simulate the DORIS-25014 scenario:
// - New tablet has compacted rowset [2-6] from compaction during SC
// - SC produces individual output rowsets [2],[3],[4],[5],[6]
// - Without the fix, add_rowsets fails to remove [2-6] because
// [2].contains([2-6]) = false
// - With delete_rowsets_for_schema_change, the stale compaction rowset is
// removed from both _rs_version_map and version graph before add_rowsets
TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestSchemaChangeDeletesCompactionRowset) {
// Setup: add placeholder [0-1] and compacted rowset [2-6]
auto rs_placeholder = create_rowset(Version(0, 1));
auto rs_compacted = create_rowset(Version(2, 6));
ASSERT_NE(rs_placeholder, nullptr);
ASSERT_NE(rs_compacted, nullptr);

{
std::unique_lock wlock(_tablet->get_header_lock());
_tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock, false);
}
// Verify initial state
ASSERT_EQ(_tablet->rowset_map().size(), 2);
ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6)));

// SC produces individual rowsets
std::vector<RowsetSharedPtr> sc_output;
for (int v = 2; v <= 6; v++) {
auto rs = create_rowset(Version(v, v));
ASSERT_NE(rs, nullptr);
sc_output.push_back(rs);
}

// Simulate delete_rowsets_for_schema_change + add_rowsets
int64_t alter_version = 6;
{
std::unique_lock wlock(_tablet->get_header_lock());
// Collect rowsets in [2, alter_version]
std::vector<RowsetSharedPtr> to_delete;
for (auto& [v, rs] : _tablet->rowset_map()) {
if (v.first >= 2 && v.second <= alter_version) {
to_delete.push_back(rs);
}
}
ASSERT_EQ(to_delete.size(), 1); // only [2-6]
ASSERT_EQ(to_delete[0]->version(), Version(2, 6));

_tablet->delete_rowsets_for_schema_change(to_delete, wlock);

// [2-6] should be removed from rs_version_map
ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6)));
// Should NOT go to stale (to avoid stale path conflicts), but to unused
ASSERT_FALSE(_tablet->has_stale_rowsets());
ASSERT_TRUE(_tablet->need_remove_unused_rowsets());

_tablet->add_rowsets(std::move(sc_output), false, wlock, false);
}

// Verify: individual SC rowsets are now in rs_version_map
ASSERT_EQ(_tablet->rowset_map().size(), 6); // [0-1] + 5 individual
for (int v = 2; v <= 6; v++) {
ASSERT_TRUE(_tablet->rowset_map().count(Version(v, v)))
<< "Missing version " << v << "-" << v;
}
ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6)));

// Verify: capture_consistent_versions works correctly (no stale edges)
auto versions_result = _tablet->capture_consistent_versions_unlocked(Version(0, 6), {});
ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
auto& versions = versions_result.value();
ASSERT_EQ(versions.size(), 6); // [0-1] + [2],[3],[4],[5],[6]
ASSERT_EQ(versions[0], Version(0, 1));
for (int i = 0; i < 5; i++) {
ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i));
}
}

// Test that delete_rowsets_for_schema_change with empty input is a no-op
TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestEmptyDeleteIsNoop) {
auto rs = create_rowset(Version(0, 1));
ASSERT_NE(rs, nullptr);
{
std::unique_lock wlock(_tablet->get_header_lock());
_tablet->add_rowsets({rs}, false, wlock, false);
}
ASSERT_EQ(_tablet->rowset_map().size(), 1);

{
std::unique_lock wlock(_tablet->get_header_lock());
_tablet->delete_rowsets_for_schema_change({}, wlock);
}
ASSERT_EQ(_tablet->rowset_map().size(), 1);
ASSERT_FALSE(_tablet->has_stale_rowsets());
}

// Test with multiple compaction rowsets spanning different version ranges
TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestMultipleCompactionRowsets) {
auto rs_placeholder = create_rowset(Version(0, 1));
auto rs_comp1 = create_rowset(Version(2, 5));
auto rs_comp2 = create_rowset(Version(6, 10));
auto rs_post = create_rowset(Version(11, 11)); // after alter_version, should NOT be deleted
ASSERT_NE(rs_placeholder, nullptr);
ASSERT_NE(rs_comp1, nullptr);
ASSERT_NE(rs_comp2, nullptr);
ASSERT_NE(rs_post, nullptr);

{
std::unique_lock wlock(_tablet->get_header_lock());
_tablet->add_rowsets({rs_placeholder, rs_comp1, rs_comp2, rs_post}, false, wlock, false);
}
ASSERT_EQ(_tablet->rowset_map().size(), 4);

// SC output: individual rowsets for versions 2-10
std::vector<RowsetSharedPtr> sc_output;
for (int v = 2; v <= 10; v++) {
auto rs = create_rowset(Version(v, v));
ASSERT_NE(rs, nullptr);
sc_output.push_back(rs);
}

int64_t alter_version = 10;
{
std::unique_lock wlock(_tablet->get_header_lock());
std::vector<RowsetSharedPtr> to_delete;
for (auto& [v, rs] : _tablet->rowset_map()) {
if (v.first >= 2 && v.second <= alter_version) {
to_delete.push_back(rs);
}
}
ASSERT_EQ(to_delete.size(), 2); // [2-5] and [6-10]

_tablet->delete_rowsets_for_schema_change(to_delete, wlock);

// Post-alter rowset should survive
ASSERT_TRUE(_tablet->rowset_map().count(Version(11, 11)));
ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 5)));
ASSERT_FALSE(_tablet->rowset_map().count(Version(6, 10)));

_tablet->add_rowsets(std::move(sc_output), false, wlock, false);
}

// Verify: [0-1], [2],[3],...,[10], [11-11]
ASSERT_EQ(_tablet->rowset_map().size(), 11);

// Verify capture
auto versions_result = _tablet->capture_consistent_versions_unlocked(Version(0, 11), {});
ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
auto& versions = versions_result.value();
ASSERT_EQ(versions.size(), 11);
ASSERT_EQ(versions[0], Version(0, 1));
for (int i = 0; i < 9; i++) {
ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i));
}
ASSERT_EQ(versions[10], Version(11, 11));
}

// Reproduce the CI crash scenario: SC delete puts rowsets to stale, then
// compaction creates a new stale path with overlapping version keys. When
// one stale path is cleaned, the other hits DCHECK(false) because the
// version is already removed from _stale_rs_version_map.
// With the fix (bypassing stale tracking), this should not happen.
TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestNoStalePathConflictWithCompaction) {
// Setup: [0-1] placeholder, [2-6] compaction product during SC
auto rs_placeholder = create_rowset(Version(0, 1));
auto rs_compacted = create_rowset(Version(2, 6));
ASSERT_NE(rs_placeholder, nullptr);
ASSERT_NE(rs_compacted, nullptr);

{
std::unique_lock wlock(_tablet->get_header_lock());
_tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock, false);
}

// SC output: individual rowsets [2],[3],[4],[5],[6]
std::vector<RowsetSharedPtr> sc_output;
for (int v = 2; v <= 6; v++) {
sc_output.push_back(create_rowset(Version(v, v)));
}

// Step 1: delete_rowsets_for_schema_change + add SC output
{
std::unique_lock wlock(_tablet->get_header_lock());
_tablet->delete_rowsets_for_schema_change({rs_compacted}, wlock);
_tablet->add_rowsets(std::move(sc_output), false, wlock, false);
}
// Stale should be empty — SC delete bypasses stale tracking
ASSERT_FALSE(_tablet->has_stale_rowsets());

// Step 2: compaction merges SC output [2],[3],[4],[5],[6] -> [2-6]
auto rs_new_compacted = create_rowset(Version(2, 6));
std::vector<RowsetSharedPtr> compaction_input;
{
std::unique_lock wlock(_tablet->get_header_lock());
for (auto& [v, rs] : _tablet->rowset_map()) {
if (v.first >= 2 && v.second <= 6) {
compaction_input.push_back(rs);
}
}
ASSERT_EQ(compaction_input.size(), 5);
// Normal compaction delete_rowsets — this WILL use stale tracking
_tablet->delete_rowsets(compaction_input, wlock);
_tablet->add_rowsets({rs_new_compacted}, false, wlock, false);
}
// Now stale has the compaction inputs
ASSERT_TRUE(_tablet->has_stale_rowsets());

// Step 3: delete_expired_stale_rowsets — this is where CI crashed
// With old code: stale path from SC and compaction both reference [2-6] key,
// causing DCHECK(false). With fix: only compaction stale path exists, no conflict.
config::tablet_rowset_stale_sweep_time_sec = 0; // expire immediately
ASSERT_NO_FATAL_FAILURE(_tablet->delete_expired_stale_rowsets());

// Verify final state: [0-1] and [2-6] active, no stale left
ASSERT_EQ(_tablet->rowset_map().size(), 2);
ASSERT_TRUE(_tablet->rowset_map().count(Version(0, 1)));
ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6)));
ASSERT_FALSE(_tablet->has_stale_rowsets());
}

} // namespace doris