Skip to content

Commit

Permalink
Added abilty to automatically update clusters configuration. [#METR-2…
Browse files Browse the repository at this point in the history
…2802]
  • Loading branch information
ludv1x committed Oct 11, 2016
1 parent 311f1b1 commit ebbc9b9
Show file tree
Hide file tree
Showing 21 changed files with 364 additions and 166 deletions.
6 changes: 6 additions & 0 deletions dbms/include/DB/Common/ConfigProcessor.h
Expand Up @@ -44,6 +44,12 @@ class ConfigProcessor
*/
ConfigurationPtr loadConfig(const std::string & path);

public:

using Files = std::list<std::string>;

static Files getConfigMergeFiles(const std::string & config_path);

private:
Logger * log;
Poco::AutoPtr<Poco::Channel> channel_ptr;
Expand Down
2 changes: 2 additions & 0 deletions dbms/include/DB/DataStreams/RemoteBlockInputStream.h
Expand Up @@ -7,6 +7,8 @@
#include <DB/Interpreters/Context.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Client/MultiplexedConnections.h>
#include <DB/Interpreters/Cluster.h>


namespace DB
{
Expand Down
27 changes: 22 additions & 5 deletions dbms/include/DB/Interpreters/Cluster.h
Expand Up @@ -16,7 +16,7 @@ namespace DB
class Cluster
{
public:
Cluster(const Settings & settings, const String & cluster_name);
Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name);

/// Построить кластер по именам шардов и реплик. Локальные обрабатываются так же как удаленные.
Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
Expand Down Expand Up @@ -56,7 +56,7 @@ class Cluster
String default_database; /// this database is selected when no database is specified for Distributed table
UInt32 replica_num;

Address(const String & config_prefix);
Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix);
Address(const String & host_port_, const String & user_, const String & password_);
};

Expand Down Expand Up @@ -142,20 +142,37 @@ class Cluster
size_t local_shard_count = 0;
};

using ClusterPtr = std::shared_ptr<Cluster>;


class Clusters
{
public:
Clusters(const Settings & settings, const String & config_name = "remote_servers");
Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");

Clusters(const Clusters &) = delete;
Clusters & operator=(const Clusters &) = delete;

public:
using Impl = std::map<String, Cluster>;
ClusterPtr getCluster(const std::string & cluster_name) const
{
std::lock_guard<std::mutex> lock(mutex);

auto it = impl.find(cluster_name);
return (it != impl.end()) ? it->second : nullptr;
}

void updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");

public:
using Impl = std::map<String, ClusterPtr>;

Impl getClusters() const;

protected:
Impl impl;
mutable std::mutex mutex;
};

using ClustersPtr = std::shared_ptr<Clusters>;

}
5 changes: 3 additions & 2 deletions dbms/include/DB/Interpreters/ClusterProxy/Query.h
Expand Up @@ -2,6 +2,7 @@

#include <DB/Parsers/IAST.h>
#include <DB/Storages/IStorage.h>
#include <DB/Interpreters/Cluster.h>

namespace DB
{
Expand All @@ -24,7 +25,7 @@ class IQueryConstructor;
class Query
{
public:
Query(IQueryConstructor & query_constructor_, const Cluster & cluster_,
Query(IQueryConstructor & query_constructor_, const ClusterPtr & cluster_,
ASTPtr query_ast_, const Context & context_, const Settings & settings_, bool enable_shard_multiplexing_);

/// For each location at which we perform the query, create an input stream
Expand All @@ -33,7 +34,7 @@ class Query

private:
IQueryConstructor & query_constructor;
const Cluster & cluster;
ClusterPtr cluster;
ASTPtr query_ast;
const Context & context;
const Settings & settings;
Expand Down
5 changes: 3 additions & 2 deletions dbms/include/DB/Interpreters/Context.h
Expand Up @@ -276,8 +276,9 @@ class Context
*/
void resetCaches() const;

const Cluster & getCluster(const std::string & cluster_name) const;
std::shared_ptr<Clusters> getClusters() const;
Clusters & getClusters() const;
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name, bool throw_on_error = true) const;
void setClustersConfig(ConfigurationPtr config);

Compiler & getCompiler();
QueryLog & getQueryLog();
Expand Down
Expand Up @@ -3,6 +3,7 @@
#include <DB/Parsers/formatAST.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Core/Block.h>
#include <DB/Interpreters/Cluster.h>

namespace DB
{
Expand All @@ -20,7 +21,7 @@ class StorageDistributed;
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast);
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_);

void write(const Block & block) override;

Expand All @@ -38,6 +39,7 @@ class DistributedBlockOutputStream : public IBlockOutputStream
private:
StorageDistributed & storage;
ASTPtr query_ast;
ClusterPtr cluster;
};

}
21 changes: 12 additions & 9 deletions dbms/include/DB/Storages/StorageDistributed.h
Expand Up @@ -7,6 +7,7 @@
#include <DB/Client/ConnectionPoolWithFailover.h>
#include <DB/Interpreters/Settings.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Interpreters/ExpressionActions.h>
#include <common/logger_useful.h>

