Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AsyncPipelineExecutor all dictionaries #55839

Merged
merged 4 commits into from Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/en/operations/settings/settings.md
Expand Up @@ -4746,3 +4746,18 @@ a Tuple(
l Nullable(String)
)
```

## dictionary_use_async_executor {#dictionary_use_async_executor}

Execute a pipeline for reading dictionary source in several threads. It's supported only by dictionaries with local CLICKHOUSE source.

You may specify it in `SETTINGS` section of dictionary definition:

```sql
CREATE DICTIONARY t1_dict ( key String, attr UInt64 )
PRIMARY KEY key
SOURCE(CLICKHOUSE(QUERY `SELECT key, attr FROM t1 GROUP BY key`))
LIFETIME(MIN 0 MAX 3600)
LAYOUT(COMPLEX_KEY_HASHED_ARRAY())
SETTINGS(dictionary_use_async_executor=1, max_threads=8);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not clear how many threads will be used if I wouldn't specify max_threads

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be a default behavior (with max_threads=0 or 'auto' and set to the number of cpus). Let me recheck it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPD: behavior is the same as for regular query, max_threads set automatically

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why someone could want to not use async executor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the query itself is more lightweight that dictionary filling. To consume less query by executing source query and not to spawn additional threads. But if we will realize that it's always beneficial we may set it by default if a future.

