Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added reloading storage configuration from configuration file #8594

Merged
merged 3 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ int Server::main(const std::vector<std::string> & /*args*/)

if (config->has("max_partition_size_to_drop"))
global_context->setMaxPartitionSizeToDrop(config->getUInt64("max_partition_size_to_drop"));

global_context->updateStorageConfiguration(*config);
},
/* already_loaded = */ true);

Expand Down
117 changes: 110 additions & 7 deletions dbms/src/Disks/DiskSpaceMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Common/quoteString.h>

#include <set>

#include <Poco/File.h>


Expand All @@ -15,6 +16,7 @@ namespace DB

namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int UNKNOWN_DISK;
extern const int UNKNOWN_POLICY;
Expand Down Expand Up @@ -48,7 +50,65 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
}


const DiskPtr & DiskSelector::operator[](const String & name) const
DiskSelectorPtr DiskSelector::updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);

auto & factory = DiskFactory::instance();

std::shared_ptr<DiskSelector> result = std::make_shared<DiskSelector>(*this);

std::set<String> old_disks_minus_new_disks;
for (const auto & [disk_name, _] : result->disks)
{
old_disks_minus_new_disks.insert(disk_name);
}

for (const auto & disk_name : keys)
{
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);

if (result->disks.count(disk_name) == 0)
{
auto disk_config_prefix = config_prefix + "." + disk_name;
result->disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
}
else
{
old_disks_minus_new_disks.erase(disk_name);

/// TODO: Ideally ClickHouse shall complain if disk has changed, but
/// implementing that may appear as not trivial task.
}
}

if (!old_disks_minus_new_disks.empty())
{
WriteBufferFromOwnString warning;
if (old_disks_minus_new_disks.size() == 1)
writeString("Disk ", warning);
else
writeString("Disks ", warning);

int index = 0;
for (const String & name : old_disks_minus_new_disks)
{
if (index++ > 0)
writeString(", ", warning);
writeBackQuotedString(name, warning);
}

writeString(" disappeared from configuration, this change will be applied after restart of ClickHouse", warning);
LOG_WARNING(&Logger::get("DiskSelector"), warning.str());
}

return result;
}


DiskPtr DiskSelector::get(const String & name) const
{
auto it = disks.find(name);
if (it == disks.end())
Expand All @@ -61,7 +121,7 @@ Volume::Volume(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const DiskSelector & disk_selector)
DiskSelectorPtr disk_selector)
: name(std::move(name_))
{
Poco::Util::AbstractConfiguration::Keys keys;
Expand All @@ -74,7 +134,7 @@ Volume::Volume(
if (startsWith(disk, "disk"))
{
auto disk_name = config.getString(config_prefix + "." + disk);
disks.push_back(disk_selector[disk_name]);
disks.push_back(disk_selector->get(disk_name));
}
}

Expand Down Expand Up @@ -162,7 +222,7 @@ StoragePolicy::StoragePolicy(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const DiskSelector & disks)
DiskSelectorPtr disks)
: name(std::move(name_))
{
String volumes_prefix = config_prefix + ".volumes";
Expand Down Expand Up @@ -330,6 +390,28 @@ ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
}


void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const
{
std::unordered_set<String> new_volume_names;
for (const auto & volume : new_storage_policy->getVolumes())
new_volume_names.insert(volume->getName());

for (const auto & volume : getVolumes())
{
if (new_volume_names.count(volume->getName()) == 0)
throw Exception("New storage policy shall contain volumes of old one", ErrorCodes::LOGICAL_ERROR);

std::unordered_set<String> new_disk_names;
for (const auto & disk : new_storage_policy->getVolumeByName(volume->getName())->disks)
new_disk_names.insert(disk->getName());

for (const auto & disk : volume->disks)
if (new_disk_names.count(disk->getName()) == 0)
throw Exception("New storage policy shall contain disks of old one", ErrorCodes::LOGICAL_ERROR);
}
}


