diff --git a/be/src/storage/compaction/compaction.cpp b/be/src/storage/compaction/compaction.cpp index 6e553bfb901dfe..f8155c46668e78 100644 --- a/be/src/storage/compaction/compaction.cpp +++ b/be/src/storage/compaction/compaction.cpp @@ -1455,6 +1455,20 @@ Status CompactionMixin::modify_rowsets() { } tablet()->merge_delete_bitmap(output_rowset_delete_bitmap); + std::vector visible_rowsets; + tablet()->traverse_rowsets_unlocked( + [&visible_rowsets](const RowsetSharedPtr& rowset) { + visible_rowsets.emplace_back(rowset); + }, + false); + std::vector compacted_output_rowsets {_output_rowset}; + for (const auto& rowset : visible_rowsets) { + if (rowset->start_version() <= _output_version.second) { + continue; + } + RETURN_IF_ERROR(tablet()->update_delete_bitmap_without_lock( + _tablet, rowset, &compacted_output_rowsets)); + } RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); } } else { diff --git a/be/src/storage/compaction/cumulative_compaction_time_series_policy.cpp b/be/src/storage/compaction/cumulative_compaction_time_series_policy.cpp index 587fd370cfce2a..133f798480efa3 100644 --- a/be/src/storage/compaction/cumulative_compaction_time_series_policy.cpp +++ b/be/src/storage/compaction/cumulative_compaction_time_series_policy.cpp @@ -124,11 +124,13 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( int64_t continuous_size = 0; std::vector level1_rowsets; int64_t earliest_level1_rowset_creation_time = INT64_MAX; + bool ignore_level2_timeout = false; + DBUG_EXECUTE_IF("time_series_level2_file_count", { ignore_level2_timeout = true; }); for (const auto& rs_meta : checked_rs_metas) { if (rs_meta->compaction_level() == 0) { break; } - if (rs_meta->compaction_level() == 1 && + if (!ignore_level2_timeout && rs_meta->compaction_level() == 1 && (now - rs_meta->creation_time()) <= MAX_LEVEL2_COMPACTION_TIMEOUT) { continue; } @@ -145,6 +147,12 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( } } + DBUG_EXECUTE_IF("time_series_level2_file_count", { + if (level1_rowsets.size() >= compaction_file_count) { + return cast_set(level1_rowsets.size()); + } + }) + // Condition 5: level1 achieve compaction_time_threshold_seconds if (level1_rowsets.size() >= 2) { int64_t cumu_interval = now - earliest_level1_rowset_creation_time; @@ -373,12 +381,14 @@ int32_t TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( if (compaction_level >= 2) { int64_t continuous_size = 0; std::vector level1_rowsets; + bool ignore_level2_timeout = false; + DBUG_EXECUTE_IF("time_series_level2_file_count", { ignore_level2_timeout = true; }); for (const auto& rowset : candidate_rowsets) { const auto& rs_meta = rowset->rowset_meta(); if (rs_meta->compaction_level() == 0) { break; } - if (rs_meta->compaction_level() == 1 && + if (!ignore_level2_timeout && rs_meta->compaction_level() == 1 && (now - rs_meta->creation_time()) <= MAX_LEVEL2_COMPACTION_TIMEOUT) { continue; } @@ -478,4 +488,4 @@ int64_t TimeSeriesCumulativeCompactionPolicy::get_compaction_level( } #include "common/compile_check_end.h" -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/storage/delete/delete_bitmap_calculator.cpp b/be/src/storage/delete/delete_bitmap_calculator.cpp index 0dee5f8d4012f5..a8a6923cf856e6 100644 --- a/be/src/storage/delete/delete_bitmap_calculator.cpp +++ b/be/src/storage/delete/delete_bitmap_calculator.cpp @@ -175,17 +175,38 @@ Status MergeIndexDeleteBitmapCalculator::calculate_one(RowLocation& loc) { _heap->pop(); Slice cur_key; RETURN_IF_ERROR(cur_ctx->get_current_key(cur_key)); + RowLocation cur_loc {static_cast(cur_ctx->segment_id()), cur_ctx->row_id()}; + if (_rowid_length > 0) { + Slice key_without_seq = + Slice(cur_key.get_data(), cur_key.get_size() - _seq_col_length - _rowid_length); + Slice rowid_slice = + Slice(cur_key.get_data() + key_without_seq.get_size() + _seq_col_length + 1, + _rowid_length - 1); + RETURN_IF_ERROR(_rowid_coder->decode_ascending(&rowid_slice, _rowid_length, + (uint8_t*)&cur_loc.row_id)); + } if (!_last_key.empty() && _comparator.is_key_same(cur_key, _last_key)) { - loc.segment_id = cur_ctx->segment_id(); - loc.row_id = cur_ctx->row_id(); - if (_rowid_length > 0) { - Slice key_without_seq = Slice(cur_key.get_data(), - cur_key.get_size() - _seq_col_length - _rowid_length); - Slice rowid_slice = - Slice(cur_key.get_data() + key_without_seq.get_size() + _seq_col_length + 1, - _rowid_length - 1); - RETURN_IF_ERROR(_rowid_coder->decode_ascending(&rowid_slice, _rowid_length, - (uint8_t*)&loc.row_id)); + loc = cur_loc; + bool cur_row_wins = false; + if (_seq_col_length > 0) { + Slice cur_key_without_seq = Slice( + cur_key.get_data(), cur_key.get_size() - _seq_col_length - _rowid_length); + Slice last_key_without_seq = + Slice(_last_key.data(), _last_key.size() - _seq_col_length - _rowid_length); + Slice cur_sequence_val = + Slice(cur_key.get_data() + cur_key_without_seq.get_size() + 1, + _seq_col_length - 1); + Slice last_sequence_val = + Slice(_last_key.data() + last_key_without_seq.get_size() + 1, + _seq_col_length - 1); + cur_row_wins = cur_sequence_val.compare(last_sequence_val) > 0; + } else if (_rowid_length > 0 && _last_row_location.segment_id == cur_loc.segment_id) { + cur_row_wins = cur_loc.row_id > _last_row_location.row_id; + } + if (cur_row_wins) { + loc = _last_row_location; + _last_key = cur_key.to_string(); + _last_row_location = cur_loc; } auto st = cur_ctx->advance(); if (st.ok()) { @@ -195,10 +216,32 @@ Status MergeIndexDeleteBitmapCalculator::calculate_one(RowLocation& loc) { } return Status::OK(); } + _last_key = cur_key.to_string(); + _last_row_location = cur_loc; if (_heap->empty()) { + if (_rowid_length > 0) { + Status st = cur_ctx->advance(); + if (st.is()) { + continue; + } + RETURN_IF_ERROR(st); + _heap->push(cur_ctx); + continue; + } break; } - _last_key = cur_key.to_string(); + if (_rowid_length > 0) { + // Cluster-key MOW primary-key indexes append rowid, so one segment may contain + // consecutive entries for the same unique key. Visit them in order so the older + // rowid can be deleted and the newest rowid remains visible. + Status st = cur_ctx->advance(); + if (st.is()) { + continue; + } + RETURN_IF_ERROR(st); + _heap->push(cur_ctx); + continue; + } auto nxt_ctx = _heap->top(); Slice nxt_key; RETURN_IF_ERROR(nxt_ctx->get_current_key(nxt_key)); diff --git a/be/src/storage/delete/delete_bitmap_calculator.h b/be/src/storage/delete/delete_bitmap_calculator.h index a51f98533bf4e0..c2c6f922d97fb9 100644 --- a/be/src/storage/delete/delete_bitmap_calculator.h +++ b/be/src/storage/delete/delete_bitmap_calculator.h @@ -99,6 +99,7 @@ class MergeIndexDeleteBitmapCalculator { RowsetId _rowset_id; std::unique_ptr _heap; std::string _last_key; + RowLocation _last_row_location; size_t _seq_col_length; size_t _rowid_length; const KeyCoder* _rowid_coder = nullptr; diff --git a/be/src/storage/tablet/base_tablet.cpp b/be/src/storage/tablet/base_tablet.cpp index a87e3a75656f87..38b46ba9b4bc9c 100644 --- a/be/src/storage/tablet/base_tablet.cpp +++ b/be/src/storage/tablet/base_tablet.cpp @@ -576,7 +576,17 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest } std::vector picked_segments; for (int j = num_segments - 1; j >= 0; j--) { - if (_key_is_not_in_segment(key_without_seq, segments_key_bounds[j], + auto segment_key_bounds = segments_key_bounds[j]; + if (rowid_length > 0) { + // Cluster-key MOW segment bounds are built from primary-key index entries, which + // include sequence value and rowid. The lookup range key has already stripped + // those suffixes, so compare only the unique-key prefix. + segment_key_bounds.mutable_min_key()->resize( + std::min(segment_key_bounds.min_key().size(), key_without_seq.get_size())); + segment_key_bounds.mutable_max_key()->resize( + std::min(segment_key_bounds.max_key().size(), key_without_seq.get_size())); + } + if (_key_is_not_in_segment(key_without_seq, segment_key_bounds, rs->rowset_meta()->is_segments_key_bounds_truncated())) { continue; } @@ -1363,6 +1373,7 @@ Status BaseTablet::commit_phase_update_delete_bitmap( RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; RowsetIdUnorderedSet rowset_ids_to_del; + bool recalc_all_cur_rowsets = false; int64_t cur_version; std::vector specified_rowsets; @@ -1385,6 +1396,13 @@ Status BaseTablet::commit_phase_update_delete_bitmap( RETURN_IF_ERROR(tablet->get_all_rs_id_unlocked(cur_version, &cur_rowset_ids)); _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); + if (!rowset_ids_to_del.empty()) { + // Some rowsets seen in flush phase were compacted before commit phase. Surviving + // rowsets remain in both sets and would be skipped by the delta path, so rebuild + // this txn's base-rowset delete bitmap against the current tablet rowsets. + recalc_all_cur_rowsets = true; + rowset_ids_to_add = cur_rowset_ids; + } specified_rowsets = tablet->get_rowset_by_ids(&rowset_ids_to_add); } for (const auto& to_del : rowset_ids_to_del) { @@ -1399,6 +1417,7 @@ Status BaseTablet::commit_phase_update_delete_bitmap( LOG(INFO) << "[Before Commit] construct delete bitmap tablet: " << tablet->tablet_id() << ", rowset_ids to add: " << rowset_ids_to_add.size() << ", rowset_ids to del: " << rowset_ids_to_del.size() + << ", recalc all cur rowsets: " << recalc_all_cur_rowsets << ", cur max_version: " << cur_version << ", transaction_id: " << txn_id << ", total rows: " << total_rows; pre_rowset_ids = cur_rowset_ids; diff --git a/be/test/storage/compaction/vertical_compaction_test.cpp b/be/test/storage/compaction/vertical_compaction_test.cpp index a39932e2a01fcf..87fd3735654355 100644 --- a/be/test/storage/compaction/vertical_compaction_test.cpp +++ b/be/test/storage/compaction/vertical_compaction_test.cpp @@ -528,6 +528,7 @@ class TestCompactionMixin : public CompactionMixin { std::unique_ptr rowid_conversion) { _input_rowsets = std::move(input_rowsets); _output_rowset = std::move(output_rowset); + _output_version = _output_rowset->version(); _rowid_conversion = std::move(rowid_conversion); _stats.rowid_conversion = _rowid_conversion.get(); auto st = modify_rowsets(); diff --git a/regression-test/data/audit/test_audit_log_behavior.out b/regression-test/data/audit/test_audit_log_behavior.out index bfb8d22da9d046..3a520735ca2c4d 100644 --- a/regression-test/data/audit/test_audit_log_behavior.out +++ b/regression-test/data/audit/test_audit_log_behavior.out @@ -1,6 +1,6 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !audit_log_schema -- -query_id varchar(48) Yes true \N +query_id varchar(128) Yes true \N time datetime(3) Yes true \N client_ip varchar(128) Yes true \N user varchar(128) Yes false \N NONE @@ -42,4 +42,3 @@ is_internal tinyint Yes false \N NONE workload_group text Yes false \N NONE compute_group text Yes false \N NONE stmt text Yes false \N NONE - diff --git a/regression-test/data/variant_p0/predefine/test_variant_compaction_with_sparse_limit.out b/regression-test/data/variant_p0/predefine/test_variant_compaction_with_sparse_limit.out index 42862a35f1047e..2b725164830292 100644 --- a/regression-test/data/variant_p0/predefine/test_variant_compaction_with_sparse_limit.out +++ b/regression-test/data/variant_p0/predefine/test_variant_compaction_with_sparse_limit.out @@ -420,4 +420,3 @@ 3 {"a":1,"b":{"c":1},"ddd":1,"sala":0,"z":10} 4 {"a":1,"b":{"c":1},"ddd":1,"sala":0,"z":10} 5 {"a":1,"b":{"c":1},"ddd":1,"sala":0,"z":10} - diff --git a/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy b/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy index 09bb3c18a93725..9a613360b0e2a1 100644 --- a/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy +++ b/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy @@ -76,6 +76,18 @@ suite("test_mow_compact_multi_segments", "nonConcurrent") { } } + def getRowsetId = { tablet, rowsetIndex -> + String compactionUrl = tablet["CompactionStatus"] + def (code, out, err) = curl("GET", compactionUrl) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + assertTrue(tabletJson.rowsets.size() >= rowsetIndex) + def rowset = tabletJson.rowsets.get(rowsetIndex - 1) + return rowset.split("\\s+")[4] + } + def getLocalDeleteBitmapStatus = { tablet -> String tablet_id = tablet.TabletId String trigger_backend_id = tablet.BackendId @@ -232,6 +244,7 @@ suite("test_mow_compact_multi_segments", "nonConcurrent") { getTabletStatus(tablet, 3, 6) def local_dm = getLocalDeleteBitmapStatus(tablet) logger.info("local delete bitmap 1: " + local_dm) + def compacted_rowset_id = getRowsetId(tablet, 3) // trigger compaction for load 2 GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", @@ -255,5 +268,9 @@ suite("test_mow_compact_multi_segments", "nonConcurrent") { GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") // local local_dm = getLocalDeleteBitmapStatus(tablet) logger.info("local delete bitmap 2: " + local_dm) - assertEquals(1, local_dm["delete_bitmap_count"]) + assertTrue(local_dm["delete_bitmap_count"] >= 1) + assertTrue(local_dm["delete_bitmap_count"] <= 2) + local_dm["delete_bitmap"].keySet().each { + assertFalse(it.contains(compacted_rowset_id)) + } }