Skip to content

Commit

Permalink
Merge pull request #49122 from CurtizJ/add-async-insert-mt-setting
Browse files Browse the repository at this point in the history
Add `MergeTree` setting `async_insert`
  • Loading branch information
CurtizJ committed May 3, 2023
2 parents 17d6e2c + 36d53e0 commit 7896d30
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Expand Up @@ -650,7 +650,7 @@ class IColumn;
M(UInt64, remote_read_min_bytes_for_seek, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead of read with ignore.", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
Expand Down
8 changes: 7 additions & 1 deletion src/Interpreters/executeQuery.cpp
Expand Up @@ -57,6 +57,7 @@
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Common/ProfileEvents.h>

#include <IO/CompressionMethod.h>
Expand Down Expand Up @@ -526,6 +527,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context->initializeExternalTablesIfSet();

auto * insert_query = ast->as<ASTInsertQuery>();
bool async_insert_enabled = settings.async_insert;

/// Resolve database before trying to use async insert feature - to properly hash the query.
if (insert_query)
Expand All @@ -534,6 +536,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
insert_query->table_id = context->resolveStorageID(insert_query->table_id);
else if (auto table = insert_query->getTable(); !table.empty())
insert_query->table_id = context->resolveStorageID(StorageID{insert_query->getDatabase(), table});

if (insert_query->table_id)
if (auto table = DatabaseCatalog::instance().tryGetTable(insert_query->table_id, context))
async_insert_enabled |= table->areAsynchronousInsertsEnabled();
}

if (insert_query && insert_query->select)
Expand Down Expand Up @@ -568,7 +574,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto * queue = context->getAsynchronousInsertQueue();
auto * logger = &Poco::Logger::get("executeQuery");

if (insert_query && settings.async_insert)
if (insert_query && async_insert_enabled)
{
String reason;

Expand Down
2 changes: 2 additions & 0 deletions src/Storages/IStorage.h
Expand Up @@ -178,6 +178,8 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// Returns true if the storage is for system, which cannot be target of SHOW CREATE TABLE.
virtual bool isSystemStorage() const { return false; }

/// Returns true if asynchronous inserts are enabled for table.
virtual bool areAsynchronousInsertsEnabled() const { return false; }

/// Optional size information of each physical column.
/// Currently it's only used by the MergeTree family for query optimizations.
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.h
Expand Up @@ -431,6 +431,8 @@ class MergeTreeData : public IStorage, public WithMutableContext

bool supportsLightweightDelete() const override;

bool areAsynchronousInsertsEnabled() const override { return getSettings()->async_insert; }

NamesAndTypesList getVirtuals() const override;

bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, ContextPtr, const StorageMetadataPtr & metadata_snapshot) const override;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -84,6 +84,7 @@ struct Settings;
M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background.", 0) \
\
/* Part removal settings. */ \
M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \
Expand Down
@@ -0,0 +1,4 @@
2
2
default.t_mt_async_insert 1
default.t_mt_sync_insert 0
35 changes: 35 additions & 0 deletions tests/queries/0_stateless/02725_async_insert_table_setting.sh
@@ -0,0 +1,35 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

${CLICKHOUSE_CLIENT} -n --query "
DROP TABLE IF EXISTS t_mt_async_insert;
DROP TABLE IF EXISTS t_mt_sync_insert;
CREATE TABLE t_mt_async_insert (id UInt64, s String)
ENGINE = MergeTree ORDER BY id SETTINGS async_insert = 1;
CREATE TABLE t_mt_sync_insert (id UInt64, s String)
ENGINE = MergeTree ORDER BY id SETTINGS async_insert = 0;"

url="${CLICKHOUSE_URL}&async_insert=0&wait_for_async_insert=1"

${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_mt_async_insert VALUES (1, 'aa'), (2, 'bb')"
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_mt_sync_insert VALUES (1, 'aa'), (2, 'bb')"

${CLICKHOUSE_CLIENT} -n --query "
SELECT count() FROM t_mt_async_insert;
SELECT count() FROM t_mt_sync_insert;
SYSTEM FLUSH LOGS;
SELECT tables[1], ProfileEvents['AsyncInsertQuery'] FROM system.query_log
WHERE
type = 'QueryFinish' AND
current_database = currentDatabase() AND
query ILIKE 'INSERT INTO t_mt_%sync_insert%'
ORDER BY tables[1];
DROP TABLE IF EXISTS t_mt_async_insert;
DROP TABLE IF EXISTS t_mt_sync_insert;"

0 comments on commit 7896d30

Please sign in to comment.