```
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Expand Up @@ -1069,7 +1069,7 @@ class IColumn;
M(Bool, regexp_dict_flag_case_insensitive, false, "Use case-insensitive matching for a regexp_tree dictionary. Can be overridden in individual expressions with (?i) and (?-i).", 0) \
M(Bool, regexp_dict_flag_dotall, false, "Allow '.' to match newline characters for a regexp_tree dictionary.", 0) \
\
M(Bool, dictionary_use_async_executor, false, "Execute a pipeline for reading from a dictionary with several threads. It's supported only by DIRECT dictionary with CLICKHOUSE source.", 0) \
M(Bool, dictionary_use_async_executor, false, "Execute a pipeline for reading dictionary source in several threads. It's supported only by dictionaries with local CLICKHOUSE source.", 0) \
M(Bool, precise_float_parsing, false, "Prefer more precise (but slower) float parsing algorithm", 0) \

// End of FORMAT_FACTORY_SETTINGS
Expand Down
16 changes: 7 additions & 9 deletions src/Dictionaries/CacheDictionary.cpp
Expand Up @@ -10,10 +10,10 @@
#include <Common/ProfileEvents.h>
#include <Common/ProfilingScopedRWLock.h>

#include <Dictionaries//DictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>

#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

namespace ProfileEvents
Expand Down Expand Up @@ -50,8 +50,7 @@ CacheDictionary<dictionary_key_type>::CacheDictionary(
DictionarySourcePtr source_ptr_,
CacheDictionaryStoragePtr cache_storage_ptr_,
CacheDictionaryUpdateQueueConfiguration update_queue_configuration_,
DictionaryLifetime dict_lifetime_,
bool allow_read_expired_keys_)
CacheDictionaryConfiguration configuration_)
: IDictionary(dict_id_)
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
Expand All @@ -63,9 +62,8 @@ CacheDictionary<dictionary_key_type>::CacheDictionary(
{
update(unit_to_update);
})
, dict_lifetime(dict_lifetime_)
, configuration(configuration_)
, log(&Poco::Logger::get("ExternalDictionaries"))
, allow_read_expired_keys(allow_read_expired_keys_)
, rnd_engine(randomSeed())
{
if (!source_ptr->supportsSelectiveLoad())
Expand Down Expand Up @@ -209,7 +207,7 @@ Columns CacheDictionary<dictionary_key_type>::getColumns(
HashMap<KeyType, size_t> requested_keys_to_fetched_columns_during_update_index;
MutableColumns fetched_columns_during_update = request.makeAttributesResultColumns();

if (not_found_keys_size == 0 && expired_keys_size > 0 && allow_read_expired_keys)
if (not_found_keys_size == 0 && expired_keys_size > 0 && configuration.allow_read_expired_keys)
{
/// Start async update only if allow read expired keys and all keys are found
update_queue.tryPushToUpdateQueueOrThrow(update_unit);
Expand Down Expand Up @@ -314,7 +312,7 @@ ColumnUInt8::Ptr CacheDictionary<dictionary_key_type>::hasKeys(const Columns & k

allow_expired_keys_during_aggregation = true;
}
else if (not_found_keys_size == 0 && expired_keys_size > 0 && allow_read_expired_keys)
else if (not_found_keys_size == 0 && expired_keys_size > 0 && configuration.allow_read_expired_keys)
{
/// Start async update only if allow read expired keys and all keys are found
update_queue.tryPushToUpdateQueueOrThrow(update_unit);
Expand Down Expand Up @@ -589,7 +587,7 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d

Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable();

PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
Block block;
while (executor.pull(block))
{
Expand Down
20 changes: 12 additions & 8 deletions src/Dictionaries/CacheDictionary.h
Expand Up @@ -24,6 +24,14 @@

namespace DB
{

struct CacheDictionaryConfiguration
{
const bool allow_read_expired_keys;
const DictionaryLifetime lifetime;
const bool use_async_executor = false;
};

/** CacheDictionary store keys in cache storage and can asynchronous and synchronous updates during keys fetch.

If keys are not found in storage during fetch, dictionary start update operation with update queue.
Expand Down Expand Up @@ -58,8 +66,7 @@ class CacheDictionary final : public IDictionary
DictionarySourcePtr source_ptr_,
CacheDictionaryStoragePtr cache_storage_ptr_,
CacheDictionaryUpdateQueueConfiguration update_queue_configuration_,
DictionaryLifetime dict_lifetime_,
bool allow_read_expired_keys_);
CacheDictionaryConfiguration configuration_);

~CacheDictionary() override;

Expand Down Expand Up @@ -99,13 +106,12 @@ class CacheDictionary final : public IDictionary
getSourceAndUpdateIfNeeded()->clone(),
cache_storage_ptr,
update_queue.getConfiguration(),
dict_lifetime,
allow_read_expired_keys);
configuration);
}

DictionarySourcePtr getSource() const override;

const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryLifetime & getLifetime() const override { return configuration.lifetime; }

const DictionaryStructure & getStructure() const override { return dict_struct; }

Expand Down Expand Up @@ -194,12 +200,10 @@ class CacheDictionary final : public IDictionary
CacheDictionaryStoragePtr cache_storage_ptr;
mutable CacheDictionaryUpdateQueue<dictionary_key_type> update_queue;

const DictionaryLifetime dict_lifetime;
const CacheDictionaryConfiguration configuration;

Poco::Logger * log;

const bool allow_read_expired_keys;

mutable pcg64 rnd_engine;

/// This lock is used for the inner cache state update function lock it for
Expand Down
2 changes: 2 additions & 0 deletions src/Dictionaries/ClickHouseDictionarySource.h
Expand Up @@ -59,6 +59,8 @@ class ClickHouseDictionarySource final : public IDictionarySource

bool hasUpdateField() const override;

bool isLocal() const { return configuration.is_local; }

DictionarySourcePtr clone() const override { return std::make_shared<ClickHouseDictionarySource>(*this); }

std::string toString() const override;
Expand Down
30 changes: 30 additions & 0 deletions src/Dictionaries/DictionarySourceHelpers.cpp
Expand Up @@ -9,11 +9,15 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/SettingsChanges.h>

#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}

Expand Down Expand Up @@ -130,4 +134,30 @@ String TransformWithAdditionalColumns::getName() const
{
return "TransformWithAdditionalColumns";
}

DictionaryPipelineExecutor::DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async)
: async_executor(async ? std::make_unique<PullingAsyncPipelineExecutor>(pipeline_) : nullptr)
, executor(async ? nullptr : std::make_unique<PullingPipelineExecutor>(pipeline_))
{}

bool DictionaryPipelineExecutor::pull(Block & block)
{
if (async_executor)
{
while (true)
{
bool has_data = async_executor->pull(block);
if (has_data && !block)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we don't skip empty blocks when pulling from sync executor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync executor never returns them. Actually Async shouldn't do it either. Perhaps this fix can help #55945 , but not sure it that is correct and complete fix.

continue;
return has_data;
}
}
else if (executor)
return executor->pull(block);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "DictionaryPipelineExecutor is not initialized");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe let's do it straight in the ctor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ctor it's clear that one of executors is initialized from initializer-list. I added this branch just in case if new ctor is added and there it may be forgotten.

}

DictionaryPipelineExecutor::~DictionaryPipelineExecutor() = default;

}
17 changes: 17 additions & 0 deletions src/Dictionaries/DictionarySourceHelpers.h
Expand Up @@ -16,6 +16,10 @@ namespace DB
struct DictionaryStructure;
class SettingsChanges;

class PullingPipelineExecutor;
class PullingAsyncPipelineExecutor;
class QueryPipeline;

/// For simple key

Block blockForIds(
Expand Down Expand Up @@ -51,4 +55,17 @@ class TransformWithAdditionalColumns final : public ISimpleTransform
size_t current_range_index = 0;
};

/// Wrapper for `Pulling(Async)PipelineExecutor` to dynamically dispatch calls to the right executor
class DictionaryPipelineExecutor
{
public:
DictionaryPipelineExecutor(QueryPipeline & pipeline_, bool async);
bool pull(Block & block);

~DictionaryPipelineExecutor();
private:
std::unique_ptr<PullingAsyncPipelineExecutor> async_executor;
std::unique_ptr<PullingPipelineExecutor> executor;
};

}
4 changes: 2 additions & 2 deletions src/Dictionaries/DirectDictionary.cpp
Expand Up @@ -366,10 +366,10 @@ Pipe DirectDictionary<dictionary_key_type>::read(const Names & /* column_names *
template <DictionaryKeyType dictionary_key_type>
void DirectDictionary<dictionary_key_type>::applySettings(const Settings & settings)
{
if (dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get()))
if (const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get()))
{
/// Only applicable for CLICKHOUSE dictionary source.
use_async_executor = settings.dictionary_use_async_executor;
use_async_executor = settings.dictionary_use_async_executor && clickhouse_source->isLocal();
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Dictionaries/FlatDictionary.cpp
Expand Up @@ -12,9 +12,9 @@
#include <Functions/FunctionHelpers.h>

#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>

#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>

Expand Down Expand Up @@ -395,7 +395,7 @@ void FlatDictionary::updateData()
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
update_field_loaded_block.reset();
Block block;

Expand Down Expand Up @@ -436,7 +436,7 @@ void FlatDictionary::loadData()
if (!source_ptr->hasUpdateField())
{
QueryPipeline pipeline(source_ptr->loadAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);

Block block;
while (executor.pull(block))
Expand Down
1 change: 1 addition & 0 deletions src/Dictionaries/FlatDictionary.h
Expand Up @@ -27,6 +27,7 @@ class FlatDictionary final : public IDictionary
size_t max_array_size;
bool require_nonempty;
DictionaryLifetime dict_lifetime;
bool use_async_executor = false;
};

FlatDictionary(
Expand Down
57 changes: 47 additions & 10 deletions src/Dictionaries/HashedArrayDictionary.cpp
Expand Up @@ -7,11 +7,12 @@
#include <Columns/ColumnNullable.h>
#include <Functions/FunctionHelpers.h>

#include <Dictionaries/ClickHouseDictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>


namespace DB
{

Expand Down Expand Up @@ -409,7 +410,7 @@ void HashedArrayDictionary<dictionary_key_type>::updateData()
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
PullingPipelineExecutor executor(pipeline);
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
update_field_loaded_block.reset();
Block block;

Expand Down Expand Up @@ -533,12 +534,12 @@ void HashedArrayDictionary<dictionary_key_type>::blockToAttributes(const Block &
}

template <DictionaryKeyType dictionary_key_type>
void HashedArrayDictionary<dictionary_key_type>::resize(size_t added_rows)
void HashedArrayDictionary<dictionary_key_type>::resize(size_t total_rows)
{
if (unlikely(!added_rows))
if (unlikely(!total_rows))
return;

key_attribute.container.reserve(added_rows);
key_attribute.container.reserve(total_rows);
}

template <DictionaryKeyType dictionary_key_type>
Expand Down Expand Up @@ -727,14 +728,37 @@ void HashedArrayDictionary<dictionary_key_type>::loadData()
{
QueryPipeline pipeline;
pipeline = QueryPipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);

UInt64 pull_time_microseconds = 0;
UInt64 process_time_microseconds = 0;

size_t total_rows = 0;
size_t total_blocks = 0;

PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
while (true)
{
resize(block.rows());
Stopwatch watch_pull;
bool has_data = executor.pull(block);
pull_time_microseconds += watch_pull.elapsedMicroseconds();

if (!has_data)
break;

++total_blocks;
total_rows += block.rows();

Stopwatch watch_process;
resize(total_rows);
blockToAttributes(block);
process_time_microseconds += watch_process.elapsedMicroseconds();
}

LOG_DEBUG(&Poco::Logger::get("HashedArrayDictionary"),
"Finished {}reading {} blocks with {} rows from pipeline in {:.2f} sec and inserted into hashtable in {:.2f} sec",
configuration.use_async_executor ? "asynchronous " : "",
total_blocks, total_rows, pull_time_microseconds / 1000000.0, process_time_microseconds / 1000000.0);
}
else
{
Expand Down Expand Up @@ -843,6 +867,7 @@ void registerDictionaryArrayHashed(DictionaryFactory & factory)
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ContextPtr global_context,
DictionarySourcePtr source_ptr,
DictionaryKeyType dictionary_key_type) -> DictionaryPtr
{
Expand All @@ -863,6 +888,12 @@ void registerDictionaryArrayHashed(DictionaryFactory & factory)

HashedArrayDictionaryStorageConfiguration configuration{require_nonempty, dict_lifetime};

ContextMutablePtr context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
const auto & settings = context->getSettingsRef();

const auto * clickhouse_source = dynamic_cast<const ClickHouseDictionarySource *>(source_ptr.get());
configuration.use_async_executor = clickhouse_source && clickhouse_source->isLocal() && settings.dictionary_use_async_executor;

if (dictionary_key_type == DictionaryKeyType::Simple)
return std::make_unique<HashedArrayDictionary<DictionaryKeyType::Simple>>(dict_id, dict_struct, std::move(source_ptr), configuration);
else
Expand All @@ -872,9 +903,15 @@ void registerDictionaryArrayHashed(DictionaryFactory & factory)
using namespace std::placeholders;

factory.registerLayout("hashed_array",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Simple); }, false);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr global_context, bool /*created_from_ddl*/)
{
return create_layout(a, b, c, d, global_context, std::move(e), DictionaryKeyType::Simple);
}, false);
factory.registerLayout("complex_key_hashed_array",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Complex); }, true);
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr global_context, bool /*created_from_ddl*/)
{
return create_layout(a, b, c, d, global_context, std::move(e), DictionaryKeyType::Complex);
}, true);
}

}