Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions be/src/storage/compaction/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,20 @@ Status CompactionMixin::modify_rowsets() {
}

tablet()->merge_delete_bitmap(output_rowset_delete_bitmap);
std::vector<RowsetSharedPtr> visible_rowsets;
tablet()->traverse_rowsets_unlocked(
[&visible_rowsets](const RowsetSharedPtr& rowset) {
visible_rowsets.emplace_back(rowset);
},
false);
std::vector<RowsetSharedPtr> compacted_output_rowsets {_output_rowset};
for (const auto& rowset : visible_rowsets) {
if (rowset->start_version() <= _output_version.second) {
continue;
}
RETURN_IF_ERROR(tablet()->update_delete_bitmap_without_lock(
_tablet, rowset, &compacted_output_rowsets));
}
RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
int64_t continuous_size = 0;
std::vector<RowsetMetaSharedPtr> level1_rowsets;
int64_t earliest_level1_rowset_creation_time = INT64_MAX;
bool ignore_level2_timeout = false;
DBUG_EXECUTE_IF("time_series_level2_file_count", { ignore_level2_timeout = true; });
for (const auto& rs_meta : checked_rs_metas) {
if (rs_meta->compaction_level() == 0) {
break;
}
if (rs_meta->compaction_level() == 1 &&
if (!ignore_level2_timeout && rs_meta->compaction_level() == 1 &&
(now - rs_meta->creation_time()) <= MAX_LEVEL2_COMPACTION_TIMEOUT) {
continue;
}
Expand All @@ -145,6 +147,12 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
}
}

DBUG_EXECUTE_IF("time_series_level2_file_count", {
if (level1_rowsets.size() >= compaction_file_count) {
return cast_set<int32_t>(level1_rowsets.size());
}
})

