Skip to content

Commit

Permalink
Merge pull request #40151 from ClickHouse/fix_cannot_quickly_remove_d…
Browse files Browse the repository at this point in the history
…irectory

Fix "Cannot quickly remove directory"
  • Loading branch information
alesapin committed Aug 17, 2022
2 parents caa270b + c18f3be commit 4398a35
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 83 deletions.
26 changes: 13 additions & 13 deletions src/Storages/MergeTree/DataPartStorageOnDisk.cpp
Expand Up @@ -208,6 +208,8 @@ void DataPartStorageOnDisk::remove(
const NameSet & names_not_to_remove,
const MergeTreeDataPartChecksums & checksums,
std::list<ProjectionChecksums> projections,
bool is_temp,
MergeTreeDataPartState state,
Poco::Logger * log) const
{
/// NOTE We rename part to delete_tmp_<relative_path> instead of delete_tmp_<name> to avoid race condition
Expand Down Expand Up @@ -276,7 +278,7 @@ void DataPartStorageOnDisk::remove(

clearDirectory(
fs::path(to) / proj_dir_name,
can_remove_shared_data, names_not_to_remove, projection.checksums, {}, log, true);
can_remove_shared_data, names_not_to_remove, projection.checksums, {}, is_temp, state, log, true);
}

/// It is possible that we are removing the part which have a written but not loaded projection.
Expand All @@ -303,7 +305,7 @@ void DataPartStorageOnDisk::remove(

clearDirectory(
fs::path(to) / name,
can_remove_shared_data, names_not_to_remove, tmp_checksums, {}, log, true);
can_remove_shared_data, names_not_to_remove, tmp_checksums, {}, is_temp, state, log, true);
}
catch (...)
{
Expand All @@ -313,7 +315,7 @@ void DataPartStorageOnDisk::remove(
}
}

clearDirectory(to, can_remove_shared_data, names_not_to_remove, checksums, projection_directories, log, false);
clearDirectory(to, can_remove_shared_data, names_not_to_remove, checksums, projection_directories, is_temp, state, log, false);
}

void DataPartStorageOnDisk::clearDirectory(
Expand All @@ -322,24 +324,22 @@ void DataPartStorageOnDisk::clearDirectory(
const NameSet & names_not_to_remove,
const MergeTreeDataPartChecksums & checksums,
const std::unordered_set<String> & skip_directories,
bool is_temp,
MergeTreeDataPartState state,
Poco::Logger * log,
bool is_projection) const
{
auto disk = volume->getDisk();

if (checksums.empty())
/// It does not make sense to try fast path for incomplete temporary parts, because some files are probably absent.
/// Sometimes we add something to checksums.files before actually writing checksums and columns on disk.
/// Also sometimes we write checksums.txt and columns.txt in arbitrary order, so this check becomes complex...
bool is_temporary_part = is_temp || state == MergeTreeDataPartState::Temporary;
bool incomplete_temporary_part = is_temporary_part && (!disk->exists(fs::path(dir) / "checksums.txt") || !disk->exists(fs::path(dir) / "columns.txt"));
if (checksums.empty() || incomplete_temporary_part)
{
if (is_projection)
{
LOG_ERROR(
log,
"Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: checksums.txt is missing",
fullPath(disk, dir));
}

/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeSharedRecursive(fs::path(dir) / "", !can_remove_shared_data, names_not_to_remove);

return;
}

Expand Down
4 changes: 4 additions & 0 deletions src/Storages/MergeTree/DataPartStorageOnDisk.h
Expand Up @@ -49,6 +49,8 @@ class DataPartStorageOnDisk final : public IDataPartStorage
const NameSet & names_not_to_remove,
const MergeTreeDataPartChecksums & checksums,
std::list<ProjectionChecksums> projections,
bool is_temp,
MergeTreeDataPartState state,
Poco::Logger * log) const override;

std::string getRelativePathForPrefix(Poco::Logger * log, const String & prefix, bool detached) const override;
Expand Down Expand Up @@ -120,6 +122,8 @@ class DataPartStorageOnDisk final : public IDataPartStorage
const NameSet & names_not_to_remove,
const MergeTreeDataPartChecksums & checksums,
const std::unordered_set<String> & skip_directories,
bool is_temp,
MergeTreeDataPartState state,
Poco::Logger * log,
bool is_projection) const;
};
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/MergeTree/IDataPartStorage.h
Expand Up @@ -3,6 +3,7 @@
#include <base/types.h>
#include <Core/NamesAndTypes.h>
#include <Interpreters/TransactionVersionMetadata.h>
#include <Storages/MergeTree/MergeTreeDataPartState.h>
#include <optional>

