Skip to content

Commit

Permalink
Add statement 'SYSTEM RELOAD ASYNCHRONOUS METRICS'
Browse files Browse the repository at this point in the history
  • Loading branch information
rschu1ze committed Jan 22, 2024
1 parent 748371d commit 90a0ea3
Show file tree
Hide file tree
Showing 19 changed files with 108 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Columns:
- `hostname` ([LowCardinality(String)](../../sql-reference/data-types/string.md)) — Hostname of the server executing the query.
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Event date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Event time.
- `name` ([String](../../sql-reference/data-types/string.md)) — Metric name.
- `metric` ([String](../../sql-reference/data-types/string.md)) — Metric name.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — Metric value.

**Example**
Expand Down
1 change: 1 addition & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,7 @@ try

/// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread.
async_metrics.start();
global_context->setAsynchronousMetrics(&async_metrics);

main_config_reloader->start();
access_control.startPeriodicReloading();
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ enum class AccessType
M(SYSTEM_RELOAD_MODEL, "SYSTEM RELOAD MODELS, RELOAD MODEL, RELOAD MODELS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_FUNCTION, "SYSTEM RELOAD FUNCTIONS, RELOAD FUNCTION, RELOAD FUNCTIONS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_EMBEDDED_DICTIONARIES, "RELOAD EMBEDDED DICTIONARIES", GLOBAL, SYSTEM_RELOAD) /* implicitly enabled by the grant SYSTEM_RELOAD_DICTIONARY ON *.* */\
M(SYSTEM_RELOAD_ASYNCHRONOUS_METRICS, "RELOAD ASYNCHRONOUS METRICS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD, "", GROUP, SYSTEM) \
M(SYSTEM_RESTART_DISK, "SYSTEM RESTART DISK", GLOBAL, SYSTEM) \
M(SYSTEM_MERGES, "SYSTEM STOP MERGES, SYSTEM START MERGES, STOP MERGES, START MERGES", TABLE, SYSTEM) \
Expand Down
6 changes: 3 additions & 3 deletions src/Common/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ void AsynchronousMetrics::start()
{
/// Update once right now, to make metrics available just after server start
/// (without waiting for asynchronous_metrics_update_period_s).
update(std::chrono::system_clock::now());
update(std::chrono::system_clock::now(), false);
thread = std::make_unique<ThreadFromGlobalPool>([this] { run(); });
}

Expand Down Expand Up @@ -557,7 +557,7 @@ AsynchronousMetrics::NetworkInterfaceStatValues::operator-(const AsynchronousMet
#endif


void AsynchronousMetrics::update(TimePoint update_time)
void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
{
Stopwatch watch;

Expand Down Expand Up @@ -1584,7 +1584,7 @@ void AsynchronousMetrics::update(TimePoint update_time)

/// Add more metrics as you wish.

updateImpl(update_time, current_time, first_run, new_values);
updateImpl(update_time, current_time, force_update, first_run, new_values);

new_values["AsynchronousMetricsCalculationTimeSpent"] = { watch.elapsedSeconds(), "Time in seconds spent for calculation of asynchronous metrics (this is the overhead of asynchronous metrics)." };

Expand Down
13 changes: 8 additions & 5 deletions src/Common/AsynchronousMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,13 @@ struct ProtocolServerMetrics
*/
class AsynchronousMetrics
{
protected:
using Duration = std::chrono::seconds;
using TimePoint = std::chrono::system_clock::time_point;

public:
using ProtocolServerMetricsFunc = std::function<std::vector<ProtocolServerMetrics>()>;

AsynchronousMetrics(
int update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_);
Expand All @@ -69,18 +74,17 @@ class AsynchronousMetrics

void stop();

void update(TimePoint update_time, bool force_update = false);

/// Returns copy of all values.
AsynchronousMetricValues getValues() const;

protected:
using Duration = std::chrono::seconds;
using TimePoint = std::chrono::system_clock::time_point;

const Duration update_period;

Poco::Logger * log;
private:
virtual void updateImpl(TimePoint update_time, TimePoint current_time, bool first_run, AsynchronousMetricValues & new_values) = 0;
virtual void updateImpl(TimePoint update_time, TimePoint current_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values) = 0;
virtual void logImpl(AsynchronousMetricValues &) {}

ProtocolServerMetricsFunc protocol_server_metrics_func;
Expand Down Expand Up @@ -213,7 +217,6 @@ class AsynchronousMetrics
#endif

void run();
void update(TimePoint update_time);
};

}
2 changes: 1 addition & 1 deletion src/Coordination/KeeperAsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ KeeperAsynchronousMetrics::~KeeperAsynchronousMetrics()
stop();
}

void KeeperAsynchronousMetrics::updateImpl(TimePoint /*update_time*/, TimePoint /*current_time*/, bool /*first_run*/, AsynchronousMetricValues & new_values)
void KeeperAsynchronousMetrics::updateImpl(TimePoint /*update_time*/, TimePoint /*current_time*/, bool /*force_update*/, bool /*first_run*/, AsynchronousMetricValues & new_values)
{
#if USE_NURAFT
{
Expand Down
2 changes: 1 addition & 1 deletion src/Coordination/KeeperAsynchronousMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class KeeperAsynchronousMetrics : public AsynchronousMetrics
private:
ContextPtr context;

void updateImpl(TimePoint update_time, TimePoint current_time, bool first_run, AsynchronousMetricValues & new_values) override;
void updateImpl(TimePoint update_time, TimePoint current_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values) override;
};


Expand Down
13 changes: 13 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ struct ContextSharedPart : boost::noncopyable
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache TSA_GUARDED_BY(mutex); /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
AsynchronousMetrics * asynchronous_metrics TSA_GUARDED_BY(mutex) = nullptr; /// Points to asynchronous metrics
ProcessList process_list; /// Executing queries at the moment.
SessionTracker session_tracker;
GlobalOvercommitTracker global_overcommit_tracker;
Expand Down Expand Up @@ -2860,6 +2861,18 @@ void Context::clearCaches() const
/// Intentionally not clearing the query cache which is transactionally inconsistent by design.
}

void Context::setAsynchronousMetrics(AsynchronousMetrics * asynchronous_metrics_)
{
std::lock_guard lock(shared->mutex);
shared->asynchronous_metrics = asynchronous_metrics_;
}

AsynchronousMetrics * Context::getAsynchronousMetrics() const
{
SharedLockGuard lock(shared->mutex);
return shared->asynchronous_metrics;
}

ThreadPool & Context::getPrefetchThreadpool() const
{
callOnce(shared->prefetch_threadpool_initialized, [&] {
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class IUserDefinedSQLObjectsStorage;
class InterserverCredentials;
using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
class InterserverIOHandler;
class AsynchronousMetrics;
class BackgroundSchedulePool;
class MergeList;
class MovesList;
Expand Down Expand Up @@ -1014,6 +1015,9 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>

/// -----------------------------------------------------------------------------------------------------

void setAsynchronousMetrics(AsynchronousMetrics * asynchronous_metrics_);
AsynchronousMetrics * getAsynchronousMetrics() const;

ThreadPool & getPrefetchThreadpool() const;

/// Note: prefetchThreadpool is different from threadpoolReader
Expand Down
13 changes: 13 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,14 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_USERS);
system_context->getAccessControl().reload(AccessControl::ReloadMode::ALL);
break;
case Type::RELOAD_ASYNCHRONOUS_METRICS:
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_ASYNCHRONOUS_METRICS);
auto * asynchronous_metrics = system_context->getAsynchronousMetrics();
if (asynchronous_metrics)
asynchronous_metrics->update(std::chrono::system_clock::now(), /*force_update*/ true);
break;
}
case Type::STOP_MERGES:
startStopAction(ActionLocks::PartsMerge, false);
break;
Expand Down Expand Up @@ -1225,6 +1233,11 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_RELOAD_USERS);
break;
}
case Type::RELOAD_ASYNCHRONOUS_METRICS:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_ASYNCHRONOUS_METRICS);
break;
}
case Type::STOP_MERGES:
case Type::START_MERGES:
{
Expand Down
10 changes: 5 additions & 5 deletions src/Interpreters/ServerAsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ ServerAsynchronousMetrics::~ServerAsynchronousMetrics()
stop();
}

