Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alter table drop detached part #6158

Merged
merged 19 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingUInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.") \
M(SettingBool, check_query_single_value_result, true, "Return check query result as single 1/0 value") \
M(SettingBool, allow_drop_detached, false, "Allow ALTER TABLE ... DROP DETACHED PART[ITION] ... queries") \
\
M(SettingBool, allow_experimental_live_view, false, "Enable LIVE VIEW. Not mature enough.") \
M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.") \
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_COLUMN;
extern const int SUPPORT_IS_DISABLED;
}


Expand Down Expand Up @@ -56,7 +57,13 @@ BlockIO InterpreterAlterQuery::execute()
if (auto alter_command = AlterCommand::parse(command_ast))
alter_commands.emplace_back(std::move(*alter_command));
else if (auto partition_command = PartitionCommand::parse(command_ast))
{
if (partition_command->type == PartitionCommand::DROP_DETACHED_PARTITION
&& !context.getSettingsRef().allow_drop_detached)
throw DB::Exception("Cannot execute query: DROP DETACHED PART is disabled "
"(see allow_drop_detached setting)", ErrorCodes::SUPPORT_IS_DISABLED);
partition_commands.emplace_back(std::move(*partition_command));
}
else if (auto mut_command = MutationCommand::parse(command_ast))
mutation_commands.emplace_back(std::move(*mut_command));
else if (auto live_view_command = LiveViewCommand::parse(command_ast))
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Parsers/ASTAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ void ASTAlterCommand::formatImpl(
<< (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::DROP_DETACHED_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DROP DETACHED" << (part ? " PART " : " PARTITION ")
<< (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::ATTACH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ATTACH "
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Parsers/ASTAlterQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ASTAlterCommand : public IAST
MATERIALIZE_INDEX,

DROP_PARTITION,
DROP_DETACHED_PARTITION,
ATTACH_PARTITION,
REPLACE_PARTITION,
FETCH_PARTITION,
Expand Down Expand Up @@ -115,7 +116,7 @@ class ASTAlterCommand : public IAST

bool detach = false; /// true for DETACH PARTITION

bool part = false; /// true for ATTACH PART
bool part = false; /// true for ATTACH PART and DROP DETACHED PART

bool clear_column = false; /// for CLEAR COLUMN (do not drop column from metadata)

Expand Down
21 changes: 19 additions & 2 deletions dbms/src/Parsers/ParserAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_attach_partition("ATTACH PARTITION");
ParserKeyword s_detach_partition("DETACH PARTITION");
ParserKeyword s_drop_partition("DROP PARTITION");
ParserKeyword s_drop_detached_partition("DROP DETACHED PARTITION");
ParserKeyword s_drop_detached_part("DROP DETACHED PART");
ParserKeyword s_attach_part("ATTACH PART");
ParserKeyword s_fetch_partition("FETCH PARTITION");
ParserKeyword s_replace_partition("REPLACE PARTITION");
Expand Down Expand Up @@ -155,6 +157,21 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected

command->type = ASTAlterCommand::DROP_PARTITION;
}
else if (s_drop_detached_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;

command->type = ASTAlterCommand::DROP_DETACHED_PARTITION;
}
else if (s_drop_detached_part.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, command->partition, expected))
return false;

command->type = ASTAlterCommand::DROP_DETACHED_PARTITION;
command->part = true;
}
else if (s_drop_column.ignore(pos, expected))
{
if (s_if_exists.ignore(pos, expected))
Expand All @@ -163,8 +180,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
if (!parser_name.parse(pos, command->column, expected))
return false;

command->type = ASTAlterCommand::DROP_COLUMN;
command->detach = false;
command->type = ASTAlterCommand::DROP_COLUMN;
command->detach = false;
}
else if (s_clear_column.ignore(pos, expected))
{
Expand Down
182 changes: 166 additions & 16 deletions dbms/src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ namespace ErrorCodes
extern const int CANNOT_MUNMAP;
extern const int CANNOT_MREMAP;
extern const int BAD_TTL_EXPRESSION;
extern const int INCORRECT_FILE_NAME;
extern const int BAD_DATA_PART_NAME;
}


Expand Down Expand Up @@ -1766,6 +1768,52 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
}
}