size_t StoragePolicy::getVolumeIndexByDisk(const DiskPtr & disk_ptr) const
{
for (size_t i = 0; i < volumes.size(); ++i)
Expand All @@ -346,7 +428,7 @@ size_t StoragePolicy::getVolumeIndexByDisk(const DiskPtr & disk_ptr) const
StoragePolicySelector::StoragePolicySelector(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const DiskSelector & disks)
DiskSelectorPtr disks)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
Expand All @@ -368,18 +450,39 @@ StoragePolicySelector::StoragePolicySelector(
/// Add default policy if it's not specified explicetly
if (policies.find(default_storage_policy_name) == policies.end())
{
auto default_volume = std::make_shared<Volume>(default_volume_name, std::vector<DiskPtr>{disks[default_disk_name]}, 0);
auto default_volume = std::make_shared<Volume>(default_volume_name, std::vector<DiskPtr>{disks->get(default_disk_name)}, 0);

auto default_policy = std::make_shared<StoragePolicy>(default_storage_policy_name, Volumes{default_volume}, 0.0);
policies.emplace(default_storage_policy_name, default_policy);
}
}

const StoragePolicyPtr & StoragePolicySelector::operator[](const String & name) const

StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks) const
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);

std::shared_ptr<StoragePolicySelector> result = std::make_shared<StoragePolicySelector>(config, config_prefix, disks);

for (const auto & [name, policy] : policies)
{
if (result->policies.count(name) == 0)
throw Exception("Storage policy " + backQuote(name) + " is missing in new configuration", ErrorCodes::BAD_ARGUMENTS);

policy->checkCompatibleWith(result->policies[name]);
}

return result;
}


StoragePolicyPtr StoragePolicySelector::get(const String & name) const
{
auto it = policies.find(name);
if (it == policies.end())
throw Exception("Unknown StoragePolicy " + name, ErrorCodes::UNKNOWN_POLICY);

return it->second;
}

Expand Down
26 changes: 20 additions & 6 deletions dbms/src/Disks/DiskSpaceMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@
namespace DB
{

class DiskSelector;
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;

/// Parse .xml configuration and store information about disks
/// Mostly used for introspection.
class DiskSelector
{
public:
DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context);
DiskSelector(const DiskSelector & from): disks(from.disks) {}

DiskSelectorPtr updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const;

/// Get disk by name
const DiskPtr & operator[](const String & name) const;
DiskPtr get(const String & name) const;

/// Get all disks with names
const auto & getDisksMap() const { return disks; }
Expand Down Expand Up @@ -54,7 +60,7 @@ class Volume : public Space
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const DiskSelector & disk_selector);
DiskSelectorPtr disk_selector);

/// Next disk (round-robin)
///
Expand Down Expand Up @@ -87,6 +93,8 @@ class Volume : public Space
using VolumePtr = std::shared_ptr<Volume>;
using Volumes = std::vector<VolumePtr>;

class StoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;

