Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More correct alters for non replicated MergeTree #8361

Merged
merged 5 commits into from Dec 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/MutationsInterpreter.cpp
Expand Up @@ -339,6 +339,8 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
affected_materialized.emplace(mat_column);
}

/// Just to be sure, that we don't change type
/// after update expression execution.
const auto & update_expr = kv.second;
auto updated_column = makeASTFunction("CAST",
makeASTFunction("if",
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Interpreters/MutationsInterpreter.h
Expand Up @@ -65,8 +65,9 @@ class MutationsInterpreter
/// Each stage has output_columns that contain columns that are changed at the end of that stage
/// plus columns needed for the next mutations.
///
/// First stage is special: it can contain only DELETEs and is executed using InterpreterSelectQuery
/// to take advantage of table indexes (if there are any).
/// First stage is special: it can contain only filters and is executed using InterpreterSelectQuery
/// to take advantage of table indexes (if there are any). It's necessary because all mutations have
/// `WHERE clause` part.

struct Stage
{
Expand All @@ -83,7 +84,7 @@ class MutationsInterpreter

/// A chain of actions needed to execute this stage.
/// First steps calculate filter columns for DELETEs (in the same order as in `filter_column_names`),
/// then there is (possibly) an UPDATE stage, and finally a projection stage.
/// then there is (possibly) an UPDATE step, and finally a projection step.
ExpressionActionsChain expressions_chain;
Names filter_column_names;
};
Expand Down
43 changes: 25 additions & 18 deletions dbms/src/Storages/AlterCommands.cpp
Expand Up @@ -109,7 +109,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
if (ast_col_decl.comment)
{
const auto & ast_comment = ast_col_decl.comment->as<ASTLiteral &>();
command.comment = ast_comment.value.get<String>();
command.comment.emplace(ast_comment.value.get<String>());
}

if (ast_col_decl.ttl)
Expand Down Expand Up @@ -225,7 +225,9 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
column.default_desc.kind = default_kind;
column.default_desc.expression = default_expression;
}
column.comment = comment;
if (comment)
column.comment = *comment;

column.codec = codec;
column.ttl = ttl;

Expand All @@ -251,19 +253,22 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
column.codec = codec;
}

if (!isMutable())
{
column.comment = comment;
return;
}
if (comment)
column.comment = *comment;

if (ttl)
column.ttl = ttl;

column.type = data_type;
if (data_type)
column.type = data_type;

column.default_desc.kind = default_kind;
column.default_desc.expression = default_expression;
/// User specified default expression or changed
/// datatype. We have to replace default.
if (default_expression || data_type)
{
column.default_desc.kind = default_kind;
column.default_desc.expression = default_expression;
}
});
}
else if (type == MODIFY_ORDER_BY)
Expand All @@ -279,7 +284,7 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
}
else if (type == COMMENT_COLUMN)
{
columns_description.modify(column_name, [&](ColumnDescription & column) { column.comment = comment; });
columns_description.modify(column_name, [&](ColumnDescription & column) { column.comment = *comment; });
}
else if (type == ADD_INDEX)
{
Expand Down Expand Up @@ -390,13 +395,15 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}

bool AlterCommand::isMutable() const
bool AlterCommand::isModifyingData() const
{
if (type == COMMENT_COLUMN || type == MODIFY_SETTING)
return false;
/// Possible change data representation on disk
if (type == MODIFY_COLUMN)
return data_type.get() || default_expression;
return true;
return data_type != nullptr;

return type == ADD_COLUMN /// We need to change columns.txt in each part for MergeTree
|| type == DROP_COLUMN /// We need to change columns.txt in each part for MergeTree
|| type == DROP_INDEX; /// We need to remove file from filesystem for MergeTree
}

bool AlterCommand::isSettingsAlter() const
Expand Down Expand Up @@ -666,11 +673,11 @@ void AlterCommands::applyForSettingsOnly(SettingsChanges & changes) const
changes = std::move(out_changes);
}

bool AlterCommands::isMutable() const
bool AlterCommands::isModifyingData() const
{
for (const auto & param : *this)
{
if (param.isMutable())
if (param.isModifyingData())
return true;
}

Expand Down
11 changes: 7 additions & 4 deletions dbms/src/Storages/AlterCommands.h
Expand Up @@ -47,7 +47,7 @@ struct AlterCommand

ColumnDefaultKind default_kind{};
ASTPtr default_expression{};
String comment;
std::optional<String> comment;

/// For ADD - after which column to add a new one. If an empty string, add to the end. To add to the beginning now it is impossible.
String after_column;
Expand Down Expand Up @@ -102,8 +102,11 @@ struct AlterCommand
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast,
ASTPtr & primary_key_ast, ASTPtr & ttl_table_ast, SettingsChanges & changes) const;