void MergeTreeData::PartsTemporaryRename::addPart(const String & old_name, const String & new_name)
{
old_and_new_names.push_back({old_name, new_name});
}

void MergeTreeData::PartsTemporaryRename::tryRenameAll()
{
renamed = true;
for (size_t i = 0; i < old_and_new_names.size(); ++i)
{
try
{
const auto & names = old_and_new_names[i];
if (names.first.empty() || names.second.empty())
throw DB::Exception("Empty part name. Most likely it's a bug.", ErrorCodes::INCORRECT_FILE_NAME);
Poco::File(base_dir + names.first).renameTo(base_dir + names.second);
}
catch (...)
{
old_and_new_names.resize(i);
LOG_WARNING(storage.log, "Cannot rename parts to perform operation on them: " << getCurrentExceptionMessage(false));
throw;
}
}
}

MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename()
{
// TODO what if server had crashed before this destructor was called?
if (!renamed)
return;
for (const auto & names : old_and_new_names)
{
if (names.first.empty())
continue;
try
{
Poco::File(base_dir + names.second).renameTo(base_dir + names.first);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}


MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
const MergeTreePartInfo & new_part_info,
Expand Down Expand Up @@ -2386,6 +2434,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const St
{
MutableDataPartPtr part = std::make_shared<DataPart>(*this, Poco::Path(relative_path).getFileName());
part->relative_path = relative_path;
loadPartAndFixMetadata(part);
return part;
}

void MergeTreeData::loadPartAndFixMetadata(MutableDataPartPtr part)
{
String full_part_path = part->getFullPath();

/// Earlier the list of columns was written incorrectly. Delete it and re-create.
Expand All @@ -2407,8 +2461,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const St

Poco::File(full_part_path + "checksums.txt.tmp").renameTo(full_part_path + "checksums.txt");
}

return part;
}


Expand Down Expand Up @@ -2628,24 +2680,122 @@ MergeTreeData::getDetachedParts() const
res.emplace_back();
auto & part = res.back();

/// First, try to parse as <part_name>.
if (MergeTreePartInfo::tryParsePartName(dir_name, &part, format_version))
continue;
DetachedPartInfo::tryParseDetachedPartName(dir_name, part, format_version);
}
return res;
}

/// Next, as <prefix>_<partname>. Use entire name as prefix if it fails.
part.prefix = dir_name;
const auto first_separator = dir_name.find_first_of('_');
if (first_separator == String::npos)
continue;
void MergeTreeData::validateDetachedPartName(const String & name) const
{
if (name.find('/') != std::string::npos || name == "." || name == "..")
throw DB::Exception("Invalid part name", ErrorCodes::INCORRECT_FILE_NAME);

const auto part_name = dir_name.substr(first_separator + 1,
dir_name.size() - first_separator - 1);
if (!MergeTreePartInfo::tryParsePartName(part_name, &part, format_version))
continue;
Poco::File detached_part_dir(full_path + "detached/" + name);
if (!detached_part_dir.exists())
throw DB::Exception("Detached part \"" + name + "\" not found" , ErrorCodes::BAD_DATA_PART_NAME);

part.prefix = dir_name.substr(0, first_separator);
if (startsWith(name, "attaching_") || startsWith(name, "deleting_"))
throw DB::Exception("Cannot drop part " + name + ": "
"most likely it is used by another DROP or ATTACH query.",
ErrorCodes::BAD_DATA_PART_NAME);
}

void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, const Context & context)
{
PartsTemporaryRename renamed_parts(*this, full_path + "detached/");

if (part)
{
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_name);
renamed_parts.addPart(part_name, "deleting_" + part_name);
}
return res;
else
{
String partition_id = getPartitionIDFromQuery(partition, context);
DetachedPartsInfo detached_parts = getDetachedParts();
for (const auto & part_info : detached_parts)
if (part_info.valid_name && part_info.partition_id == partition_id
&& part_info.prefix != "attaching" && part_info.prefix != "deleting")
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name);
}

LOG_DEBUG(log, "Will drop " << renamed_parts.old_and_new_names.size() << " detached parts.");

renamed_parts.tryRenameAll();

for (auto & names : renamed_parts.old_and_new_names)
{
Poco::File(renamed_parts.base_dir + names.second).remove(true);
LOG_DEBUG(log, "Dropped detached part " << names.first);
names.first.clear();
}
}

MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
const Context & context, PartsTemporaryRename & renamed_parts)
{
String source_dir = "detached/";

/// Let's compose a list of parts that should be added.
if (attach_part)
{
String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_id);
renamed_parts.addPart(part_id, "attaching_" + part_id);
}
else
{
String partition_id = getPartitionIDFromQuery(partition, context);
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
ActiveDataPartSet active_parts(format_version);

std::set<String> part_names;
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
String name = it.name();
MergeTreePartInfo part_info;
// TODO what if name contains "_tryN" suffix?
/// Parts with prefix in name (e.g. attaching_1_3_3_0, deleting_1_3_3_0) will be ignored
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version))
continue;
if (part_info.partition_id != partition_id)
continue;
LOG_DEBUG(log, "Found part " << name);
active_parts.add(name);
part_names.insert(name);
}
LOG_DEBUG(log, active_parts.size() << " of them are active");

/// Inactive parts rename so they can not be attached in case of repeated ATTACH.
for (const auto & name : part_names)
{
String containing_part = active_parts.getContainingPart(name);
if (!containing_part.empty() && containing_part != name)
// TODO maybe use PartsTemporaryRename here?
Poco::File(full_path + source_dir + name).renameTo(full_path + source_dir + "inactive_" + name);
else
renamed_parts.addPart(name, "attaching_" + name);
}
}

/// Try to rename all parts before attaching to prevent race with DROP DETACHED and another ATTACH.
renamed_parts.tryRenameAll();

/// Synchronously check that added parts exist and are not broken. We will write checksums.txt if it does not exist.
LOG_DEBUG(log, "Checking parts");
MutableDataPartsVector loaded_parts;
loaded_parts.reserve(renamed_parts.old_and_new_names.size());
for (const auto & part_names : renamed_parts.old_and_new_names)
{
LOG_DEBUG(log, "Checking part " << part_names.second);
MutableDataPartPtr part = std::make_shared<DataPart>(*this, part_names.first);
part->relative_path = source_dir + part_names.second;
loadPartAndFixMetadata(part);
loaded_parts.push_back(part);
}

return loaded_parts;
}

MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
Expand Down
27 changes: 26 additions & 1 deletion dbms/src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,23 @@ class MergeTreeData : public IStorage

using AlterDataPartTransactionPtr = std::unique_ptr<AlterDataPartTransaction>;

struct PartsTemporaryRename : private boost::noncopyable
{
PartsTemporaryRename(const MergeTreeData & storage_, const String & base_dir_) : storage(storage_), base_dir(base_dir_) {}

void addPart(const String & old_name, const String & new_name);

/// Renames part from old_name to new_name
void tryRenameAll();

/// Renames all added parts from new_name to old_name if old name is not empty
~PartsTemporaryRename();

const MergeTreeData & storage;
String base_dir;
std::vector<std::pair<String, String>> old_and_new_names;
bool renamed = false;
};

/// Parameters for various modes.
struct MergingParams
Expand Down Expand Up @@ -388,7 +405,14 @@ class MergeTreeData : public IStorage
DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const;

/// Returns all detached parts
std::vector<DetachedPartInfo> getDetachedParts() const;
DetachedPartsInfo getDetachedParts() const;

void validateDetachedPartName(const String & name) const;

void dropDetached(const ASTPtr & partition, bool part, const Context & context);

MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
const Context & context, PartsTemporaryRename & renamed_parts);

/// Returns Committed parts
DataParts getDataParts() const;
Expand Down Expand Up @@ -536,6 +560,7 @@ class MergeTreeData : public IStorage

/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path);
void loadPartAndFixMetadata(MutableDataPartPtr part);

/** Create local backup (snapshot) for parts with specified prefix.
* Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,7 @@ MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & na
{
}

MergeTreeDataPart::MergeTreeDataPart(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_)
MergeTreeDataPart::MergeTreeDataPart(const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_)
: storage(storage_)
, name(name_)
, info(info_)
Expand Down Expand Up @@ -367,6 +364,8 @@ void MergeTreeDataPart::remove() const
* And a race condition can happen that will lead to "File not found" error here.
*/

// TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function

String from = storage.full_path + relative_path;
String to = storage.full_path + "delete_tmp_" + name;

Expand Down