Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CREATE TABLE AS table_function() #6057

Merged
merged 16 commits into from Jul 22, 2019
8 changes: 8 additions & 0 deletions dbms/src/Databases/DatabasesCommon.cpp
Expand Up @@ -9,6 +9,7 @@
#include <Storages/IStorage.h>
#include <Storages/StorageFactory.h>
#include <Common/typeid_cast.h>
#include <TableFunctions/TableFunctionFactory.h>

#include <sstream>

Expand Down Expand Up @@ -68,6 +69,13 @@ std::pair<String, StoragePtr> createTableFromDefinition(
ast_create_query.attach = true;
ast_create_query.database = database_name;

if (ast_create_query.as_table_function)
{
const auto & table_function = ast_create_query.as_table_function->as<ASTFunction &>();
const auto & factory = TableFunctionFactory::instance();
StoragePtr storage = factory.get(table_function.name, context)->execute(ast_create_query.as_table_function, context, ast_create_query.table);
return {ast_create_query.table, storage};
}
/// We do not directly use `InterpreterCreateQuery::execute`, because
/// - the database has not been created yet;
/// - the code is simpler, since the query is already brought to a suitable form.
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
Expand Up @@ -99,7 +99,8 @@ void SelectStreamFactory::createForShard(
if (table_func_ptr)
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context);
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
}
else
main_table_storage = context.tryGetTable(main_table.database, main_table.table);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.cpp
Expand Up @@ -963,7 +963,7 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression->as<ASTFunction>()->name, *this);

/// Run it and remember the result
res = table_function_ptr->execute(table_expression, *this);
res = table_function_ptr->execute(table_expression, *this, table_function_ptr->getName());
}

return res;
Expand Down
34 changes: 24 additions & 10 deletions dbms/src/Interpreters/InterpreterCreateQuery.cpp
Expand Up @@ -46,6 +46,8 @@
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/addTypeConversionToAST.h>

#include <TableFunctions/TableFunctionFactory.h>


namespace DB
{
Expand Down Expand Up @@ -384,7 +386,7 @@ ColumnsDescription InterpreterCreateQuery::setColumns(
indices.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
}
else if (!create.as_table.empty())
else if (!create.as_table.empty() || create.as_table_function)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

{
columns = as_storage->getColumns();
indices = as_storage->getIndices();
Expand Down Expand Up @@ -518,6 +520,13 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)

StoragePtr as_storage;
TableStructureReadLockHolder as_storage_lock;

if (create.as_table_function)
{
const auto * table_function = create.as_table_function->as<ASTFunction>();
const auto & factory = TableFunctionFactory::instance();
as_storage = factory.get(table_function->name, context)->execute(create.as_table_function, context, create.table);
}
if (!as_table_name.empty())
{
as_storage = context.getTable(as_database_name, as_table_name);
Expand Down Expand Up @@ -585,15 +594,20 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return {};

res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
columns,
create.attach,
false);
if (create.as_table_function)
res = as_storage;
else
{
res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
columns,
create.attach,
false);
}

if (create.temporary)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/InterpreterDescribeQuery.cpp
Expand Up @@ -79,7 +79,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
const auto & table_function = table_expression.table_function->as<ASTFunction &>();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function.name, context);
/// Run the table function and remember the result
table = table_function_ptr->execute(table_expression.table_function, context);
table = table_function_ptr->execute(table_expression.table_function, context, table_function_ptr->getName());
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/InterpreterInsertQuery.cpp
Expand Up @@ -48,7 +48,8 @@ StoragePtr InterpreterInsertQuery::getTable(const ASTInsertQuery & query)
{
const auto * table_function = query.table_function->as<ASTFunction>();
const auto & factory = TableFunctionFactory::instance();
return factory.get(table_function->name, context)->execute(query.table_function, context);
TableFunctionPtr table_function_ptr = factory.get(table_function->name, context);
return table_function_ptr->execute(query.table_function, context, table_function_ptr->getName());
}

/// Into what table to write.
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Parsers/ASTCreateQuery.cpp
Expand Up @@ -216,7 +216,11 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
formatOnCluster(settings);
}

if (as_table_function)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_none : "");
as_table_function->formatImpl(settings, state, frame);
}
if (!to_table.empty())
{
settings.ostr
Expand All @@ -231,7 +235,7 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
<< (!as_database.empty() ? backQuoteIfNeed(as_database) + "." : "") << backQuoteIfNeed(as_table);
}

