Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,9 @@ DEFINE_mInt32(load_trigger_compaction_version_percent, "66");
DEFINE_mInt64(base_compaction_interval_seconds_since_last_operation, "86400");
DEFINE_mBool(enable_compaction_pause_on_high_memory, "true");

DEFINE_mDouble(max_wait_time_multiplier, "0.5");
DEFINE_mInt32(load_timeout_remaining_seconds, "30");

DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false");

// clang-format off
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,9 @@ DECLARE_mInt32(load_trigger_compaction_version_percent);
DECLARE_mInt64(base_compaction_interval_seconds_since_last_operation);
DECLARE_mBool(enable_compaction_pause_on_high_memory);

DECLARE_mDouble(max_wait_time_multiplier);
DECLARE_mInt32(load_timeout_remaining_seconds);

DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently);

#ifdef BE_TEST
Expand Down
9 changes: 9 additions & 0 deletions be/src/vec/sink/load_stream_map_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ class LoadStreamMap {
// only call this method after release() returns true.
void close_load(bool incremental);

std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> get_streams_for_node() {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
snapshot = _streams_for_node;
}
return snapshot;
}

private:
const UniqueId _load_id;
const int64_t _src_id;
Expand Down
52 changes: 16 additions & 36 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
LOG(WARNING) << "stub is not exist when on_closed, " << *this;
return;
}
std::lock_guard<bthread::Mutex> lock(stub->_close_mutex);
stub->_is_closed.store(true);
stub->_close_cv.notify_all();
}

inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler) {
Expand Down Expand Up @@ -330,37 +328,28 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i
return Status::OK();
}

Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
Status LoadStreamStub::close_finish_check(RuntimeState* state, bool* is_closed) {
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
*is_closed = true;
if (!_is_open.load()) {
// we don't need to close wait on non-open streams
return Status::OK();
}
if (!_is_closing.load()) {
*is_closed = false;
return _status;
}
if (state->get_query_ctx()->is_cancelled()) {
return state->get_query_ctx()->exec_status();
}
if (_is_closed.load()) {
return _check_cancel();
}
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
std::unique_lock<bthread::Mutex> lock(_close_mutex);
auto timeout_sec = timeout_ms / 1000;
while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) {
//the query maybe cancel, so need check after wait 1s
timeout_sec = timeout_sec - 1;
LOG(INFO) << "close waiting, " << *this << ", timeout_sec=" << timeout_sec
<< ", is_closed=" << _is_closed.load()
<< ", is_cancelled=" << state->get_query_ctx()->is_cancelled();
int ret = _close_cv.wait_for(lock, 1000000);
if (ret != 0 && timeout_sec <= 0) {
return Status::InternalError("stream close_wait timeout, error={}, timeout_ms={}, {}",
ret, timeout_ms, to_string());
RETURN_IF_ERROR(_check_cancel());
if (!_is_eos.load()) {
return Status::InternalError("Stream closed without EOS, {}", to_string());
}
return Status::OK();
}
RETURN_IF_ERROR(_check_cancel());
if (!_is_eos.load()) {
return Status::InternalError("stream closed without eos, {}", to_string());
}
*is_closed = false;
return Status::OK();
}

Expand All @@ -374,15 +363,12 @@ void LoadStreamStub::cancel(Status reason) {
_cancel_st = reason;
_is_cancelled.store(true);
}
{
std::lock_guard<bthread::Mutex> lock(_close_mutex);
_is_closed.store(true);
_close_cv.notify_all();
}
_is_closed.store(true);
}

Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) {
butil::IOBuf buf;
// append data to buffer
size_t header_len = header.ByteSizeLong();
buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
buf.append(header.SerializeAsString());
Expand All @@ -394,6 +380,9 @@ Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const S
}
bool eos = header.opcode() == doris::PStreamHeader::CLOSE_LOAD;
bool get_schema = header.opcode() == doris::PStreamHeader::GET_SCHEMA;
// update bytes written
_bytes_written += buf.size();
// send buffer
return _send_with_buffer(buf, eos || get_schema);
}