namespace DB
Expand Down Expand Up @@ -116,6 +117,8 @@ class IDataPartStorage
const NameSet & names_not_to_remove,
const MergeTreeDataPartChecksums & checksums,
std::list<ProjectionChecksums> projections,
bool is_temp,
MergeTreeDataPartState state,
Poco::Logger * log) const = 0;

/// Get a name like 'prefix_partdir_tryN' which does not exist in a root dir.
Expand Down
52 changes: 26 additions & 26 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Expand Up @@ -207,55 +207,55 @@ void IMergeTreeDataPart::MinMaxIndex::appendFiles(const MergeTreeData & data, St
}


static void incrementStateMetric(IMergeTreeDataPart::State state)
static void incrementStateMetric(MergeTreeDataPartState state)
{
switch (state)
{
case IMergeTreeDataPart::State::Temporary:
case MergeTreeDataPartState::Temporary:
CurrentMetrics::add(CurrentMetrics::PartsTemporary);
return;
case IMergeTreeDataPart::State::PreActive:
case MergeTreeDataPartState::PreActive:
CurrentMetrics::add(CurrentMetrics::PartsPreActive);
CurrentMetrics::add(CurrentMetrics::PartsPreCommitted);
return;
case IMergeTreeDataPart::State::Active:
case MergeTreeDataPartState::Active:
CurrentMetrics::add(CurrentMetrics::PartsActive);
CurrentMetrics::add(CurrentMetrics::PartsCommitted);
return;
case IMergeTreeDataPart::State::Outdated:
case MergeTreeDataPartState::Outdated:
CurrentMetrics::add(CurrentMetrics::PartsOutdated);
return;
case IMergeTreeDataPart::State::Deleting:
case MergeTreeDataPartState::Deleting:
CurrentMetrics::add(CurrentMetrics::PartsDeleting);
return;
case IMergeTreeDataPart::State::DeleteOnDestroy:
case MergeTreeDataPartState::DeleteOnDestroy:
CurrentMetrics::add(CurrentMetrics::PartsDeleteOnDestroy);
return;
}
}