if (columns_list)
if (columns_list && !as_table_function)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be able to parse CREATE table (columns_list) AS table_function(...)
Please add negative test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be able to parse CREATE table (columns_list) AS table_function(...)
Please add negative test.

Test will be added, but this code actually prevents printing a frame with columns in metadata sql file, so it can simply be ATTACH table AS table_function(). Here comes the next question, why was columns_list initialized. (It responds to your last comment on this topic) It is initialized with setColumns() to avoid duplicating some code in initialization of a table. But I am not sure if it is right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried to fix it with if clause

{
settings.ostr << (settings.one_line ? " (" : "\n(");
FormatStateStacked frame_nested = frame;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Parsers/ASTCreateQuery.h
Expand Up @@ -63,6 +63,7 @@ class ASTCreateQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnC
ASTStorage * storage = nullptr;
String as_database;
String as_table;
ASTPtr as_table_function;
ASTSelectWithUnionQuery * select = nullptr;

/** Get the text that identifies this element. */
Expand Down
30 changes: 19 additions & 11 deletions dbms/src/Parsers/ParserCreateQuery.cpp
Expand Up @@ -319,6 +319,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserIdentifier name_p;
ParserColumnsOrIndicesDeclarationList columns_or_indices_p;
ParserSelectWithUnionQuery select_p;
ParserFunction table_function_p;

ASTPtr database;
ASTPtr table;
Expand All @@ -328,6 +329,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr storage;
ASTPtr as_database;
ASTPtr as_table;
ASTPtr as_table_function;
ASTPtr select;
String cluster_str;
bool attach = false;
Expand Down Expand Up @@ -407,22 +409,25 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!s_as.ignore(pos, expected))
return false;

if (!select_p.parse(pos, select, expected)) /// AS SELECT ...
if (!table_function_p.parse(pos, as_table_function, expected))
{
/// AS [db.]table
if (!name_p.parse(pos, as_table, expected))
return false;

if (s_dot.ignore(pos, expected))
if (!select_p.parse(pos, select, expected)) /// AS SELECT ...
{
as_database = as_table;
/// AS [db.]table
if (!name_p.parse(pos, as_table, expected))
return false;
}

/// Optional - ENGINE can be specified.
if (!storage)
storage_p.parse(pos, storage, expected);
if (s_dot.ignore(pos, expected))
{
as_database = as_table;
if (!name_p.parse(pos, as_table, expected))
return false;
}

/// Optional - ENGINE can be specified.
if (!storage)
storage_p.parse(pos, storage, expected);
}
}
}
}
Expand Down Expand Up @@ -526,6 +531,9 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
auto query = std::make_shared<ASTCreateQuery>();
node = query;

if (as_table_function)
query->as_table_function = as_table_function;

query->attach = attach;
query->if_not_exists = if_not_exists;
query->is_view = is_view;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/getStructureOfRemoteTable.cpp
Expand Up @@ -40,7 +40,8 @@ ColumnsDescription getStructureOfRemoteTable(
if (shard_info.isLocal())
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
return TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context)->getColumns();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
return table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName())->getColumns();
}

auto table_func_name = queryToString(table_func_ptr);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/TableFunctions/ITableFunction.cpp
Expand Up @@ -10,10 +10,10 @@ namespace ProfileEvents
namespace DB
{

StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context) const
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
ProfileEvents::increment(ProfileEvents::TableFunctionExecute);
return executeImpl(ast_function, context);
return executeImpl(ast_function, context, table_name);
}

}
4 changes: 2 additions & 2 deletions dbms/src/TableFunctions/ITableFunction.h
Expand Up @@ -32,12 +32,12 @@ class ITableFunction
virtual std::string getName() const = 0;

/// Create storage according to the query.
StoragePtr execute(const ASTPtr & ast_function, const Context & context) const;
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const;

virtual ~ITableFunction() {}

private:
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const = 0;
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const = 0;
};

using TableFunctionPtr = std::shared_ptr<ITableFunction>;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/TableFunctions/ITableFunctionFileLike.cpp
Expand Up @@ -19,7 +19,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}

StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
// Parse args
ASTs & args_func = ast_function->children;
Expand Down Expand Up @@ -60,7 +60,7 @@ StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, cons
}

// Create table
StoragePtr storage = getStorage(filename, format, sample_block, const_cast<Context &>(context));
StoragePtr storage = getStorage(filename, format, sample_block, const_cast<Context &>(context), table_name);

storage->startup();

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/TableFunctions/ITableFunctionFileLike.h
Expand Up @@ -12,8 +12,8 @@ namespace DB
class ITableFunctionFileLike : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
virtual StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const = 0;
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const = 0;
};
}
15 changes: 9 additions & 6 deletions dbms/src/TableFunctions/ITableFunctionXDBC.cpp
Expand Up @@ -27,7 +27,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
const auto & args_func = ast_function->as<ASTFunction &>();

Expand All @@ -45,18 +45,18 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co

std::string connection_string;
std::string schema_name;
std::string table_name;
std::string remote_table_name;

if (args.size() == 3)
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
schema_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
else if (args.size() == 2)
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
remote_table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}

/* Infer external table structure */
Expand All @@ -68,15 +68,18 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
columns_info_uri.addQueryParameter("connection_string", connection_string);
if (!schema_name.empty())
columns_info_uri.addQueryParameter("schema", schema_name);
columns_info_uri.addQueryParameter("table", table_name);
columns_info_uri.addQueryParameter("table", remote_table_name);

ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, nullptr);

std::string columns_info;
readStringBinary(columns_info, buf);
NamesAndTypesList columns = NamesAndTypesList::parse(columns_info);

auto result = std::make_shared<StorageXDBC>(getDatabaseName(), table_name, schema_name, table_name, ColumnsDescription{columns}, context, helper);
///If table_name was not specified by user, it will have the same name as remote_table_name
std::string local_table_name = (table_name == getName()) ? remote_table_name : table_name;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is strange.


auto result = std::make_shared<StorageXDBC>(getDatabaseName(), local_table_name, schema_name, remote_table_name, ColumnsDescription{columns}, context, helper);

if (!result)
throw Exception("Failed to instantiate storage from table function " + getName(), ErrorCodes::UNKNOWN_EXCEPTION);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TableFunctions/ITableFunctionXDBC.h
Expand Up @@ -15,7 +15,7 @@ namespace DB
class ITableFunctionXDBC : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;

/* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(Context & context,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/TableFunctions/TableFunctionCatBoostPool.cpp
Expand Up @@ -16,7 +16,7 @@ namespace ErrorCodes
}


StoragePtr TableFunctionCatBoostPool::executeImpl(const ASTPtr & ast_function, const Context & context) const
StoragePtr TableFunctionCatBoostPool::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
ASTs & args_func = ast_function->children;

Expand Down Expand Up @@ -45,7 +45,7 @@ StoragePtr TableFunctionCatBoostPool::executeImpl(const ASTPtr & ast_function, c
String column_descriptions_file = getStringLiteral(*args[0], "Column descriptions file");
String dataset_description_file = getStringLiteral(*args[1], "Dataset description file");

return StorageCatBoostPool::create(getDatabaseName(), getName(), context, column_descriptions_file, dataset_description_file);
return StorageCatBoostPool::create(getDatabaseName(), table_name, context, column_descriptions_file, dataset_description_file);
}

void registerTableFunctionCatBoostPool(TableFunctionFactory & factory)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TableFunctions/TableFunctionCatBoostPool.h
Expand Up @@ -15,7 +15,7 @@ class TableFunctionCatBoostPool : public ITableFunction
static constexpr auto name = "catBoostPool";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
};

}
4 changes: 2 additions & 2 deletions dbms/src/TableFunctions/TableFunctionFile.cpp
Expand Up @@ -5,13 +5,13 @@
namespace DB
{
StoragePtr TableFunctionFile::getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const
{
return StorageFile::create(source,
-1,
global_context.getUserFilesPath(),
getDatabaseName(),
getName(),
table_name,
format,
ColumnsDescription{sample_block.getNamesAndTypesList()},
global_context);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TableFunctions/TableFunctionFile.h
Expand Up @@ -24,6 +24,6 @@ class TableFunctionFile : public ITableFunctionFileLike

private:
StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const override;
const String & source, const String & format, const Block & sample_block, Context & global_context, const std::string & table_name) const override;
};
}