Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow-up to #46168 #46409

Merged
merged 3 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 48 additions & 13 deletions src/Processors/QueryPlan/ReadFromMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ Pipe ReadFromMergeTree::readFromPool(
total_rows = query_info.limit;

const auto & settings = context->getSettingsRef();
MergeTreeReadPool::BackoffSettings backoff_settings(settings);

/// round min_marks_to_read up to nearest multiple of block_size expressed in marks
/// If granularity is adaptive it doesn't make sense
Expand All @@ -295,18 +294,54 @@ Pipe ReadFromMergeTree::readFromPool(
/ max_block_size * max_block_size / fixed_index_granularity;
}

auto pool = std::make_shared<MergeTreeReadPool>(
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage_snapshot,
prewhere_info,
required_columns,
virt_column_names,
backoff_settings,
settings.preferred_block_size_bytes,
false);
bool all_parts_are_remote = true;
bool all_parts_are_local = true;
for (const auto & part : parts_with_range)
{
const bool is_remote = part.data_part->isStoredOnRemoteDisk();
all_parts_are_local &= !is_remote;
all_parts_are_remote &= is_remote;
}

MergeTreeReadPoolPtr pool;

if ((all_parts_are_remote
&& settings.allow_prefetched_read_pool_for_remote_filesystem
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method))
|| (!all_parts_are_local
&& settings.allow_prefetched_read_pool_for_local_filesystem
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method)))
{
pool = std::make_shared<MergeTreePrefetchedReadPool>(
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage_snapshot,
prewhere_info,
required_columns,
virt_column_names,
settings.preferred_block_size_bytes,
reader_settings,
context,
use_uncompressed_cache,
all_parts_are_remote,
*data.getSettings());
}
else
{
pool = std::make_shared<MergeTreeReadPool>(
max_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
storage_snapshot,
prewhere_info,
required_columns,
virt_column_names,
context,
false);
}

auto * logger = &Poco::Logger::get(data.getLogName() + " (SelectExecutor)");
LOG_DEBUG(logger, "Reading approx. {} rows with {} streams", total_rows, max_streams);
Expand Down
29 changes: 29 additions & 0 deletions src/Storages/MergeTree/IMergeTreeReadPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include <boost/noncopyable.hpp>
#include <Core/Block.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Storages/MergeTree/MergeTreeData.h>


