Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions be/src/vec/sink/vtablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,12 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
return Status::OK();
}

void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err,
void IndexChannel::mark_as_failed(const VNodeChannel* node_channel, const std::string& err,
int64_t tablet_id) {
LOG(INFO) << "mark node_id:" << node_id << " tablet_id: " << tablet_id
DCHECK(node_channel != nullptr);
LOG(INFO) << "mark node_id:" << node_channel->channel_info() << " tablet_id: " << tablet_id
<< " as failed, err: " << err;
auto node_id = node_channel->node_id();
const auto& it = _tablets_by_channel.find(node_id);
if (it == _tablets_by_channel.end()) {
return;
Expand All @@ -157,15 +159,16 @@ void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, cons
if (tablet_id == -1) {
for (const auto the_tablet_id : it->second) {
_failed_channels[the_tablet_id].insert(node_id);
_failed_channels_msgs.emplace(the_tablet_id, err + ", host: " + host);
_failed_channels_msgs.emplace(the_tablet_id,
err + ", host: " + node_channel->host());
if (_failed_channels[the_tablet_id].size() >= ((_parent->_num_replicas + 1) / 2)) {
_intolerable_failure_status =
Status::InternalError(_failed_channels_msgs[the_tablet_id]);
}
}
} else {
_failed_channels[tablet_id].insert(node_id);
_failed_channels_msgs.emplace(tablet_id, err + ", host: " + host);
_failed_channels_msgs.emplace(tablet_id, err + ", host: " + node_channel->host());
if (_failed_channels[tablet_id].size() >= ((_parent->_num_replicas + 1) / 2)) {
_intolerable_failure_status =
Status::InternalError(_failed_channels_msgs[tablet_id]);
Expand Down Expand Up @@ -383,7 +386,7 @@ Status VNodeChannel::open_wait() {
}
SCOPED_ATTACH_TASK(_state);
// If rpc failed, mark all tablets on this node channel as failed
_index_channel->mark_as_failed(this->node_id(), this->host(),
_index_channel->mark_as_failed(this,
fmt::format("rpc failed, error coed:{}, error text:{}",
_add_block_closure->cntl.ErrorCode(),
_add_block_closure->cntl.ErrorText()),
Expand Down Expand Up @@ -411,8 +414,8 @@ Status VNodeChannel::open_wait() {
if (status.ok()) {
// if has error tablet, handle them first
for (auto& error : result.tablet_errors()) {
_index_channel->mark_as_failed(this->node_id(), this->host(),
"tablet error: " + error.msg(), error.tablet_id());
_index_channel->mark_as_failed(this, "tablet error: " + error.msg(),
error.tablet_id());
}

Status st = _index_channel->check_intolerable_failure();
Expand Down Expand Up @@ -1086,7 +1089,7 @@ Status VOlapTableSink::open(RuntimeState* state) {
// This phase will not fail due to a single tablet.
// Therefore, if the open() phase fails, all tablets corresponding to the node need to be marked as failed.
index_channel->mark_as_failed(
ch->node_id(), ch->host(),
ch.get(),
fmt::format("{}, open failed, err: {}", ch->channel_info(), st.to_string()),
-1);
}
Expand Down Expand Up @@ -1369,8 +1372,7 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block,
// if it is load single tablet, then append this whole block
load_block_to_single_tablet);
if (!st.ok()) {
_channels[i]->mark_as_failed(entry.first->node_id(), entry.first->host(),
st.to_string());
_channels[i]->mark_as_failed(entry.first, st.to_string());
}
}
}
Expand All @@ -1386,7 +1388,7 @@ Status VOlapTableSink::_cancel_channel_and_check_intolerable_failure(
Status status, const std::string& err_msg, const std::shared_ptr<IndexChannel> ich,
const std::shared_ptr<VNodeChannel> nch) {
LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << err_msg;
ich->mark_as_failed(nch->node_id(), nch->host(), err_msg, -1);
ich->mark_as_failed(nch.get(), err_msg, -1);
// cancel the node channel in best effort
nch->cancel(err_msg);

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/vtablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ class IndexChannel {
}
}

void mark_as_failed(int64_t node_id, const std::string& host, const std::string& err,
void mark_as_failed(const VNodeChannel* node_channel, const std::string& err,
int64_t tablet_id = -1);
Status check_intolerable_failure();

Expand Down