Skip to content

Commit

Permalink
Merge pull request #51851 from ClickHouse/add_delay_for_replicated
Browse files Browse the repository at this point in the history
Make shutdown of `ReplicatedMergeTree` tables more soft
  • Loading branch information
tavplubix committed Jul 26, 2023
2 parents 93e1007 + 6bbed62 commit 4d03c23
Show file tree
Hide file tree
Showing 25 changed files with 394 additions and 33 deletions.
2 changes: 2 additions & 0 deletions docker/test/upgrade/run.sh
Expand Up @@ -62,6 +62,7 @@ configure

# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml

start
Expand Down Expand Up @@ -91,6 +92,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau

# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml

start
Expand Down
1 change: 1 addition & 0 deletions programs/server/Server.cpp
Expand Up @@ -747,6 +747,7 @@ try

std::lock_guard lock(servers_lock);
metrics.reserve(servers_to_start_before_tables.size() + servers.size());

for (const auto & server : servers_to_start_before_tables)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});

Expand Down
2 changes: 1 addition & 1 deletion src/Databases/DatabasesCommon.cpp
Expand Up @@ -292,7 +292,7 @@ void DatabaseWithOwnTablesBase::shutdown()

for (const auto & kv : tables_snapshot)
{
kv.second->flush();
kv.second->flushAndPrepareForShutdown();
}

for (const auto & kv : tables_snapshot)
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterDropQuery.cpp
Expand Up @@ -361,7 +361,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
std::vector<std::pair<String, bool>> tables_to_drop;
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
{
iterator->table()->flush();
iterator->table()->flushAndPrepareForShutdown();
tables_to_drop.push_back({iterator->name(), iterator->table()->isDictionary()});
}

Expand Down
8 changes: 4 additions & 4 deletions src/Storages/IStorage.h
Expand Up @@ -553,15 +553,15 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/**
* If the storage requires some complicated work on destroying,
* then you have two virtual methods:
* - flush()
* - flushAndPrepareForShutdown()
* - shutdown()
*
* @see shutdown()
* @see flush()
* @see flushAndPrepareForShutdown()
*/
void flushAndShutdown()
{
flush();
flushAndPrepareForShutdown();
shutdown();
}

Expand All @@ -574,7 +574,7 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo

/// Called before shutdown() to flush data to underlying storage
/// Data in memory need to be persistent
virtual void flush() {}
virtual void flushAndPrepareForShutdown() {}

/// Asks table to stop executing some action identified by action_type
/// If table does not support such type of lock, and empty lock is returned
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/DataPartsExchange.cpp
Expand Up @@ -203,6 +203,8 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
sendPartFromMemory(part, out, send_projections);
else
sendPartFromDisk(part, out, client_protocol_version, false, send_projections);

data.addLastSentPart(part->info);
}
catch (const NetException &)
{
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -119,6 +119,7 @@ struct Settings;
M(Bool, detach_not_byte_identical_parts, false, "Do not remove non byte-idential parts for ReplicatedMergeTree, instead detach them (maybe useful for further analysis).", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.", 0) \
M(Milliseconds, wait_for_unique_parts_send_before_shutdown_ms, 0, "Before shutdown table will wait for required amount time for unique parts (exist only on current replica) to be fetched by other replicas (0 means disabled).", 0) \
\
/** Check delay of replicas settings. */ \
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -576,7 +576,7 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
/// It's ok if replica became readonly due to connection loss after we got current zookeeper (in this case zookeeper must be expired).
/// And it's ok if replica became readonly after shutdown.
/// In other cases it's likely that someone called pullLogsToQueue(...) when queue is not initialized yet by RestartingThread.
bool not_completely_initialized = storage.is_readonly && !zookeeper->expired() && !storage.shutdown_called;
bool not_completely_initialized = storage.is_readonly && !zookeeper->expired() && !storage.shutdown_prepared_called;
if (not_completely_initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tried to pull logs to queue (reason: {}) on readonly replica {}, it's a bug",
reason, storage.getStorageID().getNameForLogs());
Expand Down
11 changes: 8 additions & 3 deletions src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
Expand Up @@ -329,7 +329,7 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()

void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shutdown)
{
setReadonly(part_of_full_shutdown);
setReadonly(/* on_shutdown = */ part_of_full_shutdown);
storage.partialShutdown();
}

Expand All @@ -339,10 +339,15 @@ void ReplicatedMergeTreeRestartingThread::shutdown(bool part_of_full_shutdown)
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
task->deactivate();

/// Explicitly set the event, because the restarting thread will not set it again
if (part_of_full_shutdown)
storage.startup_event.set();

LOG_TRACE(log, "Restarting thread finished");

/// Stop other tasks.
partialShutdown(part_of_full_shutdown);
setReadonly(part_of_full_shutdown);

}

void ReplicatedMergeTreeRestartingThread::setReadonly(bool on_shutdown)
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h
Expand Up @@ -5,6 +5,7 @@
#include <base/types.h>
#include <thread>
#include <atomic>
#include <Common/logger_useful.h>


namespace DB
Expand All @@ -25,6 +26,7 @@ class ReplicatedMergeTreeRestartingThread

void start(bool schedule = true)
{
LOG_TRACE(log, "Starting restating thread, schedule: {}", schedule);
if (schedule)
task->activateAndSchedule();
else
Expand All @@ -36,6 +38,7 @@ class ReplicatedMergeTreeRestartingThread
void shutdown(bool part_of_full_shutdown);

void run();

private:
StorageReplicatedMergeTree & storage;
String log_name;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageBuffer.cpp
Expand Up @@ -682,7 +682,7 @@ void StorageBuffer::startup()
}


void StorageBuffer::flush()
void StorageBuffer::flushAndPrepareForShutdown()
{
if (!flush_handle)
return;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageBuffer.h
Expand Up @@ -92,7 +92,7 @@ friend class BufferSink;

void startup() override;
/// Flush all buffers into the subordinate table and stop background thread.
void flush() override;
void flushAndPrepareForShutdown() override;
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageDistributed.cpp
Expand Up @@ -1432,7 +1432,7 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
return {};
}

void StorageDistributed::flush()
void StorageDistributed::flushAndPrepareForShutdown()
{
try
{
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageDistributed.h
Expand Up @@ -135,7 +135,7 @@ class StorageDistributed final : public IStorage, WithContext

void initializeFromDisk();
void shutdown() override;
void flush() override;
void flushAndPrepareForShutdown() override;
void drop() override;

bool storesDataOnDisk() const override { return data_volume != nullptr; }
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageProxy.h
Expand Up @@ -139,7 +139,7 @@ class StorageProxy : public IStorage

void startup() override { getNested()->startup(); }
void shutdown() override { getNested()->shutdown(); }
void flush() override { getNested()->flush(); }
void flushAndPrepareForShutdown() override { getNested()->flushAndPrepareForShutdown(); }

ActionLock getActionLock(StorageActionBlockType action_type) override { return getNested()->getActionLock(action_type); }

Expand Down

0 comments on commit 4d03c23

Please sign in to comment.