Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-blocking alter for vanilla merge tree #9606

Merged
merged 41 commits into from
Mar 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
406b48a
First stupid implementation of non-blocking alter for vanilla merge tree
alesapin Mar 11, 2020
9f67498
Merge branch 'master' into alter_on_top_of_mutations_merge_tree
alesapin Mar 12, 2020
7a9f7bd
test
alesapin Mar 12, 2020
74a908f
Reslove conflicts
alesapin Mar 12, 2020
dbf940d
Merge branch 'master' of github.com:ClickHouse/ClickHouse into alter_…
alesapin Mar 13, 2020
71e2af3
Some trach code
alesapin Mar 13, 2020
814e4c0
Return back some log messages
alesapin Mar 13, 2020
c777191
Get rid of removeEmptyColumnsFromPart method based on alterDataPart.
alesapin Mar 13, 2020
d7a13fb
Merge branch 'remove_empty_columns_when_part_written' into alter_on_t…
alesapin Mar 13, 2020
c7c945a
Fix style check
alesapin Mar 13, 2020
b0b81be
Merge with master and fix
alesapin Mar 17, 2020
5877a5a
Remove alter transaction!!!
alesapin Mar 17, 2020
8d0cb42
Remove more code
alesapin Mar 17, 2020
d5636fb
Fix style check
alesapin Mar 17, 2020
347d2a3
Remove columns lock!!!
alesapin Mar 17, 2020
17e505d
Fix build errors
alesapin Mar 17, 2020
87e9a84
Unblock waiting mutations
alesapin Mar 18, 2020
b3d52cf
Merge branch 'master' into alter_on_top_of_mutations_merge_tree
alesapin Mar 18, 2020
e6acfd4
Return new test
alesapin Mar 18, 2020
13a0151
Remove garbage
alesapin Mar 18, 2020
3babac1
Fix query hang
alesapin Mar 18, 2020
eb938f6
Better mutations interface
alesapin Mar 18, 2020
d004062
Splitting mutate part to temporarty part
alesapin Mar 18, 2020
f64c005
Split merger mutator
alesapin Mar 18, 2020
baec35a
Better locking in alter
alesapin Mar 18, 2020
04494c6
Merge with master after strange conflict
alesapin Mar 18, 2020
37cc49e
Fix clang-tidy
alesapin Mar 18, 2020
ce8eb92
Merge branch 'master' into alter_on_top_of_mutations_merge_tree
alesapin Mar 19, 2020
204d0ac
Fix bugs after method split
alesapin Mar 19, 2020
c6d10e3
Revert accident changes
alesapin Mar 19, 2020
610a727
Fix tidy error
alesapin Mar 19, 2020
5a3216b
Merge with master
alesapin Mar 20, 2020
90c436f
Fix style
alesapin Mar 20, 2020
bafd51d
Merge branch 'master' into alter_on_top_of_mutations_merge_tree
alesapin Mar 20, 2020
62f39b6
Correct merge with master
alesapin Mar 20, 2020
2a53326
Correct merge with master (try 2)
alesapin Mar 20, 2020
20970f0
Remove files from part correctly
alesapin Mar 20, 2020
fbf7301
Fix integration test
alesapin Mar 21, 2020
7a92428
Update MutationCommands.h
alexey-milovidov Mar 22, 2020
46829a6
Update StorageMergeTree.cpp
alexey-milovidov Mar 22, 2020
03aa8d4
Update AlterCommands.cpp
alexey-milovidov Mar 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion dbms/src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ BlockIO InterpreterAlterQuery::execute()

if (!partition_commands.empty())
{
partition_commands.validate(*table);
table->alterPartition(query_ptr, partition_commands, context);
}

Expand Down
74 changes: 44 additions & 30 deletions dbms/src/Storages/AlterCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,18 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_

return command;
}
else if (command_ast->type == ASTAlterCommand::DROP_COLUMN && !command_ast->partition)
else if (command_ast->type == ASTAlterCommand::DROP_COLUMN)
{
if (command_ast->clear_column)
throw Exception(R"("ALTER TABLE table CLEAR COLUMN column" queries are not supported yet. Use "CLEAR COLUMN column IN PARTITION".)", ErrorCodes::NOT_IMPLEMENTED);

AlterCommand command;
command.ast = command_ast->clone();
command.type = AlterCommand::DROP_COLUMN;
command.column_name = getIdentifierName(command_ast->column);
command.if_exists = command_ast->if_exists;
if (command_ast->clear_column)
command.clear = true;

if (command_ast->partition)
command.partition = command_ast->partition;
return command;
}
else if (command_ast->type == ASTAlterCommand::MODIFY_COLUMN)
Expand Down Expand Up @@ -186,11 +188,8 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_

