Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JoinedTables related changes #9471

Merged
merged 1 commit into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 38 additions & 5 deletions dbms/src/Interpreters/DatabaseAndTableWithAlias.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ struct TableWithColumnNames
, columns(columns_)
{}

void addHiddenColumns(const NamesAndTypesList & addition)
{
for (auto & column : addition)
hidden_columns.push_back(column.name);
}
TableWithColumnNames(const DatabaseAndTableWithAlias table_, Names && columns_, Names && hidden_columns_)
: table(table_)
, columns(columns_)
, hidden_columns(hidden_columns_)
{}

bool hasColumn(const String & name) const
{
Expand All @@ -69,9 +69,42 @@ struct TableWithColumnNames
mutable NameSet columns_set;
};

struct TableWithColumnNamesAndTypes
{
DatabaseAndTableWithAlias table;
NamesAndTypesList columns;
NamesAndTypesList hidden_columns;

TableWithColumnNamesAndTypes(const DatabaseAndTableWithAlias & table_, const NamesAndTypesList & columns_)
: table(table_)
, columns(columns_)
{}

void addHiddenColumns(const NamesAndTypesList & addition)
{
hidden_columns.insert(hidden_columns.end(), addition.begin(), addition.end());
}

TableWithColumnNames removeTypes() const
{
Names out_columns;
out_columns.reserve(columns.size());
for (auto & col : columns)
out_columns.push_back(col.name);

Names out_hidden_columns;
out_hidden_columns.reserve(hidden_columns.size());
for (auto & col : hidden_columns)
out_hidden_columns.push_back(col.name);

return TableWithColumnNames(table, std::move(out_columns), std::move(out_hidden_columns));
}
};

std::vector<DatabaseAndTableWithAlias> getDatabaseAndTables(const ASTSelectQuery & select_query, const String & current_database);
std::optional<DatabaseAndTableWithAlias> getDatabaseAndTable(const ASTSelectQuery & select, size_t table_number);

using TablesWithColumnNames = std::vector<TableWithColumnNames>;
using TablesWithColumnNamesAndTypes = std::vector<TableWithColumnNames>;

}
92 changes: 26 additions & 66 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
#include <Storages/IStorage.h>
#include <Storages/StorageValues.h>

#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
Expand Down Expand Up @@ -236,25 +235,22 @@ InterpreterSelectQuery::InterpreterSelectQuery(
throw Exception("Too deep subqueries. Maximum: " + settings.max_subquery_depth.toString(),
ErrorCodes::TOO_DEEP_SUBQUERIES);

CrossToInnerJoinVisitor::Data cross_to_inner;
CrossToInnerJoinVisitor(cross_to_inner).visit(query_ptr);
JoinedTables joined_tables(getSelectQuery());
if (joined_tables.hasJoins())
{
CrossToInnerJoinVisitor::Data cross_to_inner;
CrossToInnerJoinVisitor(cross_to_inner).visit(query_ptr);

JoinToSubqueryTransformVisitor::Data join_to_subs_data{*context};
JoinToSubqueryTransformVisitor(join_to_subs_data).visit(query_ptr);
JoinToSubqueryTransformVisitor::Data join_to_subs_data{*context};
JoinToSubqueryTransformVisitor(join_to_subs_data).visit(query_ptr);

max_streams = settings.max_threads;
auto & query = getSelectQuery();
joined_tables.reset(getSelectQuery());
}

ASTPtr table_expression = extractTableExpression(query, 0);
String database_name, table_name;
max_streams = settings.max_threads;
ASTSelectQuery & query = getSelectQuery();

bool is_table_func = false;
bool is_subquery = false;
if (table_expression)
{
is_table_func = table_expression->as<ASTFunction>();
is_subquery = table_expression->as<ASTSelectWithUnionQuery>();
}
const ASTPtr & left_table_expression = joined_tables.leftTableExpression();

if (input)
{
Expand All @@ -266,55 +262,35 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Read from prepared input.
source_header = input_pipe->getHeader();
}
else if (is_subquery)
else if (joined_tables.isLeftTableSubquery())
{
/// Read from subquery.
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
table_expression, getSubqueryContext(*context), options.subquery(), required_columns);
left_table_expression, getSubqueryContext(*context), options.subquery(), required_columns);