void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint current_time, bool first_run, AsynchronousMetricValues & new_values)
void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint current_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values)
{
if (auto mark_cache = getContext()->getMarkCache())
{
Expand Down Expand Up @@ -377,7 +377,7 @@ void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint curr
}
#endif

updateHeavyMetricsIfNeeded(current_time, update_time, first_run, new_values);
updateHeavyMetricsIfNeeded(current_time, update_time, force_update, first_run, new_values);
}

void ServerAsynchronousMetrics::logImpl(AsynchronousMetricValues & new_values)
Expand Down Expand Up @@ -421,13 +421,13 @@ void ServerAsynchronousMetrics::updateDetachedPartsStats()
detached_parts_stats = current_values;
}

void ServerAsynchronousMetrics::updateHeavyMetricsIfNeeded(TimePoint current_time, TimePoint update_time, bool first_run, AsynchronousMetricValues & new_values)
void ServerAsynchronousMetrics::updateHeavyMetricsIfNeeded(TimePoint current_time, TimePoint update_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values)
{
const auto time_since_previous_update = current_time - heavy_metric_previous_update_time;
const bool update_heavy_metric = (time_since_previous_update >= heavy_metric_update_period) || first_run;
const bool update_heavy_metrics = (time_since_previous_update >= heavy_metric_update_period) || force_update || first_run;

Stopwatch watch;
if (update_heavy_metric)
if (update_heavy_metrics)
{
heavy_metric_previous_update_time = update_time;
if (first_run)
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/ServerAsynchronousMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ServerAsynchronousMetrics : WithContext, public AsynchronousMetrics
~ServerAsynchronousMetrics() override;

private:
void updateImpl(TimePoint update_time, TimePoint current_time, bool first_run, AsynchronousMetricValues & new_values) override;
void updateImpl(TimePoint update_time, TimePoint current_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values) override;
void logImpl(AsynchronousMetricValues & new_values) override;

const Duration heavy_metric_update_period;
Expand All @@ -34,7 +34,7 @@ class ServerAsynchronousMetrics : WithContext, public AsynchronousMetrics
DetachedPartsStats detached_parts_stats{};

void updateDetachedPartsStats();
void updateHeavyMetricsIfNeeded(TimePoint current_time, TimePoint update_time, bool first_run, AsynchronousMetricValues & new_values);
void updateHeavyMetricsIfNeeded(TimePoint current_time, TimePoint update_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values);
};

}
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
RELOAD_EMBEDDED_DICTIONARIES,
RELOAD_CONFIG,
RELOAD_USERS,
RELOAD_ASYNCHRONOUS_METRICS,
RESTART_DISK,
STOP_MERGES,
START_MERGES,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<clickhouse>