return command;
}
else if (command_ast->type == ASTAlterCommand::DROP_CONSTRAINT && !command_ast->partition)
else if (command_ast->type == ASTAlterCommand::DROP_CONSTRAINT)
{
if (command_ast->clear_column)
throw Exception(R"("ALTER TABLE table CLEAR COLUMN column" queries are not supported yet. Use "CLEAR COLUMN column IN PARTITION".)", ErrorCodes::NOT_IMPLEMENTED);

AlterCommand command;
command.ast = command_ast->clone();
command.if_exists = command_ast->if_exists;
Expand All @@ -199,16 +198,18 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_

return command;
}
else if (command_ast->type == ASTAlterCommand::DROP_INDEX && !command_ast->partition)
else if (command_ast->type == ASTAlterCommand::DROP_INDEX)
{
if (command_ast->clear_column)
throw Exception(R"("ALTER TABLE table CLEAR INDEX index" queries are not supported yet. Use "CLEAR INDEX index IN PARTITION".)", ErrorCodes::NOT_IMPLEMENTED);

AlterCommand command;
command.ast = command_ast->clone();
command.type = AlterCommand::DROP_INDEX;
command.index_name = command_ast->index->as<ASTIdentifier &>().name;
command.if_exists = command_ast->if_exists;
if (command_ast->clear_index)
command.clear = true;

if (command_ast->partition)
command.partition = command_ast->partition;

return command;
}
Expand Down Expand Up @@ -263,7 +264,9 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata) const
}
else if (type == DROP_COLUMN)
{
metadata.columns.remove(column_name);
/// Otherwise just clear data on disk
if (!clear && !partition)
metadata.columns.remove(column_name);
}
else if (type == MODIFY_COLUMN)
{
Expand Down Expand Up @@ -354,23 +357,25 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata) const
}
else if (type == DROP_INDEX)
{
auto erase_it = std::find_if(
metadata.indices.indices.begin(),
metadata.indices.indices.end(),
[this](const ASTPtr & index_ast)
{
return index_ast->as<ASTIndexDeclaration &>().name == index_name;
});

if (erase_it == metadata.indices.indices.end())
if (!partition && !clear)
{
if (if_exists)
return;
throw Exception("Wrong index name. Cannot find index " + backQuote(index_name) + " to drop.",
ErrorCodes::BAD_ARGUMENTS);
}
auto erase_it = std::find_if(
metadata.indices.indices.begin(),
metadata.indices.indices.end(),
[this](const ASTPtr & index_ast)
{
return index_ast->as<ASTIndexDeclaration &>().name == index_name;
});

if (erase_it == metadata.indices.indices.end())
{
if (if_exists)
return;
throw Exception("Wrong index name. Cannot find index " + backQuote(index_name) + " to drop.", ErrorCodes::BAD_ARGUMENTS);
}

metadata.indices.indices.erase(erase_it);
metadata.indices.indices.erase(erase_it);
}
}
else if (type == ADD_CONSTRAINT)
{
Expand Down Expand Up @@ -515,7 +520,7 @@ bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metada
if (ignore)
return false;

if (type == DROP_COLUMN)
if (type == DROP_COLUMN || type == DROP_INDEX)
return true;

if (type != MODIFY_COLUMN || data_type == nullptr)
Expand Down Expand Up @@ -564,12 +569,21 @@ std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(const S
{
result.type = MutationCommand::Type::DROP_COLUMN;
result.column_name = column_name;
if (clear)
result.clear = true;
if (partition)
result.partition = partition;
result.predicate = nullptr;
}
else if (type == DROP_INDEX)
{
result.type = MutationCommand::Type::DROP_INDEX;
result.column_name = column_name;
result.column_name = index_name;
if (clear)
result.clear = true;
if (partition)
result.partition = partition;

result.predicate = nullptr;
}

Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/AlterCommands.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ struct AlterCommand

String column_name;

/// For DROP COLUMN ... FROM PARTITION
String partition_name;
/// For DROP/CLEAR COLUMN/INDEX ... IN PARTITION
ASTPtr partition;

/// For ADD and MODIFY, a new column type.
DataTypePtr data_type = nullptr;
Expand Down Expand Up @@ -84,6 +84,9 @@ struct AlterCommand
/// indicates that this command should not be applied, for example in case of if_exists=true and column doesn't exist.
bool ignore = false;

/// Clear columns or index (don't drop from metadata)
bool clear = false;

/// For ADD and MODIFY
CompressionCodecPtr codec = nullptr;

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ using SettingsChanges = std::vector<SettingChange>;

class AlterCommands;
class MutationCommands;
class PartitionCommands;
struct PartitionCommand;
using PartitionCommands = std::vector<PartitionCommand>;

class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
Expand Down
24 changes: 0 additions & 24 deletions dbms/src/Storages/MergeTree/AlterAnalysisResult.h

This file was deleted.

2 changes: 0 additions & 2 deletions dbms/src/Storages/MergeTree/DataPartsExchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo

MergeTreeData::DataPartPtr part = findPart(part_name);

std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);

CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};