namespace DB
{
struct MergeTreeReadTask;
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;


class IMergeTreeReadPool : private boost::noncopyable
{
public:
virtual ~IMergeTreeReadPool() = default;

virtual Block getHeader() const = 0;

virtual MergeTreeReadTaskPtr getTask(size_t thread) = 0;

virtual void profileFeedback(ReadBufferFromFileBase::ProfileInfo info) = 0;
};

using MergeTreeReadPoolPtr = std::shared_ptr<IMergeTreeReadPool>;

}
15 changes: 5 additions & 10 deletions src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,7 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
bool use_uncompressed_cache_,
bool is_remote_read_,
const MergeTreeSettings & storage_settings_)
: IMergeTreeReadPool(
storage_snapshot_,
column_names_,
virtual_column_names_,
min_marks_for_concurrent_read_,
prewhere_info_,
parts_,
(preferred_block_size_bytes_ > 0),
/*do_not_steal_tasks_*/false)
, WithContext(context_)
: WithContext(context_)
, log(&Poco::Logger::get("MergeTreePrefetchedReadPool(" + (parts_.empty() ? "" : parts_.front().data_part->storage.getStorageID().getNameForLogs()) + ")"))
, header(storage_snapshot_->getSampleBlockForColumns(column_names_))
, mark_cache(context_->getGlobalContext()->getMarkCache().get())
Expand All @@ -57,6 +48,10 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
, profile_callback([this](ReadBufferFromFileBase::ProfileInfo info_) { profileFeedback(info_); })
, index_granularity_bytes(storage_settings_.index_granularity_bytes)
, fixed_index_granularity(storage_settings_.index_granularity)
, storage_snapshot(storage_snapshot_)
, column_names(column_names_)
, virtual_column_names(virtual_column_names_)
, prewhere_info(prewhere_info_)
, is_remote_read(is_remote_read_)
, prefetch_threadpool(getContext()->getPrefetchThreadpool())
{
Expand Down
8 changes: 8 additions & 0 deletions src/Storages/MergeTree/MergeTreePrefetchedReadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,20 @@ class MergeTreePrefetchedReadPool : public IMergeTreeReadPool, private WithConte
ReadBufferFromFileBase::ProfileCallback profile_callback;
size_t index_granularity_bytes;
size_t fixed_index_granularity;

StorageSnapshotPtr storage_snapshot;
const Names column_names;
const Names virtual_column_names;
PrewhereInfoPtr prewhere_info;
RangesInDataParts parts_ranges;

[[ maybe_unused ]] const bool is_remote_read;
ThreadPool & prefetch_threadpool;

PartsInfos parts_infos;

ThreadsTasks threads_tasks;
std::mutex mutex;

struct TaskHolder
{
Expand Down
71 changes: 41 additions & 30 deletions src/Storages/MergeTree/MergeTreeReadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,47 @@ namespace ErrorCodes
namespace DB
{

std::vector<size_t> IMergeTreeReadPool::fillPerPartInfo(const RangesInDataParts & parts)
MergeTreeReadPool::MergeTreeReadPool(
size_t threads_,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
ContextPtr context_,
bool do_not_steal_tasks_)
: storage_snapshot(storage_snapshot_)
, column_names(column_names_)
, virtual_column_names(virtual_column_names_)
, min_marks_for_concurrent_read(min_marks_for_concurrent_read_)
, prewhere_info(prewhere_info_)
, parts_ranges(std::move(parts_))
, predict_block_size_bytes(context_->getSettingsRef().preferred_block_size_bytes > 0)
, do_not_steal_tasks(do_not_steal_tasks_)
, backoff_settings{context_->getSettingsRef()}
, backoff_state{threads_}
{
/// parts don't contain duplicate MergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(
parts_ranges, storage_snapshot, is_part_on_remote_disk,
do_not_steal_tasks, predict_block_size_bytes,
column_names, virtual_column_names, prewhere_info, per_part_params);

fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges);
}

std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
const RangesInDataParts & parts,
const StorageSnapshotPtr & storage_snapshot,
std::vector<bool> & is_part_on_remote_disk,
bool & do_not_steal_tasks,
bool & predict_block_size_bytes,
const Names & column_names,
const Names & virtual_column_names,
const PrewhereInfoPtr & prewhere_info,
std::vector<MergeTreeReadPool::PerPartParams> & per_part_params)
{
std::vector<size_t> per_part_sum_marks;
Block sample_block = storage_snapshot->metadata->getSampleBlock();
Expand Down Expand Up @@ -65,35 +105,6 @@ std::vector<size_t> IMergeTreeReadPool::fillPerPartInfo(const RangesInDataParts
return per_part_sum_marks;
}

MergeTreeReadPool::MergeTreeReadPool(
size_t threads_,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
const Names & virtual_column_names_,
const BackoffSettings & backoff_settings_,
size_t preferred_block_size_bytes_,
bool do_not_steal_tasks_)
: IMergeTreeReadPool(
storage_snapshot_,
column_names_,
virtual_column_names_,
min_marks_for_concurrent_read_,
prewhere_info_,
std::move(parts_),
(preferred_block_size_bytes_ > 0),
do_not_steal_tasks_)
, backoff_settings{backoff_settings_}
, backoff_state{threads_}
{
/// parts don't contain duplicate MergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(parts_ranges);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges);
}


MergeTreeReadTaskPtr MergeTreeReadPool::getTask(size_t thread)
{
Expand Down