source_header = interpreter_subquery->getSampleBlock();
}
else if (!storage)
{
if (is_table_func)
if (joined_tables.isLeftTableFunction())
{
/// Read from table function. propagate all settings from initSettings(),
/// alternative is to call on current `context`, but that can potentially pollute it.
storage = getSubqueryContext(*context).executeTableFunction(table_expression);
storage = getSubqueryContext(*context).executeTableFunction(left_table_expression);
}
else
{
getDatabaseAndTableNames(query, database_name, table_name, *context);

if (auto view_source = context->getViewSource())
{
auto & storage_values = static_cast<const StorageValues &>(*view_source);
auto tmp_table_id = storage_values.getStorageID();
if (tmp_table_id.database_name == database_name && tmp_table_id.table_name == table_name)
{
/// Read from view source.
storage = context->getViewSource();
}
}

if (!storage)
{
/// Read from table. Even without table expression (implicit SELECT ... FROM system.one).
storage = context->getTable(database_name, table_name);
}
}
storage = joined_tables.getLeftTableStorage(*context);
}

if (storage)
{
table_lock = storage->lockStructureForShare(false, context->getInitialQueryId());
table_id = storage->getStorageID();
}

/// Extract joined tables colunms if any.
/// It could get storage from context without lockStructureForShare(). TODO: add lock there or rewrite this logic.
JoinedTables joined_tables;
joined_tables.resolveTables(*query_ptr->as<ASTSelectQuery>(), storage, *context, source_header.getNamesAndTypesList());
joined_tables.resolveTables(getSubqueryContext(*context), storage);
}
else
joined_tables.resolveTables(getSubqueryContext(*context), source_header.getNamesAndTypesList());

auto analyze = [&] (bool try_move_to_prewhere = true)
{
Expand Down Expand Up @@ -353,9 +329,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (syntax_analyzer_result->rewrite_subqueries)
{
/// remake interpreter_subquery when PredicateOptimizer rewrites subqueries and main table is subquery
if (is_subquery)
if (joined_tables.isLeftTableSubquery())
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
table_expression,
left_table_expression,
getSubqueryContext(*context),
options.subquery(),
required_columns);
Expand Down Expand Up @@ -430,6 +406,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (query.prewhere() && !query.where())
analysis_result.prewhere_info->need_filter = true;

const String & database_name = joined_tables.leftTableDatabase();
const String & table_name = joined_tables.leftTableName();

if (!table_name.empty() && !database_name.empty() /* always allow access to temporary tables */)
context->checkAccess(AccessType::SELECT, database_name, table_name, required_columns);

Expand All @@ -447,25 +426,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}


void InterpreterSelectQuery::getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context)
{
if (auto db_and_table = getDatabaseAndTable(query, 0))
{
table_name = db_and_table->table;
database_name = db_and_table->database;

/// If the database is not specified - use the current database.
if (database_name.empty() && !context.isExternalTableExist(table_name))
database_name = context.getCurrentDatabase();
}
else /// If the table is not specified - use the table `system.one`.
{
database_name = "system";
table_name = "one";
}
}


Block InterpreterSelectQuery::getSampleBlock()
{
return result_header;
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ class InterpreterSelectQuery : public IInterpreter
template <typename TPipeline>
void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe, QueryPipeline & save_context_and_storage);

/** From which table to read. With JOIN, the "left" table is returned.
*/
static void getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context);

/// Different stages of query execution.

/// dry_run - don't read from table, use empty header block instead.
Expand Down
113 changes: 81 additions & 32 deletions dbms/src/Interpreters/JoinedTables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#include <Interpreters/getTableExpressions.h>
#include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageValues.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectWithUnionQuery.h>

