Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
}
if (in.has_is_row_binlog()) {
out->set_is_row_binlog(in.is_row_binlog());
}
}

void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
Expand Down Expand Up @@ -207,6 +210,9 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
}
if (in.has_is_row_binlog()) {
out->set_is_row_binlog(in.is_row_binlog());
}
}

RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in) {
Expand Down Expand Up @@ -304,6 +310,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
}
if (in.has_is_row_binlog()) {
out->set_is_row_binlog(in.is_row_binlog());
}
}

void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
Expand Down Expand Up @@ -390,6 +399,9 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
if (in.has_commit_tso()) {
out->set_commit_tso(in.commit_tso());
}
if (in.has_is_row_binlog()) {
out->set_is_row_binlog(in.is_row_binlog());
}
}

TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB& in) {
Expand Down Expand Up @@ -670,6 +682,18 @@ void doris_tablet_meta_to_cloud(TabletMetaCloudPB* out, const TabletMetaPB& in)
if (in.has_binlog_config()) {
out->mutable_binlog_config()->CopyFrom(in.binlog_config());
}
if (in.has_row_binlog_schema()) {
doris_tablet_schema_to_cloud(out->mutable_row_binlog_schema(), in.row_binlog_schema());
}
if (in.row_binlog_rs_metas_size()) {
out->mutable_row_binlog_rs_metas()->Reserve(in.row_binlog_rs_metas_size());
for (const auto& rs_meta : in.row_binlog_rs_metas()) {
doris_rowset_meta_to_cloud(out->add_row_binlog_rs_metas(), rs_meta);
}
}
if (in.has_row_binlog_schema_hash()) {
out->set_row_binlog_schema_hash(in.row_binlog_schema_hash());
}
out->set_compaction_policy(in.compaction_policy());
out->set_time_series_compaction_goal_size_mbytes(in.time_series_compaction_goal_size_mbytes());
out->set_time_series_compaction_file_count_threshold(
Expand Down Expand Up @@ -749,6 +773,21 @@ void doris_tablet_meta_to_cloud(TabletMetaCloudPB* out, TabletMetaPB&& in) {
if (in.has_binlog_config()) {
out->mutable_binlog_config()->Swap(in.mutable_binlog_config());
}
if (in.has_row_binlog_schema()) {
doris_tablet_schema_to_cloud(out->mutable_row_binlog_schema(),
std::move(*in.mutable_row_binlog_schema()));
}
if (in.row_binlog_rs_metas_size()) {
int row_binlog_rs_metas_size = in.row_binlog_rs_metas_size();
out->mutable_row_binlog_rs_metas()->Reserve(row_binlog_rs_metas_size);
for (int i = 0; i < row_binlog_rs_metas_size; ++i) {
doris_rowset_meta_to_cloud(out->add_row_binlog_rs_metas(),
std::move(*in.mutable_row_binlog_rs_metas(i)));
}
}
if (in.has_row_binlog_schema_hash()) {
out->set_row_binlog_schema_hash(in.row_binlog_schema_hash());
}
out->set_compaction_policy(in.compaction_policy());
out->set_time_series_compaction_goal_size_mbytes(in.time_series_compaction_goal_size_mbytes());
out->set_time_series_compaction_file_count_threshold(
Expand Down Expand Up @@ -835,6 +874,18 @@ void cloud_tablet_meta_to_doris(TabletMetaPB* out, const TabletMetaCloudPB& in)
if (in.has_binlog_config()) {
out->mutable_binlog_config()->CopyFrom(in.binlog_config());
}
if (in.has_row_binlog_schema()) {
cloud_tablet_schema_to_doris(out->mutable_row_binlog_schema(), in.row_binlog_schema());
}
if (in.row_binlog_rs_metas_size()) {
out->mutable_row_binlog_rs_metas()->Reserve(in.row_binlog_rs_metas_size());
for (const auto& rs_meta : in.row_binlog_rs_metas()) {
cloud_rowset_meta_to_doris(out->add_row_binlog_rs_metas(), rs_meta);
}
}
if (in.has_row_binlog_schema_hash()) {
out->set_row_binlog_schema_hash(in.row_binlog_schema_hash());
}
out->set_compaction_policy(in.compaction_policy());
out->set_time_series_compaction_goal_size_mbytes(in.time_series_compaction_goal_size_mbytes());
out->set_time_series_compaction_file_count_threshold(
Expand Down Expand Up @@ -914,6 +965,21 @@ void cloud_tablet_meta_to_doris(TabletMetaPB* out, TabletMetaCloudPB&& in) {
if (in.has_binlog_config()) {
out->mutable_binlog_config()->Swap(in.mutable_binlog_config());
}
if (in.has_row_binlog_schema()) {
cloud_tablet_schema_to_doris(out->mutable_row_binlog_schema(),
std::move(*in.mutable_row_binlog_schema()));
}
if (in.row_binlog_rs_metas_size()) {
int row_binlog_rs_metas_size = in.row_binlog_rs_metas_size();
out->mutable_row_binlog_rs_metas()->Reserve(row_binlog_rs_metas_size);
for (int i = 0; i < row_binlog_rs_metas_size; ++i) {
cloud_rowset_meta_to_doris(out->add_row_binlog_rs_metas(),
std::move(*in.mutable_row_binlog_rs_metas(i)));
}
}
if (in.has_row_binlog_schema_hash()) {
out->set_row_binlog_schema_hash(in.row_binlog_schema_hash());
}
out->set_compaction_policy(in.compaction_policy());
out->set_time_series_compaction_goal_size_mbytes(in.time_series_compaction_goal_size_mbytes());
out->set_time_series_compaction_file_count_threshold(
Expand Down
2 changes: 2 additions & 0 deletions be/src/load/channel/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <initializer_list>
#include <optional>
#include <set>
#include <thread>
#include <utility>
Expand Down Expand Up @@ -203,6 +204,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para

std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;

for (const auto& index : _schema->indexes()) {
if (index->index_id == _index_id) {
index_slots = &index->slots;
Expand Down
6 changes: 1 addition & 5 deletions be/src/load/delta_writer/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,10 @@ Status BaseDeltaWriter::wait_calc_delete_bitmap() {
return _rowset_builder->wait_calc_delete_bitmap();
}

RowsetBuilder* DeltaWriter::rowset_builder() {
return static_cast<RowsetBuilder*>(_rowset_builder.get());
}

Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes) {
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_commit_txn_timer);
RETURN_IF_ERROR(rowset_builder()->commit_txn());
RETURN_IF_ERROR(_rowset_builder->commit_txn());

for (auto&& node_info : slave_tablet_nodes.slave_nodes()) {
_request_slave_tablet_pull_rowset(node_info);
Expand Down
3 changes: 0 additions & 3 deletions be/src/load/delta_writer/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,6 @@ class DeltaWriter final : public BaseDeltaWriter {

void _request_slave_tablet_pull_rowset(const PNodeInfo& node_info);

// Convert `_rowset_builder` from `BaseRowsetBuilder` to `RowsetBuilder`
RowsetBuilder* rowset_builder();

std::mutex _lock;

StorageEngine& _engine;
Expand Down
12 changes: 12 additions & 0 deletions be/src/load/delta_writer/delta_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ class TupleDescriptor;
class SlotDescriptor;
class OlapTableSchemaParam;

enum class WriteRequestType {
DATA = 0,
ROW_BINLOG = 1,
GROUP = 2,
};

struct WriteRequest {
int64_t tablet_id = 0;
int32_t schema_hash = 0;
Expand All @@ -43,7 +49,13 @@ struct WriteRequest {
std::shared_ptr<OlapTableSchemaParam> table_schema_param = nullptr;
bool is_high_priority = false;
bool write_file_cache = false;
WriteRequestType write_req_type = WriteRequestType::DATA;
std::string storage_vault_id;
};

struct GroupWriteRequest : public WriteRequest {
WriteRequest data_req;
WriteRequest row_binlog_req;
};

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/storage/binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ namespace doris {
constexpr std::string_view kBinlogPrefix = "binlog_";
constexpr std::string_view kBinlogMetaPrefix = "binlog_meta_";
constexpr std::string_view kBinlogDataPrefix = "binlog_data_";
constexpr std::string_view kRowBinlogPrefix = "binlog_row_";
// used in file directory
constexpr std::string_view FDRowBinlogSuffix = "_row_binlog";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need both suffix and prefix?

Copy link
Copy Markdown
Contributor Author

@Userwhite Userwhite Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kRowBinlogPrefix is used for rowset meta, it will be used in next write module PR
FDRowBinlogSuffix is used for data path, like ccr binlog


inline auto make_binlog_meta_key(const std::string_view tablet, int64_t version,
const std::string_view rowset) {
Expand Down
28 changes: 26 additions & 2 deletions be/src/storage/binlog_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/olap_file.pb.h>

#include "common/logging.h"

namespace doris {
BinlogConfig& BinlogConfig::operator=(const TBinlogConfig& config) {
if (config.__isset.enable) {
Expand All @@ -35,6 +37,18 @@ BinlogConfig& BinlogConfig::operator=(const TBinlogConfig& config) {
if (config.__isset.max_history_nums) {
_max_history_nums = config.max_history_nums;
}
if (config.__isset.binlog_format) {
if (config.binlog_format == TBinlogFormat::ROW) {
_binlog_format = BinlogFormatPB::ROW;
} else if (config.binlog_format == TBinlogFormat::STATEMENT_AND_SNAPSHOT) {
_binlog_format = BinlogFormatPB::STATEMENT_AND_SNAPSHOT;
} else {
DCHECK(false) << "can not identify the binlog format " << config.binlog_format;
}
}
if (config.__isset.need_historical_value) {
_need_historical_value = config.need_historical_value;
}
return *this;
}

Expand All @@ -51,6 +65,12 @@ BinlogConfig& BinlogConfig::operator=(const BinlogConfigPB& config) {
if (config.has_max_history_nums()) {
_max_history_nums = config.max_history_nums();
}
if (config.has_binlog_format()) {
_binlog_format = config.binlog_format();
}
if (config.has_need_historical_value()) {
_need_historical_value = config.need_historical_value();
}
return *this;
}

Expand All @@ -59,12 +79,16 @@ void BinlogConfig::to_pb(BinlogConfigPB* config_pb) const {
config_pb->set_ttl_seconds(_ttl_seconds);
config_pb->set_max_bytes(_max_bytes);
config_pb->set_max_history_nums(_max_history_nums);
config_pb->set_binlog_format(_binlog_format);
config_pb->set_need_historical_value(_need_historical_value);
}

std::string BinlogConfig::to_string() const {
return fmt::format(
"BinlogConfig enable: {}, ttl_seconds: {}, max_bytes: {}, max_history_nums: {}",
_enable, _ttl_seconds, _max_bytes, _max_history_nums);
"BinlogConfig enable: {}, ttl_seconds: {}, max_bytes: {}, max_history_nums: {}, "
"binlog_format: {}, need_historical_value: {}",
_enable, _ttl_seconds, _max_bytes, _max_history_nums, _binlog_format,
_need_historical_value);
}

} // namespace doris
25 changes: 23 additions & 2 deletions be/src/storage/binlog_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#pragma once

#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/olap_file.pb.h>

#include <cstdint>
#include <limits>
#include <string>
Expand All @@ -29,11 +32,14 @@ class BinlogConfigPB;
class BinlogConfig {
public:
BinlogConfig() = default;
BinlogConfig(bool enable, int64_t ttl_seconds, int64_t max_bytes, int64_t max_history_nums)
BinlogConfig(bool enable, int64_t ttl_seconds, int64_t max_bytes, int64_t max_history_nums,
BinlogFormatPB binlog_format, bool need_historical_value)
: _enable(enable),
_ttl_seconds(ttl_seconds),
_max_bytes(max_bytes),
_max_history_nums(max_history_nums) {}
_max_history_nums(max_history_nums),
_binlog_format(binlog_format),
_need_historical_value(need_historical_value) {}
BinlogConfig(const BinlogConfig&) = default;
BinlogConfig& operator=(const BinlogConfig&) = default;
BinlogConfig(BinlogConfig&&) = default;
Expand All @@ -52,6 +58,19 @@ class BinlogConfig {
int64_t max_history_nums() const { return _max_history_nums; }
void set_max_history_nums(int64_t max_history_nums) { _max_history_nums = max_history_nums; }

int32_t binlog_format() const { return _binlog_format; }
void set_binlog_format(BinlogFormatPB binlog_format) { _binlog_format = binlog_format; }

bool need_historical_value() const { return _need_historical_value; }
void set_need_historical_value(bool need_historical_value) {
_need_historical_value = need_historical_value;
}

bool isCCRBinlogFormat() const {
return _binlog_format == BinlogFormatPB::STATEMENT_AND_SNAPSHOT;
}
bool isRowBinlogFormat() const { return _binlog_format == BinlogFormatPB::ROW; }

BinlogConfig& operator=(const TBinlogConfig& config);
BinlogConfig& operator=(const BinlogConfigPB& config);

Expand All @@ -63,6 +82,8 @@ class BinlogConfig {
int64_t _ttl_seconds {std::numeric_limits<int64_t>::max()};
int64_t _max_bytes {std::numeric_limits<int64_t>::max()};
int64_t _max_history_nums {std::numeric_limits<int64_t>::max()};
BinlogFormatPB _binlog_format = BinlogFormatPB::STATEMENT_AND_SNAPSHOT;
bool _need_historical_value {false};
};

} // namespace doris
28 changes: 23 additions & 5 deletions be/src/storage/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,13 @@ Status DataDir::load() {
if (!tablet) {
return true;
}
const auto& all_rowsets = tablet->tablet_meta()->all_rs_metas();
const auto& all_data_rowsets = tablet->tablet_meta()->all_rs_metas();
const auto& all_row_binlogs = tablet->tablet_meta()->all_row_binlog_rs_metas();
RowsetIdUnorderedSet rowset_ids;
for (const auto& [_, rowset_meta] : all_rowsets) {
for (const auto& [_, rowset_meta] : all_data_rowsets) {
rowset_ids.insert(rowset_meta->rowset_id());
}
for (auto& [_, rowset_meta] : all_row_binlogs) {
rowset_ids.insert(rowset_meta->rowset_id());
}

Expand All @@ -626,12 +630,14 @@ Status DataDir::load() {
int rst_ids_size = delete_bitmap_pb.rowset_ids_size();
int seg_ids_size = delete_bitmap_pb.segment_ids_size();
int seg_maps_size = delete_bitmap_pb.segment_delete_bitmaps_size();
int binlog_mark_size = delete_bitmap_pb.is_binlog_delvec_size();
CHECK(rst_ids_size == seg_ids_size && seg_ids_size == seg_maps_size);
CHECK(binlog_mark_size == 0 || binlog_mark_size == rst_ids_size);

for (int i = 0; i < rst_ids_size; ++i) {
RowsetId rst_id;
rst_id.init(delete_bitmap_pb.rowset_ids(i));
// only process the rowset in _rs_metas
// only process the rowset in _rs_metas and _row_binlog_rs_metas
if (rowset_ids.find(rst_id) == rowset_ids.end()) {
++unknown_dbm_cnt;
continue;
Expand All @@ -645,8 +651,20 @@ Status DataDir::load() {
continue;
}
auto bitmap = delete_bitmap_pb.segment_delete_bitmaps(i).data();
tablet->tablet_meta()->delete_bitmap().delete_bitmap[{rst_id, seg_id, version}] =
roaring::Roaring::read(bitmap);

bool from_binlog = delete_bitmap_pb.is_binlog_delvec_size() > 0
? delete_bitmap_pb.is_binlog_delvec(i)
: false;
if (!from_binlog) {
tablet->tablet_meta()->delete_bitmap().delete_bitmap[{rst_id, seg_id, version}] =
roaring::Roaring::read(bitmap);
} else {
tablet->tablet_meta()->binlog_delvec().delete_bitmap[{rst_id, seg_id, version}] =
roaring::Roaring::read(bitmap);
}
VLOG_ROW << "successfully to add delete_bitmap, tablet_id=" << tablet->tablet_id()
<< ", rowset_id=" << rst_id << ", seg_id=" << seg_id << ", version=" << version
<< ", from_binlog=" << from_binlog;
}
return true;
};
Expand Down
Loading
Loading