/// We'll take a list of files from the list of checksums.
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,6 @@ void IMergeTreeDataPart::remove() const
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wunused-variable"
#endif
std::shared_lock<std::shared_mutex> lock(columns_lock);

for (const auto & [file, _] : checksums.files)
disk->remove(to + "/" + file);
#if !__clang__
Expand Down
14 changes: 1 addition & 13 deletions dbms/src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/MergeTree/AlterAnalysisResult.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Columns/IColumn.h>

Expand Down Expand Up @@ -94,19 +93,13 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar

/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (either by locking columns_lock, or by locking table structure).
/// (by locking table structure).
virtual ColumnSize getColumnSize(const String & /* name */, const IDataType & /* type */) const { return {}; }

virtual ColumnSize getTotalColumnsSize() const { return {}; }

virtual String getFileNameForColumn(const NameAndTypePair & column) const = 0;

/// Returns rename map of column files for the alter converting expression onto new table files.
/// Files to be deleted are mapped to an empty string in rename map.
virtual NameToNameMap createRenameMapForAlter(
AlterAnalysisResult & /* analysis_result */,
const NamesAndTypesList & /* old_columns */) const { return {}; }

virtual ~IMergeTreeDataPart();

using ColumnToSize = std::map<std::string, UInt64>;
Expand Down Expand Up @@ -283,11 +276,6 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// Columns with values, that all have been zeroed by expired ttl
NameSet expired_columns;

/** It is blocked for writing when changing columns, checksums or any part files.
* Locked to read when reading columns, checksums or any part files.
*/
mutable std::shared_mutex columns_lock;

/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;
Expand Down
70 changes: 70 additions & 0 deletions dbms/src/Storages/MergeTree/IMergedBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,74 @@ Block IMergedBlockOutputStream::getBlockAndPermute(const Block & block, const Na
return result;
}

NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
const MergeTreeDataPartPtr & data_part,
NamesAndTypesList & columns,
MergeTreeData::DataPart::Checksums & checksums)
{
const NameSet & empty_columns = data_part->expired_columns;

/// For compact part we have to override whole file with data, it's not
/// worth it
if (empty_columns.empty() || isCompactPart(data_part))
return {};

/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
std::map<String, size_t> stream_counts;
for (const NameAndTypePair & column : columns)
{
column.type->enumerateStreams(
[&](const IDataType::SubstreamPath & substream_path)
{
++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)];
},
{});
}

NameSet remove_files;
const String mrk_extension = data_part->getMarksFileExtension();
for (const auto & column_name : empty_columns)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
{
remove_files.emplace(stream_name + ".bin");
remove_files.emplace(stream_name + mrk_extension);
}
};
IDataType::SubstreamPath stream_path;
auto column_with_type = columns.tryGetByName(column_name);
if (column_with_type)
column_with_type->type->enumerateStreams(callback, stream_path);
}

/// Remove files on disk and checksums
for (const String & removed_file : remove_files)
{
if (checksums.files.count(removed_file))
{
data_part->disk->remove(data_part->getFullRelativePath() + removed_file);
checksums.files.erase(removed_file);
}
}

/// Remove columns from columns array
for (const String & empty_column_name : empty_columns)
{
auto find_func = [&empty_column_name](const auto & pair) -> bool
{
return pair.name == empty_column_name;
};
auto remove_it
= std::find_if(columns.begin(), columns.end(), find_func);

if (remove_it != columns.end())
columns.erase(remove_it);
}
return remove_files;
}

}
7 changes: 7 additions & 0 deletions dbms/src/Storages/MergeTree/IMergedBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ class IMergedBlockOutputStream : public IBlockOutputStream

IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);

/// Remove all columns marked expired in data_part. Also, clears checksums
/// and columns array. Return set of removed files names.
NameSet removeEmptyColumnsFromPart(
const MergeTreeDataPartPtr & data_part,
NamesAndTypesList & columns,
MergeTreeData::DataPart::Checksums & checksums);

protected:
const MergeTreeData & storage;

Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/MergeTree/MergeList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str
source_part_names.emplace_back(source_part->name);
source_part_paths.emplace_back(source_part->getFullPath());

std::shared_lock<std::shared_mutex> part_lock(source_part->columns_lock);

total_size_bytes_compressed += source_part->bytes_on_disk;
total_size_marks += source_part->getMarksCount();
total_rows_count += source_part->index_granularity.getTotalRows();
Expand Down