<asynchronous_metrics_update_period_s>60000</asynchronous_metrics_update_period_s>
<asynchronous_heavy_metrics_update_period_s>60000</asynchronous_heavy_metrics_update_period_s>

</clickhouse>
46 changes: 46 additions & 0 deletions tests/integration/test_system_reload_async_metrics/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import pytest
import shutil
import time
from helpers.cluster import ClickHouseCluster

# Tests that SYSTEM RELOAD ASYNCHRONOUS METRICS works.

# Config default.xml sets a large refresh interval of asynchronous metrics, so that the periodic updates don't interfere with the manual
# update below.
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/default.xml"],
stay_alive=True,
)


@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()


SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_DIR = os.path.join(SCRIPT_DIR, "configs")


def test_query_cache_size_is_runtime_configurable(start_cluster):
node.query("SYSTEM DROP QUERY CACHE")

res1 = node.query(
"SELECT value FROM system.asynchronous_metrics WHERE metric = 'NumberOfTables'"
)

node.query("CREATE TABLE tab (col UInt64) ENGINE MergeTree ORDER BY tuple()") # do anything dumb

node.query("SYSTEM RELOAD ASYNCHRONOUS METRICS")

res2 = node.query(
"SELECT value FROM system.asynchronous_metrics WHERE metric = 'NumberOfTables'"
)
assert int(res1.rstrip()) + 1 == int(res2.rstrip())
1 change: 1 addition & 0 deletions tests/queries/0_stateless/01271_show_privileges.reference
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ SYSTEM RELOAD DICTIONARY ['SYSTEM RELOAD DICTIONARIES','RELOAD DICTIONARY','RELO
SYSTEM RELOAD MODEL ['SYSTEM RELOAD MODELS','RELOAD MODEL','RELOAD MODELS'] GLOBAL SYSTEM RELOAD
SYSTEM RELOAD FUNCTION ['SYSTEM RELOAD FUNCTIONS','RELOAD FUNCTION','RELOAD FUNCTIONS'] GLOBAL SYSTEM RELOAD
SYSTEM RELOAD EMBEDDED DICTIONARIES ['RELOAD EMBEDDED DICTIONARIES'] GLOBAL SYSTEM RELOAD
SYSTEM RELOAD ASYNCHRONOUS METRICS ['RELOAD ASYNCHRONOUS METRICS'] GLOBAL SYSTEM RELOAD
SYSTEM RELOAD [] \N SYSTEM
SYSTEM RESTART DISK ['SYSTEM RESTART DISK'] GLOBAL SYSTEM
SYSTEM MERGES ['SYSTEM STOP MERGES','SYSTEM START MERGES','STOP MERGES','START MERGES'] TABLE SYSTEM
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SYSTEM RELOAD ASYNCHRONOUS METRICS;

0 comments on commit 90a0ea3

Please sign in to comment.