Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split databases from Context #9065

Merged
merged 37 commits into from
Mar 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
93dba03
SHOW CREATE for DatabaseMemory
tavplubix Jan 30, 2020
a508c25
store temporary tables in DatabaseMemory
tavplubix Jan 30, 2020
b7197ba
fix build
tavplubix Feb 10, 2020
4ff7bf7
add DatabaseCatalog
tavplubix Feb 3, 2020
869e20d
move databases from Context to DatabaseCatalog
tavplubix Feb 10, 2020
45338ad
fixes
tavplubix Feb 10, 2020
18dd0c8
move DDLGuard to DatabaseCatalog
tavplubix Feb 10, 2020
3c5d6cf
fix
tavplubix Feb 11, 2020
2e6796e
move ViewDependencies to DatabaseCatalog
tavplubix Feb 12, 2020
b6039f8
remove tryGetExternalTable
tavplubix Feb 12, 2020
1d6afe9
Merge branch 'master' into split_databases_from_context
tavplubix Feb 12, 2020
c479bbe
make Context::getLock() private
tavplubix Feb 13, 2020
964b775
Merge branch 'master' into split_databases_from_context
tavplubix Feb 17, 2020
d710bd1
better get... methods
tavplubix Feb 17, 2020
e6718e1
less Context::getTable() usages
tavplubix Feb 17, 2020
d5e7915
Merge branch 'master' into split_databases_from_context
tavplubix Feb 21, 2020
29a993a
remove another method from Context
tavplubix Feb 21, 2020
8548089
Merge branch 'master' into split_databases_from_context
tavplubix Feb 25, 2020
a4d12b3
Merge branch 'master' into split_databases_from_context
tavplubix Mar 2, 2020
fa27ecf
do not allow custom database for system tables
tavplubix Mar 2, 2020
70dc82f
add StorageID to ASTInsertQuery
tavplubix Mar 2, 2020
273333b
fixes
tavplubix Mar 3, 2020
67c63a7
Merge branch 'master' into split_databases_from_context
tavplubix Mar 4, 2020
2d5ed78
remove tryGetTable
tavplubix Mar 4, 2020
2a18ada
fix test_part_log_table
tavplubix Mar 5, 2020
367b358
remove Context::getTable()
tavplubix Mar 6, 2020
c7468d2
Merge branch 'master' into split_databases_from_context
tavplubix Mar 10, 2020
8b3a245
improve temporary tables
tavplubix Mar 10, 2020
4972dae
fixes
tavplubix Mar 11, 2020
cf28bfe
Merge branch 'master' into split_databases_from_context
tavplubix Mar 12, 2020
e98d4f4
remove isExternalTableExist
tavplubix Mar 12, 2020
158cde6
enable UUIDs for temporary tables
tavplubix Mar 12, 2020
2208ff6
fix build
tavplubix Mar 13, 2020
00b7190
minor improvements
tavplubix Mar 13, 2020
339c101
add test for tmp tables
tavplubix Mar 13, 2020
729e4fc
fix
tavplubix Mar 13, 2020
99e0f7b
fix
tavplubix Mar 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/programs/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ class Client : public Poco::Util::Application

configReadClient(config(), home_path);

context.makeGlobalContext();
context.setApplicationType(Context::ApplicationType::CLIENT);
context.setQueryParameters(query_parameters);

Expand Down Expand Up @@ -1730,6 +1729,7 @@ class Client : public Poco::Util::Application
("server_logs_file", po::value<std::string>(), "put server logs into specified file")
;

context.makeGlobalContext();
context.getSettingsRef().addProgramOptions(main_description);

/// Commandline options related to external tables.
Expand Down
2 changes: 1 addition & 1 deletion dbms/programs/copier/ClusterCopierApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void ClusterCopierApp::mainImpl()
registerDisks();

static const std::string default_database = "_local";
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
context->setCurrentDatabase(default_database);

/// Initialize query scope just in case.
Expand Down
10 changes: 6 additions & 4 deletions dbms/programs/local/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/Config/ConfigProcessor.h>
Expand Down Expand Up @@ -187,7 +188,7 @@ try
* if such tables will not be dropped, clickhouse-server will not be able to load them due to security reasons.
*/
std::string default_database = config().getString("default_database", "_local");
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
context->setCurrentDatabase(default_database);
applyCmdOptions();

Expand All @@ -200,6 +201,7 @@ try
loadMetadataSystem(*context);
attachSystemTables();
loadMetadata(*context);
DatabaseCatalog::instance().loadDatabases();
LOG_DEBUG(log, "Loaded metadata.");
}
else
Expand Down Expand Up @@ -248,12 +250,12 @@ std::string LocalServer::getInitialCreateTableQuery()

