Skip to content

Commit

Permalink
Merge pull request #54655 from Avogar/fix-use-structure-from-insertio…
Browse files Browse the repository at this point in the history
…n-table

Fix using structure from insertion tables in case of defaults and explicit insert columns
  • Loading branch information
Avogar committed Sep 18, 2023
2 parents 6564743 + 7cfdd88 commit 3aaaf76
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 41 deletions.
36 changes: 19 additions & 17 deletions src/Analyzer/Passes/QueryAnalysisPass.cpp
Expand Up @@ -6243,28 +6243,28 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
const auto & insertion_table = scope_context->getInsertionTable();
if (!insertion_table.empty())
{
const auto & insert_structure = DatabaseCatalog::instance()
.getTable(insertion_table, scope_context)
->getInMemoryMetadataPtr()
->getColumns()
.getInsertable();
const auto & insert_columns = DatabaseCatalog::instance()
.getTable(insertion_table, scope_context)
->getInMemoryMetadataPtr()
->getColumns();
const auto & insert_column_names = scope_context->hasInsertionTableColumnNames() ? *scope_context->getInsertionTableColumnNames() : insert_columns.getInsertable().getNames();
DB::ColumnsDescription structure_hint;

bool use_columns_from_insert_query = true;

/// Insert table matches columns against SELECT expression by position, so we want to map
/// insert table columns to table function columns through names from SELECT expression.

auto insert_column = insert_structure.begin();
auto insert_structure_end = insert_structure.end(); /// end iterator of the range covered by possible asterisk
auto insert_column_name_it = insert_column_names.begin();
auto insert_column_names_end = insert_column_names.end(); /// end iterator of the range covered by possible asterisk
auto virtual_column_names = table_function_ptr->getVirtualsToCheckBeforeUsingStructureHint();
bool asterisk = false;
const auto & expression_list = scope.scope_node->as<QueryNode &>().getProjection();
auto expression = expression_list.begin();

/// We want to go through SELECT expression list and correspond each expression to column in insert table
/// which type will be used as a hint for the file structure inference.
for (; expression != expression_list.end() && insert_column != insert_structure_end; ++expression)
for (; expression != expression_list.end() && insert_column_name_it != insert_column_names_end; ++expression)
{
if (auto * identifier_node = (*expression)->as<IdentifierNode>())
{
Expand All @@ -6280,15 +6280,17 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
break;
}

structure_hint.add({ identifier_node->getIdentifier().getFullName(), insert_column->type });
ColumnDescription column = insert_columns.get(*insert_column_name_it);
column.name = identifier_node->getIdentifier().getFullName();
structure_hint.add(std::move(column));
}

/// Once we hit asterisk we want to find end of the range covered by asterisk
/// contributing every further SELECT expression to the tail of insert structure
if (asterisk)
--insert_structure_end;
--insert_column_names_end;
else
++insert_column;
++insert_column_name_it;
}
else if (auto * matcher_node = (*expression)->as<MatcherNode>(); matcher_node && matcher_node->getMatcherType() == MatcherNodeType::ASTERISK)
{
Expand Down Expand Up @@ -6322,18 +6324,18 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
/// Once we hit asterisk we want to find end of the range covered by asterisk
/// contributing every further SELECT expression to the tail of insert structure
if (asterisk)
--insert_structure_end;
--insert_column_names_end;
else
++insert_column;
++insert_column_name_it;
}
else
{
/// Once we hit asterisk we want to find end of the range covered by asterisk
/// contributing every further SELECT expression to the tail of insert structure
if (asterisk)
--insert_structure_end;
--insert_column_names_end;
else
++insert_column;
++insert_column_name_it;
}
}

Expand All @@ -6353,8 +6355,8 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
/// Append tail of insert structure to the hint
if (asterisk)
{
for (; insert_column != insert_structure_end; ++insert_column)
structure_hint.add({ insert_column->name, insert_column->type });
for (; insert_column_name_it != insert_column_names_end; ++insert_column_name_it)
structure_hint.add(insert_columns.get(*insert_column_name_it));
}