namespace DB
{
Expand All @@ -15,61 +18,107 @@ namespace ErrorCodes
namespace
{

std::vector<TableWithColumnNames> getTablesWithColumns(const std::vector<const ASTTableExpression * > & table_expressions,
const Context & context)
template <typename T>
void checkTablesWithColumns(const std::vector<T> & tables_with_columns, const Context & context)
{
std::vector<TableWithColumnNames> tables_with_columns = getDatabaseAndTablesWithColumnNames(table_expressions, context);

auto & settings = context.getSettingsRef();
if (settings.joined_subquery_requires_alias && tables_with_columns.size() > 1)
{
for (auto & pr : tables_with_columns)
if (pr.table.table.empty() && pr.table.alias.empty())
for (auto & t : tables_with_columns)
if (t.table.table.empty() && t.table.alias.empty())
throw Exception("No alias for subquery or table function in JOIN (set joined_subquery_requires_alias=0 to disable restriction).",
ErrorCodes::ALIAS_REQUIRED);
}
}

return tables_with_columns;
}

JoinedTables::JoinedTables(const ASTSelectQuery & select_query)
: table_expressions(getTableExpressions(select_query))
, left_table_expression(extractTableExpression(select_query, 0))
, left_db_and_table(getDatabaseAndTable(select_query, 0))
{}

bool JoinedTables::isLeftTableSubquery() const
{
return left_table_expression && left_table_expression->as<ASTSelectWithUnionQuery>();
}

bool JoinedTables::isLeftTableFunction() const
{
return left_table_expression && left_table_expression->as<ASTFunction>();
}

void JoinedTables::resolveTables(const ASTSelectQuery & select_query, StoragePtr storage, const Context & context,
const NamesAndTypesList & source_columns)
StoragePtr JoinedTables::getLeftTableStorage(Context & context)
{
StoragePtr storage;

if (left_db_and_table)
{
database_name = left_db_and_table->database;
table_name = left_db_and_table->table;

/// If the database is not specified - use the current database.
if (database_name.empty() && !context.isExternalTableExist(table_name))
database_name = context.getCurrentDatabase();
}
else /// If the table is not specified - use the table `system.one`.
{
database_name = "system";
table_name = "one";
}

if (auto view_source = context.getViewSource())
{
auto & storage_values = static_cast<const StorageValues &>(*view_source);
auto tmp_table_id = storage_values.getStorageID();
if (tmp_table_id.database_name == database_name && tmp_table_id.table_name == table_name)
{
/// Read from view source.
storage = context.getViewSource();
}
}

if (!storage)
{
if (auto db_and_table = getDatabaseAndTable(select_query, 0))
storage = context.tryGetTable(db_and_table->database, db_and_table->table);
/// Read from table. Even without table expression (implicit SELECT ... FROM system.one).
storage = context.getTable(database_name, table_name);
}

std::vector<const ASTTableExpression *> table_expressions = getTableExpressions(select_query);
tables_with_columns = getTablesWithColumns(table_expressions, context);
return storage;
}

const NamesAndTypesList & JoinedTables::secondTableColumns() const
{
static const NamesAndTypesList empty;
if (tables_with_columns.size() > 1)
return tables_with_columns[1].columns;
return empty;
}

void JoinedTables::resolveTables(const Context & context, StoragePtr storage)
{
tables_with_columns = getDatabaseAndTablesWithColumns(table_expressions, context);
checkTablesWithColumns(tables_with_columns, context);

if (tables_with_columns.empty())
{
if (storage)
{
const ColumnsDescription & storage_columns = storage->getColumns();
tables_with_columns.emplace_back(DatabaseAndTableWithAlias{}, storage_columns.getOrdinary().getNames());
auto & table = tables_with_columns.back();
table.addHiddenColumns(storage_columns.getMaterialized());
table.addHiddenColumns(storage_columns.getAliases());
table.addHiddenColumns(storage_columns.getVirtuals());
}
else
{
Names columns;
columns.reserve(source_columns.size());
for (const auto & column : source_columns)
columns.push_back(column.name);
tables_with_columns.emplace_back(DatabaseAndTableWithAlias{}, columns);
}
const ColumnsDescription & storage_columns = storage->getColumns();
tables_with_columns.emplace_back(DatabaseAndTableWithAlias{}, storage_columns.getOrdinary());
auto & table = tables_with_columns.back();
table.addHiddenColumns(storage_columns.getMaterialized());
table.addHiddenColumns(storage_columns.getAliases());
table.addHiddenColumns(storage_columns.getVirtuals());
}
}

void JoinedTables::resolveTables(const Context & context, const NamesAndTypesList & source_columns)
{
tables_with_columns = getDatabaseAndTablesWithColumns(table_expressions, context);
checkTablesWithColumns(tables_with_columns, context);

if (table_expressions.size() > 1)
columns_from_joined_table = getColumnsFromTableExpression(*table_expressions[1], context);
if (tables_with_columns.empty())
tables_with_columns.emplace_back(DatabaseAndTableWithAlias{}, source_columns);
}

}