Expand Down Expand Up @@ -45,7 +46,7 @@ class StorageDistributed : private ext::shared_ptr_helper<StorageDistributed>, p
NamesAndTypesListPtr columns_, /// Список столбцов.
const String & remote_database_, /// БД на удалённых серверах.
const String & remote_table_, /// Имя таблицы на удалённых серверах.
std::shared_ptr<Cluster> & owned_cluster_,
ClusterPtr & owned_cluster_,
Context & context_);

std::string getName() const override { return "Distributed"; }
Expand Down Expand Up @@ -95,15 +96,15 @@ class StorageDistributed : private ext::shared_ptr_helper<StorageDistributed>, p
const String & getPath() const { return path; }
std::string getRemoteDatabaseName() const { return remote_database; }
std::string getRemoteTableName() const { return remote_table; }

std::string getClusterName() const { return cluster_name; } /// Returns empty string if tables is used by TableFunctionRemote

private:
StorageDistributed(
const std::string & name_,
NamesAndTypesListPtr columns_,
const String & remote_database_,
const String & remote_table_,
const Cluster & cluster_,
const String & cluster_name_,
Context & context_,
const ASTPtr & sharding_key_ = nullptr,
const String & data_path_ = String{});
Expand All @@ -116,7 +117,7 @@ class StorageDistributed : private ext::shared_ptr_helper<StorageDistributed>, p
const ColumnDefaults & column_defaults_,
const String & remote_database_,
const String & remote_table_,
const Cluster & cluster_,
const String & cluster_name_,
Context & context_,
const ASTPtr & sharding_key_ = nullptr,
const String & data_path_ = String{});
Expand All @@ -129,6 +130,9 @@ class StorageDistributed : private ext::shared_ptr_helper<StorageDistributed>, p
/// ensure directory monitor creation
void requireDirectoryMonitor(const std::string & name);

ClusterPtr getCluster() const;

private:
String name;
NamesAndTypesListPtr columns;
String remote_database;
Expand All @@ -137,16 +141,15 @@ class StorageDistributed : private ext::shared_ptr_helper<StorageDistributed>, p
Context & context;
Logger * log = &Logger::get("StorageDistributed");

/// Используется только, если таблица должна владеть объектом Cluster,
/// которым больше никто не владеет - для реализации TableFunctionRemote.
/// для реализации TableFunctionRemote.
std::shared_ptr<Cluster> owned_cluster;

/// Соединения с удалёнными серверами.
const Cluster & cluster;
/// Пусто если используется для реализации TableFunctionRemote.
const String cluster_name;

bool has_sharding_key;
ExpressionActionsPtr sharding_key_expr;
String sharding_key_column_name;
bool write_enabled;
String path; /// Может быть пустым, если data_path_ пустой. В этом случае, директория для данных для отправки не создаётся.

class DirectoryMonitor;
Expand Down
62 changes: 38 additions & 24 deletions dbms/src/Common/ConfigProcessor.cpp
Expand Up @@ -282,27 +282,20 @@ void ConfigProcessor::doIncludes(DocumentPtr config, DocumentPtr include_from)
doIncludesRecursive(config, include_from, getRootNode(&*config));
}