if (!structure_hint.empty())
Expand Down
37 changes: 20 additions & 17 deletions src/Interpreters/Context.cpp
Expand Up @@ -1587,28 +1587,29 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
uint64_t use_structure_from_insertion_table_in_table_functions = getSettingsRef().use_structure_from_insertion_table_in_table_functions;
if (use_structure_from_insertion_table_in_table_functions && table_function_ptr->needStructureHint() && hasInsertionTable())
{
const auto & insert_structure = DatabaseCatalog::instance()
.getTable(getInsertionTable(), shared_from_this())
->getInMemoryMetadataPtr()
->getColumns()
.getInsertable();
const auto & insert_columns = DatabaseCatalog::instance()
.getTable(getInsertionTable(), shared_from_this())
->getInMemoryMetadataPtr()
->getColumns();

const auto & insert_column_names = hasInsertionTableColumnNames() ? *getInsertionTableColumnNames() : insert_columns.getInsertable().getNames();
DB::ColumnsDescription structure_hint;

bool use_columns_from_insert_query = true;

/// Insert table matches columns against SELECT expression by position, so we want to map
/// insert table columns to table function columns through names from SELECT expression.

auto insert_column = insert_structure.begin();
auto insert_structure_end = insert_structure.end(); /// end iterator of the range covered by possible asterisk
auto insert_column_name_it = insert_column_names.begin();
auto insert_column_names_end = insert_column_names.end(); /// end iterator of the range covered by possible asterisk
auto virtual_column_names = table_function_ptr->getVirtualsToCheckBeforeUsingStructureHint();
bool asterisk = false;
const auto & expression_list = select_query_hint->select()->as<ASTExpressionList>()->children;
const auto * expression = expression_list.begin();

/// We want to go through SELECT expression list and correspond each expression to column in insert table
/// which type will be used as a hint for the file structure inference.
for (; expression != expression_list.end() && insert_column != insert_structure_end; ++expression)
for (; expression != expression_list.end() && insert_column_name_it != insert_column_names_end; ++expression)
{
if (auto * identifier = (*expression)->as<ASTIdentifier>())
{
Expand All @@ -1623,15 +1624,17 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
break;
}

structure_hint.add({ identifier->name(), insert_column->type });
ColumnDescription column = insert_columns.get(*insert_column_name_it);
column.name = identifier->name();
structure_hint.add(std::move(column));
}

/// Once we hit asterisk we want to find end of the range covered by asterisk
/// contributing every further SELECT expression to the tail of insert structure
if (asterisk)
--insert_structure_end;
--insert_column_names_end;
else
++insert_column;
++insert_column_name_it;
}
else if ((*expression)->as<ASTAsterisk>())
{
Expand Down Expand Up @@ -1665,18 +1668,18 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
/// Once we hit asterisk we want to find end of the range covered by asterisk
/// contributing every further SELECT expression to the tail of insert structure
if (asterisk)
--insert_structure_end;
--insert_column_names_end;
else
++insert_column;
++insert_column_name_it;
}
else
{
/// Once we hit asterisk we want to find end of the range covered by asterisk
/// contributing every further SELECT expression to the tail of insert structure
if (asterisk)
--insert_structure_end;
--insert_column_names_end;
else
++insert_column;
++insert_column_name_it;
}
}

Expand All @@ -1696,8 +1699,8 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
/// Append tail of insert structure to the hint
if (asterisk)
{
for (; insert_column != insert_structure_end; ++insert_column)
structure_hint.add({ insert_column->name, insert_column->type });
for (; insert_column_name_it != insert_column_names_end; ++insert_column_name_it)
structure_hint.add(insert_columns.get(*insert_column_name_it));
}

if (!structure_hint.empty())
Expand Down
16 changes: 12 additions & 4 deletions src/Interpreters/Context.h
Expand Up @@ -265,7 +265,13 @@ class Context: public std::enable_shared_from_this<Context>

std::weak_ptr<QueryStatus> process_list_elem; /// For tracking total resource usage for query.
bool has_process_list_elem = false; /// It's impossible to check if weak_ptr was initialized or not
StorageID insertion_table = StorageID::createEmpty(); /// Saved insertion table in query context
struct InsertionTableInfo
{
StorageID table = StorageID::createEmpty();
std::optional<Names> column_names;
};

InsertionTableInfo insertion_table_info; /// Saved information about insertion table in query context
bool is_distributed = false; /// Whether the current context it used for distributed query

String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
Expand Down Expand Up @@ -713,9 +719,11 @@ class Context: public std::enable_shared_from_this<Context>

void killCurrentQuery() const;