Expand Down Expand Up @@ -556,13 +545,4 @@ Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_comm
return status;
}

Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) {
MonotonicStopWatch watch;
watch.start();
for (auto& stream : _streams) {
RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - watch.elapsed_time() / 1000 / 1000));
}
return Status::OK();
}

} // namespace doris
12 changes: 7 additions & 5 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <atomic>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <cstdint>
#include <functional>
#include <initializer_list>
#include <map>
Expand Down Expand Up @@ -155,7 +156,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {

// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
Status close_finish_check(RuntimeState* state, bool* is_closed);

// cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
void cancel(Status reason);
Expand Down Expand Up @@ -216,6 +217,8 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
_failed_tablets[tablet_id] = reason;
}

int64_t bytes_written() const { return _bytes_written; }

private:
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Expand Down Expand Up @@ -247,9 +250,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
Status _cancel_st;

bthread::Mutex _open_mutex;
bthread::Mutex _close_mutex;
bthread::Mutex _cancel_mutex;
bthread::ConditionVariable _close_cv;

std::mutex _buffer_mutex;
std::mutex _send_mutex;
Expand All @@ -266,6 +267,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
std::unordered_map<int64_t, Status> _failed_tablets;

bool _is_incremental = false;
size_t _bytes_written = 0;
};

// a collection of LoadStreams connect to the same node
Expand Down Expand Up @@ -310,8 +312,6 @@ class LoadStreamStubs {

Status close_load(const std::vector<PTabletID>& tablets_to_commit);

Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);

std::unordered_set<int64_t> success_tablets() {
std::unordered_set<int64_t> s;
for (auto& stream : _streams) {
Expand All @@ -330,6 +330,8 @@ class LoadStreamStubs {
return m;
}

std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; }

private:
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
std::atomic<bool> _open_success = false;
Expand Down
173 changes: 153 additions & 20 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
// For each tablet, send its input_rows from block to delta writer
for (const auto& [tablet_id, rows] : rows_for_tablet) {
RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
_write_tablets.insert(tablet_id);
}

COUNTER_SET(_input_rows_counter, _number_input_rows);
Expand Down Expand Up @@ -707,30 +708,162 @@ Status VTabletWriterV2::close(Status exec_status) {

Status VTabletWriterV2::_close_wait(bool incremental) {
SCOPED_TIMER(_close_load_timer);
auto st = _load_stream_map->for_each_st(
[this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status {
if (streams.is_incremental() != incremental) {
return Status::OK();
auto streams_for_node = _load_stream_map->get_streams_for_node();
auto check_timeout = [this]() -> Status {
int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
_timeout_watch.elapsed_time() / 1000 / 1000;
DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; });
if (remain_ms <= 0) {
LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id);
return Status::TimedOut("load timed out before close waiting");
}
return Status::OK();
};
auto check_streams_finish =
[this](std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams,
Status& status,
const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>&
streams_for_node) -> Status {
for (const auto& [dst_id, streams] : streams_for_node) {
for (const auto& stream : streams->streams()) {
if (!unfinished_streams.contains(stream)) {
continue;
}
int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
_timeout_watch.elapsed_time() / 1000 / 1000;
DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; });
if (remain_ms <= 0) {
LOG(WARNING) << "load timed out before close waiting, load_id="
<< print_id(_load_id);
return Status::TimedOut("load timed out before close waiting");
bool is_closed = false;
auto stream_st = stream->close_finish_check(_state, &is_closed);
if (!stream_st.ok()) {
status = stream_st;
unfinished_streams.erase(stream);
LOG(WARNING) << "close_wait failed: " << stream_st
<< ", load_id=" << print_id(_load_id);
}
auto st = streams.close_wait(_state, remain_ms);
if (!st.ok()) {
LOG(WARNING) << "close_wait timeout on streams to dst_id=" << dst_id
<< ", load_id=" << print_id(_load_id) << ": " << st;
if (is_closed) {
unfinished_streams.erase(stream);
}
return st;
});
if (!st.ok()) {
LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id);
}
}
return status;
};

std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams;
for (const auto& [dst_id, streams] : streams_for_node) {
if (streams->is_incremental() != incremental) {
continue;
}
for (const auto& stream : streams->streams()) {
unfinished_streams.insert(stream);
}
}
return st;

Status status;
// First wait for quorum success
while (true) {
RETURN_IF_ERROR(check_timeout());
RETURN_IF_ERROR(check_streams_finish(unfinished_streams, status, streams_for_node));
if (_quorum_success(unfinished_streams) || !status.ok() || unfinished_streams.empty()) {
break;
}
bthread_usleep(1000 * 10);
}

// Then wait for remaining streams as much as possible
if (status.ok() && !unfinished_streams.empty()) {
double max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node, unfinished_streams);
while (true) {
RETURN_IF_ERROR(check_timeout());
RETURN_IF_ERROR(check_streams_finish(unfinished_streams, status, streams_for_node));

if (unfinished_streams.empty() || !status.ok()) {
break;
}

// Check if we should stop waiting
if (static_cast<double>(UnixMillis() - _timeout_watch.elapsed_time()) >
max_wait_time_ms ||
_state->execution_timeout() * 1000 - _timeout_watch.elapsed_time() <
config::load_timeout_remaining_seconds * 1000) {
std::stringstream unfinished_streams_str;
for (const auto& stream : unfinished_streams) {
unfinished_streams_str << stream->stream_id() << ",";
}
LOG(INFO) << "reach max wait time"
<< ", load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
<< ", unfinished streams: " << unfinished_streams_str.str();
break;
}
bthread_usleep(1000 * 10);
}
}

if (!status.ok()) {
LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << print_id(_load_id);
}
return status;
}