/// Checks that not only metadata touched by that command
bool isMutable() const;
/// Checks that alter query changes data. For MergeTree:
/// * column files (data and marks)
/// * each part meta (columns.txt)
/// in each part on disk (it's not lightweight alter).
bool isModifyingData() const;

/// checks that only settings changed by alter
bool isSettingsAlter() const;
Expand All @@ -124,7 +127,7 @@ class AlterCommands : public std::vector<AlterCommand>
void applyForSettingsOnly(SettingsChanges & changes) const;

void validate(const IStorage & table, const Context & context);
bool isMutable() const;
bool isModifyingData() const;
bool isSettingsAlter() const;
};

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/IStorage.cpp
Expand Up @@ -402,7 +402,7 @@ void IStorage::alter(
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
{
if (params.isMutable())
if (params.isModifyingData())
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);

const String database_name = getDatabaseName();
Expand Down
86 changes: 40 additions & 46 deletions dbms/src/Storages/MergeTree/MergeTreeData.cpp
Expand Up @@ -1342,78 +1342,69 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
"before using data skipping indices.", ErrorCodes::BAD_ARGUMENTS);

/// Set of columns that shouldn't be altered.
NameSet columns_alter_forbidden;
NameSet columns_alter_type_forbidden;

/// Primary key columns can be ALTERed only if they are used in the key as-is
/// (and not as a part of some expression) and if the ALTER only affects column metadata.
NameSet columns_alter_metadata_only;
NameSet columns_alter_type_metadata_only;

if (partition_key_expr)
{
/// Forbid altering partition key columns because it can change partition ID format.
/// TODO: in some cases (e.g. adding an Enum value) a partition key column can still be ALTERed.
/// We should allow it.
for (const String & col : partition_key_expr->getRequiredColumns())
columns_alter_forbidden.insert(col);
columns_alter_type_forbidden.insert(col);
}

for (const auto & index : skip_indices)
{
for (const String & col : index->expr->getRequiredColumns())
columns_alter_forbidden.insert(col);
columns_alter_type_forbidden.insert(col);
}

if (sorting_key_expr)
{
for (const ExpressionAction & action : sorting_key_expr->getActions())
{
auto action_columns = action.getNeededColumns();
columns_alter_forbidden.insert(action_columns.begin(), action_columns.end());
columns_alter_type_forbidden.insert(action_columns.begin(), action_columns.end());
}
for (const String & col : sorting_key_expr->getRequiredColumns())
columns_alter_metadata_only.insert(col);
columns_alter_type_metadata_only.insert(col);

/// We don't process sample_by_ast separately because it must be among the primary key columns
/// and we don't process primary_key_expr separately because it is a prefix of sorting_key_expr.
}

if (!merging_params.sign_column.empty())
columns_alter_forbidden.insert(merging_params.sign_column);
columns_alter_type_forbidden.insert(merging_params.sign_column);

std::map<String, const IDataType *> old_types;
for (const auto & column : getColumns().getAllPhysical())
old_types.emplace(column.name, column.type.get());