void LocalServer::attachSystemTables()
{
DatabasePtr system_database = context->tryGetDatabase("system");
DatabasePtr system_database = DatabaseCatalog::instance().tryGetDatabase(DatabaseCatalog::SYSTEM_DATABASE);
if (!system_database)
{
/// TODO: add attachTableDelayed into DatabaseMemory to speedup loading
system_database = std::make_shared<DatabaseMemory>("system");
context->addDatabase("system", system_database);
system_database = std::make_shared<DatabaseMemory>(DatabaseCatalog::SYSTEM_DATABASE);
DatabaseCatalog::instance().attachDatabase(DatabaseCatalog::SYSTEM_DATABASE, system_database);
}

attachSystemTablesLocal(*system_database);
Expand Down
2 changes: 1 addition & 1 deletion dbms/programs/server/MySQLHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ void MySQLHandler::comFieldList(ReadBuffer & payload)
ComFieldList packet;
packet.readPayload(payload);
String database = connection_context.getCurrentDatabase();
StoragePtr tablePtr = connection_context.getTable(database, packet.table);
StoragePtr tablePtr = DatabaseCatalog::instance().getTable({database, packet.table});
for (const NameAndTypePair & column: tablePtr->getColumns().getAll())
{
ColumnDefinition column_definition(
Expand Down
2 changes: 1 addition & 1 deletion dbms/programs/server/ReplicasStatusHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
bool ok = true;
std::stringstream message;

auto databases = context.getDatabases();
auto databases = DatabaseCatalog::instance().getDatabases();

/// Iterate through all the replicated tables.
for (const auto & db : databases)
Expand Down
6 changes: 4 additions & 2 deletions dbms/programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <Interpreters/ExternalModelsLoader.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/DNSCacheUpdater.h>
#include <Interpreters/SystemLog.cpp>
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
Expand Down Expand Up @@ -559,9 +560,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(*global_context->getDatabase("system"), has_zookeeper);
attachSystemTablesServer(*DatabaseCatalog::instance().getSystemDatabase(), has_zookeeper);
/// Then, load remaining databases
loadMetadata(*global_context);
DatabaseCatalog::instance().loadDatabases();
}
catch (...)
{
Expand Down Expand Up @@ -721,7 +723,7 @@ int Server::main(const std::vector<std::string> & /*args*/)

/// This object will periodically calculate some metrics.
AsynchronousMetrics async_metrics(*global_context);
attachSystemTablesAsync(*global_context->getDatabase("system"), async_metrics);
attachSystemTablesAsync(*DatabaseCatalog::instance().getSystemDatabase(), async_metrics);

for (const auto & listen_host : listen_hosts)
{
Expand Down
28 changes: 16 additions & 12 deletions dbms/programs/server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void TCPHandler::runImpl()
/// When connecting, the default database can be specified.
if (!default_database.empty())
{
if (!connection_context.isDatabaseExist(default_database))
if (!DatabaseCatalog::instance().isDatabaseExist(default_database))
{
Exception e("Database " + backQuote(default_database) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
LOG_ERROR(log, "Code: " << e.code() << ", e.displayText() = " << e.displayText()
Expand Down Expand Up @@ -463,11 +463,11 @@ void TCPHandler::processInsertQuery(const Settings & connection_settings)
/// Send ColumnsDescription for insertion table
if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA)
{
const auto & db_and_table = query_context->getInsertionTable();
const auto & table_id = query_context->getInsertionTable();
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
if (!db_and_table.second.empty())
sendTableColumns(query_context->getTable(db_and_table.first, db_and_table.second)->getColumns());
if (!table_id.empty())
sendTableColumns(DatabaseCatalog::instance().getTable(table_id)->getColumns());
}
}

Expand Down Expand Up @@ -673,7 +673,8 @@ void TCPHandler::processTablesStatusRequest()
TablesStatusResponse response;
for (const QualifiedTableName & table_name: request.tables)
{
StoragePtr table = connection_context.tryGetTable(table_name.database, table_name.table);
auto resolved_id = connection_context.tryResolveStorageID({table_name.database, table_name.table});
StoragePtr table = DatabaseCatalog::instance().tryGetTable(resolved_id);
if (!table)
continue;

Expand Down Expand Up @@ -972,30 +973,33 @@ bool TCPHandler::receiveData(bool scalar)
initBlockInput();

/// The name of the temporary table for writing data, default to empty string
String name;
readStringBinary(name, *in);
auto temporary_id = StorageID::createEmpty();
readStringBinary(temporary_id.table_name, *in);

/// Read one block from the network and write it down
Block block = state.block_in->read();

if (block)
{
if (scalar)
query_context->addScalar(name, block);
query_context->addScalar(temporary_id.table_name, block);
else
{
/// If there is an insert request, then the data should be written directly to `state.io.out`.
/// Otherwise, we write the blocks in the temporary `external_table_name` table.
if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input)
{
auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal);
StoragePtr storage;
/// If such a table does not exist, create it.
if (!(storage = query_context->tryGetExternalTable(name)))
if (resolved)
storage = DatabaseCatalog::instance().getTable(resolved);
else
{
NamesAndTypesList columns = block.getNamesAndTypesList();
storage = StorageMemory::create(StorageID("_external", name), ColumnsDescription{columns}, ConstraintsDescription{});
storage->startup();
query_context->addExternalTable(name, storage);
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns});
storage = temporary_table.getTable();
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
}
/// The data will be written directly to the table.
state.io.out = storage->write(ASTPtr(), *query_context);
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Access/AccessRightsContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Common/quoteString.h>
#include <Core/Settings.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Poco/Logger.h>
#include <common/logger_useful.h>
#include <boost/algorithm/string/join.hpp>
Expand Down Expand Up @@ -435,8 +436,12 @@ boost::shared_ptr<const AccessRights> AccessRightsContext::calculateResultAccess
| AccessType::DROP_ROLE | AccessType::DROP_POLICY | AccessType::DROP_QUOTA | AccessType::ROLE_ADMIN;

/// Anyone has access to the "system" database.
if (!result.isGranted(AccessType::SELECT, "system"))
result.grant(AccessType::SELECT, "system");
if (!result.isGranted(AccessType::SELECT, DatabaseCatalog::SYSTEM_DATABASE))
result.grant(AccessType::SELECT, DatabaseCatalog::SYSTEM_DATABASE);

/// User has access to temporary or external table if such table was resolved in session or query context
if (!result.isGranted(AccessType::SELECT, DatabaseCatalog::TEMPORARY_DATABASE))
result.grant(AccessType::SELECT, DatabaseCatalog::TEMPORARY_DATABASE);

if (readonly_)
result.fullRevoke(write_table_access | all_dcl | AccessType::SYSTEM | AccessType::KILL);
Expand All @@ -453,6 +458,11 @@ boost::shared_ptr<const AccessRights> AccessRightsContext::calculateResultAccess
/// For example, for readonly = 2 - allowed.
result.fullRevoke(AccessType::CREATE_TEMPORARY_TABLE | AccessType::TABLE_FUNCTIONS);
}
else if (readonly_ == 2)
{
/// Allow INSERT into temporary tables
result.grant(AccessType::INSERT, DatabaseCatalog::TEMPORARY_DATABASE);
}

if (!allow_introspection_)
result.fullRevoke(AccessType::INTROSPECTION);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Core/ExternalTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,

/// Create table
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
StoragePtr storage = StorageMemory::create(StorageID("_external", data->table_name), ColumnsDescription{columns}, ConstraintsDescription{});
storage->startup();
context.addExternalTable(data->table_name, storage);
auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns});
auto storage = temporary_table.getTable();
context.addExternalTable(data->table_name, std::move(temporary_table));
BlockOutputStreamPtr output = storage->write(ASTPtr(), context);

/// Write data
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Core/QualifiedTableName.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace DB
{

//TODO replace with StorageID
struct QualifiedTableName
{
std::string database;
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Core/UUID.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,24 @@

#include <Common/UInt128.h>
#include <common/strong_typedef.h>
#include <Common/thread_local_rng.h>

namespace DB
{

STRONG_TYPEDEF(UInt128, UUID)

namespace UUIDHelpers
{
inline UUID generateV4()
{
UInt128 res{thread_local_rng(), thread_local_rng()};
res.low = (res.low & 0xffffffffffff0fffull) | 0x0000000000004000ull;
res.high = (res.high & 0x3fffffffffffffffull) | 0x8000000000000000ull;
return UUID{res};
}

const UUID Nil = UUID(UInt128(0, 0));
}

}
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/CheckConstraintsBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ namespace ErrorCodes


CheckConstraintsBlockOutputStream::CheckConstraintsBlockOutputStream(
const String & table_,
const StorageID & table_id_,
const BlockOutputStreamPtr & output_,
const Block & header_,
const ConstraintsDescription & constraints_,
const Context & context_)
: table(table_),
: table_id(table_id_),
output(output_),
header(header_),
constraints(constraints_),
Expand Down Expand Up @@ -62,7 +62,7 @@ void CheckConstraintsBlockOutputStream::write(const Block & block)
std::stringstream exception_message;

exception_message << "Constraint " << backQuote(constraints.constraints[i]->name)
<< " for table " << backQuote(table)
<< " for table " << table_id.getNameForLogs()
<< " is violated at row " << (rows_written + row_idx + 1)
<< ". Expression: (" << serializeAST(*(constraints.constraints[i]->expr), true) << ")"
<< ". Column values";
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/DataStreams/CheckConstraintsBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <DataStreams/IBlockOutputStream.h>
#include <Storages/ConstraintsDescription.h>
#include <Interpreters/StorageID.h>


namespace DB
Expand All @@ -15,7 +16,7 @@ class CheckConstraintsBlockOutputStream : public IBlockOutputStream
{
public:
CheckConstraintsBlockOutputStream(
const String & table_,
const StorageID & table_,
const BlockOutputStreamPtr & output_,
const Block & header_,
const ConstraintsDescription & constraints_,
Expand All @@ -30,7 +31,7 @@ class CheckConstraintsBlockOutputStream : public IBlockOutputStream
void writeSuffix() override;

private:
String table;
StorageID table_id;
BlockOutputStreamPtr output;
Block header;
const ConstraintsDescription constraints;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(

res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size);

if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !ast_insert_query->table.empty() && !input_function)
if (context.getSettingsRef().input_format_defaults_for_omitted_fields && ast_insert_query->table_id && !input_function)
{
StoragePtr storage = context.getTable(ast_insert_query->database, ast_insert_query->table);
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id);
auto column_defaults = storage->getColumns().getDefaults();
if (!column_defaults.empty())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, context);
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
disable_deduplication_for_children = !no_destination && storage->supportsDeduplication();

auto table_id = storage->getStorageID();
Dependencies dependencies = context.getDependencies(table_id);
Dependencies dependencies = DatabaseCatalog::instance().getDependencies(table_id);

/// We need special context for materialized views insertions
if (!dependencies.empty())
Expand All @@ -47,7 +47,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(

for (const auto & database_table : dependencies)
{
auto dependent_table = context.getTable(database_table);
auto dependent_table = DatabaseCatalog::instance().getTable(database_table);

ASTPtr query;
BlockOutputStreamPtr out;
Expand All @@ -61,8 +61,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
query = materialized_view->getInnerQuery();

std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->database = inner_table_id.database_name;
insert->table = inner_table_id.table_name;
insert->table_id = inner_table_id;

/// Get list of columns we get from select query.
auto header = InterpreterSelectQuery(query, *views_context, SelectQueryOptions().analyze())
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/RemoteBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ RemoteBlockInputStream::RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> connections;
if (main_table)
{
auto try_results = pool->getManyChecked(timeouts, &current_settings, pool_mode, *main_table);
auto try_results = pool->getManyChecked(timeouts, &current_settings, pool_mode, main_table.getQualifiedName());
connections.reserve(try_results.size());
for (auto & try_result : try_results)
connections.emplace_back(std::move(try_result.entry));
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/RemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class RemoteBlockInputStream : public IBlockInputStream
/// Specify how we allocate connections on a shard.
void setPoolMode(PoolMode pool_mode_) { pool_mode = pool_mode_; }

void setMainTable(QualifiedTableName main_table_) { main_table = std::move(main_table_); }
void setMainTable(StorageID main_table_) { main_table = std::move(main_table_); }

/// Sends query (initiates calculation) before read()
void readPrefix() override;
Expand Down Expand Up @@ -148,7 +148,7 @@ class RemoteBlockInputStream : public IBlockInputStream
std::atomic<bool> got_unknown_packet_from_replica { false };

PoolMode pool_mode = PoolMode::GET_MANY;
std::optional<QualifiedTableName> main_table;
StorageID main_table = StorageID::createEmpty();

Logger * log = &Logger::get("RemoteBlockInputStream");
};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/tests/expression_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ try
chain.finalize();
ExpressionActionsPtr expression = chain.getLastActions();

StoragePtr table = StorageSystemNumbers::create("numbers", false);
StoragePtr table = StorageSystemNumbers::create(StorageID("test", "numbers"), false);

Names column_names;
column_names.push_back("number");
Expand Down