bool VTabletWriterV2::_quorum_success(
const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams) {
if (_write_tablets.empty()) {
return false;
}
const int num_replicas = _num_replicas;
const int quorum = num_replicas / 2 + 1;
std::unordered_map<int64_t, int> success_counts;
auto streams_for_node = _load_stream_map->get_streams_for_node();
for (const auto& [dst_id, streams] : streams_for_node) {
for (const auto& stream : streams->streams()) {
if (unfinished_streams.contains(stream)) {
continue;
}
for (auto tablet_id : stream->success_tablets()) {
if (_write_tablets.contains(tablet_id)) {
success_counts[tablet_id]++;
}
}
}
}
for (auto tablet_id : _write_tablets) {
int success = success_counts[tablet_id];
if (success < quorum) {
return false;
}
}
return true;
}

double VTabletWriterV2::_calc_max_wait_time_ms(
const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node,
const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams) {
double avg_speed = 0.0;
int64_t total_time_ms = std::max<int64_t>(
1L, static_cast<int64_t>(UnixMillis() - _timeout_watch.elapsed_time()));
int64_t finished_streams_count = 0;

for (const auto& [dst_id, streams] : streams_for_node) {
for (const auto& stream : streams->streams()) {
if (unfinished_streams.contains(stream)) {
continue;
}
avg_speed += static_cast<double>(stream->bytes_written()) /
static_cast<double>(total_time_ms);
finished_streams_count++;
}
}
DCHECK(finished_streams_count > 0) << "no finished streams";
avg_speed /= static_cast<double>(finished_streams_count);

double max_wait_time_ms = 0.0;
for (const auto& [dst_id, streams] : streams_for_node) {
for (const auto& stream : streams->streams()) {
if (unfinished_streams.contains(stream)) {
max_wait_time_ms = std::max(
max_wait_time_ms, static_cast<double>(stream->bytes_written()) / avg_speed);
}
}
}
max_wait_time_ms += config::max_wait_time_multiplier * max_wait_time_ms;

return max_wait_time_ms;
}

void VTabletWriterV2::_calc_tablets_to_commit() {
Expand Down
Loading