Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
return cloud::bthread_fork_join(tasks, 10);
}

Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>& row_idxs) {
if (row_idxs.empty()) [[unlikely]] {
Status CloudDeltaWriter::write(const Block* block, const TabletAddRowsPayload& rows) {
if (rows.row_idxs.empty()) [[unlikely]] {
return Status::OK();
}
std::lock_guard lock(_mtx);
Expand All @@ -77,7 +77,7 @@ Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>&
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
return _memtable_writer->write(block, row_idxs);
return _memtable_writer->write(block, rows);
}

Status CloudDeltaWriter::close() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class CloudDeltaWriter final : public BaseDeltaWriter {
const UniqueId& load_id);
~CloudDeltaWriter() override;

Status write(const Block* block, const DorisVector<uint32_t>& row_idxs) override;
Status write(const Block* block, const TabletAddRowsPayload& rows) override;

Status close() override;

Expand Down
8 changes: 4 additions & 4 deletions be/src/cloud/cloud_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
return Status::OK();
}

std::unordered_map<int64_t, DorisVector<uint32_t>> tablet_to_rowidxs;
_build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
std::unordered_map<int64_t, TabletAddRowsPayload> tablet_to_rows;
_build_tablet_to_rows(request, &tablet_to_rows);

std::unordered_set<int64_t> partition_ids;
std::vector<CloudDeltaWriter*> writers;
{
// add_batch may concurrency with inc_open but not under _lock.
// so need to protect it with _tablet_writers_lock.
std::lock_guard<std::mutex> l(_tablet_writers_lock);
for (auto& [tablet_id, _] : tablet_to_rowidxs) {
for (auto& [tablet_id, _] : tablet_to_rows) {
auto tablet_writer_it = _tablet_writers.find(tablet_id);
if (tablet_writer_it == _tablet_writers.end()) {
return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id);
Expand All @@ -88,7 +88,7 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
}
}

return _write_block_data(request, cur_seq, tablet_to_rowidxs, response);
return _write_block_data(request, cur_seq, tablet_to_rows, response);
}

Status CloudTabletsChannel::_init_writers_by_partition_ids(
Expand Down
51 changes: 39 additions & 12 deletions be/src/exec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include "core/column/column.h"
#include "core/column/column_const.h"
#include "core/data_type/data_type_nullable.h"
#include "exec/sink/autoinc_buffer.h"
#include "exec/sink/vtablet_block_convertor.h"
#include "exec/sink/vtablet_finder.h"
#include "exprs/vexpr.h"
Expand All @@ -76,6 +77,7 @@
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "storage/binlog.h"
#include "storage/tablet_info.h"
#include "util/brpc_closure.h"
#include "util/debug_points.h"
Expand Down Expand Up @@ -728,7 +730,7 @@ Status VNodeChannel::open_wait() {

Status VNodeChannel::add_block(Block* block, const Payload* payload) {
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
if (payload->second.empty()) {
if (payload->tablet_ids.empty()) {
return Status::OK();
}
// If add_block() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed.
Expand Down Expand Up @@ -777,14 +779,17 @@ Status VNodeChannel::add_block(Block* block, const Payload* payload) {
}

SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
st = block->append_to_block_by_selector(_cur_mutable_block.get(), *(payload->first));
st = block->append_to_block_by_selector(_cur_mutable_block.get(), *(payload->row_ids));
if (!st.ok()) {
_cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string()));
return st;
}
for (auto tablet_id : payload->second) {
for (auto tablet_id : payload->tablet_ids) {
_cur_add_block_request->add_tablet_ids(tablet_id);
}
for (auto row_binlog_lsn : payload->row_binlog_lsns) {
_cur_add_block_request->add_row_binlog_lsns(row_binlog_lsn);
}
_write_bytes.fetch_add(_cur_mutable_block->bytes());

if (_cur_mutable_block->rows() >= _batch_size ||
Expand All @@ -808,6 +813,7 @@ Status VNodeChannel::add_block(Block* block, const Payload* payload) {
}
_cur_mutable_block = MutableBlock::create_unique(block->clone_empty());
_cur_add_block_request->clear_tablet_ids();
_cur_add_block_request->clear_row_binlog_lsns();
}

return Status::OK();
Expand Down Expand Up @@ -1508,6 +1514,12 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
_write_file_cache = table_sink.write_file_cache;
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
bool has_row_binlog = std::any_of(_schema->indexes().begin(), _schema->indexes().end(),
[](const auto* index) { return index->row_binlog_id > 0; });
if (has_row_binlog) {
_row_binlog_lsn_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
_schema->db_id(), _schema->table_id(), kBinlogLsnAutoIncId);
}
_schema->set_timestamp_ms(state->timestamp_ms());
_schema->set_nano_seconds(state->nano_seconds());
_schema->set_timezone(state->timezone());
Expand Down Expand Up @@ -2012,13 +2024,19 @@ Status VTabletWriter::close(Status exec_status) {
return _close_status;
}

void VTabletWriter::_generate_one_index_channel_payload(
Status VTabletWriter::_generate_one_index_channel_payload(
RowPartTabletIds& row_part_tablet_id, int32_t index_idx,
ChannelDistributionPayload& channel_payload) {
auto& row_ids = row_part_tablet_id.row_ids;
auto& tablet_ids = row_part_tablet_id.tablet_ids;

size_t row_cnt = row_ids.size();
bool has_row_binlog = _schema->indexes()[index_idx]->row_binlog_id > 0;
std::vector<int64_t> row_binlog_lsns;
if (has_row_binlog && row_cnt > 0) {
DCHECK(_row_binlog_lsn_buffer != nullptr);
RETURN_IF_ERROR(allocate_binlog_lsn(_row_binlog_lsn_buffer, row_cnt, row_binlog_lsns));
}

for (size_t i = 0; i < row_ids.size(); i++) {
// (tablet_id, VNodeChannel) where this tablet locate
Expand All @@ -2032,23 +2050,32 @@ void VTabletWriter::_generate_one_index_channel_payload(
if (payload_it == channel_payload.end()) {
auto [tmp_it, _] = channel_payload.emplace(
locate_node.get(),
Payload {std::make_unique<IColumn::Selector>(), std::vector<int64_t>()});
Payload {std::make_unique<IColumn::Selector>(), std::vector<int64_t>(),
std::vector<int64_t>()});
payload_it = tmp_it;
payload_it->second.first->reserve(row_cnt);
payload_it->second.second.reserve(row_cnt);
payload_it->second.row_ids->reserve(row_cnt);
payload_it->second.tablet_ids.reserve(row_cnt);
if (has_row_binlog) {
payload_it->second.row_binlog_lsns.reserve(row_cnt);
}
}
payload_it->second.row_ids->push_back(row_ids[i]);
payload_it->second.tablet_ids.push_back(tablet_ids[i]);
if (has_row_binlog) {
payload_it->second.row_binlog_lsns.push_back(row_binlog_lsns[i]);
}
payload_it->second.first->push_back(row_ids[i]);
payload_it->second.second.push_back(tablet_ids[i]);
}
}
return Status::OK();
}

void VTabletWriter::_generate_index_channels_payloads(
Status VTabletWriter::_generate_index_channels_payloads(
std::vector<RowPartTabletIds>& row_part_tablet_ids,
ChannelDistributionPayloadVec& payload) {
for (int i = 0; i < _schema->indexes().size(); i++) {
_generate_one_index_channel_payload(row_part_tablet_ids[i], i, payload[i]);
RETURN_IF_ERROR(_generate_one_index_channel_payload(row_part_tablet_ids[i], i, payload[i]));
}
return Status::OK();
}

Status VTabletWriter::write(RuntimeState* state, doris::Block& input_block) {
Expand Down Expand Up @@ -2088,7 +2115,7 @@ Status VTabletWriter::write(RuntimeState* state, doris::Block& input_block) {
ChannelDistributionPayloadVec channel_to_payload;

channel_to_payload.resize(_channels.size());
_generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload);
RETURN_IF_ERROR(_generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload));
_row_distribution_watch.stop();

// Add block to node channel
Expand Down
20 changes: 13 additions & 7 deletions be/src/exec/sink/writer/vtablet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class TExpr;
class Thread;
class ThreadPoolToken;
class TupleDescriptor;
class AutoIncIDBuffer;

// The counter of add_batch rpc of a single node
struct AddBatchCounter {
Expand Down Expand Up @@ -220,8 +221,11 @@ struct WriterStats {
VNodeChannelStat channel_stat;
};

// pair<row_id,tablet_id>
using Payload = std::pair<std::unique_ptr<IColumn::Selector>, std::vector<int64_t>>;
struct Payload {
std::unique_ptr<IColumn::Selector> row_ids;
std::vector<int64_t> tablet_ids;
std::vector<int64_t> row_binlog_lsns;
};

// every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures.
class VNodeChannel {
Expand Down Expand Up @@ -651,12 +655,12 @@ class VTabletWriter final : public AsyncResultWriter {

Status _init(RuntimeState* state, RuntimeProfile* profile);

void _generate_one_index_channel_payload(RowPartTabletIds& row_part_tablet_tuple,
int32_t index_idx,
ChannelDistributionPayload& channel_payload);
Status _generate_one_index_channel_payload(RowPartTabletIds& row_part_tablet_tuple,
int32_t index_idx,
ChannelDistributionPayload& channel_payload);

void _generate_index_channels_payloads(std::vector<RowPartTabletIds>& row_part_tablet_ids,
ChannelDistributionPayloadVec& payload);
Status _generate_index_channels_payloads(std::vector<RowPartTabletIds>& row_part_tablet_ids,
ChannelDistributionPayloadVec& payload);

void _cancel_all_channel(Status status);

Expand Down Expand Up @@ -704,6 +708,8 @@ class VTabletWriter final : public AsyncResultWriter {
bthread::Mutex _stop_check_channel;
std::vector<std::shared_ptr<IndexChannel>> _channels;
std::unordered_map<int64_t, std::shared_ptr<IndexChannel>> _index_id_to_channel;
// Table-level row-binlog LSN buffer
std::shared_ptr<AutoIncIDBuffer> _row_binlog_lsn_buffer;

std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;

Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,11 @@ void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& r
Rows rows;
rows.partition_id = partition_ids[i];
rows.index_id = _schema->indexes()[index_idx]->index_id;
rows.row_idxes.reserve(row_ids.size());
rows.row_payload.row_idxs.reserve(row_ids.size());
auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
it = tmp_it;
}
it->second.row_idxes.push_back(row_ids[i]);
it->second.row_payload.row_idxs.push_back(row_ids[i]);
_number_output_rows++;
}
}
Expand Down Expand Up @@ -590,7 +590,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t ta
}
}
SCOPED_TIMER(_write_memtable_timer);
st = delta_writer->write(block.get(), rows.row_idxes);
st = delta_writer->write(block.get(), rows.row_payload);
return st;
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/sink/writer/vtablet_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "exec/sink/vrow_distribution.h"
#include "exec/sink/writer/async_result_writer.h"
#include "exprs/vexpr_fwd.h"
#include "load/delta_writer/delta_writer_context.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_profile.h"
Expand Down Expand Up @@ -85,7 +86,7 @@ class DeltaWriterV2Map;
struct Rows {
int64_t partition_id;
int64_t index_id;
DorisVector<uint32_t> row_idxes;
TabletAddRowsPayload row_payload;
};

using RowsForTablet = std::unordered_map<int64_t, Rows>;
Expand Down
43 changes: 26 additions & 17 deletions be/src/load/channel/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) {

Status BaseTabletsChannel::_write_block_data(
const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs,
std::unordered_map<int64_t, TabletAddRowsPayload>& tablet_to_rows,
PTabletWriterAddBlockResult* response) {
Block send_data;
[[maybe_unused]] size_t uncompressed_size = 0;
Expand All @@ -617,6 +617,12 @@ Status BaseTabletsChannel::_write_block_data(
CHECK(send_data.rows() == request.tablet_ids_size())
<< "block rows: " << send_data.rows()
<< ", tablet_ids_size: " << request.tablet_ids_size();
bool has_row_binlog_lsn = request.row_binlog_lsns_size() > 0;
if (has_row_binlog_lsn) {
CHECK(send_data.rows() == request.row_binlog_lsns_size())
<< "block rows: " << send_data.rows()
<< ", row_binlog_lsns_size: " << request.row_binlog_lsns_size();
}

g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
Defer defer {
Expand Down Expand Up @@ -657,12 +663,12 @@ Status BaseTabletsChannel::_write_block_data(

SCOPED_TIMER(_write_block_timer);
auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](BaseDeltaWriter* writer) {
return writer->write(&send_data, tablet_to_rowidxs_it.second);
for (const auto& tablet_to_rows_it : tablet_to_rows) {
RETURN_IF_ERROR(write_tablet_data(tablet_to_rows_it.first, [&](BaseDeltaWriter* writer) {
return writer->write(&send_data, tablet_to_rows_it.second);
}));

auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first);
auto tablet_writer_it = _tablet_writers.find(tablet_to_rows_it.first);
if (tablet_writer_it != _tablet_writers.end()) {
tablet_writer_it->second->set_tablet_load_rowset_num_info(tablet_load_infos);
}
Expand Down Expand Up @@ -694,11 +700,10 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
return Status::OK();
}

std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> /* row index */>
tablet_to_rowidxs;
_build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
std::unordered_map<int64_t /* tablet_id */, TabletAddRowsPayload> tablet_to_rows;
_build_tablet_to_rows(request, &tablet_to_rows);

return _write_block_data(request, cur_seq, tablet_to_rowidxs, response);
return _write_block_data(request, cur_seq, tablet_to_rows, response);
}

void BaseTabletsChannel::_add_broken_tablet(int64_t tablet_id) {
Expand All @@ -710,17 +715,22 @@ bool BaseTabletsChannel::_is_broken_tablet(int64_t tablet_id) const {
return _broken_tablets.find(tablet_id) != _broken_tablets.end();
}

void BaseTabletsChannel::_build_tablet_to_rowidxs(
void BaseTabletsChannel::_build_tablet_to_rows(
const PTabletWriterAddBlockRequest& request,
std::unordered_map<int64_t, DorisVector<uint32_t>>* tablet_to_rowidxs) {
std::unordered_map<int64_t, TabletAddRowsPayload>* tablet_to_rows) {
// just add a coarse-grained read lock here rather than each time when visiting _broken_tablets
// tests show that a relatively coarse-grained read lock here performs better under multicore scenario
// see: https://github.com/apache/doris/pull/28552
std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
bool has_row_binlog_lsn = request.row_binlog_lsns_size() > 0;
if (request.is_single_tablet_block()) {
// The cloud mode need the tablet ids to prepare rowsets.
int64_t tablet_id = request.tablet_ids(0);
tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {0});
auto& rows = (*tablet_to_rows)[tablet_id];
rows.row_idxs.emplace_back(0);
if (has_row_binlog_lsn) {
rows.row_binlog_lsns.emplace_back(request.row_binlog_lsns(0));
}
return;
}
for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) {
Expand All @@ -730,11 +740,10 @@ void BaseTabletsChannel::_build_tablet_to_rowidxs(
VLOG_PROGRESS << "skip broken tablet tablet=" << tablet_id;
continue;
}
auto it = tablet_to_rowidxs->find(tablet_id);
if (it == tablet_to_rowidxs->end()) {
tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {i});
} else {
it->second.emplace_back(i);
auto& rows = (*tablet_to_rows)[tablet_id];
rows.row_idxs.emplace_back(i);
if (has_row_binlog_lsn) {
rows.row_binlog_lsns.emplace_back(request.row_binlog_lsns(i));
}
}
}
Expand Down
Loading
Loading