for (const AlterCommand & command : commands)
{
if (!command.isMutable())
if (command.type == AlterCommand::MODIFY_ORDER_BY && !is_custom_partitioned)
{
continue;
throw Exception(
"ALTER MODIFY ORDER BY is not supported for default-partitioned tables created with the old syntax",
ErrorCodes::BAD_ARGUMENTS);
}

if (columns_alter_forbidden.count(command.column_name))
throw Exception("Trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);

if (columns_alter_metadata_only.count(command.column_name))
else if (command.isModifyingData())
{
if (command.type == AlterCommand::MODIFY_COLUMN)
if (columns_alter_type_forbidden.count(command.column_name))
throw Exception("Trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);

if (columns_alter_type_metadata_only.count(command.column_name))
{
auto it = old_types.find(command.column_name);
if (it != old_types.end() && isMetadataOnlyConversion(it->second, command.data_type.get()))
continue;
if (command.type == AlterCommand::MODIFY_COLUMN)
{
auto it = old_types.find(command.column_name);
if (it == old_types.end() || !isMetadataOnlyConversion(it->second, command.data_type.get()))
throw Exception("ALTER of key column " + command.column_name + " must be metadata-only", ErrorCodes::ILLEGAL_COLUMN);
}
}

throw Exception(
"ALTER of key column " + command.column_name + " must be metadata-only",
ErrorCodes::ILLEGAL_COLUMN);
}

if (command.type == AlterCommand::MODIFY_ORDER_BY)
{
if (!is_custom_partitioned)
throw Exception(
"ALTER MODIFY ORDER BY is not supported for default-partitioned tables created with the old syntax",
ErrorCodes::BAD_ARGUMENTS);
}
}

Expand All @@ -1425,17 +1416,20 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
for (const auto & setting : new_changes)
checkSettingCanBeChanged(setting.name);

/// Check that type conversions are possible.
ExpressionActionsPtr unused_expression;
NameToNameMap unused_map;
bool unused_bool;
createConvertExpression(nullptr, getColumns().getAllPhysical(), new_columns.getAllPhysical(),
getIndices().indices, new_indices.indices, unused_expression, unused_map, unused_bool);
if (commands.isModifyingData())
{
/// Check that type conversions are possible.
ExpressionActionsPtr unused_expression;
NameToNameMap unused_map;
bool unused_bool;
createConvertExpression(nullptr, getColumns().getAllPhysical(), new_columns.getAllPhysical(),
getIndices().indices, new_indices.indices, unused_expression, unused_map, unused_bool);
}
}

void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
const IndicesASTs & old_indices, const IndicesASTs & new_indices, ExpressionActionsPtr & out_expression,
NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns,
const NamesAndTypesList & new_columns, const IndicesASTs & old_indices, const IndicesASTs & new_indices,
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
{
const auto settings = getSettings();
out_expression = nullptr;
Expand All @@ -1457,16 +1451,16 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name


/// Remove old indices
std::set<String> new_indices_set;
std::unordered_set<String> new_indices_set;
for (const auto & index_decl : new_indices)
new_indices_set.emplace(index_decl->as<ASTIndexDeclaration &>().name);
for (const auto & index_decl : old_indices)
{
const auto & index = index_decl->as<ASTIndexDeclaration &>();
if (!new_indices_set.count(index.name))
{
out_rename_map["skp_idx_" + index.name + ".idx"] = "";
out_rename_map["skp_idx_" + index.name + part_mrk_file_extension] = "";
out_rename_map["skp_idx_" + index.name + ".idx"] = ""; /// drop this file
out_rename_map["skp_idx_" + index.name + part_mrk_file_extension] = ""; /// and this one
}
}

Expand Down Expand Up @@ -1494,8 +1488,8 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
/// Delete files if they are no longer shared with another column.
if (--stream_counts[file_name] == 0)
{
out_rename_map[file_name + ".bin"] = "";
out_rename_map[file_name + part_mrk_file_extension] = "";
out_rename_map[file_name + ".bin"] = ""; /// drop this file
out_rename_map[file_name + part_mrk_file_extension] = ""; /// and this one
}
}, {});
}
Expand Down Expand Up @@ -1847,7 +1841,7 @@ void MergeTreeData::AlterDataPartTransaction::commit()
mutable_part.checksums = new_checksums;
mutable_part.columns = new_columns;

/// 3) Delete the old files.
/// 3) Delete the old files and drop required columns (DROP COLUMN)
for (const auto & from_to : rename_map)
{
String name = from_to.second.empty() ? from_to.first : from_to.second;
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Storages/MergeTree/MergeTreeData.h
Expand Up @@ -234,7 +234,7 @@ class MergeTreeData : public IStorage
const NamesAndTypesList & getNewColumns() const { return new_columns; }
const DataPart::Checksums & getNewChecksums() const { return new_checksums; }

AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_), alter_lock(data_part->alter_mutex) {}
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_) {}
const DataPartPtr & getDataPart() const { return data_part; }
bool isValid() const;

Expand All @@ -244,9 +244,7 @@ class MergeTreeData : public IStorage

bool valid = true;

//don't interchange order of data_part & alter_lock
DataPartPtr data_part;
DataPartsLock alter_lock;

DataPart::Checksums new_checksums;
NamesAndTypesList new_columns;
Expand Down
10 changes: 0 additions & 10 deletions dbms/src/Storages/MergeTree/MergeTreeDataPart.h
Expand Up @@ -227,16 +227,6 @@ struct MergeTreeDataPart
*/
mutable std::shared_mutex columns_lock;

/** It is taken for the whole time ALTER a part: from the beginning of the recording of the temporary files to their renaming to permanent.
* It is taken with unlocked `columns_lock`.
*
* NOTE: "You can" do without this mutex if you could turn ReadRWLock into WriteRWLock without removing the lock.
* This transformation is impossible, because it would create a deadlock, if you do it from two threads at once.
* Taking this mutex means that we want to lock columns_lock on read with intention then, not
* unblocking, block it for writing.
*/
mutable std::mutex alter_mutex;
Copy link
Member Author

@alesapin alesapin Dec 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this clumsy mutex and corresponding lock, because they are not required (anymore?). Data part cannot be altered from different threads simultaneously because for non replicated MergeTree we execute them sequentially with alter_intention_lock and for ReplicatedMergeTree we use ReplicatedMergeTreeAltreThread executed in schedule_pool which also cannot run concurrently.

UPD: Actually data can be altered concurrently (ALTER PARTITION and ALTER MODIFY COLUMN), but I don't find any cases where alter thread can take columns_lock for read, and then upgrade this lock for write. BTW columns lock will be completely removed with alter on top of mutations.


MergeTreeIndexGranularityInfo index_granularity_info;

~MergeTreeDataPart();
Expand Down