Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ Status CloudTabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlo
return _close_status;
}

LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id
<< ", backend id: " << req.backend_id();
for (auto pid : req.partition_ids()) {
_partition_ids.emplace(pid);
}
Expand All @@ -113,6 +111,10 @@ Status CloudTabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlo
_num_remaining_senders--;
*finished = (_num_remaining_senders == 0);

LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id
<< ", backend id: " << req.backend_id()
<< " remaining sender: " << _num_remaining_senders;

if (!*finished) {
return Status::OK();
}
Expand Down
17 changes: 10 additions & 7 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,18 +385,21 @@ Status VOlapTablePartitionParam::init() {
// for both auto/non-auto partition table.
_is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;

// initial partitions
// initial partitions. if meet dummy partitions only for open BE nodes, not generate key of them for finding
for (const auto& t_part : _t_param.partitions) {
VOlapTablePartition* part = nullptr;
RETURN_IF_ERROR(generate_partition_from(t_part, part));
_partitions.emplace_back(part);
if (_is_in_partition) {
for (auto& in_key : part->in_keys) {
_partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);

if (!_t_param.partitions_is_fake) {
if (_is_in_partition) {
for (auto& in_key : part->in_keys) {
_partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);
}
} else {
_partitions_map->emplace(
std::tuple {part->end_key.first, part->end_key.second, false}, part);
}
} else {
_partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false},
part);
}
}

Expand Down
25 changes: 22 additions & 3 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>

#include "bvar/bvar.h"
#include "cloud/cloud_tablets_channel.h"
#include "cloud/config.h"
#include "common/logging.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
Expand All @@ -35,11 +35,11 @@ namespace doris {
bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt");

LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority,
const std::string& sender_ip, int64_t backend_id, bool enable_profile)
std::string sender_ip, int64_t backend_id, bool enable_profile)
: _load_id(load_id),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
_sender_ip(sender_ip),
_sender_ip(std::move(sender_ip)),
_backend_id(backend_id),
_enable_profile(enable_profile) {
std::shared_ptr<QueryContext> query_context =
Expand Down Expand Up @@ -174,6 +174,7 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request,
}

// 3. handle eos
// if channel is incremental, maybe hang on close until all close request arrived.
if (request.has_eos() && request.eos()) {
st = _handle_eos(channel.get(), request, response);
_report_profile(response);
Expand All @@ -195,6 +196,23 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
auto index_id = request.index_id();

RETURN_IF_ERROR(channel->close(this, request, response, &finished));

// for init node, we close waiting(hang on) all close request and let them return together.
if (request.has_hang_wait() && request.hang_wait()) {
DCHECK(!channel->is_incremental_channel());
VLOG_TRACE << "reciever close waiting!" << request.sender_id();
int count = 0;
while (!channel->is_finished()) {
bthread_usleep(1000);
count++;
}
// now maybe finished or cancelled.
VLOG_TRACE << "reciever close wait finished!" << request.sender_id();
if (count >= 1000 * _timeout_s) { // maybe config::streaming_load_rpc_max_alive_time_sec
return Status::InternalError("Tablets channel didn't wait all close");
}
}

if (finished) {
std::lock_guard<std::mutex> l(_lock);
{
Expand All @@ -204,6 +222,7 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
std::make_pair(channel->total_received_rows(), channel->num_rows_filtered())));
_tablets_channels.erase(index_id);
}
VLOG_NOTICE << "load " << _load_id.to_string() << " closed tablets_channel " << index_id;
_finished_channel_ids.emplace(index_id);
}
return Status::OK();
Expand Down
9 changes: 1 addition & 8 deletions be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,19 @@

#pragma once

#include <algorithm>
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "common/status.h"
#include "olap/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"

namespace doris {
Expand All @@ -52,7 +45,7 @@ class BaseTabletsChannel;
class LoadChannel {
public:
LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority,
const std::string& sender_ip, int64_t backend_id, bool enable_profile);
std::string sender_ip, int64_t backend_id, bool enable_profile);
~LoadChannel();

// open a new load channel if not exist
Expand Down
8 changes: 0 additions & 8 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,17 @@
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <ctime>
#include <functional>
#include <map>
#include <memory>
#include <ostream>
#include <queue>
#include <string>
#include <tuple>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
#include "runtime/exec_env.h"
#include "runtime/load_channel.h"
#include "runtime/memory/mem_tracker.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/perf_counters.h"
#include "util/pretty_printer.h"
#include "util/thread.h"

namespace doris {
Expand Down
43 changes: 37 additions & 6 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,29 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
RETURN_IF_ERROR(_schema->init(request.schema()));
_tuple_desc = _schema->tuple_desc();

_num_remaining_senders = request.num_senders();
_next_seqs.resize(_num_remaining_senders, 0);
_closed_senders.Reset(_num_remaining_senders);
int max_sender = request.num_senders();
/*
* a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none.
* there are two possibilities:
* 1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be
* called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never
* be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc,
* the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and
* return together to avoid close-then-incremental-open problem.
* 2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition
* (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data
* distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true.
* then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic
* and also need same number of senders' close to close. but will not hang.
*/
if (_open_by_incremental) {
DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
} else {
_num_remaining_senders = max_sender;
}
// just use max_sender no matter incremental or not cuz we dont know how many senders will open.
_next_seqs.resize(max_sender, 0);
_closed_senders.Reset(max_sender);

RETURN_IF_ERROR(_open_all_writers(request));

Expand All @@ -154,10 +174,19 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {

Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) {
SCOPED_TIMER(_incremental_open_timer);
if (_state == kInitialized) { // haven't opened

// current node first opened by incremental open
if (_state == kInitialized) {
_open_by_incremental = true;
RETURN_IF_ERROR(open(params));
}

std::lock_guard<std::mutex> l(_lock);

if (_open_by_incremental) {
_num_remaining_senders++;
}

std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
for (const auto& index : _schema->indexes()) {
Expand Down Expand Up @@ -231,15 +260,17 @@ Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockReq
*finished = (_num_remaining_senders == 0);
return _close_status;
}
LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id
<< ", backend id: " << backend_id;

for (auto pid : partition_ids) {
_partition_ids.emplace(pid);
}
_closed_senders.Set(sender_id, true);
_num_remaining_senders--;
*finished = (_num_remaining_senders == 0);

LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id
<< ", backend id: " << backend_id << " remaining sender: " << _num_remaining_senders;

if (!*finished) {
return Status::OK();
}
Expand Down
9 changes: 6 additions & 3 deletions be/src/runtime/tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

#include <atomic>
#include <cstdint>
#include <functional>
#include <map>
#include <mutex>
#include <ostream>
#include <shared_mutex>
Expand Down Expand Up @@ -116,6 +114,11 @@ class BaseTabletsChannel {

size_t num_rows_filtered() const { return _num_rows_filtered; }

// means this tablets in this BE is incremental opened partitions.
bool is_incremental_channel() const { return _open_by_incremental; }

bool is_finished() const { return _state == kFinished; }

protected:
Status _write_block_data(const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
std::unordered_map<int64_t, std::vector<uint32_t>>& tablet_to_rowidxs,
Expand Down Expand Up @@ -158,8 +161,8 @@ class BaseTabletsChannel {
int64_t _txn_id = -1;
int64_t _index_id = -1;
std::shared_ptr<OlapTableSchemaParam> _schema;

TupleDescriptor* _tuple_desc = nullptr;
bool _open_by_incremental = false;

// next sequence we expect
int _num_remaining_senders = 0;
Expand Down
Loading