static void decrementStateMetric(IMergeTreeDataPart::State state)
static void decrementStateMetric(MergeTreeDataPartState state)
{
switch (state)
{
case IMergeTreeDataPart::State::Temporary:
case MergeTreeDataPartState::Temporary:
CurrentMetrics::sub(CurrentMetrics::PartsTemporary);
return;
case IMergeTreeDataPart::State::PreActive:
case MergeTreeDataPartState::PreActive:
CurrentMetrics::sub(CurrentMetrics::PartsPreActive);
CurrentMetrics::sub(CurrentMetrics::PartsPreCommitted);
return;
case IMergeTreeDataPart::State::Active:
case MergeTreeDataPartState::Active:
CurrentMetrics::sub(CurrentMetrics::PartsActive);
CurrentMetrics::sub(CurrentMetrics::PartsCommitted);
return;
case IMergeTreeDataPart::State::Outdated:
case MergeTreeDataPartState::Outdated:
CurrentMetrics::sub(CurrentMetrics::PartsOutdated);
return;
case IMergeTreeDataPart::State::Deleting:
case MergeTreeDataPartState::Deleting:
CurrentMetrics::sub(CurrentMetrics::PartsDeleting);
return;
case IMergeTreeDataPart::State::DeleteOnDestroy:
case MergeTreeDataPartState::DeleteOnDestroy:
CurrentMetrics::sub(CurrentMetrics::PartsDeleteOnDestroy);
return;
}
Expand Down Expand Up @@ -313,7 +313,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, use_metadata_cache(storage.use_metadata_cache)
{
if (parent_part)
state = State::Active;
state = MergeTreeDataPartState::Active;
incrementStateMetric(state);
incrementTypeMetric(part_type);

Expand All @@ -339,7 +339,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, use_metadata_cache(storage.use_metadata_cache)
{
if (parent_part)
state = State::Active;
state = MergeTreeDataPartState::Active;
incrementStateMetric(state);
incrementTypeMetric(part_type);

Expand Down Expand Up @@ -381,14 +381,14 @@ std::optional<size_t> IMergeTreeDataPart::getColumnPosition(const String & colum
}


void IMergeTreeDataPart::setState(IMergeTreeDataPart::State new_state) const
void IMergeTreeDataPart::setState(MergeTreeDataPartState new_state) const
{
decrementStateMetric(state);
state = new_state;
incrementStateMetric(state);
}

IMergeTreeDataPart::State IMergeTreeDataPart::getState() const
MergeTreeDataPartState IMergeTreeDataPart::getState() const
{
return state;
}
Expand Down Expand Up @@ -496,7 +496,7 @@ SerializationPtr IMergeTreeDataPart::tryGetSerialization(const String & column_n
void IMergeTreeDataPart::removeIfNeeded()
{
assert(assertHasValidVersionMetadata());
if (!is_temp && state != State::DeleteOnDestroy)
if (!is_temp && state != MergeTreeDataPartState::DeleteOnDestroy)
return;

try
Expand Down Expand Up @@ -526,7 +526,7 @@ void IMergeTreeDataPart::removeIfNeeded()

remove();

if (state == State::DeleteOnDestroy)
if (state == MergeTreeDataPartState::DeleteOnDestroy)
{
LOG_TRACE(storage.log, "Removed part from old location {}", path);
}
Expand All @@ -539,8 +539,8 @@ void IMergeTreeDataPart::removeIfNeeded()
/// Seems like it's especially important for remote disks, because removal may fail due to network issues.
tryLogCurrentException(__PRETTY_FUNCTION__);
assert(!is_temp);
assert(state != State::DeleteOnDestroy);
assert(state != State::Temporary);
assert(state != MergeTreeDataPartState::DeleteOnDestroy);
assert(state != MergeTreeDataPartState::Temporary);
}
}

Expand All @@ -561,7 +561,7 @@ UInt64 IMergeTreeDataPart::getIndexSizeInAllocatedBytes() const
return res;
}

void IMergeTreeDataPart::assertState(const std::initializer_list<IMergeTreeDataPart::State> & affordable_states) const
void IMergeTreeDataPart::assertState(const std::initializer_list<MergeTreeDataPartState> & affordable_states) const
{
if (!checkState(affordable_states))
{
Expand Down Expand Up @@ -1295,7 +1295,7 @@ catch (Exception & e)

bool IMergeTreeDataPart::wasInvolvedInTransaction() const
{
assert(!version.creation_tid.isEmpty() || (state == State::Temporary /* && std::uncaught_exceptions() */));
assert(!version.creation_tid.isEmpty() || (state == MergeTreeDataPartState::Temporary /* && std::uncaught_exceptions() */));
bool created_by_transaction = !version.creation_tid.isPrehistoric();
bool removed_by_transaction = version.isRemovalTIDLocked() && version.removal_tid_lock != Tx::PrehistoricTID.getHash();
return created_by_transaction || removed_by_transaction;
Expand All @@ -1319,7 +1319,7 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
if (part_is_probably_removed_from_disk)
return true;

if (state == State::Temporary)
if (state == MergeTreeDataPartState::Temporary)
return true;

if (!data_part_storage->exists())
Expand Down Expand Up @@ -1455,7 +1455,7 @@ void IMergeTreeDataPart::remove() const
projection_checksums.emplace_back(IDataPartStorage::ProjectionChecksums{.name = p_name, .checksums = projection_part->checksums});
}

data_part_storage->remove(can_remove, files_not_to_remove, checksums, projection_checksums, storage.log);
data_part_storage->remove(can_remove, files_not_to_remove, checksums, projection_checksums, is_temp, getState(), storage.log);
}

String IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix, bool detached) const
Expand Down
37 changes: 7 additions & 30 deletions src/Storages/MergeTree/IMergeTreeDataPart.h
Expand Up @@ -6,6 +6,7 @@
#include <Storages/IStorage.h>
#include <Storages/LightweightDeleteDescription.h>
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/MergeTree/MergeTreeDataPartState.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
Expand Down Expand Up @@ -223,45 +224,22 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// Flag for keep S3 data when zero-copy replication over S3 turned on.
mutable bool force_keep_shared_data = false;

/**
* Part state is a stage of its lifetime. States are ordered and state of a part could be increased only.
* Part state should be modified under data_parts mutex.
*
* Possible state transitions:
* Temporary -> PreActive: we are trying to add a fetched, inserted or merged part to active set
* PreActive -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part)
* PreActive -> Active: we successfully added a part to active dataset
* PreActive -> Outdated: a part was replaced by a covering part or DROP PARTITION
* Outdated -> Deleting: a cleaner selected this part for deletion
* Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion
* Active -> DeleteOnDestroy: if part was moved to another disk
*/
enum class State
{
Temporary, /// the part is generating now, it is not in data_parts list
PreActive, /// the part is in data_parts, but not used for SELECTs
Active, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
};

using TTLInfo = MergeTreeDataPartTTLInfo;
using TTLInfos = MergeTreeDataPartTTLInfos;

mutable TTLInfos ttl_infos;

/// Current state of the part. If the part is in working set already, it should be accessed via data_parts mutex
void setState(State new_state) const;
State getState() const;
void setState(MergeTreeDataPartState new_state) const;
MergeTreeDataPartState getState() const;

static constexpr std::string_view stateString(State state) { return magic_enum::enum_name(state); }
static constexpr std::string_view stateString(MergeTreeDataPartState state) { return magic_enum::enum_name(state); }
constexpr std::string_view stateString() const { return stateString(state); }

String getNameWithState() const { return fmt::format("{} (state {})", name, stateString()); }

/// Returns true if state of part is one of affordable_states
bool checkState(const std::initializer_list<State> & affordable_states) const
bool checkState(const std::initializer_list<MergeTreeDataPartState> & affordable_states) const
{
for (auto affordable_state : affordable_states)
{
Expand All @@ -272,7 +250,7 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
}

/// Throws an exception if state of the part is not in affordable_states
void assertState(const std::initializer_list<State> & affordable_states) const;
void assertState(const std::initializer_list<MergeTreeDataPartState> & affordable_states) const;

/// Primary key (correspond to primary.idx file).
/// Always loaded in RAM. Contains each index_granularity-th value of primary key tuple.
Expand Down Expand Up @@ -592,13 +570,12 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
/// for this column with default parameters.
CompressionCodecPtr detectDefaultCompressionCodec() const;

mutable State state{State::Temporary};
mutable MergeTreeDataPartState state{MergeTreeDataPartState::Temporary};

/// This ugly flag is needed for debug assertions only
mutable bool part_is_probably_removed_from_disk = false;
};

using MergeTreeDataPartState = IMergeTreeDataPart::State;
using MergeTreeDataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
using MergeTreeMutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;

Expand Down
14 changes: 7 additions & 7 deletions src/Storages/MergeTree/MergeTreeData.cpp
Expand Up @@ -2952,18 +2952,18 @@ void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const
if (part->version.creation_csn != Tx::RolledBackCSN)
MergeTreeTransaction::removeOldPart(shared_from_this(), part, txn);

if (part->getState() == IMergeTreeDataPart::State::Active)
if (part->getState() == MergeTreeDataPartState::Active)
{
removePartContributionToColumnAndSecondaryIndexSizes(part);
removePartContributionToDataVolume(part);
removed_active_part = true;
}

if (part->getState() == IMergeTreeDataPart::State::Active || clear_without_timeout)
if (part->getState() == MergeTreeDataPartState::Active || clear_without_timeout)
part->remove_time.store(remove_time, std::memory_order_relaxed);

if (part->getState() != IMergeTreeDataPart::State::Outdated)
modifyPartState(part, IMergeTreeDataPart::State::Outdated);
if (part->getState() != MergeTreeDataPartState::Outdated)
modifyPartState(part, MergeTreeDataPartState::Outdated);

if (isInMemoryPart(part) && getSettings()->in_memory_parts_enable_wal)
getWriteAheadLog()->dropPart(part->name);
Expand All @@ -2983,9 +2983,9 @@ void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(con
if (it_part == data_parts_by_info.end())
throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR);

assert(part->getState() == IMergeTreeDataPart::State::PreActive);
assert(part->getState() == MergeTreeDataPartState::PreActive);

modifyPartState(part, IMergeTreeDataPart::State::Temporary);
modifyPartState(part, MergeTreeDataPartState::Temporary);
/// Erase immediately
data_parts_indexes.erase(it_part);
}
Expand Down Expand Up @@ -6111,7 +6111,7 @@ void MergeTreeData::reportBrokenPart(MergeTreeData::DataPartPtr & data_part) con
broken_part_callback(part->name);
}
}
else if (data_part && data_part->getState() == IMergeTreeDataPart::State::Active)
else if (data_part && data_part->getState() == MergeTreeDataPartState::Active)
broken_part_callback(data_part->name);
else
LOG_DEBUG(log, "Will not check potentially broken part {} because it's not active", data_part->getNameWithState());
Expand Down

0 comments on commit 4398a35

Please sign in to comment.