Skip to content

Commit

Permalink
Merge pull request #53710 from rschu1ze/system-reload-asynchronous-me…
Browse files Browse the repository at this point in the history
…trics

Add statement `SYSTEM RELOAD ASYNCHRONOUS METRICS`
  • Loading branch information
rschu1ze committed Jan 23, 2024
2 parents 6962859 + 8fc918b commit e23e7e6
Show file tree
Hide file tree
Showing 17 changed files with 175 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Columns:
- `hostname` ([LowCardinality(String)](../../sql-reference/data-types/string.md)) — Hostname of the server executing the query.
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Event date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Event time.
- `name` ([String](../../sql-reference/data-types/string.md)) — Metric name.
- `metric` ([String](../../sql-reference/data-types/string.md)) — Metric name.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — Metric value.

**Example**
Expand Down
1 change: 1 addition & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1900,6 +1900,7 @@ try

/// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread.
async_metrics.start();
global_context->setAsynchronousMetrics(&async_metrics);

main_config_reloader->start();
access_control.startPeriodicReloading();
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ enum class AccessType
M(SYSTEM_RELOAD_MODEL, "SYSTEM RELOAD MODELS, RELOAD MODEL, RELOAD MODELS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_FUNCTION, "SYSTEM RELOAD FUNCTIONS, RELOAD FUNCTION, RELOAD FUNCTIONS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_EMBEDDED_DICTIONARIES, "RELOAD EMBEDDED DICTIONARIES", GLOBAL, SYSTEM_RELOAD) /* implicitly enabled by the grant SYSTEM_RELOAD_DICTIONARY ON *.* */\
M(SYSTEM_RELOAD_ASYNCHRONOUS_METRICS, "RELOAD ASYNCHRONOUS METRICS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD, "", GROUP, SYSTEM) \
M(SYSTEM_RESTART_DISK, "SYSTEM RESTART DISK", GLOBAL, SYSTEM) \
M(SYSTEM_MERGES, "SYSTEM STOP MERGES, SYSTEM START MERGES, STOP MERGES, START MERGES", TABLE, SYSTEM) \
Expand Down
49 changes: 30 additions & 19 deletions src/Common/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ AsynchronousMetrics::AsynchronousMetrics(
}

#if defined(OS_LINUX)
void AsynchronousMetrics::openSensors()
void AsynchronousMetrics::openSensors() TSA_REQUIRES(data_mutex)
{
LOG_TRACE(log, "Scanning /sys/class/thermal");

Expand Down Expand Up @@ -136,7 +136,7 @@ void AsynchronousMetrics::openSensors()
}
}

void AsynchronousMetrics::openBlockDevices()
void AsynchronousMetrics::openBlockDevices() TSA_REQUIRES(data_mutex)
{
LOG_TRACE(log, "Scanning /sys/block");

Expand All @@ -163,7 +163,7 @@ void AsynchronousMetrics::openBlockDevices()
}
}

void AsynchronousMetrics::openEDAC()
void AsynchronousMetrics::openEDAC() TSA_REQUIRES(data_mutex)
{
LOG_TRACE(log, "Scanning /sys/devices/system/edac");

Expand Down Expand Up @@ -194,7 +194,7 @@ void AsynchronousMetrics::openEDAC()
}
}

