Skip to content

Commit

Permalink
Fixed error with cancelling merges on ALTERs [#METR-22524].
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-milovidov committed Aug 26, 2016
1 parent 0c39e52 commit 867b73b
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 50 deletions.
65 changes: 33 additions & 32 deletions dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h
Expand Up @@ -83,22 +83,41 @@ class MergeTreeDataMerger
/// Примерное количество места на диске, нужное для мерджа. С запасом.
static size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts);

/** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancel().
*/
void cancel() { cancelled = true; }
void uncancel() { cancelled = false; }
bool isCancelled() const { return cancelled; }

void abortIfRequested();

private:
/** Выбрать все куски принадлежащие одной партиции.
*/
MergeTreeData::DataPartsVector selectAllPartsFromPartition(DayNum_t partition);

private:
using FrozenPartitions = std::unordered_set<DayNum_t>;
/** Temporarily cancel merges.
*/
class BlockerImpl
{
public:
BlockerImpl(MergeTreeDataMerger * merger_) : merger(merger_)
{
++merger->cancelled;
}

~BlockerImpl()
{
--merger->cancelled;
}
private:
MergeTreeDataMerger * merger;
};

public:
/** Cancel all merges. All currently running 'mergeParts' methods will throw exception soon.
* All new calls to 'mergeParts' will throw exception till all 'Blocker' objects will be destroyed.
*/
using Blocker = std::unique_ptr<BlockerImpl>;
Blocker cancel() { return std::make_unique<BlockerImpl>(this); }

/** Cancel all merges forever.
*/
void cancelForever() { ++cancelled; }

bool isCancelled() const { return cancelled > 0; }

private:
MergeTreeData & data;
Expand All @@ -110,28 +129,10 @@ class MergeTreeDataMerger

CancellationHook cancellation_hook;

std::atomic<bool> cancelled {false};
};

std::atomic<int> cancelled {0};

/** Временно приостанавливает мерджи.
*/
class MergeTreeMergeBlocker
{
public:
MergeTreeMergeBlocker(MergeTreeDataMerger & merger_)
: merger(merger_)
{
merger.cancel();
}

~MergeTreeMergeBlocker()
{
merger.uncancel();
}

private:
MergeTreeDataMerger & merger;
void abortReshardPartitionIfRequested();
};


}
10 changes: 5 additions & 5 deletions dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp
Expand Up @@ -812,13 +812,13 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(

while (Block block = merged_stream->read())
{
abortIfRequested();
abortReshardPartitionIfRequested();

ShardedBlocksWithDateIntervals blocks = sharder.shardBlock(block);

for (ShardedBlockWithDateInterval & block_with_dates : blocks)
{
abortIfRequested();
abortReshardPartitionIfRequested();

size_t shard_no = block_with_dates.shard_no;
MergeTreeData::MutableDataPartPtr & data_part = per_shard_data_parts.at(shard_no);
Expand All @@ -845,7 +845,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
/// Завершить инициализацию куски новых партиций.
for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no)
{
abortIfRequested();
abortReshardPartitionIfRequested();

MergedBlockOutputStreamPtr & output_stream = per_shard_output.at(shard_no);
if (0 == output_stream->marksCount())
Expand Down Expand Up @@ -896,9 +896,9 @@ size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataP
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
}

void MergeTreeDataMerger::abortIfRequested()
void MergeTreeDataMerger::abortReshardPartitionIfRequested()
{
if (cancelled)
if (isCancelled())
throw Exception("Cancelled partition resharding", ErrorCodes::ABORTED);

if (cancellation_hook)
Expand Down
Expand Up @@ -67,14 +67,14 @@ void ReplicatedMergeTreeAlterThread::run()

{
/// Если потребуется блокировать структуру таблицы, то приостановим мерджи.
std::unique_ptr<MergeTreeMergeBlocker> merge_blocker;
std::unique_ptr<MergeTreeMergeBlocker> unreplicated_merge_blocker;
MergeTreeDataMerger::Blocker merge_blocker;
MergeTreeDataMerger::Blocker unreplicated_merge_blocker;

if (changed_version || force_recheck_parts)
{
merge_blocker = std::make_unique<MergeTreeMergeBlocker>(storage.merger);
merge_blocker = storage.merger.cancel();
if (storage.unreplicated_merger)
unreplicated_merge_blocker = std::make_unique<MergeTreeMergeBlocker>(*storage.unreplicated_merger);
unreplicated_merge_blocker = storage.unreplicated_merger->cancel();
}

MergeTreeData::DataParts parts;
Expand Down
Expand Up @@ -171,9 +171,9 @@ void ReplicatedMergeTreeRestartingThread::run()
storage.remote_part_checker_endpoint_holder->cancel();
storage.remote_part_checker_endpoint_holder = nullptr;

storage.merger.cancel();
storage.merger.cancelForever();
if (storage.unreplicated_merger)
storage.unreplicated_merger->cancel();
storage.unreplicated_merger->cancelForever();

partialShutdown();
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/StorageMergeTree.cpp
Expand Up @@ -109,7 +109,7 @@ void StorageMergeTree::shutdown()
if (shutdown_called)
return;
shutdown_called = true;
merger.cancel();
merger.cancelForever();
background_pool.removeTask(merge_task_handle);
}

Expand Down Expand Up @@ -168,7 +168,7 @@ void StorageMergeTree::alter(
const Context & context)
{
/// NOTE: Здесь так же как в ReplicatedMergeTree можно сделать ALTER, не блокирующий запись данных надолго.
const MergeTreeMergeBlocker merge_blocker{merger};
auto merge_blocker = merger.cancel();

auto table_soft_lock = lockDataForAlter();

Expand Down Expand Up @@ -360,7 +360,7 @@ void StorageMergeTree::dropPartition(ASTPtr query, const Field & partition, bool

/// Просит завершить мерджи и не позволяет им начаться.
/// Это защищает от "оживания" данных за удалённую партицию после завершения мерджа.
const MergeTreeMergeBlocker merge_blocker{merger};
auto merge_blocker = merger.cancel();
/// Дожидается завершения мерджей и не даёт начаться новым.
auto lock = lockForAlter();

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/StorageReplicatedMergeTree.cpp
Expand Up @@ -2383,9 +2383,9 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
assertNotReadonly();

auto zookeeper = getZooKeeper();
const MergeTreeMergeBlocker merge_blocker{merger};
const auto unreplicated_merge_blocker = unreplicated_merger ?
std::make_unique<MergeTreeMergeBlocker>(*unreplicated_merger) : nullptr;
auto merge_blocker = merger.cancel();
auto unreplicated_merge_blocker = unreplicated_merger ?
unreplicated_merger->cancel() : MergeTreeDataMerger::Blocker();

LOG_DEBUG(log, "Doing ALTER");

Expand Down Expand Up @@ -2576,7 +2576,7 @@ void StorageReplicatedMergeTree::dropUnreplicatedPartition(const Field & partiti

/// Просит завершить мерджи и не позволяет им начаться.
/// Это защищает от "оживания" данных за удалённую партицию после завершения мерджа.
const MergeTreeMergeBlocker merge_blocker{*unreplicated_merger};
auto merge_blocker = unreplicated_merger->cancel();
auto structure_lock = lockStructure(true);

const DayNum_t month = MergeTreeData::getMonthDayNum(partition);
Expand Down

0 comments on commit 867b73b

Please sign in to comment.