XMLDocumentPtr ConfigProcessor::processConfig(const std::string & path_str)
ConfigProcessor::Files ConfigProcessor::getConfigMergeFiles(const std::string & config_path)
{
/// We need larger name pool to allow to support vast amount of users in users.xml files for ClickHouse.
/// Size is prime because Poco::XML::NamePool uses bad (inefficient, low quality)
/// hash function internally, and its size was prime by default.
Poco::AutoPtr<Poco::XML::NamePool> name_pool(new Poco::XML::NamePool(65521));
Poco::XML::DOMParser dom_parser(name_pool);

DocumentPtr config = dom_parser.parse(path_str);
Files res;

std::vector<std::string> contributing_files;
contributing_files.push_back(path_str);
Poco::Path merge_dir_path(path_str);
Poco::Path merge_dir_path(config_path);
merge_dir_path.setExtension("d");

std::vector<std::string> merge_dirs;
merge_dirs.push_back(merge_dir_path.toString());
if (merge_dir_path.getBaseName() != "conf")
{
if (merge_dir_path.getBaseName() != "conf") {
merge_dir_path.setBaseName("conf");
merge_dirs.push_back(merge_dir_path.toString());
}

for (const std::string & merge_dir_name : merge_dirs)
{
Poco::File merge_dir(merge_dir_name);
Expand All @@ -311,22 +304,43 @@ XMLDocumentPtr ConfigProcessor::processConfig(const std::string & path_str)
for (Poco::DirectoryIterator it(merge_dir_name); it != Poco::DirectoryIterator(); ++it)
{
Poco::File & file = *it;
try
{
if (file.isFile() && (endsWith(file.path(), ".xml") || endsWith(file.path(), ".conf")))
{
contributing_files.push_back(file.path());
DocumentPtr with = dom_parser.parse(file.path());
merge(config, with);
}
}
catch (Poco::Exception & e)
if (file.isFile() && (endsWith(file.path(), ".xml") || endsWith(file.path(), ".conf")))
{
throw Poco::Exception("Failed to merge config with " + file.path() + ": " + e.displayText());
res.push_back(file.path());
}
}
}

return res;
}

XMLDocumentPtr ConfigProcessor::processConfig(const std::string & path_str)
{
/// We need larger name pool to allow to support vast amount of users in users.xml files for ClickHouse.
/// Size is prime because Poco::XML::NamePool uses bad (inefficient, low quality)
/// hash function internally, and its size was prime by default.
Poco::AutoPtr<Poco::XML::NamePool> name_pool(new Poco::XML::NamePool(65521));
Poco::XML::DOMParser dom_parser(name_pool);

DocumentPtr config = dom_parser.parse(path_str);

std::vector<std::string> contributing_files;
contributing_files.push_back(path_str);

for (auto & merge_file : getConfigMergeFiles(path_str))
{
try
{
DocumentPtr with = dom_parser.parse(merge_file);
merge(config, with);
contributing_files.push_back(merge_file);
}
catch (Poco::Exception & e)
{
throw Poco::Exception("Failed to merge config with " + merge_file + ": " + e.displayText());
}
}

try
{
Node * node = config->getNodeByPath("yandex/include_from");
Expand Down
46 changes: 34 additions & 12 deletions dbms/src/Interpreters/Cluster.cpp
Expand Up @@ -76,10 +76,8 @@ Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port)

/// Implementation of Cluster::Address class

Cluster::Address::Address(const String & config_prefix)
Cluster::Address::Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
const auto & config = Poco::Util::Application::instance().config();

host_name = config.getString(config_prefix + ".host");
port = config.getInt(config_prefix + ".port");
resolved_address = resolveSocketAddress(host_name, port);
Expand Down Expand Up @@ -111,23 +109,47 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const

/// Implementation of Clusters class

Clusters::Clusters(const Settings & settings, const String & config_name)
Clusters::Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
{
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_name, config_keys);

for (const auto & key : config_keys)
impl.emplace(std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple(settings, config_name + "." + key));
impl.emplace(key, std::make_shared<Cluster>(config, settings, config_name + "." + key));
}

void Clusters::updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_name, config_keys);

std::lock_guard<std::mutex> lock(mutex);

for (const auto & key : config_keys)
{
auto it = impl.find(key);
auto new_cluster = std::make_shared<Cluster>(config, settings, config_name + "." + key);

if (it == impl.end())
impl.emplace(key, std::move(new_cluster));
else
{
//TODO: Check that cluster update is necessarily
it->second = std::move(new_cluster);
}
}
}

Clusters::Impl Clusters::getClusters() const
{
std::lock_guard<std::mutex> lock(mutex);
return impl;
}

/// Реализация класса Cluster

Cluster::Cluster(const Settings & settings, const String & cluster_name)
Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name)
{
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(cluster_name, config_keys);

Expand All @@ -149,7 +171,7 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
if (weight == 0)
continue;

addresses.emplace_back(prefix);
addresses.emplace_back(config, prefix);
addresses.back().replica_num = 1;
const auto & address = addresses.back();

Expand Down Expand Up @@ -206,7 +228,7 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)

if (startsWith(replica_key, "replica"))
{
replica_addresses.emplace_back(partial_prefix + replica_key);
replica_addresses.emplace_back(config, partial_prefix + replica_key);
replica_addresses.back().replica_num = current_replica_num;
++current_replica_num;

Expand Down

0 comments on commit ebbc9b9

Please sign in to comment.