void AsynchronousMetrics::openSensorsChips()
void AsynchronousMetrics::openSensorsChips() TSA_REQUIRES(data_mutex)
{
LOG_TRACE(log, "Scanning /sys/class/hwmon");

Expand Down Expand Up @@ -272,7 +272,7 @@ void AsynchronousMetrics::start()
{
/// Update once right now, to make metrics available just after server start
/// (without waiting for asynchronous_metrics_update_period_s).
update(std::chrono::system_clock::now());
update(std::chrono::system_clock::now(), false);
thread = std::make_unique<ThreadFromGlobalPool>([this] { run(); });
}

Expand All @@ -281,7 +281,7 @@ void AsynchronousMetrics::stop()
try
{
{
std::lock_guard lock{mutex};
std::lock_guard lock(thread_mutex);
quit = true;
}

Expand All @@ -306,11 +306,14 @@ AsynchronousMetrics::~AsynchronousMetrics()

AsynchronousMetricValues AsynchronousMetrics::getValues() const
{
std::lock_guard lock{mutex};
std::lock_guard lock(data_mutex);
return values;
}

static auto get_next_update_time(std::chrono::seconds update_period)
namespace
{

auto get_next_update_time(std::chrono::seconds update_period)
{
using namespace std::chrono;

Expand All @@ -334,6 +337,8 @@ static auto get_next_update_time(std::chrono::seconds update_period)
return time_next;
}

}

void AsynchronousMetrics::run()
{
setThreadName("AsyncMetrics");
Expand All @@ -344,9 +349,9 @@ void AsynchronousMetrics::run()

{
// Wait first, so that the first metric collection is also on even time.
std::unique_lock lock{mutex};
std::unique_lock lock(thread_mutex);
if (wait_cond.wait_until(lock, next_update_time,
[this] { return quit; }))
[this] TSA_REQUIRES(thread_mutex) { return quit; }))
{
break;
}
Expand All @@ -364,6 +369,9 @@ void AsynchronousMetrics::run()
}

#if USE_JEMALLOC
namespace
{

uint64_t updateJemallocEpoch()
{
uint64_t value = 0;
Expand All @@ -373,7 +381,7 @@ uint64_t updateJemallocEpoch()
}

template <typename Value>
static Value saveJemallocMetricImpl(
Value saveJemallocMetricImpl(
AsynchronousMetricValues & values,
const std::string & jemalloc_full_name,
const std::string & clickhouse_full_name)
Expand All @@ -386,7 +394,7 @@ static Value saveJemallocMetricImpl(
}

template<typename Value>
static Value saveJemallocMetric(AsynchronousMetricValues & values,
Value saveJemallocMetric(AsynchronousMetricValues & values,
const std::string & metric_name)
{
return saveJemallocMetricImpl<Value>(values,
Expand All @@ -395,13 +403,15 @@ static Value saveJemallocMetric(AsynchronousMetricValues & values,
}

template<typename Value>
static Value saveAllArenasMetric(AsynchronousMetricValues & values,
Value saveAllArenasMetric(AsynchronousMetricValues & values,
const std::string & metric_name)
{
return saveJemallocMetricImpl<Value>(values,
fmt::format("stats.arenas.{}.{}", MALLCTL_ARENAS_ALL, metric_name),
fmt::format("jemalloc.arenas.all.{}", metric_name));
}

}
#endif


Expand Down Expand Up @@ -547,21 +557,23 @@ AsynchronousMetrics::NetworkInterfaceStatValues::operator-(const AsynchronousMet
#endif


void AsynchronousMetrics::update(TimePoint update_time)
void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
{
Stopwatch watch;

AsynchronousMetricValues new_values;

std::lock_guard lock(data_mutex);

auto current_time = std::chrono::system_clock::now();
auto time_after_previous_update = current_time - previous_update_time;
auto time_since_previous_update = current_time - previous_update_time;
previous_update_time = update_time;

double update_interval = 0.;
if (first_run)
update_interval = update_period.count();
else
update_interval = std::chrono::duration_cast<std::chrono::microseconds>(time_after_previous_update).count() / 1e6;
update_interval = std::chrono::duration_cast<std::chrono::microseconds>(time_since_previous_update).count() / 1e6;
new_values["AsynchronousMetricsUpdateInterval"] = { update_interval, "Metrics update interval" };

/// This is also a good indicator of system responsiveness.
Expand Down Expand Up @@ -815,7 +827,7 @@ void AsynchronousMetrics::update(TimePoint update_time)
if (-1 == hz)
throw ErrnoException(ErrorCodes::CANNOT_SYSCONF, "Cannot call 'sysconf' to obtain system HZ");

double multiplier = 1.0 / hz / (std::chrono::duration_cast<std::chrono::nanoseconds>(time_after_previous_update).count() / 1e9);
double multiplier = 1.0 / hz / (std::chrono::duration_cast<std::chrono::nanoseconds>(time_since_previous_update).count() / 1e9);
size_t num_cpus = 0;

ProcStatValuesOther current_other_values{};
Expand Down Expand Up @@ -1572,7 +1584,7 @@ void AsynchronousMetrics::update(TimePoint update_time)

/// Add more metrics as you wish.

updateImpl(new_values, update_time, current_time);
updateImpl(update_time, current_time, force_update, first_run, new_values);

new_values["AsynchronousMetricsCalculationTimeSpent"] = { watch.elapsedSeconds(), "Time in seconds spent for calculation of asynchronous metrics (this is the overhead of asynchronous metrics)." };

Expand All @@ -1581,7 +1593,6 @@ void AsynchronousMetrics::update(TimePoint update_time)
first_run = false;

// Finally, update the current metrics.
std::lock_guard lock(mutex);
values = new_values;
}

Expand Down
86 changes: 46 additions & 40 deletions src/Common/AsynchronousMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,13 @@ struct ProtocolServerMetrics
*/
class AsynchronousMetrics
{
protected:
using Duration = std::chrono::seconds;
using TimePoint = std::chrono::system_clock::time_point;

public:
using ProtocolServerMetricsFunc = std::function<std::vector<ProtocolServerMetrics>()>;

AsynchronousMetrics(
int update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_);
Expand All @@ -69,62 +74,66 @@ class AsynchronousMetrics

void stop();

void update(TimePoint update_time, bool force_update = false);

/// Returns copy of all values.
AsynchronousMetricValues getValues() const;

protected:
using Duration = std::chrono::seconds;
using TimePoint = std::chrono::system_clock::time_point;

const Duration update_period;

/// Some values are incremental and we have to calculate the difference.
/// On first run we will only collect the values to subtract later.
bool first_run = true;
TimePoint previous_update_time;

Poco::Logger * log;
private:
virtual void updateImpl(AsynchronousMetricValues & new_values, TimePoint update_time, TimePoint current_time) = 0;
virtual void updateImpl(TimePoint update_time, TimePoint current_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values) = 0;
virtual void logImpl(AsynchronousMetricValues &) {}

ProtocolServerMetricsFunc protocol_server_metrics_func;

mutable std::mutex mutex;
std::unique_ptr<ThreadFromGlobalPool> thread;

mutable std::mutex thread_mutex;
std::condition_variable wait_cond;
bool quit {false};
AsynchronousMetricValues values;
bool quit TSA_GUARDED_BY(thread_mutex) = false;

mutable std::mutex data_mutex;

/// Some values are incremental and we have to calculate the difference.
/// On first run we will only collect the values to subtract later.
bool first_run TSA_GUARDED_BY(data_mutex) = true;
TimePoint previous_update_time TSA_GUARDED_BY(data_mutex);

AsynchronousMetricValues values TSA_GUARDED_BY(data_mutex);

#if defined(OS_LINUX) || defined(OS_FREEBSD)
MemoryStatisticsOS memory_stat;
MemoryStatisticsOS memory_stat TSA_GUARDED_BY(data_mutex);
#endif

#if defined(OS_LINUX)
std::optional<ReadBufferFromFilePRead> meminfo;
std::optional<ReadBufferFromFilePRead> loadavg;
std::optional<ReadBufferFromFilePRead> proc_stat;
std::optional<ReadBufferFromFilePRead> cpuinfo;
std::optional<ReadBufferFromFilePRead> file_nr;
std::optional<ReadBufferFromFilePRead> uptime;
std::optional<ReadBufferFromFilePRead> net_dev;

std::optional<ReadBufferFromFilePRead> cgroupmem_limit_in_bytes;
std::optional<ReadBufferFromFilePRead> cgroupmem_usage_in_bytes;
std::optional<ReadBufferFromFilePRead> cgroupcpu_cfs_period;
std::optional<ReadBufferFromFilePRead> cgroupcpu_cfs_quota;
std::optional<ReadBufferFromFilePRead> cgroupcpu_max;

std::vector<std::unique_ptr<ReadBufferFromFilePRead>> thermal;
std::optional<ReadBufferFromFilePRead> meminfo TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> loadavg TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> proc_stat TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> cpuinfo TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> file_nr TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> uptime TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> net_dev TSA_GUARDED_BY(data_mutex);

std::optional<ReadBufferFromFilePRead> cgroupmem_limit_in_bytes TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> cgroupmem_usage_in_bytes TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> cgroupcpu_cfs_period TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> cgroupcpu_cfs_quota TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> cgroupcpu_max TSA_GUARDED_BY(data_mutex);

std::vector<std::unique_ptr<ReadBufferFromFilePRead>> thermal TSA_GUARDED_BY(data_mutex);

std::unordered_map<String /* device name */,
std::unordered_map<String /* label name */,
std::unique_ptr<ReadBufferFromFilePRead>>> hwmon_devices;
std::unique_ptr<ReadBufferFromFilePRead>>> hwmon_devices TSA_GUARDED_BY(data_mutex);

std::vector<std::pair<
std::unique_ptr<ReadBufferFromFilePRead> /* correctable errors */,
std::unique_ptr<ReadBufferFromFilePRead> /* uncorrectable errors */>> edac;
std::unique_ptr<ReadBufferFromFilePRead> /* uncorrectable errors */>> edac TSA_GUARDED_BY(data_mutex);

std::unordered_map<String /* device name */, std::unique_ptr<ReadBufferFromFilePRead>> block_devs;
std::unordered_map<String /* device name */, std::unique_ptr<ReadBufferFromFilePRead>> block_devs TSA_GUARDED_BY(data_mutex);

/// TODO: socket statistics.

Expand Down Expand Up @@ -154,9 +163,9 @@ class AsynchronousMetrics
ProcStatValuesOther operator-(const ProcStatValuesOther & other) const;
};

ProcStatValuesCPU proc_stat_values_all_cpus{};
ProcStatValuesOther proc_stat_values_other{};
std::vector<ProcStatValuesCPU> proc_stat_values_per_cpu;
ProcStatValuesCPU proc_stat_values_all_cpus TSA_GUARDED_BY(data_mutex) {};
ProcStatValuesOther proc_stat_values_other TSA_GUARDED_BY(data_mutex) {};
std::vector<ProcStatValuesCPU> proc_stat_values_per_cpu TSA_GUARDED_BY(data_mutex);

/// https://www.kernel.org/doc/Documentation/block/stat.txt
struct BlockDeviceStatValues
Expand All @@ -181,7 +190,7 @@ class AsynchronousMetrics
BlockDeviceStatValues operator-(const BlockDeviceStatValues & other) const;
};

std::unordered_map<String /* device name */, BlockDeviceStatValues> block_device_stats;
std::unordered_map<String /* device name */, BlockDeviceStatValues> block_device_stats TSA_GUARDED_BY(data_mutex);

struct NetworkInterfaceStatValues
{
Expand All @@ -197,20 +206,17 @@ class AsynchronousMetrics
NetworkInterfaceStatValues operator-(const NetworkInterfaceStatValues & other) const;
};

std::unordered_map<String /* device name */, NetworkInterfaceStatValues> network_interface_stats;
std::unordered_map<String /* device name */, NetworkInterfaceStatValues> network_interface_stats TSA_GUARDED_BY(data_mutex);

Stopwatch block_devices_rescan_delay;
Stopwatch block_devices_rescan_delay TSA_GUARDED_BY(data_mutex);

void openSensors();
void openBlockDevices();
void openSensorsChips();
void openEDAC();
#endif

std::unique_ptr<ThreadFromGlobalPool> thread;

void run();
void update(TimePoint update_time);
};

}
2 changes: 1 addition & 1 deletion src/Coordination/KeeperAsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ KeeperAsynchronousMetrics::~KeeperAsynchronousMetrics()
stop();
}

void KeeperAsynchronousMetrics::updateImpl(AsynchronousMetricValues & new_values, TimePoint /*update_time*/, TimePoint /*current_time*/)
void KeeperAsynchronousMetrics::updateImpl(TimePoint /*update_time*/, TimePoint /*current_time*/, bool /*force_update*/, bool /*first_run*/, AsynchronousMetricValues & new_values)
{
#if USE_NURAFT
{
Expand Down
2 changes: 1 addition & 1 deletion src/Coordination/KeeperAsynchronousMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class KeeperAsynchronousMetrics : public AsynchronousMetrics
private:
ContextPtr context;

void updateImpl(AsynchronousMetricValues & new_values, TimePoint update_time, TimePoint current_time) override;
void updateImpl(TimePoint update_time, TimePoint current_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values) override;
};


Expand Down

0 comments on commit e23e7e6

Please sign in to comment.