bool hasInsertionTable() const { return !insertion_table.empty(); }
void setInsertionTable(StorageID db_and_table) { insertion_table = std::move(db_and_table); }
const StorageID & getInsertionTable() const { return insertion_table; }
bool hasInsertionTable() const { return !insertion_table_info.table.empty(); }
bool hasInsertionTableColumnNames() const { return insertion_table_info.column_names.has_value(); }
void setInsertionTable(StorageID db_and_table, std::optional<Names> column_names = std::nullopt) { insertion_table_info = {std::move(db_and_table), std::move(column_names)}; }
const StorageID & getInsertionTable() const { return insertion_table_info.table; }
const std::optional<Names> & getInsertionTableColumnNames() const{ return insertion_table_info.column_names; }

void setDistributed(bool is_distributed_) { is_distributed = is_distributed_; }
bool isDistributed() const { return is_distributed; }
Expand Down
24 changes: 22 additions & 2 deletions src/Interpreters/InterpreterInsertQuery.cpp
Expand Up @@ -138,8 +138,9 @@ Block InterpreterInsertQuery::getSampleBlock(
}

/// Form the block based on the column names from the query
Names names;
const auto columns_ast = processColumnTransformers(getContext()->getCurrentDatabase(), table, metadata_snapshot, query.columns);
Names names;
names.reserve(columns_ast->children.size());
for (const auto & identifier : columns_ast->children)
{
std::string current_name = identifier->getColumnName();
Expand All @@ -149,6 +150,25 @@ Block InterpreterInsertQuery::getSampleBlock(
return getSampleBlock(names, table, metadata_snapshot);
}

std::optional<Names> InterpreterInsertQuery::getInsertColumnNames() const
{
auto const * insert_query = query_ptr->as<ASTInsertQuery>();
if (!insert_query || !insert_query->columns)
return std::nullopt;

auto table = DatabaseCatalog::instance().getTable(getDatabaseTable(), getContext());
const auto columns_ast = processColumnTransformers(getContext()->getCurrentDatabase(), table, table->getInMemoryMetadataPtr(), insert_query->columns);
Names names;
names.reserve(columns_ast->children.size());
for (const auto & identifier : columns_ast->children)
{
std::string current_name = identifier->getColumnName();
names.emplace_back(std::move(current_name));
}

return names;
}

Block InterpreterInsertQuery::getSampleBlock(
const Names & names,
const StoragePtr & table,
Expand Down Expand Up @@ -456,7 +476,7 @@ BlockIO InterpreterInsertQuery::execute()

auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
new_context->setInsertionTable(getContext()->getInsertionTable());
new_context->setInsertionTable(getContext()->getInsertionTable(), getContext()->getInsertionTableColumnNames());

auto select_query_options = SelectQueryOptions(QueryProcessingStage::Complete, 1);

Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/InterpreterInsertQuery.h
Expand Up @@ -37,6 +37,10 @@ class InterpreterInsertQuery : public IInterpreter, WithContext

StorageID getDatabaseTable() const;

/// Return explicitly specified column names to insert.
/// It not explicit names were specified, return nullopt.
std::optional<Names> getInsertColumnNames() const;

Chain buildChain(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/executeQuery.cpp
Expand Up @@ -1074,7 +1074,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Save insertion table (not table function). TODO: support remote() table function.
auto table_id = insert_interpreter->getDatabaseTable();
if (!table_id.empty())
context->setInsertionTable(std::move(table_id));
context->setInsertionTable(std::move(table_id), insert_interpreter->getInsertColumnNames());

if (insert_data_buffer_holder)
insert_interpreter->addBuffer(std::move(insert_data_buffer_holder));
Expand Down
@@ -0,0 +1,4 @@
0 0
0 0
42 0
42 0
@@ -0,0 +1,17 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

$CLICKHOUSE_LOCAL -q "select 42 as x format Native" > $CLICKHOUSE_TEST_UNIQUE_NAME.native
$CLICKHOUSE_LOCAL -n -q "
create table test (x UInt64, y UInt64) engine=Memory;
insert into test (x) select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME.native');
insert into test (y) select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME.native');
insert into test (* except(x)) select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME.native');
insert into test (* except(y)) select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME.native');
select * from test order by x;
"

rm $CLICKHOUSE_TEST_UNIQUE_NAME.native
@@ -0,0 +1 @@
1 42
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

$CLICKHOUSE_LOCAL -q "select 1 as x format Native" > $CLICKHOUSE_TEST_UNIQUE_NAME.native
$CLICKHOUSE_LOCAL -n -q "
create table test (x UInt64, y UInt64 default 42) engine=Memory;
insert into test select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME.native');
select * from test;
"

rm $CLICKHOUSE_TEST_UNIQUE_NAME.native

0 comments on commit 3aaaf76

Please sign in to comment.