Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to change storage_policy to not less rich one #8107

Merged
merged 2 commits into from Jan 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 58 additions & 0 deletions dbms/src/Storages/MergeTree/MergeTreeData.cpp
Expand Up @@ -682,6 +682,57 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new
}


void MergeTreeData::setStoragePolicy(const String & new_storage_policy_name, bool only_check)
{
const auto old_storage_policy = getStoragePolicy();
const auto & new_storage_policy = global_context.getStoragePolicySelector()[new_storage_policy_name];

std::unordered_set<String> new_volume_names;
for (const auto & volume : new_storage_policy->getVolumes())
new_volume_names.insert(volume->getName());

for (const auto & volume : old_storage_policy->getVolumes())
{
if (new_volume_names.count(volume->getName()) == 0)
throw Exception("New storage policy shall contain volumes of old one", ErrorCodes::LOGICAL_ERROR);

std::unordered_set<String> new_disk_names;
for (const auto & disk : new_storage_policy->getVolumeByName(volume->getName())->disks)
new_disk_names.insert(disk->getName());

for (const auto & disk : volume->disks)
if (new_disk_names.count(disk->getName()) == 0)
throw Exception("New storage policy shall contain disks of old one", ErrorCodes::LOGICAL_ERROR);
}

std::unordered_set<String> all_diff_disk_names;
for (const auto & disk : new_storage_policy->getDisks())
all_diff_disk_names.insert(disk->getName());
for (const auto & disk : old_storage_policy->getDisks())
all_diff_disk_names.erase(disk->getName());

for (const String & disk_name : all_diff_disk_names)
{
const auto & path = getFullPathOnDisk(new_storage_policy->getDiskByName(disk_name));
if (Poco::File(path).exists())
throw Exception("New storage policy contain disks which already contain data of a table with the same name", ErrorCodes::LOGICAL_ERROR);
}

if (!only_check)
{
for (const String & disk_name : all_diff_disk_names)
{
const auto & path = getFullPathOnDisk(new_storage_policy->getDiskByName(disk_name));
Poco::File(path).createDirectories();
Poco::File(path + "detached").createDirectory();
}

storage_policy = new_storage_policy;
/// TODO: Query lock is fine but what about background moves??? And downloading of parts?
}
}


void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) const
{
if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::VersionedCollapsing)
Expand Down Expand Up @@ -1479,6 +1530,9 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S
throw Exception{"Setting '" + changed_setting.name + "' is readonly for storage '" + getName() + "'",
ErrorCodes::READONLY_SETTING};
}

if (changed_setting.name == "storage_policy")
setStoragePolicy(changed_setting.value.safeGet<String>(), /* only_check = */ true);
}
}

Expand Down Expand Up @@ -1815,6 +1869,10 @@ void MergeTreeData::changeSettings(
copy.applyChanges(new_changes);
storage_settings.set(std::make_unique<const MergeTreeSettings>(copy));
settings_ast = new_settings;

for (const auto & change : new_changes)
if (change.name == "storage_policy")
setStoragePolicy(change.value.safeGet<String>());
}
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/MergeTree/MergeTreeData.h
Expand Up @@ -899,6 +899,8 @@ class MergeTreeData : public IStorage
void setTTLExpressions(const ColumnsDescription::ColumnTTLs & new_column_ttls,
const ASTPtr & new_ttl_table_ast, bool only_check = false);

void setStoragePolicy(const String & new_storage_policy_name, bool only_check = false);

/// Expression for column type conversion.
/// If no conversions are needed, out_expression=nullptr.
/// out_rename_map maps column files for the out_expression onto new table files.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -105,7 +105,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
/// We check settings after storage creation
static bool isReadonlySetting(const String & name)
{
return name == "index_granularity" || name == "index_granularity_bytes" || name == "storage_policy";
return name == "index_granularity" || name == "index_granularity_bytes";
}
};

Expand Down
Expand Up @@ -30,6 +30,17 @@
</volumes>
</small_jbod_with_external>

<one_more_small_jbod_with_external>
<volumes>
<m>
<disk>jbod1</disk>
</m>
<e>
<disk>external</disk>
</e>
</volumes>
</one_more_small_jbod_with_external>

<!-- store on JBOD by default (round-robin), store big parts on external -->
<jbods_with_external>
<volumes>
Expand Down
50 changes: 50 additions & 0 deletions dbms/tests/integration/test_multiple_disks/test.py
Expand Up @@ -84,6 +84,22 @@ def test_system_tables(start_cluster):
"max_data_part_size": "0",
"move_factor": 0.1,
},
{
"policy_name": "one_more_small_jbod_with_external",
"volume_name": "m",
"volume_priority": "1",
"disks": ["jbod1"],
"max_data_part_size": "0",
"move_factor": 0.1,
},
{
"policy_name": "one_more_small_jbod_with_external",
"volume_name": "e",
"volume_priority": "2",
"disks": ["external"],
"max_data_part_size": "0",
"move_factor": 0.1,
},
{
"policy_name": "jbods_with_external",
"volume_name": "main",
Expand Down Expand Up @@ -223,6 +239,40 @@ def test_query_parser(start_cluster):
node1.query("DROP TABLE IF EXISTS table_with_normal_policy")


@pytest.mark.parametrize("name,engine", [
("test_alter_policy","MergeTree()"),
("replicated_test_alter_policy","ReplicatedMergeTree('/clickhouse/test_alter_policy', '1')",),
])
def test_alter_policy(start_cluster, name, engine):
try:
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name, engine=engine))

assert node1.query("""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(name=name)) == "small_jbod_with_external\n"

with pytest.raises(QueryRuntimeException):
node1.query("""ALTER TABLE {name} MODIFY SETTING storage_policy='one_more_small_jbod_with_external'""".format(name=name))

assert node1.query("""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(name=name)) == "small_jbod_with_external\n"

node1.query("""ALTER TABLE {name} MODIFY SETTING storage_policy='jbods_with_external'""".format(name=name))

assert node1.query("""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(name=name)) == "jbods_with_external\n"

with pytest.raises(QueryRuntimeException):
node1.query("""ALTER TABLE {name} MODIFY SETTING storage_policy='small_jbod_with_external'""".format(name=name))

assert node1.query("""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(name=name)) == "jbods_with_external\n"

finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))


def get_random_string(length):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))

Expand Down