/**
* Contains all information about volumes configuration for Storage.
Expand All @@ -95,7 +103,7 @@ using Volumes = std::vector<VolumePtr>;
class StoragePolicy
{
public:
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DiskSelector & disks);
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);

StoragePolicy(String name_, Volumes volumes_, double move_factor_);

Expand Down Expand Up @@ -146,6 +154,9 @@ class StoragePolicy
return getVolume(it->second);
}

/// Checks if storage policy can be replaced by another one.
void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const;

private:
Volumes volumes;
const String name;
Expand All @@ -158,17 +169,20 @@ class StoragePolicy
};


using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;

/// Parse .xml configuration and store information about policies
/// Mostly used for introspection.
class StoragePolicySelector
{
public:
StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DiskSelector & disks);
StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);

StoragePolicySelectorPtr updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks) const;

/// Policy by name
const StoragePolicyPtr & operator[](const String & name) const;
StoragePolicyPtr get(const String & name) const;

/// All policies
const std::map<String, StoragePolicyPtr> & getPoliciesMap() const { return policies; }
Expand Down
51 changes: 36 additions & 15 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ struct ContextShared
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
/// Storage disk chooser for MergeTree engines
mutable std::unique_ptr<DiskSelector> merge_tree_disk_selector;
mutable std::shared_ptr<const DiskSelector> merge_tree_disk_selector;
/// Storage policy chooser for MergeTree engines
mutable std::unique_ptr<StoragePolicySelector> merge_tree_storage_policy_selector;
mutable std::shared_ptr<const StoragePolicySelector> merge_tree_storage_policy_selector;

std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
std::atomic_size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
Expand Down Expand Up @@ -577,7 +577,7 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic
}
else
{
StoragePolicyPtr tmp_policy = getStoragePolicySelector()[policy_name];
StoragePolicyPtr tmp_policy = getStoragePolicySelector()->get(policy_name);
if (tmp_policy->getVolumes().size() != 1)
throw Exception("Policy " + policy_name + " is used temporary files, such policy should have exactly one volume", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
shared->tmp_volume = tmp_policy->getVolume(0);
Expand Down Expand Up @@ -1892,17 +1892,17 @@ CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double par
}


const DiskPtr & Context::getDisk(const String & name) const
DiskPtr Context::getDisk(const String & name) const
{
auto lock = getLock();

const auto & disk_selector = getDiskSelector();
auto disk_selector = getDiskSelector();

return disk_selector[name];
return disk_selector->get(name);
}


DiskSelector & Context::getDiskSelector() const
DiskSelectorPtr Context::getDiskSelector() const
{
auto lock = getLock();

Expand All @@ -1911,23 +1911,23 @@ DiskSelector & Context::getDiskSelector() const
constexpr auto config_name = "storage_configuration.disks";
auto & config = getConfigRef();

shared->merge_tree_disk_selector = std::make_unique<DiskSelector>(config, config_name, *this);
shared->merge_tree_disk_selector = std::make_shared<DiskSelector>(config, config_name, *this);
}
return *shared->merge_tree_disk_selector;
return shared->merge_tree_disk_selector;
}


const StoragePolicyPtr & Context::getStoragePolicy(const String & name) const
StoragePolicyPtr Context::getStoragePolicy(const String & name) const
{
auto lock = getLock();

auto & policy_selector = getStoragePolicySelector();
auto policy_selector = getStoragePolicySelector();

return policy_selector[name];
return policy_selector->get(name);
}


StoragePolicySelector & Context::getStoragePolicySelector() const
StoragePolicySelectorPtr Context::getStoragePolicySelector() const
{
auto lock = getLock();

Expand All @@ -1936,9 +1936,30 @@ StoragePolicySelector & Context::getStoragePolicySelector() const
constexpr auto config_name = "storage_configuration.policies";
auto & config = getConfigRef();

shared->merge_tree_storage_policy_selector = std::make_unique<StoragePolicySelector>(config, config_name, getDiskSelector());
shared->merge_tree_storage_policy_selector = std::make_shared<StoragePolicySelector>(config, config_name, getDiskSelector());
}
return shared->merge_tree_storage_policy_selector;
}


void Context::updateStorageConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();

if (shared->merge_tree_disk_selector)
shared->merge_tree_disk_selector = shared->merge_tree_disk_selector->updateFromConfig(config, "storage_configuration.disks", *this);

if (shared->merge_tree_storage_policy_selector)
{
try
{
shared->merge_tree_storage_policy_selector = shared->merge_tree_storage_policy_selector->updateFromConfig(config, "storage_configuration.policies", shared->merge_tree_disk_selector);
}
catch (Exception & e)
{
LOG_ERROR(shared->log, "An error has occured while reloading storage policies, storage policies were not applied: " << e.message());
}
}
return *shared->merge_tree_storage_policy_selector;
}


Expand Down