// Condition 5: level1 achieve compaction_time_threshold_seconds
if (level1_rowsets.size() >= 2) {
int64_t cumu_interval = now - earliest_level1_rowset_creation_time;
Expand Down Expand Up @@ -373,12 +381,14 @@ int32_t TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
if (compaction_level >= 2) {
int64_t continuous_size = 0;
std::vector<RowsetSharedPtr> level1_rowsets;
bool ignore_level2_timeout = false;
DBUG_EXECUTE_IF("time_series_level2_file_count", { ignore_level2_timeout = true; });
for (const auto& rowset : candidate_rowsets) {
const auto& rs_meta = rowset->rowset_meta();
if (rs_meta->compaction_level() == 0) {
break;
}
if (rs_meta->compaction_level() == 1 &&
if (!ignore_level2_timeout && rs_meta->compaction_level() == 1 &&
(now - rs_meta->creation_time()) <= MAX_LEVEL2_COMPACTION_TIMEOUT) {
continue;
}
Expand Down Expand Up @@ -478,4 +488,4 @@ int64_t TimeSeriesCumulativeCompactionPolicy::get_compaction_level(
}
#include "common/compile_check_end.h"

} // namespace doris
} // namespace doris
65 changes: 54 additions & 11 deletions be/src/storage/delete/delete_bitmap_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,38 @@ Status MergeIndexDeleteBitmapCalculator::calculate_one(RowLocation& loc) {
_heap->pop();
Slice cur_key;
RETURN_IF_ERROR(cur_ctx->get_current_key(cur_key));
RowLocation cur_loc {static_cast<uint32_t>(cur_ctx->segment_id()), cur_ctx->row_id()};
if (_rowid_length > 0) {
Slice key_without_seq =
Slice(cur_key.get_data(), cur_key.get_size() - _seq_col_length - _rowid_length);
Slice rowid_slice =
Slice(cur_key.get_data() + key_without_seq.get_size() + _seq_col_length + 1,
_rowid_length - 1);
RETURN_IF_ERROR(_rowid_coder->decode_ascending(&rowid_slice, _rowid_length,
(uint8_t*)&cur_loc.row_id));
}
if (!_last_key.empty() && _comparator.is_key_same(cur_key, _last_key)) {
loc.segment_id = cur_ctx->segment_id();
loc.row_id = cur_ctx->row_id();
if (_rowid_length > 0) {
Slice key_without_seq = Slice(cur_key.get_data(),
cur_key.get_size() - _seq_col_length - _rowid_length);
Slice rowid_slice =
Slice(cur_key.get_data() + key_without_seq.get_size() + _seq_col_length + 1,
_rowid_length - 1);
RETURN_IF_ERROR(_rowid_coder->decode_ascending(&rowid_slice, _rowid_length,
(uint8_t*)&loc.row_id));
loc = cur_loc;
bool cur_row_wins = false;
if (_seq_col_length > 0) {
Slice cur_key_without_seq = Slice(
cur_key.get_data(), cur_key.get_size() - _seq_col_length - _rowid_length);
Slice last_key_without_seq =
Slice(_last_key.data(), _last_key.size() - _seq_col_length - _rowid_length);
Slice cur_sequence_val =
Slice(cur_key.get_data() + cur_key_without_seq.get_size() + 1,
_seq_col_length - 1);
Slice last_sequence_val =
Slice(_last_key.data() + last_key_without_seq.get_size() + 1,
_seq_col_length - 1);
cur_row_wins = cur_sequence_val.compare(last_sequence_val) > 0;
} else if (_rowid_length > 0 && _last_row_location.segment_id == cur_loc.segment_id) {
cur_row_wins = cur_loc.row_id > _last_row_location.row_id;
}
if (cur_row_wins) {
loc = _last_row_location;
_last_key = cur_key.to_string();
_last_row_location = cur_loc;
}
auto st = cur_ctx->advance();
if (st.ok()) {
Expand All @@ -195,10 +216,32 @@ Status MergeIndexDeleteBitmapCalculator::calculate_one(RowLocation& loc) {
}
return Status::OK();
}
_last_key = cur_key.to_string();
_last_row_location = cur_loc;
if (_heap->empty()) {
if (_rowid_length > 0) {
Status st = cur_ctx->advance();
if (st.is<ErrorCode::END_OF_FILE>()) {
continue;
}
RETURN_IF_ERROR(st);
_heap->push(cur_ctx);
continue;
}
break;
}
_last_key = cur_key.to_string();
if (_rowid_length > 0) {
// Cluster-key MOW primary-key indexes append rowid, so one segment may contain
// consecutive entries for the same unique key. Visit them in order so the older
// rowid can be deleted and the newest rowid remains visible.
Status st = cur_ctx->advance();
if (st.is<ErrorCode::END_OF_FILE>()) {
continue;
}
RETURN_IF_ERROR(st);
_heap->push(cur_ctx);
continue;
}
auto nxt_ctx = _heap->top();
Slice nxt_key;
RETURN_IF_ERROR(nxt_ctx->get_current_key(nxt_key));
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/delete/delete_bitmap_calculator.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class MergeIndexDeleteBitmapCalculator {
RowsetId _rowset_id;
std::unique_ptr<Heap> _heap;
std::string _last_key;
RowLocation _last_row_location;
size_t _seq_col_length;
size_t _rowid_length;
const KeyCoder* _rowid_coder = nullptr;
Expand Down
21 changes: 20 additions & 1 deletion be/src/storage/tablet/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,17 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
}
std::vector<uint32_t> picked_segments;
for (int j = num_segments - 1; j >= 0; j--) {
if (_key_is_not_in_segment(key_without_seq, segments_key_bounds[j],
auto segment_key_bounds = segments_key_bounds[j];
if (rowid_length > 0) {
// Cluster-key MOW segment bounds are built from primary-key index entries, which
// include sequence value and rowid. The lookup range key has already stripped
// those suffixes, so compare only the unique-key prefix.
segment_key_bounds.mutable_min_key()->resize(
std::min(segment_key_bounds.min_key().size(), key_without_seq.get_size()));
segment_key_bounds.mutable_max_key()->resize(
std::min(segment_key_bounds.max_key().size(), key_without_seq.get_size()));
}
if (_key_is_not_in_segment(key_without_seq, segment_key_bounds,
rs->rowset_meta()->is_segments_key_bounds_truncated())) {
continue;
}
Expand Down Expand Up @@ -1363,6 +1373,7 @@ Status BaseTablet::commit_phase_update_delete_bitmap(
RowsetIdUnorderedSet cur_rowset_ids;
RowsetIdUnorderedSet rowset_ids_to_add;
RowsetIdUnorderedSet rowset_ids_to_del;
bool recalc_all_cur_rowsets = false;
int64_t cur_version;

std::vector<RowsetSharedPtr> specified_rowsets;
Expand All @@ -1385,6 +1396,13 @@ Status BaseTablet::commit_phase_update_delete_bitmap(
RETURN_IF_ERROR(tablet->get_all_rs_id_unlocked(cur_version, &cur_rowset_ids));
_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add,
&rowset_ids_to_del);
if (!rowset_ids_to_del.empty()) {
// Some rowsets seen in flush phase were compacted before commit phase. Surviving
// rowsets remain in both sets and would be skipped by the delta path, so rebuild
// this txn's base-rowset delete bitmap against the current tablet rowsets.
recalc_all_cur_rowsets = true;
rowset_ids_to_add = cur_rowset_ids;
}
specified_rowsets = tablet->get_rowset_by_ids(&rowset_ids_to_add);
}
for (const auto& to_del : rowset_ids_to_del) {
Expand All @@ -1399,6 +1417,7 @@ Status BaseTablet::commit_phase_update_delete_bitmap(
LOG(INFO) << "[Before Commit] construct delete bitmap tablet: " << tablet->tablet_id()
<< ", rowset_ids to add: " << rowset_ids_to_add.size()
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
<< ", recalc all cur rowsets: " << recalc_all_cur_rowsets
<< ", cur max_version: " << cur_version << ", transaction_id: " << txn_id
<< ", total rows: " << total_rows;
pre_rowset_ids = cur_rowset_ids;
Expand Down
1 change: 1 addition & 0 deletions be/test/storage/compaction/vertical_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ class TestCompactionMixin : public CompactionMixin {
std::unique_ptr<RowIdConversion> rowid_conversion) {
_input_rowsets = std::move(input_rowsets);
_output_rowset = std::move(output_rowset);
_output_version = _output_rowset->version();
_rowid_conversion = std::move(rowid_conversion);
_stats.rowid_conversion = _rowid_conversion.get();
auto st = modify_rowsets();
Expand Down
3 changes: 1 addition & 2 deletions regression-test/data/audit/test_audit_log_behavior.out
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !audit_log_schema --
query_id varchar(48) Yes true \N
query_id varchar(128) Yes true \N
time datetime(3) Yes true \N
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line introduces trailing whitespace, and git diff --check reports it as an error. Please regenerate or trim the expected output so whitespace checks pass.

client_ip varchar(128) Yes true \N
user varchar(128) Yes false \N NONE
Expand Down Expand Up @@ -42,4 +42,3 @@ is_internal tinyint Yes false \N NONE
workload_group text Yes false \N NONE
compute_group text Yes false \N NONE
stmt text Yes false \N NONE

Original file line number Diff line number Diff line change
Expand Up @@ -420,4 +420,3 @@
3 {"a":1,"b":{"c":1},"ddd":1,"sala":0,"z":10}
4 {"a":1,"b":{"c":1},"ddd":1,"sala":0,"z":10}
5 {"a":1,"b":{"c":1},"ddd":1,"sala":0,"z":10}

Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ suite("test_mow_compact_multi_segments", "nonConcurrent") {
}
}

def getRowsetId = { tablet, rowsetIndex ->
String compactionUrl = tablet["CompactionStatus"]
def (code, out, err) = curl("GET", compactionUrl)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
assertTrue(tabletJson.rowsets.size() >= rowsetIndex)
def rowset = tabletJson.rowsets.get(rowsetIndex - 1)
return rowset.split("\\s+")[4]
}

def getLocalDeleteBitmapStatus = { tablet ->
String tablet_id = tablet.TabletId
String trigger_backend_id = tablet.BackendId
Expand Down Expand Up @@ -232,6 +244,7 @@ suite("test_mow_compact_multi_segments", "nonConcurrent") {
getTabletStatus(tablet, 3, 6)
def local_dm = getLocalDeleteBitmapStatus(tablet)
logger.info("local delete bitmap 1: " + local_dm)
def compacted_rowset_id = getRowsetId(tablet, 3)

// trigger compaction for load 2
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
Expand All @@ -255,5 +268,9 @@ suite("test_mow_compact_multi_segments", "nonConcurrent") {
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") // local
local_dm = getLocalDeleteBitmapStatus(tablet)
logger.info("local delete bitmap 2: " + local_dm)
assertEquals(1, local_dm["delete_bitmap_count"])
assertTrue(local_dm["delete_bitmap_count"] >= 1)
assertTrue(local_dm["delete_bitmap_count"] <= 2)
local_dm["delete_bitmap"].keySet().each {
assertFalse(it.contains(compacted_rowset_id))
}
}
Loading